Module brevettiai.utils.upload_data

Functionality and script to upload data to the Brevetti AI Platform

Expand source code
#!/usr/bin/env python
"""
Functionality and script to upload data to the [Brevetti AI Platform](https://platform.brevetti.ai)

"""
import argparse
import os.path
import time
import concurrent
from tqdm import tqdm
import os
from brevettiai.platform.models.dataset import Dataset
from brevettiai.platform import PlatformAPI


def recursive_relative_paths(path, reverse=False):
    for root, dirs, files in os.walk(path):
        if reverse:
            dirs[:] = dirs[::-1]
        for file in files:
            file_path = os.path.join(root, file)
            dataset_path = os.path.relpath(file_path, path)
            yield (file_path, dataset_path.replace("\\", "/"))

def filtered_generator(path, filter_files, reverse=False):
    for (disk_path, dataset_path) in recursive_relative_paths(path):
        ix = filter_files.searchsorted(dataset_path)
        if ix >= len(filter_files) or filter_files[ix] != dataset_path:
            yield disk_path, dataset_path

def copy_recursive(dataset, generator):
    def upload_to_ds(ds, src, ds_target):
        pth = ds.get_location(ds_target)

        status = ds.io.copy(src, pth)
        return pth

    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        futures = []
        for (disk_path, dataset_path) in generator:
            future = executor.submit(upload_to_ds,
                                    dataset, disk_path, dataset_path)
            futures.append(future)
    for f in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
        pass


"""
Example usage:

python -m brevettiai.utils.upload_data my_local_folder --dataset_name "My new dataset name" --username my_name@my_domain.com --password *****
"""
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('input_folder', help='Absolute path to the folder containing the Dataset')
    parser.add_argument('--dataset_name', help='Name of the dataset as it will appear on the platform')
    parser.add_argument('--reference', help='Reference Field for the dataset')
    parser.add_argument('--username', help='Brevetti-AI platform username (https://platform.brevetti.ai)')
    parser.add_argument('--password', help='Brevetti-AI platform password (https://platform.brevetti.ai)')
    parser.add_argument('--dataset_id', help="Id of existing dataset to upload to")
    parser.add_argument('--overwrite', help="Overwrite data in existing dataset (only used if uploading to an existing dataset)", type=bool, default=False)
    parser.add_argument('--reverse', help="Reverse order of upload", type=bool, default=False)

    args = parser.parse_args()

    credentials = {}
    if "username" in args:
        credentials["username"] = args.username
    if "password" in args:
        credentials["password"] = args.password

    platform = PlatformAPI(**credentials, cache_remote_files=False, remember_me=True)

    if args.dataset_id:
        dataset = platform.get_dataset(args.dataset_id, write_access=True)
    else:
        ds_name = args.dataset_name if args.dataset_name else os.path.basename(args.input_folder)
        dataset = Dataset(name=ds_name, reference=args.reference)
        print(f'Creating dataset {ds_name} on platform')
        dataset = platform.create(dataset, write_access=True)

    if not args.overwrite and args.dataset_id:
        import numpy as np
        remote_files = np.array([f"{x[0]}/{y}"[len(dataset.bucket)+1:] for x in dataset.io.walk(dataset.bucket) for y in x[2]])
        remote_files.sort()
        generator = filtered_generator(args.input_folder, remote_files, reverse=args.reverse)
        print(f'Copying files to s3...')
    else:
        generator = recursive_relative_paths(args.input_folder, reverse=args.reverse)
        print('Copy entire dataset to s3...')

    start_procedure = time.time()

    copy_recursive(dataset, generator)

    print('End copy...')
    print(f'Dataset Created-Posted in {time.time() - start_procedure}s...')

Functions

def copy_recursive(dataset, generator)
Expand source code
def copy_recursive(dataset, generator):
    def upload_to_ds(ds, src, ds_target):
        pth = ds.get_location(ds_target)

        status = ds.io.copy(src, pth)
        return pth

    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        futures = []
        for (disk_path, dataset_path) in generator:
            future = executor.submit(upload_to_ds,
                                    dataset, disk_path, dataset_path)
            futures.append(future)
    for f in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
        pass
def filtered_generator(path, filter_files, reverse=False)
Expand source code
def filtered_generator(path, filter_files, reverse=False):
    for (disk_path, dataset_path) in recursive_relative_paths(path):
        ix = filter_files.searchsorted(dataset_path)
        if ix >= len(filter_files) or filter_files[ix] != dataset_path:
            yield disk_path, dataset_path
def recursive_relative_paths(path, reverse=False)
Expand source code
def recursive_relative_paths(path, reverse=False):
    for root, dirs, files in os.walk(path):
        if reverse:
            dirs[:] = dirs[::-1]
        for file in files:
            file_path = os.path.join(root, file)
            dataset_path = os.path.relpath(file_path, path)
            yield (file_path, dataset_path.replace("\\", "/"))