Module brevettiai.platform.models.dataset
Expand source code
import ast
import hashlib
import json
import logging
import re
import urllib.parse
from io import BytesIO
from typing import Optional, List, Dict, Any
from uuid import uuid4
import numpy as np
import pandas as pd
from pydantic import BaseModel, Field, PrivateAttr, root_validator
from brevettiai.data.image import ImageKeys
from brevettiai.data.sample_integrity import load_sample_identification, save_sample_identification, SampleSplit
from brevettiai.data.sample_tools import BrevettiDatasetSamples, get_samples, save_samples
from brevettiai.io import IoTools, io_tools
from brevettiai.platform.models import Tag
from brevettiai.platform.models import PlatformBackend
from brevettiai.platform.models import backend as platform_backend
log = logging.getLogger(__name__)
DATASET_ROOT = "__root__"
__all__ = ["tif2dzi", "BrevettiDatasetSamples", "load_sample_identification", "save_sample_identification",
"SampleSplit", "get_samples", "save_samples", "Dataset"]
DATASET_LOCATIONS = dict(
annotations=".annotations",
meta=".meta",
samples=".samples",
data="",
)
def get_category(mapping, keys, default=None):
try:
key, keys = keys[0], keys[1:]
default = default or key
if keys:
return mapping.get(key, get_category(mapping, keys, default=default))
else:
return mapping.get(key, default)
except IndexError:
return mapping.get(default[0], default)
class Dataset(BaseModel):
"""
Model defining a dataset on the Brevetti platform
"""
id: str = Field(default_factory=lambda: str(uuid4()))
bucket: Optional[str]
name: str
locked: bool = False
reference: Optional[str] = ""
notes: Optional[str] = ""
tags: List[Tag] = Field(default_factory=list, description="testsds")
_io: IoTools = PrivateAttr(default=None)
_backend: PlatformBackend = PrivateAttr(default=None)
_uri_offset = PrivateAttr(default=None)
def __init__(self, io=io_tools, backend=platform_backend, resolve_access_rights: bool = False, **data) -> None:
super().__init__(**data)
self._io = io
self._backend = backend
if self.bucket is None:
self.bucket = backend.resource_path(self.id)
if resolve_access_rights:
self.resolve_access_rights()
@root_validator(pre=True, allow_reuse=True)
def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]:
values["tags"] = [Tag(id=x, name="<Unknown>") for x in values.pop("tagIds", tuple())]
return values
@property
def backend(self):
return self._backend
@property
def io(self):
assert self._io is not None, "Remember to call start_job()"
return self._io
def resolve_access_rights(self):
self.io.resolve_access_rights(path=self.bucket, resource_id=self.id, resource_type="dataset", mode='w')
def get_image_samples(self, sample_filter=r".+(\.tif|\.bmp|\.jpeg|\.jpg|\.png|\.JPG)$",
annotations=False, location="data", **kwargs):
"""
:param sample_filter: Filter samples by regex
:param annotations: boolean, or dict Load annotation paths
:param location: location in dataset ("data", "annotations", "samples", or path in dataset to search under)
:param kwargs:
:return: Dataframe of samples
Mapping from path to category:
Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply
its key as the category. If no match is found, apply leaf folder name
Example:
class_mapping={
"A": ["Tilted"],
"B": ["Tilted"],
"1": ["Crimp"]
}
If classes is True or a list/set of categories. The filter is applied after the mapping.
"""
log.info(f"Getting image samples from dataset '{self.name}' [{self.get_uri()}]")
samples = self.find_files(self.get_location(location), sample_filter=sample_filter, exclude_hidden=True,
**kwargs)
if annotations is not False:
for ann in annotations if isinstance(annotations, (tuple, list)) else [annotations]:
samples = self.merge_annotations(samples, **(ann if isinstance(ann, dict) else {}))
samples["bucket"] = self.bucket
samples["dataset"] = self.name
samples["dataset_id"] = str(self.id)
samples["reference"] = self.reference or "N/A"
samples["url"] = samples.path.apply(self.get_sample_uri)
log.info(f"Contents: {samples.category.value_counts().to_dict()}")
return samples
def get_annotations(self, filter=None):
log.info(f"Getting annotations from dataset '{self.name}' [{self.get_uri()}] with filter: {filter}")
samples = self.find_files(self.get_location("annotations"),
sample_filter=filter,
default_category_folder_index=-2, full_path=True)
return samples
def merge_annotations(self, samples, filter=None, duplicates="last", how="inner", prefix="segmentation_"):
"""
:param samples: Samples to merge with annotations
:param filter: Annotation filter
:param duplicates: What to do about duplicate annotations
True: include all, False: remove all, 'first': keep first, 'last' keep last
:param how: Join mode between samples and annotations
:param prefix: naming prefix for annotation file paths
:return:
"""
samples.index = samples.path.apply(self.get_ds_path)
ann = self.get_annotations(filter=filter).set_index("folder")
del ann["category"]
if duplicates is not True:
ann = ann[~ann.index.duplicated(keep=duplicates)]
samples = samples.join(ann.add_prefix(prefix), how=how)
mask = samples.select_dtypes(include=["number", "bool", "object"]).columns
samples[mask] = samples[mask].fillna("")
return samples.reset_index(drop=True)
def get_samples(self, target):
"""
Get samples from sample definition file located in .samples
:param target: file path from bucket/.samples/
:return: pandas dataframe of samples
"""
target = (target,) if isinstance(target, str) else target
sample_file = self.get_location("samples", *target)
sep = self.io.path.get_sep(self.bucket)
if self.io.isfile(sample_file):
df = pd.read_csv(BytesIO(self.io.read_file(sample_file)), sep=";")
if df.shape[1] == 1:
df = pd.read_csv(BytesIO(self.io.read_file(sample_file)))
else:
df = pd.DataFrame(dict(path=["NaN"], category=[("NaN",)]))[:0]
if ImageKeys.SIZE in df:
df[ImageKeys.SIZE] = df[ImageKeys.SIZE].apply(lambda sz: np.array(json.loads(sz)))
if ImageKeys.BOUNDING_BOX in df:
df[ImageKeys.BOUNDING_BOX] = df[ImageKeys.BOUNDING_BOX].apply(lambda bbox: np.array(json.loads(bbox)))
df["folder"] = df.path.str.rsplit(sep, 2).str[-2].fillna(DATASET_ROOT) # path is still relative path
df.path = df.path.apply(lambda pp: self.io.path.join(self.bucket, pp))
if "segmentation_path" in df:
df.segmentation_path = df.segmentation_path.fillna("")
df.segmentation_path = df.segmentation_path.apply(lambda sp: self.io.path.join(self.bucket, sp))
df["bucket"] = self.bucket
df["dataset"] = self.name
df["dataset_id"] = str(self.id)
df["url"] = df.path.apply(self.get_sample_uri)
df["category"] = df.category.apply(ast.literal_eval)
return df
def save_samples(self, target, df):
"""
Save samples in dataset to samples file
:param target:
:param df:
:return:
"""
target = (target,) if isinstance(target, str) else target
sample_file = self.get_location("samples", *target)
# Only save samples from this dataset
df = df[df.path.str.startswith(self.bucket)]
# Only save non known columns
df = df.iloc[:, ~df.columns.isin({"folder", "bucket", "dataset", "dataset_id", "url"})]
df.path = df.path.apply(self.get_ds_path)
if "segmentation_path" in df:
df.segmentation_path = df.segmentation_path.apply(self.get_ds_path)
self.upload(sample_file, df.to_csv(index=False, sep=";"))
def get_meta(self, filter=None):
return self.find_files(self.get_location("meta"),
sample_filter=filter,
default_category_folder_index=-2, full_path=True)
def find_files(self, path=None, *args, **kwargs):
path = path or self.bucket
return pd.DataFrame(self.sample_walk(path, *args, **kwargs),
columns=("category", "folder", "path", "etag"))
def sample_walk(self, bucket, sample_filter=None, class_mapping: dict = None, classes: list = None,
default_category=(DATASET_ROOT,),
exclude_hidden=False, default_category_folder_index=-1, full_path=False, calculate_md5=True,
**kwargs):
class_mapping = class_mapping or {}
classes = classes or []
if isinstance(sample_filter, (list, tuple, set)):
sample_filter = "|".join(map(str, sample_filter))
if isinstance(sample_filter, str):
sample_filter = re.compile(sample_filter).search
class_mapping = {k: ((v,) if isinstance(v, str) else tuple(v)) for k, v in class_mapping.items()}
bucket_offset = len(bucket)
sep = self.io.path.get_sep(bucket)
for r, dirs, files in self.io.walk(bucket, exclude_hidden=exclude_hidden, include_object=True):
rel_path = r[bucket_offset:].strip(sep)
folders = [] if rel_path == '' else [DATASET_ROOT] + rel_path.split(sep)
def_cat = (folders[default_category_folder_index],) if folders else default_category
category = get_category(class_mapping, folders[::-1], def_cat)
if classes:
if isinstance(category, str): # If category is string make sure it is in allowed classes
if category not in classes:
continue
elif len(category) > 0: # IF category is not empty filter allowed classes
category = tuple(c for c in category if c in classes)
if len(category) == 0:
continue
if len(files) > 0:
for file in files:
if isinstance(file, tuple):
file, fobj = file
else:
fobj = None
if sample_filter is None or sample_filter(file.lower()):
folder = rel_path if full_path else (DATASET_ROOT, *folders)[-1]
path = self.io.path.join(r, file)
if calculate_md5 and (fobj is None or len(fobj.etag) != 32 or "-" in fobj.etag):
etag = self.io.get_md5(path)
elif fobj is None:
etag = hashlib.sha1(path.encode("utf8")).hexdigest()
else:
etag = fobj.etag
yield category, folder, path, etag
@property
def uri_offset(self):
if self._uri_offset is not None:
return self._uri_offset
self._uri_offset = self.bucket.find("/", self.bucket.find("://") + 3) + 1
return self._uri_offset
def get_uri(self):
return f"{self.backend.host}/data/{self.id}"
def get_sample_uri(self, path):
return f"{self.backend.host}/download?path={urllib.parse.quote(path[self.uri_offset:], safe='')}"
def upload(self, path, data):
pth = self.get_location(path)
return self.io.write_file(pth, data)
def get_ds_path(self, path):
return path[len(self.bucket) + 1:]
def get_location(self, mode, *path):
"""Get path to object, prefixing 'annotations', 'data', 'samples' with . if they are in the first argument """
location = DATASET_LOCATIONS.get(mode, mode)
path = (location, *path) if location else path
return self.io.path.join(self.bucket, *path)
def __str__(self):
return json.dumps({k: v for k, v in self.__dict__.items() if not k.startswith("_")})
def tif2dzi(path, bucket):
if ".tif" in path:
rel_path = path.replace(bucket, "").strip("/")
return io_tools.path.join(bucket.strip("/"), ".tiles", rel_path, "dzi.json")
else:
return path
Functions
def get_samples(datasets, target, *args, **kwargs)
-
Utility function for getting samples across multiple datasets by sample files :param datasets: :param target: :param args: :param kwargs: :return:
Expand source code
def get_samples(datasets, target, *args, **kwargs): """ Utility function for getting samples across multiple datasets by sample files :param datasets: :param target: :param args: :param kwargs: :return: """ samples = (d.get_samples(target) for d in sorted(datasets, key=lambda x: x.id)) return pd.concat(samples).reset_index(drop=True)
def load_sample_identification(df, path, column='purpose', io=<brevettiai.io.utils.IoTools object>, **kwargs)
-
Load and join sample identification information onto dataframe of samples :param df: sample dataframe :param path: path to sample id file :param column: name of split column :param kwargs: extra args for io_tools.read_file :return: df, extra_ids
Expand source code
def load_sample_identification(df, path, column="purpose", io=io_tools, **kwargs): """ Load and join sample identification information onto dataframe of samples :param df: sample dataframe :param path: path to sample id file :param column: name of split column :param kwargs: extra args for io_tools.read_file :return: df, extra_ids """ dfid = pd.read_csv(BytesIO(io.read_file(path, **kwargs)), index_col="etag") if column not in dfid.columns: dfid.rename(columns={dfid.columns[0]: column}) return merge_sample_identification(df, dfid, on="etag")
def save_sample_identification(df, path, known_ids=None, column='purpose', io=<brevettiai.io.utils.IoTools object>)
-
Expand source code
def save_sample_identification(df, path, known_ids=None, column="purpose", io=io_tools): columns = ["etag", column] df = df[columns].set_index("etag") if df.index.has_duplicates: log.info("Duplicate etag entries among samples, saving highest priority purpose") df = df.iloc[np.argsort(df.purpose.map({"train": 1, "devel": 2, "development": 2, "test": 3}).fillna(4))] df = df[~df.index.duplicated(keep="first")] io.write_file(path, df.append(known_ids).to_csv(header=True))
def save_samples(datasets, target, df)
-
Expand source code
def save_samples(datasets, target, df): for d in datasets: d.save_samples(target, df)
def tif2dzi(path, bucket)
-
Expand source code
def tif2dzi(path, bucket): if ".tif" in path: rel_path = path.replace(bucket, "").strip("/") return io_tools.path.join(bucket.strip("/"), ".tiles", rel_path, "dzi.json") else: return path
Classes
class BrevettiDatasetSamples (classes: list = None, class_mapping: dict = None, annotations: dict = None, calculate_md5: bool = False, walk: bool = True, samples_file_name: str = None, contains_column: str = None, contains_regex: str = None)
-
Base class for serializable modules
:param classes: Force samples to be of the categories in this list :param class_mapping: dict of mapping from path to (category) class. See example for description
Expand source code
class BrevettiDatasetSamples(vue.VueSettingsModule): def __init__(self, classes: list = None, class_mapping: dict = None, annotations: dict = None, calculate_md5: bool = False, walk: bool = True, samples_file_name: str = None, contains_column: str = None, contains_regex: str = None): """ :param classes: Force samples to be of the categories in this list :param class_mapping: dict of mapping from path to (category) class. See example for description """ self.classes = classes or [] self.class_mapping = class_mapping or {} self.annotations = annotations or {} self.calculate_md5 = calculate_md5 self.walk = walk self.samples_file_name = samples_file_name or "" self.contains_column = contains_column or "" self.contains_regex = contains_regex or "" def get_image_samples(self, datasets, *args, **kwargs): """ :param sample_filter: Filter samples by regex :param annotations: boolean, or dict Load annotation paths :param kwargs: :return: Dataframe of samples Mapping from path to category: Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply its key as the category. If no match is found, apply leaf folder name Example: class_mapping={ "A": ["Tilted"], "B": ["Tilted"], "1": ["Crimp"] } If classes is True or a list/set of categories. The filter is applied after the mapping. """ with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: futures = [executor.submit(ds.get_image_samples, *args, **{**self.__dict__, **kwargs}) for ds in sorted(datasets, key=lambda x: x.id)] return pd.concat([f.result() for f in futures]).reset_index(drop=True) def get_samples(self, datasets, walk=None, *args, **kwargs): """ Utility function for getting samples across multiple datasets by sample files :param datasets: :param target: :param args: :param kwargs: :return: """ walk = walk if walk is not None else self.walk if walk: df = self.get_image_samples(datasets, *args, **kwargs) else: df = get_samples(datasets, self.samples_file_name) if self.contains_column: df = df[df[self.contains_column].str.contains(self.contains_regex, regex=True, na=False)] assert not df.empty, "No samples found" return df
Ancestors
Methods
def get_image_samples(self, datasets, *args, **kwargs)
-
:param sample_filter: Filter samples by regex :param annotations: boolean, or dict Load annotation paths :param kwargs: :return: Dataframe of samples
Mapping from path to category: Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply its key as the category. If no match is found, apply leaf folder name Example: class_mapping={ "A": ["Tilted"], "B": ["Tilted"], "1": ["Crimp"] } If classes is True or a list/set of categories. The filter is applied after the mapping.
Expand source code
def get_image_samples(self, datasets, *args, **kwargs): """ :param sample_filter: Filter samples by regex :param annotations: boolean, or dict Load annotation paths :param kwargs: :return: Dataframe of samples Mapping from path to category: Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply its key as the category. If no match is found, apply leaf folder name Example: class_mapping={ "A": ["Tilted"], "B": ["Tilted"], "1": ["Crimp"] } If classes is True or a list/set of categories. The filter is applied after the mapping. """ with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: futures = [executor.submit(ds.get_image_samples, *args, **{**self.__dict__, **kwargs}) for ds in sorted(datasets, key=lambda x: x.id)] return pd.concat([f.result() for f in futures]).reset_index(drop=True)
def get_samples(self, datasets, walk=None, *args, **kwargs)
-
Utility function for getting samples across multiple datasets by sample files :param datasets: :param target: :param args: :param kwargs: :return:
Expand source code
def get_samples(self, datasets, walk=None, *args, **kwargs): """ Utility function for getting samples across multiple datasets by sample files :param datasets: :param target: :param args: :param kwargs: :return: """ walk = walk if walk is not None else self.walk if walk: df = self.get_image_samples(datasets, *args, **kwargs) else: df = get_samples(datasets, self.samples_file_name) if self.contains_column: df = df[df[self.contains_column].str.contains(self.contains_regex, regex=True, na=False)] assert not df.empty, "No samples found" return df
Inherited members
class Dataset (io=<brevettiai.io.utils.IoTools object>, backend=PlatformBackend(host='https://platform.brevetti.ai', output_segmentation_dir='output_segmentations', bucket_region='eu-west-1', data_bucket='s3://data.criterion.ai', custom_job_id='a0aaad69-c032-41c1-a68c-e9a15a5fb18c'), resolve_access_rights: bool = False, **data)
-
Model defining a dataset on the Brevetti platform
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class Dataset(BaseModel): """ Model defining a dataset on the Brevetti platform """ id: str = Field(default_factory=lambda: str(uuid4())) bucket: Optional[str] name: str locked: bool = False reference: Optional[str] = "" notes: Optional[str] = "" tags: List[Tag] = Field(default_factory=list, description="testsds") _io: IoTools = PrivateAttr(default=None) _backend: PlatformBackend = PrivateAttr(default=None) _uri_offset = PrivateAttr(default=None) def __init__(self, io=io_tools, backend=platform_backend, resolve_access_rights: bool = False, **data) -> None: super().__init__(**data) self._io = io self._backend = backend if self.bucket is None: self.bucket = backend.resource_path(self.id) if resolve_access_rights: self.resolve_access_rights() @root_validator(pre=True, allow_reuse=True) def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]: values["tags"] = [Tag(id=x, name="<Unknown>") for x in values.pop("tagIds", tuple())] return values @property def backend(self): return self._backend @property def io(self): assert self._io is not None, "Remember to call start_job()" return self._io def resolve_access_rights(self): self.io.resolve_access_rights(path=self.bucket, resource_id=self.id, resource_type="dataset", mode='w') def get_image_samples(self, sample_filter=r".+(\.tif|\.bmp|\.jpeg|\.jpg|\.png|\.JPG)$", annotations=False, location="data", **kwargs): """ :param sample_filter: Filter samples by regex :param annotations: boolean, or dict Load annotation paths :param location: location in dataset ("data", "annotations", "samples", or path in dataset to search under) :param kwargs: :return: Dataframe of samples Mapping from path to category: Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply its key as the category. If no match is found, apply leaf folder name Example: class_mapping={ "A": ["Tilted"], "B": ["Tilted"], "1": ["Crimp"] } If classes is True or a list/set of categories. The filter is applied after the mapping. """ log.info(f"Getting image samples from dataset '{self.name}' [{self.get_uri()}]") samples = self.find_files(self.get_location(location), sample_filter=sample_filter, exclude_hidden=True, **kwargs) if annotations is not False: for ann in annotations if isinstance(annotations, (tuple, list)) else [annotations]: samples = self.merge_annotations(samples, **(ann if isinstance(ann, dict) else {})) samples["bucket"] = self.bucket samples["dataset"] = self.name samples["dataset_id"] = str(self.id) samples["reference"] = self.reference or "N/A" samples["url"] = samples.path.apply(self.get_sample_uri) log.info(f"Contents: {samples.category.value_counts().to_dict()}") return samples def get_annotations(self, filter=None): log.info(f"Getting annotations from dataset '{self.name}' [{self.get_uri()}] with filter: {filter}") samples = self.find_files(self.get_location("annotations"), sample_filter=filter, default_category_folder_index=-2, full_path=True) return samples def merge_annotations(self, samples, filter=None, duplicates="last", how="inner", prefix="segmentation_"): """ :param samples: Samples to merge with annotations :param filter: Annotation filter :param duplicates: What to do about duplicate annotations True: include all, False: remove all, 'first': keep first, 'last' keep last :param how: Join mode between samples and annotations :param prefix: naming prefix for annotation file paths :return: """ samples.index = samples.path.apply(self.get_ds_path) ann = self.get_annotations(filter=filter).set_index("folder") del ann["category"] if duplicates is not True: ann = ann[~ann.index.duplicated(keep=duplicates)] samples = samples.join(ann.add_prefix(prefix), how=how) mask = samples.select_dtypes(include=["number", "bool", "object"]).columns samples[mask] = samples[mask].fillna("") return samples.reset_index(drop=True) def get_samples(self, target): """ Get samples from sample definition file located in .samples :param target: file path from bucket/.samples/ :return: pandas dataframe of samples """ target = (target,) if isinstance(target, str) else target sample_file = self.get_location("samples", *target) sep = self.io.path.get_sep(self.bucket) if self.io.isfile(sample_file): df = pd.read_csv(BytesIO(self.io.read_file(sample_file)), sep=";") if df.shape[1] == 1: df = pd.read_csv(BytesIO(self.io.read_file(sample_file))) else: df = pd.DataFrame(dict(path=["NaN"], category=[("NaN",)]))[:0] if ImageKeys.SIZE in df: df[ImageKeys.SIZE] = df[ImageKeys.SIZE].apply(lambda sz: np.array(json.loads(sz))) if ImageKeys.BOUNDING_BOX in df: df[ImageKeys.BOUNDING_BOX] = df[ImageKeys.BOUNDING_BOX].apply(lambda bbox: np.array(json.loads(bbox))) df["folder"] = df.path.str.rsplit(sep, 2).str[-2].fillna(DATASET_ROOT) # path is still relative path df.path = df.path.apply(lambda pp: self.io.path.join(self.bucket, pp)) if "segmentation_path" in df: df.segmentation_path = df.segmentation_path.fillna("") df.segmentation_path = df.segmentation_path.apply(lambda sp: self.io.path.join(self.bucket, sp)) df["bucket"] = self.bucket df["dataset"] = self.name df["dataset_id"] = str(self.id) df["url"] = df.path.apply(self.get_sample_uri) df["category"] = df.category.apply(ast.literal_eval) return df def save_samples(self, target, df): """ Save samples in dataset to samples file :param target: :param df: :return: """ target = (target,) if isinstance(target, str) else target sample_file = self.get_location("samples", *target) # Only save samples from this dataset df = df[df.path.str.startswith(self.bucket)] # Only save non known columns df = df.iloc[:, ~df.columns.isin({"folder", "bucket", "dataset", "dataset_id", "url"})] df.path = df.path.apply(self.get_ds_path) if "segmentation_path" in df: df.segmentation_path = df.segmentation_path.apply(self.get_ds_path) self.upload(sample_file, df.to_csv(index=False, sep=";")) def get_meta(self, filter=None): return self.find_files(self.get_location("meta"), sample_filter=filter, default_category_folder_index=-2, full_path=True) def find_files(self, path=None, *args, **kwargs): path = path or self.bucket return pd.DataFrame(self.sample_walk(path, *args, **kwargs), columns=("category", "folder", "path", "etag")) def sample_walk(self, bucket, sample_filter=None, class_mapping: dict = None, classes: list = None, default_category=(DATASET_ROOT,), exclude_hidden=False, default_category_folder_index=-1, full_path=False, calculate_md5=True, **kwargs): class_mapping = class_mapping or {} classes = classes or [] if isinstance(sample_filter, (list, tuple, set)): sample_filter = "|".join(map(str, sample_filter)) if isinstance(sample_filter, str): sample_filter = re.compile(sample_filter).search class_mapping = {k: ((v,) if isinstance(v, str) else tuple(v)) for k, v in class_mapping.items()} bucket_offset = len(bucket) sep = self.io.path.get_sep(bucket) for r, dirs, files in self.io.walk(bucket, exclude_hidden=exclude_hidden, include_object=True): rel_path = r[bucket_offset:].strip(sep) folders = [] if rel_path == '' else [DATASET_ROOT] + rel_path.split(sep) def_cat = (folders[default_category_folder_index],) if folders else default_category category = get_category(class_mapping, folders[::-1], def_cat) if classes: if isinstance(category, str): # If category is string make sure it is in allowed classes if category not in classes: continue elif len(category) > 0: # IF category is not empty filter allowed classes category = tuple(c for c in category if c in classes) if len(category) == 0: continue if len(files) > 0: for file in files: if isinstance(file, tuple): file, fobj = file else: fobj = None if sample_filter is None or sample_filter(file.lower()): folder = rel_path if full_path else (DATASET_ROOT, *folders)[-1] path = self.io.path.join(r, file) if calculate_md5 and (fobj is None or len(fobj.etag) != 32 or "-" in fobj.etag): etag = self.io.get_md5(path) elif fobj is None: etag = hashlib.sha1(path.encode("utf8")).hexdigest() else: etag = fobj.etag yield category, folder, path, etag @property def uri_offset(self): if self._uri_offset is not None: return self._uri_offset self._uri_offset = self.bucket.find("/", self.bucket.find("://") + 3) + 1 return self._uri_offset def get_uri(self): return f"{self.backend.host}/data/{self.id}" def get_sample_uri(self, path): return f"{self.backend.host}/download?path={urllib.parse.quote(path[self.uri_offset:], safe='')}" def upload(self, path, data): pth = self.get_location(path) return self.io.write_file(pth, data) def get_ds_path(self, path): return path[len(self.bucket) + 1:] def get_location(self, mode, *path): """Get path to object, prefixing 'annotations', 'data', 'samples' with . if they are in the first argument """ location = DATASET_LOCATIONS.get(mode, mode) path = (location, *path) if location else path return self.io.path.join(self.bucket, *path) def __str__(self): return json.dumps({k: v for k, v in self.__dict__.items() if not k.startswith("_")})
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var bucket : Optional[str]
var id : str
var locked : bool
var name : str
var notes : Optional[str]
var reference : Optional[str]
Static methods
def parse_settings(values: Dict[str, Any]) ‑> Dict[str, Any]
-
Expand source code
@root_validator(pre=True, allow_reuse=True) def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]: values["tags"] = [Tag(id=x, name="<Unknown>") for x in values.pop("tagIds", tuple())] return values
Instance variables
var backend
-
Expand source code
@property def backend(self): return self._backend
var io
-
Expand source code
@property def io(self): assert self._io is not None, "Remember to call start_job()" return self._io
var uri_offset
-
Expand source code
@property def uri_offset(self): if self._uri_offset is not None: return self._uri_offset self._uri_offset = self.bucket.find("/", self.bucket.find("://") + 3) + 1 return self._uri_offset
Methods
def find_files(self, path=None, *args, **kwargs)
-
Expand source code
def find_files(self, path=None, *args, **kwargs): path = path or self.bucket return pd.DataFrame(self.sample_walk(path, *args, **kwargs), columns=("category", "folder", "path", "etag"))
def get_annotations(self, filter=None)
-
Expand source code
def get_annotations(self, filter=None): log.info(f"Getting annotations from dataset '{self.name}' [{self.get_uri()}] with filter: {filter}") samples = self.find_files(self.get_location("annotations"), sample_filter=filter, default_category_folder_index=-2, full_path=True) return samples
def get_ds_path(self, path)
-
Expand source code
def get_ds_path(self, path): return path[len(self.bucket) + 1:]
def get_image_samples(self, sample_filter='.+(\\.tif|\\.bmp|\\.jpeg|\\.jpg|\\.png|\\.JPG)$', annotations=False, location='data', **kwargs)
-
:param sample_filter: Filter samples by regex :param annotations: boolean, or dict Load annotation paths :param location: location in dataset ("data", "annotations", "samples", or path in dataset to search under) :param kwargs: :return: Dataframe of samples
Mapping from path to category: Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply its key as the category. If no match is found, apply leaf folder name Example: class_mapping={ "A": ["Tilted"], "B": ["Tilted"], "1": ["Crimp"] } If classes is True or a list/set of categories. The filter is applied after the mapping.
Expand source code
def get_image_samples(self, sample_filter=r".+(\.tif|\.bmp|\.jpeg|\.jpg|\.png|\.JPG)$", annotations=False, location="data", **kwargs): """ :param sample_filter: Filter samples by regex :param annotations: boolean, or dict Load annotation paths :param location: location in dataset ("data", "annotations", "samples", or path in dataset to search under) :param kwargs: :return: Dataframe of samples Mapping from path to category: Start from the leaf folder, and work towards the dataset root folder. If folder in class_mapping then apply its key as the category. If no match is found, apply leaf folder name Example: class_mapping={ "A": ["Tilted"], "B": ["Tilted"], "1": ["Crimp"] } If classes is True or a list/set of categories. The filter is applied after the mapping. """ log.info(f"Getting image samples from dataset '{self.name}' [{self.get_uri()}]") samples = self.find_files(self.get_location(location), sample_filter=sample_filter, exclude_hidden=True, **kwargs) if annotations is not False: for ann in annotations if isinstance(annotations, (tuple, list)) else [annotations]: samples = self.merge_annotations(samples, **(ann if isinstance(ann, dict) else {})) samples["bucket"] = self.bucket samples["dataset"] = self.name samples["dataset_id"] = str(self.id) samples["reference"] = self.reference or "N/A" samples["url"] = samples.path.apply(self.get_sample_uri) log.info(f"Contents: {samples.category.value_counts().to_dict()}") return samples
def get_location(self, mode, *path)
-
Get path to object, prefixing 'annotations', 'data', 'samples' with . if they are in the first argument
Expand source code
def get_location(self, mode, *path): """Get path to object, prefixing 'annotations', 'data', 'samples' with . if they are in the first argument """ location = DATASET_LOCATIONS.get(mode, mode) path = (location, *path) if location else path return self.io.path.join(self.bucket, *path)
def get_meta(self, filter=None)
-
Expand source code
def get_meta(self, filter=None): return self.find_files(self.get_location("meta"), sample_filter=filter, default_category_folder_index=-2, full_path=True)
def get_sample_uri(self, path)
-
Expand source code
def get_sample_uri(self, path): return f"{self.backend.host}/download?path={urllib.parse.quote(path[self.uri_offset:], safe='')}"
def get_samples(self, target)
-
Get samples from sample definition file located in .samples :param target: file path from bucket/.samples/ :return: pandas dataframe of samples
Expand source code
def get_samples(self, target): """ Get samples from sample definition file located in .samples :param target: file path from bucket/.samples/ :return: pandas dataframe of samples """ target = (target,) if isinstance(target, str) else target sample_file = self.get_location("samples", *target) sep = self.io.path.get_sep(self.bucket) if self.io.isfile(sample_file): df = pd.read_csv(BytesIO(self.io.read_file(sample_file)), sep=";") if df.shape[1] == 1: df = pd.read_csv(BytesIO(self.io.read_file(sample_file))) else: df = pd.DataFrame(dict(path=["NaN"], category=[("NaN",)]))[:0] if ImageKeys.SIZE in df: df[ImageKeys.SIZE] = df[ImageKeys.SIZE].apply(lambda sz: np.array(json.loads(sz))) if ImageKeys.BOUNDING_BOX in df: df[ImageKeys.BOUNDING_BOX] = df[ImageKeys.BOUNDING_BOX].apply(lambda bbox: np.array(json.loads(bbox))) df["folder"] = df.path.str.rsplit(sep, 2).str[-2].fillna(DATASET_ROOT) # path is still relative path df.path = df.path.apply(lambda pp: self.io.path.join(self.bucket, pp)) if "segmentation_path" in df: df.segmentation_path = df.segmentation_path.fillna("") df.segmentation_path = df.segmentation_path.apply(lambda sp: self.io.path.join(self.bucket, sp)) df["bucket"] = self.bucket df["dataset"] = self.name df["dataset_id"] = str(self.id) df["url"] = df.path.apply(self.get_sample_uri) df["category"] = df.category.apply(ast.literal_eval) return df
def get_uri(self)
-
Expand source code
def get_uri(self): return f"{self.backend.host}/data/{self.id}"
def merge_annotations(self, samples, filter=None, duplicates='last', how='inner', prefix='segmentation_')
-
:param samples: Samples to merge with annotations :param filter: Annotation filter :param duplicates: What to do about duplicate annotations True: include all, False: remove all, 'first': keep first, 'last' keep last :param how: Join mode between samples and annotations :param prefix: naming prefix for annotation file paths :return:
Expand source code
def merge_annotations(self, samples, filter=None, duplicates="last", how="inner", prefix="segmentation_"): """ :param samples: Samples to merge with annotations :param filter: Annotation filter :param duplicates: What to do about duplicate annotations True: include all, False: remove all, 'first': keep first, 'last' keep last :param how: Join mode between samples and annotations :param prefix: naming prefix for annotation file paths :return: """ samples.index = samples.path.apply(self.get_ds_path) ann = self.get_annotations(filter=filter).set_index("folder") del ann["category"] if duplicates is not True: ann = ann[~ann.index.duplicated(keep=duplicates)] samples = samples.join(ann.add_prefix(prefix), how=how) mask = samples.select_dtypes(include=["number", "bool", "object"]).columns samples[mask] = samples[mask].fillna("") return samples.reset_index(drop=True)
def resolve_access_rights(self)
-
Expand source code
def resolve_access_rights(self): self.io.resolve_access_rights(path=self.bucket, resource_id=self.id, resource_type="dataset", mode='w')
def sample_walk(self, bucket, sample_filter=None, class_mapping: dict = None, classes: list = None, default_category=('__root__',), exclude_hidden=False, default_category_folder_index=-1, full_path=False, calculate_md5=True, **kwargs)
-
Expand source code
def sample_walk(self, bucket, sample_filter=None, class_mapping: dict = None, classes: list = None, default_category=(DATASET_ROOT,), exclude_hidden=False, default_category_folder_index=-1, full_path=False, calculate_md5=True, **kwargs): class_mapping = class_mapping or {} classes = classes or [] if isinstance(sample_filter, (list, tuple, set)): sample_filter = "|".join(map(str, sample_filter)) if isinstance(sample_filter, str): sample_filter = re.compile(sample_filter).search class_mapping = {k: ((v,) if isinstance(v, str) else tuple(v)) for k, v in class_mapping.items()} bucket_offset = len(bucket) sep = self.io.path.get_sep(bucket) for r, dirs, files in self.io.walk(bucket, exclude_hidden=exclude_hidden, include_object=True): rel_path = r[bucket_offset:].strip(sep) folders = [] if rel_path == '' else [DATASET_ROOT] + rel_path.split(sep) def_cat = (folders[default_category_folder_index],) if folders else default_category category = get_category(class_mapping, folders[::-1], def_cat) if classes: if isinstance(category, str): # If category is string make sure it is in allowed classes if category not in classes: continue elif len(category) > 0: # IF category is not empty filter allowed classes category = tuple(c for c in category if c in classes) if len(category) == 0: continue if len(files) > 0: for file in files: if isinstance(file, tuple): file, fobj = file else: fobj = None if sample_filter is None or sample_filter(file.lower()): folder = rel_path if full_path else (DATASET_ROOT, *folders)[-1] path = self.io.path.join(r, file) if calculate_md5 and (fobj is None or len(fobj.etag) != 32 or "-" in fobj.etag): etag = self.io.get_md5(path) elif fobj is None: etag = hashlib.sha1(path.encode("utf8")).hexdigest() else: etag = fobj.etag yield category, folder, path, etag
def save_samples(self, target, df)
-
Save samples in dataset to samples file :param target: :param df: :return:
Expand source code
def save_samples(self, target, df): """ Save samples in dataset to samples file :param target: :param df: :return: """ target = (target,) if isinstance(target, str) else target sample_file = self.get_location("samples", *target) # Only save samples from this dataset df = df[df.path.str.startswith(self.bucket)] # Only save non known columns df = df.iloc[:, ~df.columns.isin({"folder", "bucket", "dataset", "dataset_id", "url"})] df.path = df.path.apply(self.get_ds_path) if "segmentation_path" in df: df.segmentation_path = df.segmentation_path.apply(self.get_ds_path) self.upload(sample_file, df.to_csv(index=False, sep=";"))
def upload(self, path, data)
-
Expand source code
def upload(self, path, data): pth = self.get_location(path) return self.io.write_file(pth, data)
class SampleSplit (stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1, mode='sorted_permutation')
-
Base class for serializable modules
:param stratification: As regex string performed on df.path or list selecting columns :param uniqueness: As regex string performed on df.path or list selecting columns :param split: fraction of samples to apply the purpose on :param seed: seeding for assignment :param mode: ' or 'murmurhash3' :return:
Expand source code
class SampleSplit(vue.VueSettingsModule): MODE_MURMURHASH3 = "murmurhash3" MODE_SORTED_PERMUTATION = "sorted_permutation" def __init__(self, stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1, mode=MODE_SORTED_PERMUTATION): """ :param stratification: As regex string performed on df.path or list selecting columns :param uniqueness: As regex string performed on df.path or list selecting columns :param split: fraction of samples to apply the purpose on :param seed: seeding for assignment :param mode: ' or 'murmurhash3' :return: """ self.stratification = stratification try: if isinstance(uniqueness, str): uniqueness = json.loads(uniqueness) except json.JSONDecodeError: pass self.uniqueness = uniqueness or ["etag"] self.split = split self.seed = seed self.mode = mode assert self.mode in {self.MODE_MURMURHASH3, self.MODE_SORTED_PERMUTATION} def assign(self, df, purpose="train", remainder=None, column="purpose"): """ Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy. Definitions: * Stratification: Grouping of samples which should be treated as individual groups. meaning every group must be split according to the sample split target percentage, and uniqueness is performed on a per group basis * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose. :param df: pd.DataFrame of samples if purpose column does not exist it is added :param purpose: purpose to be assigned :param remainder: purpose to assign remainder samples, or None to leave unassigned :param column: column for assignment of split category """ # Ensure columns if column not in df: df[column] = pd.NA columns = df.columns split = self.split stratification = self.stratification uniqueness = self.uniqueness if split == 0 or ~df.purpose.isna().any(): # Assign no samples pass elif split == 1: # Assign all samples df.loc[df.purpose.isna(), column] = purpose else: # Parse regex stratification and uniqueness strategies if isinstance(stratification, str) and stratification: df["_stratification"] = df.path.str.extract(stratification)[0] stratification = ["_stratification"] assert stratification is None or all(x in df.columns for x in stratification), \ "stratification should be None or in columns" if isinstance(uniqueness, str) and uniqueness: df["_uniqueness"] = df.path.str.extract(uniqueness)[0] uniqueness = ["_uniqueness"] assert uniqueness is None or all(x in df.columns for x in uniqueness), \ "uniqueness should be None or in columns" seed = None if self.seed < 0 else self.seed rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed) def _split(g): if uniqueness: items = g[uniqueness + [column]].copy() items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2}) items = items.sort_values("_purpose_prio")[uniqueness + [column]] unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"]) unique_items.columns = ["samples", column] unique_items = unique_items.reset_index() else: unique_items = g[[column]].reset_index(drop=True) unique_items["samples"] = 1 # split unmarked items unmarked = unique_items[unique_items.purpose.isna()] # mode if unmarked.size > 0: if self.mode == self.MODE_MURMURHASH3: # Random seed for this stratified group mmh_seed = rng.randint(0x7FFFFFFF) # Extract uniqueness for hashing if uniqueness: unique_df = unmarked[uniqueness] else: unique_df = pd.DataFrame(unmarked.index) # Apply mmh3 hashing hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False), axis=1) # Assign unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose elif self.mode == self.MODE_SORTED_PERMUTATION or True: # default # Select unmarked to assign items_count = unique_items.samples.sum() marked_count = unique_items.samples[unique_items.purpose == purpose].sum() assign_count = items_count * split - marked_count unmarked = rng.permutation(unmarked.index) cdf = unique_items.samples[unmarked].cumsum() ix = np.searchsorted(cdf.values, assign_count, side="right") if len(cdf.values) > ix: ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1]))) # Assign unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose if uniqueness: g.loc[:, column] = unique_items.set_index(uniqueness) \ .loc[g[uniqueness].set_index(uniqueness).index].purpose.values else: g.loc[:, column] = unique_items.purpose.values return g if stratification: df = df.groupby(stratification).apply(_split) else: df = _split(df) if remainder: df.loc[df.purpose.isna(), column] = remainder # Ensure etag is unique across all stratified groups #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values return df[columns] def update_unassigned(self, df, id_path, purpose="train", remainder="devel", column="purpose", io=io_tools): """ Updates sample purpose in id_path that may hold previous dataset splits and sample ids Unassigned samples are also assigned and id_path is updated :param df: pd.DataFrame containing the samples :param id_path: path to the identification csv file :param purpose: Purpose to assign :param remainder: Purpose to assign to remainder or none to leave unassigned :param column: Column to assign split purposes to :return: """ log.info("Looking for previous train / development split") known_ids = None if io.isfile(id_path): df, known_ids = load_sample_identification(df, id_path, column=column, io=io) log.info("Using train / development split from run cached in artifacts") else: log.info("No initial sample identification file found") df = self.assign(df, purpose=purpose, remainder=remainder, column=column) save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io) return df
Ancestors
Class variables
var MODE_MURMURHASH3
var MODE_SORTED_PERMUTATION
Methods
def assign(self, df, purpose='train', remainder=None, column='purpose')
-
Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.
Definitions: * Stratification: Grouping of samples which should be treated as individual groups. meaning every group must be split according to the sample split target percentage, and uniqueness is performed on a per group basis * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.
:param df: pd.DataFrame of samples if purpose column does not exist it is added :param purpose: purpose to be assigned :param remainder: purpose to assign remainder samples, or None to leave unassigned :param column: column for assignment of split category
Expand source code
def assign(self, df, purpose="train", remainder=None, column="purpose"): """ Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy. Definitions: * Stratification: Grouping of samples which should be treated as individual groups. meaning every group must be split according to the sample split target percentage, and uniqueness is performed on a per group basis * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose. :param df: pd.DataFrame of samples if purpose column does not exist it is added :param purpose: purpose to be assigned :param remainder: purpose to assign remainder samples, or None to leave unassigned :param column: column for assignment of split category """ # Ensure columns if column not in df: df[column] = pd.NA columns = df.columns split = self.split stratification = self.stratification uniqueness = self.uniqueness if split == 0 or ~df.purpose.isna().any(): # Assign no samples pass elif split == 1: # Assign all samples df.loc[df.purpose.isna(), column] = purpose else: # Parse regex stratification and uniqueness strategies if isinstance(stratification, str) and stratification: df["_stratification"] = df.path.str.extract(stratification)[0] stratification = ["_stratification"] assert stratification is None or all(x in df.columns for x in stratification), \ "stratification should be None or in columns" if isinstance(uniqueness, str) and uniqueness: df["_uniqueness"] = df.path.str.extract(uniqueness)[0] uniqueness = ["_uniqueness"] assert uniqueness is None or all(x in df.columns for x in uniqueness), \ "uniqueness should be None or in columns" seed = None if self.seed < 0 else self.seed rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed) def _split(g): if uniqueness: items = g[uniqueness + [column]].copy() items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2}) items = items.sort_values("_purpose_prio")[uniqueness + [column]] unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"]) unique_items.columns = ["samples", column] unique_items = unique_items.reset_index() else: unique_items = g[[column]].reset_index(drop=True) unique_items["samples"] = 1 # split unmarked items unmarked = unique_items[unique_items.purpose.isna()] # mode if unmarked.size > 0: if self.mode == self.MODE_MURMURHASH3: # Random seed for this stratified group mmh_seed = rng.randint(0x7FFFFFFF) # Extract uniqueness for hashing if uniqueness: unique_df = unmarked[uniqueness] else: unique_df = pd.DataFrame(unmarked.index) # Apply mmh3 hashing hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False), axis=1) # Assign unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose elif self.mode == self.MODE_SORTED_PERMUTATION or True: # default # Select unmarked to assign items_count = unique_items.samples.sum() marked_count = unique_items.samples[unique_items.purpose == purpose].sum() assign_count = items_count * split - marked_count unmarked = rng.permutation(unmarked.index) cdf = unique_items.samples[unmarked].cumsum() ix = np.searchsorted(cdf.values, assign_count, side="right") if len(cdf.values) > ix: ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1]))) # Assign unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose if uniqueness: g.loc[:, column] = unique_items.set_index(uniqueness) \ .loc[g[uniqueness].set_index(uniqueness).index].purpose.values else: g.loc[:, column] = unique_items.purpose.values return g if stratification: df = df.groupby(stratification).apply(_split) else: df = _split(df) if remainder: df.loc[df.purpose.isna(), column] = remainder # Ensure etag is unique across all stratified groups #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values return df[columns]
def update_unassigned(self, df, id_path, purpose='train', remainder='devel', column='purpose', io=<brevettiai.io.utils.IoTools object>)
-
Updates sample purpose in id_path that may hold previous dataset splits and sample ids Unassigned samples are also assigned and id_path is updated :param df: pd.DataFrame containing the samples :param id_path: path to the identification csv file :param purpose: Purpose to assign :param remainder: Purpose to assign to remainder or none to leave unassigned :param column: Column to assign split purposes to :return:
Expand source code
def update_unassigned(self, df, id_path, purpose="train", remainder="devel", column="purpose", io=io_tools): """ Updates sample purpose in id_path that may hold previous dataset splits and sample ids Unassigned samples are also assigned and id_path is updated :param df: pd.DataFrame containing the samples :param id_path: path to the identification csv file :param purpose: Purpose to assign :param remainder: Purpose to assign to remainder or none to leave unassigned :param column: Column to assign split purposes to :return: """ log.info("Looking for previous train / development split") known_ids = None if io.isfile(id_path): df, known_ids = load_sample_identification(df, id_path, column=column, io=io) log.info("Using train / development split from run cached in artifacts") else: log.info("No initial sample identification file found") df = self.assign(df, purpose=purpose, remainder=remainder, column=column) save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io) return df
Inherited members