Module brevettiai.platform.models.dataset

Expand source code
import ast
import hashlib
import json
import logging
import re
import urllib.parse
from io import BytesIO
from typing import Optional, List, Dict, Any
from uuid import uuid4

import numpy as np
import pandas as pd
from pydantic import BaseModel, Field, PrivateAttr, root_validator

from brevettiai.data.image import ImageKeys
from brevettiai.data.sample_integrity import load_sample_identification, save_sample_identification, SampleSplit
from brevettiai.data.sample_tools import BrevettiDatasetSamples, get_samples, save_samples
from brevettiai.io import IoTools, io_tools
from brevettiai.platform.models import Tag
from brevettiai.platform.models import PlatformBackend
from brevettiai.platform.models import backend as platform_backend

log = logging.getLogger(__name__)

DATASET_ROOT = "__root__"

__all__ = ["tif2dzi", "BrevettiDatasetSamples", "load_sample_identification", "save_sample_identification",
           "SampleSplit", "get_samples", "save_samples", "Dataset"]

DATASET_LOCATIONS = dict(
    annotations=".annotations",
    meta=".meta",
    samples=".samples",
    data="",
)


def get_category(mapping, keys, default=None):
    try:
        key, keys = keys[0], keys[1:]
        default = default or key
        if keys:
            return mapping.get(key, get_category(mapping, keys, default=default))
        else:
            return mapping.get(key, default)
    except IndexError:
        return mapping.get(default[0], default)


class Dataset(BaseModel):
    """
    Model defining a dataset on the Brevetti platform
    """
    id: str = Field(default_factory=lambda: str(uuid4()))
    bucket: Optional[str]
    name: str
    locked: bool = False
    reference: Optional[str] = ""
    notes: Optional[str] = ""
    tags: List[Tag] = Field(default_factory=list, description="testsds")

    _io: IoTools = PrivateAttr(default=None)
    _backend: PlatformBackend = PrivateAttr(default=None)
    _uri_offset = PrivateAttr(default=None)

    def __init__(self, io=io_tools, backend=platform_backend, resolve_access_rights: bool = False, **data) -> None:
        super().__init__(**data)

        self._io = io
        self._backend = backend

        if self.bucket is None:
            self.bucket = backend.resource_path(self.id)

        if resolve_access_rights:
            self.resolve_access_rights()

    @root_validator(pre=True, allow_reuse=True)
    def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        values["tags"] = [Tag(id=x, name="<Unknown>") for x in values.pop("tagIds", tuple())]
        return values

    @property
    def backend(self):
        return self._backend

    @property
    def io(self):
        assert self._io is not None, "Remember to call start_job()"
        return self._io

    def resolve_access_rights(self):
        self.io.resolve_access_rights(path=self.bucket, resource_id=self.id, resource_type="dataset", mode='w')

    def get_image_samples(self, sample_filter=r".+(\.tif|\.bmp|\.jpeg|\.jpg|\.png|\.JPG)$",
                          annotations=False, location="data", **kwargs):
        """
        :param sample_filter: Filter samples by regex
        :param annotations: boolean, or dict Load annotation paths
        :param location: location in dataset ("data", "annotations", "samples", or path in dataset to search under)
        :param kwargs:
        :return: Dataframe of samples

        Mapping from path to category:
        Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply
        its key as the category. If no match is found, apply leaf folder name
        Example:
        class_mapping={
        "A": ["Tilted"],
        "B": ["Tilted"],
        "1": ["Crimp"]
        }
        If classes is True or a list/set of categories. The filter is applied after the mapping.
        """
        log.info(f"Getting image samples from dataset '{self.name}' [{self.get_uri()}]")
        samples = self.find_files(self.get_location(location), sample_filter=sample_filter, exclude_hidden=True,
                                  **kwargs)
        if annotations is not False:
            for ann in annotations if isinstance(annotations, (tuple, list)) else [annotations]:
                samples = self.merge_annotations(samples, **(ann if isinstance(ann, dict) else {}))

        samples["bucket"] = self.bucket
        samples["dataset"] = self.name
        samples["dataset_id"] = str(self.id)
        samples["reference"] = self.reference or "N/A"
        samples["url"] = samples.path.apply(self.get_sample_uri)
        log.info(f"Contents: {samples.category.value_counts().to_dict()}")
        return samples

    def get_annotations(self, filter=None):
        log.info(f"Getting annotations from dataset '{self.name}' [{self.get_uri()}] with filter: {filter}")
        samples = self.find_files(self.get_location("annotations"),
                                  sample_filter=filter,
                                  default_category_folder_index=-2, full_path=True)
        return samples

    def merge_annotations(self, samples, filter=None, duplicates="last", how="inner", prefix="segmentation_"):
        """
        :param samples: Samples to merge with annotations
        :param filter: Annotation filter
        :param duplicates: What to do about duplicate annotations
        True: include all, False: remove all, 'first': keep first, 'last' keep last
        :param how: Join mode between samples and annotations
        :param prefix: naming prefix for annotation file paths
        :return:
        """
        samples.index = samples.path.apply(self.get_ds_path)
        ann = self.get_annotations(filter=filter).set_index("folder")
        del ann["category"]
        if duplicates is not True:
            ann = ann[~ann.index.duplicated(keep=duplicates)]
        samples = samples.join(ann.add_prefix(prefix), how=how)
        mask = samples.select_dtypes(include=["number", "bool", "object"]).columns
        samples[mask] = samples[mask].fillna("")
        return samples.reset_index(drop=True)

    def get_samples(self, target):
        """
        Get samples from sample definition file located in .samples
        :param target: file path from bucket/.samples/
        :return: pandas dataframe of samples
        """
        target = (target,) if isinstance(target, str) else target
        sample_file = self.get_location("samples", *target)
        sep = self.io.path.get_sep(self.bucket)
        if self.io.isfile(sample_file):
            df = pd.read_csv(BytesIO(self.io.read_file(sample_file)), sep=";")
            if df.shape[1] == 1:
                df = pd.read_csv(BytesIO(self.io.read_file(sample_file)))
        else:
            df = pd.DataFrame(dict(path=["NaN"], category=[("NaN",)]))[:0]
        if ImageKeys.SIZE in df:
            df[ImageKeys.SIZE] = df[ImageKeys.SIZE].apply(lambda sz: np.array(json.loads(sz)))
        if ImageKeys.BOUNDING_BOX in df:
            df[ImageKeys.BOUNDING_BOX] = df[ImageKeys.BOUNDING_BOX].apply(lambda bbox: np.array(json.loads(bbox)))
        df["folder"] = df.path.str.rsplit(sep, 2).str[-2].fillna(DATASET_ROOT)  # path is still relative path
        df.path = df.path.apply(lambda pp: self.io.path.join(self.bucket, pp))
        if "segmentation_path" in df:
            df.segmentation_path = df.segmentation_path.fillna("")
            df.segmentation_path = df.segmentation_path.apply(lambda sp: self.io.path.join(self.bucket, sp))
        df["bucket"] = self.bucket
        df["dataset"] = self.name
        df["dataset_id"] = str(self.id)
        df["url"] = df.path.apply(self.get_sample_uri)
        df["category"] = df.category.apply(ast.literal_eval)
        return df

    def save_samples(self, target, df):
        """
        Save samples in dataset to samples file
        :param target:
        :param df:
        :return:
        """
        target = (target,) if isinstance(target, str) else target
        sample_file = self.get_location("samples", *target)

        # Only save samples from this dataset
        df = df[df.path.str.startswith(self.bucket)]
        # Only save non known columns
        df = df.iloc[:, ~df.columns.isin({"folder", "bucket", "dataset", "dataset_id", "url"})]
        df.path = df.path.apply(self.get_ds_path)
        if "segmentation_path" in df:
            df.segmentation_path = df.segmentation_path.apply(self.get_ds_path)
        self.upload(sample_file, df.to_csv(index=False, sep=";"))

    def get_meta(self, filter=None):
        return self.find_files(self.get_location("meta"),
                               sample_filter=filter,
                               default_category_folder_index=-2, full_path=True)

    def find_files(self, path=None, *args, **kwargs):
        path = path or self.bucket
        return pd.DataFrame(self.sample_walk(path, *args, **kwargs),
                            columns=("category", "folder", "path", "etag"))

    def sample_walk(self, bucket, sample_filter=None, class_mapping: dict = None, classes: list = None,
                    default_category=(DATASET_ROOT,),
                    exclude_hidden=False, default_category_folder_index=-1, full_path=False, calculate_md5=True,
                    **kwargs):
        class_mapping = class_mapping or {}
        classes = classes or []

        if isinstance(sample_filter, (list, tuple, set)):
            sample_filter = "|".join(map(str, sample_filter))
        if isinstance(sample_filter, str):
            sample_filter = re.compile(sample_filter).search
        class_mapping = {k: ((v,) if isinstance(v, str) else tuple(v)) for k, v in class_mapping.items()}

        bucket_offset = len(bucket)
        sep = self.io.path.get_sep(bucket)
        for r, dirs, files in self.io.walk(bucket, exclude_hidden=exclude_hidden, include_object=True):
            rel_path = r[bucket_offset:].strip(sep)
            folders = [] if rel_path == '' else [DATASET_ROOT] + rel_path.split(sep)
            def_cat = (folders[default_category_folder_index],) if folders else default_category
            category = get_category(class_mapping, folders[::-1], def_cat)

            if classes:
                if isinstance(category, str):  # If category is string make sure it is in allowed classes
                    if category not in classes:
                        continue
                elif len(category) > 0:  # IF category is not empty filter allowed classes
                    category = tuple(c for c in category if c in classes)
                    if len(category) == 0:
                        continue

            if len(files) > 0:
                for file in files:
                    if isinstance(file, tuple):
                        file, fobj = file
                    else:
                        fobj = None

                    if sample_filter is None or sample_filter(file.lower()):
                        folder = rel_path if full_path else (DATASET_ROOT, *folders)[-1]
                        path = self.io.path.join(r, file)
                        if calculate_md5 and (fobj is None or len(fobj.etag) != 32 or "-" in fobj.etag):
                            etag = self.io.get_md5(path)
                        elif fobj is None:
                            etag = hashlib.sha1(path.encode("utf8")).hexdigest()
                        else:
                            etag = fobj.etag
                        yield category, folder, path, etag

    @property
    def uri_offset(self):
        if self._uri_offset is not None:
            return self._uri_offset
        self._uri_offset = self.bucket.find("/", self.bucket.find("://") + 3) + 1
        return self._uri_offset

    def get_uri(self):
        return f"{self.backend.host}/data/{self.id}"

    def get_sample_uri(self, path):
        return f"{self.backend.host}/download?path={urllib.parse.quote(path[self.uri_offset:], safe='')}"

    def upload(self, path, data):
        pth = self.get_location(path)
        return self.io.write_file(pth, data)

    def get_ds_path(self, path):
        return path[len(self.bucket) + 1:]

    def get_location(self, mode, *path):
        """Get path to object, prefixing 'annotations', 'data', 'samples' with . if they are in the first argument """
        location = DATASET_LOCATIONS.get(mode, mode)

        path = (location, *path) if location else path
        return self.io.path.join(self.bucket, *path)

    def __str__(self):
        return json.dumps({k: v for k, v in self.__dict__.items() if not k.startswith("_")})


def tif2dzi(path, bucket):
    if ".tif" in path:
        rel_path = path.replace(bucket, "").strip("/")
        return io_tools.path.join(bucket.strip("/"), ".tiles", rel_path, "dzi.json")
    else:
        return path

Functions

def get_samples(datasets, target, *args, **kwargs)

Utility function for getting samples across multiple datasets by sample files :param datasets: :param target: :param args: :param kwargs: :return:

Expand source code
def get_samples(datasets, target, *args, **kwargs):
    """
    Utility function for getting samples across multiple datasets by sample files
    :param datasets:
    :param target:
    :param args:
    :param kwargs:
    :return:
    """
    samples = (d.get_samples(target) for d in sorted(datasets, key=lambda x: x.id))
    return pd.concat(samples).reset_index(drop=True)
def load_sample_identification(df, path, column='purpose', io=<brevettiai.io.utils.IoTools object>, **kwargs)

Load and join sample identification information onto dataframe of samples :param df: sample dataframe :param path: path to sample id file :param column: name of split column :param kwargs: extra args for io_tools.read_file :return: df, extra_ids

Expand source code
def load_sample_identification(df, path, column="purpose", io=io_tools, **kwargs):
    """
    Load and join sample identification information onto dataframe of samples
    :param df: sample dataframe
    :param path: path to sample id file
    :param column: name of split column
    :param kwargs: extra args for io_tools.read_file
    :return: df, extra_ids
    """
    dfid = pd.read_csv(BytesIO(io.read_file(path, **kwargs)), index_col="etag")
    if column not in dfid.columns:
        dfid.rename(columns={dfid.columns[0]: column})
    return merge_sample_identification(df, dfid, on="etag")
def save_sample_identification(df, path, known_ids=None, column='purpose', io=<brevettiai.io.utils.IoTools object>)
Expand source code
def save_sample_identification(df, path, known_ids=None, column="purpose", io=io_tools):
    columns = ["etag", column]
    df = df[columns].set_index("etag")
    if df.index.has_duplicates:
        log.info("Duplicate etag entries among samples, saving highest priority purpose")
        df = df.iloc[np.argsort(df.purpose.map({"train": 1, "devel": 2, "development": 2, "test": 3}).fillna(4))]
        df = df[~df.index.duplicated(keep="first")]
    io.write_file(path, df.append(known_ids).to_csv(header=True))
def save_samples(datasets, target, df)
Expand source code
def save_samples(datasets, target, df):
    for d in datasets:
        d.save_samples(target, df)
def tif2dzi(path, bucket)
Expand source code
def tif2dzi(path, bucket):
    if ".tif" in path:
        rel_path = path.replace(bucket, "").strip("/")
        return io_tools.path.join(bucket.strip("/"), ".tiles", rel_path, "dzi.json")
    else:
        return path

Classes

class BrevettiDatasetSamples (classes: list = None, class_mapping: dict = None, annotations: dict = None, calculate_md5: bool = False, walk: bool = True, samples_file_name: str = None, contains_column: str = None, contains_regex: str = None)

Base class for serializable modules

:param classes: Force samples to be of the categories in this list :param class_mapping: dict of mapping from path to (category) class. See example for description

Expand source code
class BrevettiDatasetSamples(vue.VueSettingsModule):
    def __init__(self, classes: list = None, class_mapping: dict = None, annotations: dict = None,
                 calculate_md5: bool = False, walk: bool = True, samples_file_name: str = None,
                 contains_column: str = None, contains_regex: str = None):
        """
        :param classes: Force samples to be of the categories in this list
        :param class_mapping: dict of mapping from path to (category) class. See example for description
        """
        self.classes = classes or []
        self.class_mapping = class_mapping or {}
        self.annotations = annotations or {}
        self.calculate_md5 = calculate_md5
        self.walk = walk
        self.samples_file_name = samples_file_name or ""
        self.contains_column = contains_column or ""
        self.contains_regex = contains_regex or ""

    def get_image_samples(self, datasets, *args, **kwargs):
        """
        :param sample_filter: Filter samples by regex
        :param annotations: boolean, or dict Load annotation paths
        :param kwargs:
        :return: Dataframe of samples

        Mapping from path to category:
        Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply
        its key as the category. If no match is found, apply leaf folder name
        Example:
        class_mapping={
        "A": ["Tilted"],
        "B": ["Tilted"],
        "1": ["Crimp"]
        }
        If classes is True or a list/set of categories. The filter is applied after the mapping.
        """

        with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
            futures = [executor.submit(ds.get_image_samples, *args, **{**self.__dict__, **kwargs})
                       for ds in sorted(datasets, key=lambda x: x.id)]
        return pd.concat([f.result() for f in futures]).reset_index(drop=True)

    def get_samples(self, datasets, walk=None, *args, **kwargs):
        """
        Utility function for getting samples across multiple datasets by sample files
        :param datasets:
        :param target:
        :param args:
        :param kwargs:
        :return:
        """
        walk = walk if walk is not None else self.walk

        if walk:
            df = self.get_image_samples(datasets, *args, **kwargs)
        else:
            df = get_samples(datasets, self.samples_file_name)
        if self.contains_column:
            df = df[df[self.contains_column].str.contains(self.contains_regex, regex=True, na=False)]
        assert not df.empty, "No samples found"
        return df

Ancestors

Methods

def get_image_samples(self, datasets, *args, **kwargs)

:param sample_filter: Filter samples by regex :param annotations: boolean, or dict Load annotation paths :param kwargs: :return: Dataframe of samples

Mapping from path to category: Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply its key as the category. If no match is found, apply leaf folder name Example: class_mapping={ "A": ["Tilted"], "B": ["Tilted"], "1": ["Crimp"] } If classes is True or a list/set of categories. The filter is applied after the mapping.

Expand source code
def get_image_samples(self, datasets, *args, **kwargs):
    """
    :param sample_filter: Filter samples by regex
    :param annotations: boolean, or dict Load annotation paths
    :param kwargs:
    :return: Dataframe of samples

    Mapping from path to category:
    Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply
    its key as the category. If no match is found, apply leaf folder name
    Example:
    class_mapping={
    "A": ["Tilted"],
    "B": ["Tilted"],
    "1": ["Crimp"]
    }
    If classes is True or a list/set of categories. The filter is applied after the mapping.
    """

    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        futures = [executor.submit(ds.get_image_samples, *args, **{**self.__dict__, **kwargs})
                   for ds in sorted(datasets, key=lambda x: x.id)]
    return pd.concat([f.result() for f in futures]).reset_index(drop=True)
def get_samples(self, datasets, walk=None, *args, **kwargs)

Utility function for getting samples across multiple datasets by sample files :param datasets: :param target: :param args: :param kwargs: :return:

Expand source code
def get_samples(self, datasets, walk=None, *args, **kwargs):
    """
    Utility function for getting samples across multiple datasets by sample files
    :param datasets:
    :param target:
    :param args:
    :param kwargs:
    :return:
    """
    walk = walk if walk is not None else self.walk

    if walk:
        df = self.get_image_samples(datasets, *args, **kwargs)
    else:
        df = get_samples(datasets, self.samples_file_name)
    if self.contains_column:
        df = df[df[self.contains_column].str.contains(self.contains_regex, regex=True, na=False)]
    assert not df.empty, "No samples found"
    return df

Inherited members

class Dataset (io=<brevettiai.io.utils.IoTools object>, backend=PlatformBackend(host='https://platform.brevetti.ai', output_segmentation_dir='output_segmentations', bucket_region='eu-west-1', data_bucket='s3://data.criterion.ai', custom_job_id='a0aaad69-c032-41c1-a68c-e9a15a5fb18c'), resolve_access_rights: bool = False, **data)

Model defining a dataset on the Brevetti platform

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class Dataset(BaseModel):
    """
    Model defining a dataset on the Brevetti platform
    """
    id: str = Field(default_factory=lambda: str(uuid4()))
    bucket: Optional[str]
    name: str
    locked: bool = False
    reference: Optional[str] = ""
    notes: Optional[str] = ""
    tags: List[Tag] = Field(default_factory=list, description="testsds")

    _io: IoTools = PrivateAttr(default=None)
    _backend: PlatformBackend = PrivateAttr(default=None)
    _uri_offset = PrivateAttr(default=None)

    def __init__(self, io=io_tools, backend=platform_backend, resolve_access_rights: bool = False, **data) -> None:
        super().__init__(**data)

        self._io = io
        self._backend = backend

        if self.bucket is None:
            self.bucket = backend.resource_path(self.id)

        if resolve_access_rights:
            self.resolve_access_rights()

    @root_validator(pre=True, allow_reuse=True)
    def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        values["tags"] = [Tag(id=x, name="<Unknown>") for x in values.pop("tagIds", tuple())]
        return values

    @property
    def backend(self):
        return self._backend

    @property
    def io(self):
        assert self._io is not None, "Remember to call start_job()"
        return self._io

    def resolve_access_rights(self):
        self.io.resolve_access_rights(path=self.bucket, resource_id=self.id, resource_type="dataset", mode='w')

    def get_image_samples(self, sample_filter=r".+(\.tif|\.bmp|\.jpeg|\.jpg|\.png|\.JPG)$",
                          annotations=False, location="data", **kwargs):
        """
        :param sample_filter: Filter samples by regex
        :param annotations: boolean, or dict Load annotation paths
        :param location: location in dataset ("data", "annotations", "samples", or path in dataset to search under)
        :param kwargs:
        :return: Dataframe of samples

        Mapping from path to category:
        Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply
        its key as the category. If no match is found, apply leaf folder name
        Example:
        class_mapping={
        "A": ["Tilted"],
        "B": ["Tilted"],
        "1": ["Crimp"]
        }
        If classes is True or a list/set of categories. The filter is applied after the mapping.
        """
        log.info(f"Getting image samples from dataset '{self.name}' [{self.get_uri()}]")
        samples = self.find_files(self.get_location(location), sample_filter=sample_filter, exclude_hidden=True,
                                  **kwargs)
        if annotations is not False:
            for ann in annotations if isinstance(annotations, (tuple, list)) else [annotations]:
                samples = self.merge_annotations(samples, **(ann if isinstance(ann, dict) else {}))

        samples["bucket"] = self.bucket
        samples["dataset"] = self.name
        samples["dataset_id"] = str(self.id)
        samples["reference"] = self.reference or "N/A"
        samples["url"] = samples.path.apply(self.get_sample_uri)
        log.info(f"Contents: {samples.category.value_counts().to_dict()}")
        return samples

    def get_annotations(self, filter=None):
        log.info(f"Getting annotations from dataset '{self.name}' [{self.get_uri()}] with filter: {filter}")
        samples = self.find_files(self.get_location("annotations"),
                                  sample_filter=filter,
                                  default_category_folder_index=-2, full_path=True)
        return samples

    def merge_annotations(self, samples, filter=None, duplicates="last", how="inner", prefix="segmentation_"):
        """
        :param samples: Samples to merge with annotations
        :param filter: Annotation filter
        :param duplicates: What to do about duplicate annotations
        True: include all, False: remove all, 'first': keep first, 'last' keep last
        :param how: Join mode between samples and annotations
        :param prefix: naming prefix for annotation file paths
        :return:
        """
        samples.index = samples.path.apply(self.get_ds_path)
        ann = self.get_annotations(filter=filter).set_index("folder")
        del ann["category"]
        if duplicates is not True:
            ann = ann[~ann.index.duplicated(keep=duplicates)]
        samples = samples.join(ann.add_prefix(prefix), how=how)
        mask = samples.select_dtypes(include=["number", "bool", "object"]).columns
        samples[mask] = samples[mask].fillna("")
        return samples.reset_index(drop=True)

    def get_samples(self, target):
        """
        Get samples from sample definition file located in .samples
        :param target: file path from bucket/.samples/
        :return: pandas dataframe of samples
        """
        target = (target,) if isinstance(target, str) else target
        sample_file = self.get_location("samples", *target)
        sep = self.io.path.get_sep(self.bucket)
        if self.io.isfile(sample_file):
            df = pd.read_csv(BytesIO(self.io.read_file(sample_file)), sep=";")
            if df.shape[1] == 1:
                df = pd.read_csv(BytesIO(self.io.read_file(sample_file)))
        else:
            df = pd.DataFrame(dict(path=["NaN"], category=[("NaN",)]))[:0]
        if ImageKeys.SIZE in df:
            df[ImageKeys.SIZE] = df[ImageKeys.SIZE].apply(lambda sz: np.array(json.loads(sz)))
        if ImageKeys.BOUNDING_BOX in df:
            df[ImageKeys.BOUNDING_BOX] = df[ImageKeys.BOUNDING_BOX].apply(lambda bbox: np.array(json.loads(bbox)))
        df["folder"] = df.path.str.rsplit(sep, 2).str[-2].fillna(DATASET_ROOT)  # path is still relative path
        df.path = df.path.apply(lambda pp: self.io.path.join(self.bucket, pp))
        if "segmentation_path" in df:
            df.segmentation_path = df.segmentation_path.fillna("")
            df.segmentation_path = df.segmentation_path.apply(lambda sp: self.io.path.join(self.bucket, sp))
        df["bucket"] = self.bucket
        df["dataset"] = self.name
        df["dataset_id"] = str(self.id)
        df["url"] = df.path.apply(self.get_sample_uri)
        df["category"] = df.category.apply(ast.literal_eval)
        return df

    def save_samples(self, target, df):
        """
        Save samples in dataset to samples file
        :param target:
        :param df:
        :return:
        """
        target = (target,) if isinstance(target, str) else target
        sample_file = self.get_location("samples", *target)

        # Only save samples from this dataset
        df = df[df.path.str.startswith(self.bucket)]
        # Only save non known columns
        df = df.iloc[:, ~df.columns.isin({"folder", "bucket", "dataset", "dataset_id", "url"})]
        df.path = df.path.apply(self.get_ds_path)
        if "segmentation_path" in df:
            df.segmentation_path = df.segmentation_path.apply(self.get_ds_path)
        self.upload(sample_file, df.to_csv(index=False, sep=";"))

    def get_meta(self, filter=None):
        return self.find_files(self.get_location("meta"),
                               sample_filter=filter,
                               default_category_folder_index=-2, full_path=True)

    def find_files(self, path=None, *args, **kwargs):
        path = path or self.bucket
        return pd.DataFrame(self.sample_walk(path, *args, **kwargs),
                            columns=("category", "folder", "path", "etag"))

    def sample_walk(self, bucket, sample_filter=None, class_mapping: dict = None, classes: list = None,
                    default_category=(DATASET_ROOT,),
                    exclude_hidden=False, default_category_folder_index=-1, full_path=False, calculate_md5=True,
                    **kwargs):
        class_mapping = class_mapping or {}
        classes = classes or []

        if isinstance(sample_filter, (list, tuple, set)):
            sample_filter = "|".join(map(str, sample_filter))
        if isinstance(sample_filter, str):
            sample_filter = re.compile(sample_filter).search
        class_mapping = {k: ((v,) if isinstance(v, str) else tuple(v)) for k, v in class_mapping.items()}

        bucket_offset = len(bucket)
        sep = self.io.path.get_sep(bucket)
        for r, dirs, files in self.io.walk(bucket, exclude_hidden=exclude_hidden, include_object=True):
            rel_path = r[bucket_offset:].strip(sep)
            folders = [] if rel_path == '' else [DATASET_ROOT] + rel_path.split(sep)
            def_cat = (folders[default_category_folder_index],) if folders else default_category
            category = get_category(class_mapping, folders[::-1], def_cat)

            if classes:
                if isinstance(category, str):  # If category is string make sure it is in allowed classes
                    if category not in classes:
                        continue
                elif len(category) > 0:  # IF category is not empty filter allowed classes
                    category = tuple(c for c in category if c in classes)
                    if len(category) == 0:
                        continue

            if len(files) > 0:
                for file in files:
                    if isinstance(file, tuple):
                        file, fobj = file
                    else:
                        fobj = None

                    if sample_filter is None or sample_filter(file.lower()):
                        folder = rel_path if full_path else (DATASET_ROOT, *folders)[-1]
                        path = self.io.path.join(r, file)
                        if calculate_md5 and (fobj is None or len(fobj.etag) != 32 or "-" in fobj.etag):
                            etag = self.io.get_md5(path)
                        elif fobj is None:
                            etag = hashlib.sha1(path.encode("utf8")).hexdigest()
                        else:
                            etag = fobj.etag
                        yield category, folder, path, etag

    @property
    def uri_offset(self):
        if self._uri_offset is not None:
            return self._uri_offset
        self._uri_offset = self.bucket.find("/", self.bucket.find("://") + 3) + 1
        return self._uri_offset

    def get_uri(self):
        return f"{self.backend.host}/data/{self.id}"

    def get_sample_uri(self, path):
        return f"{self.backend.host}/download?path={urllib.parse.quote(path[self.uri_offset:], safe='')}"

    def upload(self, path, data):
        pth = self.get_location(path)
        return self.io.write_file(pth, data)

    def get_ds_path(self, path):
        return path[len(self.bucket) + 1:]

    def get_location(self, mode, *path):
        """Get path to object, prefixing 'annotations', 'data', 'samples' with . if they are in the first argument """
        location = DATASET_LOCATIONS.get(mode, mode)

        path = (location, *path) if location else path
        return self.io.path.join(self.bucket, *path)

    def __str__(self):
        return json.dumps({k: v for k, v in self.__dict__.items() if not k.startswith("_")})

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var bucket : Optional[str]
var id : str
var locked : bool
var name : str
var notes : Optional[str]
var reference : Optional[str]
var tags : List[Tag]

Static methods

def parse_settings(values: Dict[str, Any]) ‑> Dict[str, Any]
Expand source code
@root_validator(pre=True, allow_reuse=True)
def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    values["tags"] = [Tag(id=x, name="<Unknown>") for x in values.pop("tagIds", tuple())]
    return values

Instance variables

var backend
Expand source code
@property
def backend(self):
    return self._backend
var io
Expand source code
@property
def io(self):
    assert self._io is not None, "Remember to call start_job()"
    return self._io
var uri_offset
Expand source code
@property
def uri_offset(self):
    if self._uri_offset is not None:
        return self._uri_offset
    self._uri_offset = self.bucket.find("/", self.bucket.find("://") + 3) + 1
    return self._uri_offset

Methods

def find_files(self, path=None, *args, **kwargs)
Expand source code
def find_files(self, path=None, *args, **kwargs):
    path = path or self.bucket
    return pd.DataFrame(self.sample_walk(path, *args, **kwargs),
                        columns=("category", "folder", "path", "etag"))
def get_annotations(self, filter=None)
Expand source code
def get_annotations(self, filter=None):
    log.info(f"Getting annotations from dataset '{self.name}' [{self.get_uri()}] with filter: {filter}")
    samples = self.find_files(self.get_location("annotations"),
                              sample_filter=filter,
                              default_category_folder_index=-2, full_path=True)
    return samples
def get_ds_path(self, path)
Expand source code
def get_ds_path(self, path):
    return path[len(self.bucket) + 1:]
def get_image_samples(self, sample_filter='.+(\\.tif|\\.bmp|\\.jpeg|\\.jpg|\\.png|\\.JPG)$', annotations=False, location='data', **kwargs)

:param sample_filter: Filter samples by regex :param annotations: boolean, or dict Load annotation paths :param location: location in dataset ("data", "annotations", "samples", or path in dataset to search under) :param kwargs: :return: Dataframe of samples

Mapping from path to category: Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply its key as the category. If no match is found, apply leaf folder name Example: class_mapping={ "A": ["Tilted"], "B": ["Tilted"], "1": ["Crimp"] } If classes is True or a list/set of categories. The filter is applied after the mapping.

Expand source code
def get_image_samples(self, sample_filter=r".+(\.tif|\.bmp|\.jpeg|\.jpg|\.png|\.JPG)$",
                      annotations=False, location="data", **kwargs):
    """
    :param sample_filter: Filter samples by regex
    :param annotations: boolean, or dict Load annotation paths
    :param location: location in dataset ("data", "annotations", "samples", or path in dataset to search under)
    :param kwargs:
    :return: Dataframe of samples

    Mapping from path to category:
    Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply
    its key as the category. If no match is found, apply leaf folder name
    Example:
    class_mapping={
    "A": ["Tilted"],
    "B": ["Tilted"],
    "1": ["Crimp"]
    }
    If classes is True or a list/set of categories. The filter is applied after the mapping.
    """
    log.info(f"Getting image samples from dataset '{self.name}' [{self.get_uri()}]")
    samples = self.find_files(self.get_location(location), sample_filter=sample_filter, exclude_hidden=True,
                              **kwargs)
    if annotations is not False:
        for ann in annotations if isinstance(annotations, (tuple, list)) else [annotations]:
            samples = self.merge_annotations(samples, **(ann if isinstance(ann, dict) else {}))

    samples["bucket"] = self.bucket
    samples["dataset"] = self.name
    samples["dataset_id"] = str(self.id)
    samples["reference"] = self.reference or "N/A"
    samples["url"] = samples.path.apply(self.get_sample_uri)
    log.info(f"Contents: {samples.category.value_counts().to_dict()}")
    return samples
def get_location(self, mode, *path)

Get path to object, prefixing 'annotations', 'data', 'samples' with . if they are in the first argument

Expand source code
def get_location(self, mode, *path):
    """Get path to object, prefixing 'annotations', 'data', 'samples' with . if they are in the first argument """
    location = DATASET_LOCATIONS.get(mode, mode)

    path = (location, *path) if location else path
    return self.io.path.join(self.bucket, *path)
def get_meta(self, filter=None)
Expand source code
def get_meta(self, filter=None):
    return self.find_files(self.get_location("meta"),
                           sample_filter=filter,
                           default_category_folder_index=-2, full_path=True)
def get_sample_uri(self, path)
Expand source code
def get_sample_uri(self, path):
    return f"{self.backend.host}/download?path={urllib.parse.quote(path[self.uri_offset:], safe='')}"
def get_samples(self, target)

Get samples from sample definition file located in .samples :param target: file path from bucket/.samples/ :return: pandas dataframe of samples

Expand source code
def get_samples(self, target):
    """
    Get samples from sample definition file located in .samples
    :param target: file path from bucket/.samples/
    :return: pandas dataframe of samples
    """
    target = (target,) if isinstance(target, str) else target
    sample_file = self.get_location("samples", *target)
    sep = self.io.path.get_sep(self.bucket)
    if self.io.isfile(sample_file):
        df = pd.read_csv(BytesIO(self.io.read_file(sample_file)), sep=";")
        if df.shape[1] == 1:
            df = pd.read_csv(BytesIO(self.io.read_file(sample_file)))
    else:
        df = pd.DataFrame(dict(path=["NaN"], category=[("NaN",)]))[:0]
    if ImageKeys.SIZE in df:
        df[ImageKeys.SIZE] = df[ImageKeys.SIZE].apply(lambda sz: np.array(json.loads(sz)))
    if ImageKeys.BOUNDING_BOX in df:
        df[ImageKeys.BOUNDING_BOX] = df[ImageKeys.BOUNDING_BOX].apply(lambda bbox: np.array(json.loads(bbox)))
    df["folder"] = df.path.str.rsplit(sep, 2).str[-2].fillna(DATASET_ROOT)  # path is still relative path
    df.path = df.path.apply(lambda pp: self.io.path.join(self.bucket, pp))
    if "segmentation_path" in df:
        df.segmentation_path = df.segmentation_path.fillna("")
        df.segmentation_path = df.segmentation_path.apply(lambda sp: self.io.path.join(self.bucket, sp))
    df["bucket"] = self.bucket
    df["dataset"] = self.name
    df["dataset_id"] = str(self.id)
    df["url"] = df.path.apply(self.get_sample_uri)
    df["category"] = df.category.apply(ast.literal_eval)
    return df
def get_uri(self)
Expand source code
def get_uri(self):
    return f"{self.backend.host}/data/{self.id}"
def merge_annotations(self, samples, filter=None, duplicates='last', how='inner', prefix='segmentation_')

:param samples: Samples to merge with annotations :param filter: Annotation filter :param duplicates: What to do about duplicate annotations True: include all, False: remove all, 'first': keep first, 'last' keep last :param how: Join mode between samples and annotations :param prefix: naming prefix for annotation file paths :return:

Expand source code
def merge_annotations(self, samples, filter=None, duplicates="last", how="inner", prefix="segmentation_"):
    """
    :param samples: Samples to merge with annotations
    :param filter: Annotation filter
    :param duplicates: What to do about duplicate annotations
    True: include all, False: remove all, 'first': keep first, 'last' keep last
    :param how: Join mode between samples and annotations
    :param prefix: naming prefix for annotation file paths
    :return:
    """
    samples.index = samples.path.apply(self.get_ds_path)
    ann = self.get_annotations(filter=filter).set_index("folder")
    del ann["category"]
    if duplicates is not True:
        ann = ann[~ann.index.duplicated(keep=duplicates)]
    samples = samples.join(ann.add_prefix(prefix), how=how)
    mask = samples.select_dtypes(include=["number", "bool", "object"]).columns
    samples[mask] = samples[mask].fillna("")
    return samples.reset_index(drop=True)
def resolve_access_rights(self)
Expand source code
def resolve_access_rights(self):
    self.io.resolve_access_rights(path=self.bucket, resource_id=self.id, resource_type="dataset", mode='w')
def sample_walk(self, bucket, sample_filter=None, class_mapping: dict = None, classes: list = None, default_category=('__root__',), exclude_hidden=False, default_category_folder_index=-1, full_path=False, calculate_md5=True, **kwargs)
Expand source code
def sample_walk(self, bucket, sample_filter=None, class_mapping: dict = None, classes: list = None,
                default_category=(DATASET_ROOT,),
                exclude_hidden=False, default_category_folder_index=-1, full_path=False, calculate_md5=True,
                **kwargs):
    class_mapping = class_mapping or {}
    classes = classes or []

    if isinstance(sample_filter, (list, tuple, set)):
        sample_filter = "|".join(map(str, sample_filter))
    if isinstance(sample_filter, str):
        sample_filter = re.compile(sample_filter).search
    class_mapping = {k: ((v,) if isinstance(v, str) else tuple(v)) for k, v in class_mapping.items()}

    bucket_offset = len(bucket)
    sep = self.io.path.get_sep(bucket)
    for r, dirs, files in self.io.walk(bucket, exclude_hidden=exclude_hidden, include_object=True):
        rel_path = r[bucket_offset:].strip(sep)
        folders = [] if rel_path == '' else [DATASET_ROOT] + rel_path.split(sep)
        def_cat = (folders[default_category_folder_index],) if folders else default_category
        category = get_category(class_mapping, folders[::-1], def_cat)

        if classes:
            if isinstance(category, str):  # If category is string make sure it is in allowed classes
                if category not in classes:
                    continue
            elif len(category) > 0:  # IF category is not empty filter allowed classes
                category = tuple(c for c in category if c in classes)
                if len(category) == 0:
                    continue

        if len(files) > 0:
            for file in files:
                if isinstance(file, tuple):
                    file, fobj = file
                else:
                    fobj = None

                if sample_filter is None or sample_filter(file.lower()):
                    folder = rel_path if full_path else (DATASET_ROOT, *folders)[-1]
                    path = self.io.path.join(r, file)
                    if calculate_md5 and (fobj is None or len(fobj.etag) != 32 or "-" in fobj.etag):
                        etag = self.io.get_md5(path)
                    elif fobj is None:
                        etag = hashlib.sha1(path.encode("utf8")).hexdigest()
                    else:
                        etag = fobj.etag
                    yield category, folder, path, etag
def save_samples(self, target, df)

Save samples in dataset to samples file :param target: :param df: :return:

Expand source code
def save_samples(self, target, df):
    """
    Save samples in dataset to samples file
    :param target:
    :param df:
    :return:
    """
    target = (target,) if isinstance(target, str) else target
    sample_file = self.get_location("samples", *target)

    # Only save samples from this dataset
    df = df[df.path.str.startswith(self.bucket)]
    # Only save non known columns
    df = df.iloc[:, ~df.columns.isin({"folder", "bucket", "dataset", "dataset_id", "url"})]
    df.path = df.path.apply(self.get_ds_path)
    if "segmentation_path" in df:
        df.segmentation_path = df.segmentation_path.apply(self.get_ds_path)
    self.upload(sample_file, df.to_csv(index=False, sep=";"))
def upload(self, path, data)
Expand source code
def upload(self, path, data):
    pth = self.get_location(path)
    return self.io.write_file(pth, data)
class SampleSplit (stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1, mode='sorted_permutation')

Base class for serializable modules

:param stratification: As regex string performed on df.path or list selecting columns :param uniqueness: As regex string performed on df.path or list selecting columns :param split: fraction of samples to apply the purpose on :param seed: seeding for assignment :param mode: ' or 'murmurhash3' :return:

Expand source code
class SampleSplit(vue.VueSettingsModule):
    MODE_MURMURHASH3 = "murmurhash3"
    MODE_SORTED_PERMUTATION = "sorted_permutation"

    def __init__(self, stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1,
                 mode=MODE_SORTED_PERMUTATION):
        """
        :param stratification: As regex string performed on df.path or list selecting columns
        :param uniqueness: As regex string performed on df.path or list selecting columns
        :param split: fraction of samples to apply the purpose on
        :param seed: seeding for assignment
        :param mode: ' or 'murmurhash3'
        :return:
        """
        self.stratification = stratification
        try:
            if isinstance(uniqueness, str):
                uniqueness = json.loads(uniqueness)
        except json.JSONDecodeError:
            pass
        self.uniqueness = uniqueness or ["etag"]
        self.split = split
        self.seed = seed
        self.mode = mode
        assert self.mode in {self.MODE_MURMURHASH3, self.MODE_SORTED_PERMUTATION}

    def assign(self, df, purpose="train", remainder=None, column="purpose"):
        """
        Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.

        Definitions:
        * Stratification: Grouping of samples which should be treated as individual groups.
        meaning every group must be split according to the sample split target percentage,
        and uniqueness is performed on a per group basis
        * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.

        :param df: pd.DataFrame of samples if purpose column does not exist it is added
        :param purpose: purpose to be assigned
        :param remainder: purpose to assign remainder samples, or None to leave unassigned
        :param column: column for assignment of split category
        """
        # Ensure columns
        if column not in df:
            df[column] = pd.NA
        columns = df.columns

        split = self.split
        stratification = self.stratification
        uniqueness = self.uniqueness

        if split == 0 or ~df.purpose.isna().any():  # Assign no samples
            pass
        elif split == 1:  # Assign all samples
            df.loc[df.purpose.isna(), column] = purpose
        else:
            # Parse regex stratification and uniqueness strategies
            if isinstance(stratification, str) and stratification:
                df["_stratification"] = df.path.str.extract(stratification)[0]
                stratification = ["_stratification"]
            assert stratification is None or all(x in df.columns for x in stratification), \
                "stratification should be None or in columns"

            if isinstance(uniqueness, str) and uniqueness:
                df["_uniqueness"] = df.path.str.extract(uniqueness)[0]
                uniqueness = ["_uniqueness"]
            assert uniqueness is None or all(x in df.columns for x in uniqueness), \
                "uniqueness should be None or in columns"

            seed = None if self.seed < 0 else self.seed
            rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed)

            def _split(g):
                if uniqueness:
                    items = g[uniqueness + [column]].copy()
                    items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2})
                    items = items.sort_values("_purpose_prio")[uniqueness + [column]]
                    unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"])
                    unique_items.columns = ["samples", column]
                    unique_items = unique_items.reset_index()
                else:
                    unique_items = g[[column]].reset_index(drop=True)
                    unique_items["samples"] = 1

                # split unmarked items
                unmarked = unique_items[unique_items.purpose.isna()]

                # mode
                if unmarked.size > 0:
                    if self.mode == self.MODE_MURMURHASH3:
                        # Random seed for this stratified group
                        mmh_seed = rng.randint(0x7FFFFFFF)

                        # Extract uniqueness for hashing
                        if uniqueness:
                            unique_df = unmarked[uniqueness]
                        else:
                            unique_df = pd.DataFrame(unmarked.index)

                        # Apply mmh3 hashing
                        hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False),
                                                axis=1)

                        # Assign
                        unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose
                    elif self.mode == self.MODE_SORTED_PERMUTATION or True:  # default
                        # Select unmarked to assign
                        items_count = unique_items.samples.sum()
                        marked_count = unique_items.samples[unique_items.purpose == purpose].sum()
                        assign_count = items_count * split - marked_count
                        unmarked = rng.permutation(unmarked.index)

                        cdf = unique_items.samples[unmarked].cumsum()
                        ix = np.searchsorted(cdf.values, assign_count, side="right")
                        if len(cdf.values) > ix:
                            ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1])))

                        # Assign
                        unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose

                if uniqueness:
                    g.loc[:, column] = unique_items.set_index(uniqueness) \
                        .loc[g[uniqueness].set_index(uniqueness).index].purpose.values
                else:
                    g.loc[:, column] = unique_items.purpose.values
                return g

            if stratification:
                df = df.groupby(stratification).apply(_split)
            else:
                df = _split(df)

        if remainder:
            df.loc[df.purpose.isna(), column] = remainder

        # Ensure etag is unique across all stratified groups
        #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values
        return df[columns]

    def update_unassigned(self, df, id_path,
                          purpose="train", remainder="devel", column="purpose", io=io_tools):
        """
        Updates sample purpose in id_path that may hold previous dataset splits and sample ids
        Unassigned samples are also assigned and id_path is updated
        :param df: pd.DataFrame containing the samples
        :param id_path: path to the identification csv file
        :param purpose: Purpose to assign
        :param remainder: Purpose to assign to remainder or none to leave unassigned
        :param column: Column to assign split purposes to
        :return:
        """

        log.info("Looking for previous train / development split")

        known_ids = None
        if io.isfile(id_path):
            df, known_ids = load_sample_identification(df, id_path, column=column, io=io)
            log.info("Using train / development split from run cached in artifacts")
        else:
            log.info("No initial sample identification file found")

        df = self.assign(df, purpose=purpose, remainder=remainder, column=column)

        save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io)

        return df

Ancestors

Class variables

var MODE_MURMURHASH3
var MODE_SORTED_PERMUTATION

Methods

def assign(self, df, purpose='train', remainder=None, column='purpose')

Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.

Definitions: * Stratification: Grouping of samples which should be treated as individual groups. meaning every group must be split according to the sample split target percentage, and uniqueness is performed on a per group basis * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.

:param df: pd.DataFrame of samples if purpose column does not exist it is added :param purpose: purpose to be assigned :param remainder: purpose to assign remainder samples, or None to leave unassigned :param column: column for assignment of split category

Expand source code
def assign(self, df, purpose="train", remainder=None, column="purpose"):
    """
    Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.

    Definitions:
    * Stratification: Grouping of samples which should be treated as individual groups.
    meaning every group must be split according to the sample split target percentage,
    and uniqueness is performed on a per group basis
    * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.

    :param df: pd.DataFrame of samples if purpose column does not exist it is added
    :param purpose: purpose to be assigned
    :param remainder: purpose to assign remainder samples, or None to leave unassigned
    :param column: column for assignment of split category
    """
    # Ensure columns
    if column not in df:
        df[column] = pd.NA
    columns = df.columns

    split = self.split
    stratification = self.stratification
    uniqueness = self.uniqueness

    if split == 0 or ~df.purpose.isna().any():  # Assign no samples
        pass
    elif split == 1:  # Assign all samples
        df.loc[df.purpose.isna(), column] = purpose
    else:
        # Parse regex stratification and uniqueness strategies
        if isinstance(stratification, str) and stratification:
            df["_stratification"] = df.path.str.extract(stratification)[0]
            stratification = ["_stratification"]
        assert stratification is None or all(x in df.columns for x in stratification), \
            "stratification should be None or in columns"

        if isinstance(uniqueness, str) and uniqueness:
            df["_uniqueness"] = df.path.str.extract(uniqueness)[0]
            uniqueness = ["_uniqueness"]
        assert uniqueness is None or all(x in df.columns for x in uniqueness), \
            "uniqueness should be None or in columns"

        seed = None if self.seed < 0 else self.seed
        rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed)

        def _split(g):
            if uniqueness:
                items = g[uniqueness + [column]].copy()
                items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2})
                items = items.sort_values("_purpose_prio")[uniqueness + [column]]
                unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"])
                unique_items.columns = ["samples", column]
                unique_items = unique_items.reset_index()
            else:
                unique_items = g[[column]].reset_index(drop=True)
                unique_items["samples"] = 1

            # split unmarked items
            unmarked = unique_items[unique_items.purpose.isna()]

            # mode
            if unmarked.size > 0:
                if self.mode == self.MODE_MURMURHASH3:
                    # Random seed for this stratified group
                    mmh_seed = rng.randint(0x7FFFFFFF)

                    # Extract uniqueness for hashing
                    if uniqueness:
                        unique_df = unmarked[uniqueness]
                    else:
                        unique_df = pd.DataFrame(unmarked.index)

                    # Apply mmh3 hashing
                    hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False),
                                            axis=1)

                    # Assign
                    unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose
                elif self.mode == self.MODE_SORTED_PERMUTATION or True:  # default
                    # Select unmarked to assign
                    items_count = unique_items.samples.sum()
                    marked_count = unique_items.samples[unique_items.purpose == purpose].sum()
                    assign_count = items_count * split - marked_count
                    unmarked = rng.permutation(unmarked.index)

                    cdf = unique_items.samples[unmarked].cumsum()
                    ix = np.searchsorted(cdf.values, assign_count, side="right")
                    if len(cdf.values) > ix:
                        ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1])))

                    # Assign
                    unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose

            if uniqueness:
                g.loc[:, column] = unique_items.set_index(uniqueness) \
                    .loc[g[uniqueness].set_index(uniqueness).index].purpose.values
            else:
                g.loc[:, column] = unique_items.purpose.values
            return g

        if stratification:
            df = df.groupby(stratification).apply(_split)
        else:
            df = _split(df)

    if remainder:
        df.loc[df.purpose.isna(), column] = remainder

    # Ensure etag is unique across all stratified groups
    #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values
    return df[columns]
def update_unassigned(self, df, id_path, purpose='train', remainder='devel', column='purpose', io=<brevettiai.io.utils.IoTools object>)

Updates sample purpose in id_path that may hold previous dataset splits and sample ids Unassigned samples are also assigned and id_path is updated :param df: pd.DataFrame containing the samples :param id_path: path to the identification csv file :param purpose: Purpose to assign :param remainder: Purpose to assign to remainder or none to leave unassigned :param column: Column to assign split purposes to :return:

Expand source code
def update_unassigned(self, df, id_path,
                      purpose="train", remainder="devel", column="purpose", io=io_tools):
    """
    Updates sample purpose in id_path that may hold previous dataset splits and sample ids
    Unassigned samples are also assigned and id_path is updated
    :param df: pd.DataFrame containing the samples
    :param id_path: path to the identification csv file
    :param purpose: Purpose to assign
    :param remainder: Purpose to assign to remainder or none to leave unassigned
    :param column: Column to assign split purposes to
    :return:
    """

    log.info("Looking for previous train / development split")

    known_ids = None
    if io.isfile(id_path):
        df, known_ids = load_sample_identification(df, id_path, column=column, io=io)
        log.info("Using train / development split from run cached in artifacts")
    else:
        log.info("No initial sample identification file found")

    df = self.assign(df, purpose=purpose, remainder=remainder, column=column)

    save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io)

    return df

Inherited members