Module brevettiai.data.data_generator
Expand source code
import logging
import inspect
from collections import OrderedDict
import copy
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from pandas.core.dtypes.common import is_signed_integer_dtype, is_unsigned_integer_dtype
from pydantic import Field
from pydantic.typing import Literal
from tqdm import tqdm
from typing import Tuple, Dict, Any, Optional, ClassVar
from brevettiai.io import load_file_safe
from brevettiai.platform.models import IoBaseModel
from brevettiai.interfaces import vue_schema_utils as vue
log = logging.getLogger(__name__)
try:
    import tensorflow as tf
    from brevettiai.data.tf_utils import NumpyStringIterator
except ImportError as ex:
    log.warn("Tensorflow not installed; Some data loading functionality may not work")
    from unittest.mock import MagicMock
    tf = MagicMock()
weighing_presets = OrderedDict([
    ("uniform", lambda x: 1),
    ("count", lambda x: x),
    ("square root", np.sqrt),
    ("log", np.log),
    ("logx+1", lambda x: np.log(x) + 1)])
def parse_weighing(weighing):
    if isinstance(weighing, str):
        weighing = weighing_presets[weighing]
    def _get_weights_safe(count, group):
        try:
            return weighing(count, group)
        except TypeError:
            return weighing(count)
    return _get_weights_safe
sampling_groupby_presets = OrderedDict([
    ("", None),
    ("None", None),
    ("Class", ["category"]),
    ("Dataset / Class", ["dataset_id", "category"]),
    ("Dataset / Folder", ["dataset_id", "folder"]),
])
def weighted_dataset_selector(weight):
    def selector_gen():
        cweight = np.cumsum(weight)
        step = weight.min() / 2
        state = 0
        while True:
            state += step
            yield np.sum(cweight < state % 1.0)
    return selector_gen
def item_mapping(df):
    mapping = {}
    for name in df.columns:
        col = df[name]
        mapping_name = f"_{name}_mapping"
        if col.dtype.name == "category":
            df.loc[:, mapping_name] = col.cat.codes
            lookup_tbl = tf.ragged.constant(col.cat.categories.values, name=f"{name}lookup")
            mapping[mapping_name] = name, lambda x, tbl=lookup_tbl: tbl[tf.cast(x, tf.int32)]
        if col.apply(pd.api.types.is_list_like).any():
            if col.apply(pd.api.types.is_hashable).all():
                grp = df.groupby(name)
                df.loc[:, mapping_name] = grp.ngroup()
                try:
                    lookup_tbl = tf.ragged.constant([k for k, v in grp], name=f"{name}lookup")
                except ValueError:
                    lookup_tbl = tf.ragged.constant([tuple(k) for k, v in grp], name=f"{name}lookup")
            else:
                df.loc[:, mapping_name] = np.arange(col.size)
                try:
                    lookup_tbl = tf.constant(col.values.tolist())
                except ValueError:
                    lookup_tbl = tf.ragged.constant(col.values.tolist())
            mapping[mapping_name] = name, lambda x, tbl=lookup_tbl: tbl[tf.cast(x, tf.int32)]
        else:
            mapping[name] = name, lambda x: x
    return df[mapping.keys()], mapping
def _downcast(s):
    if is_signed_integer_dtype(s.dtype):
        return pd.to_numeric(s, downcast="integer")
    elif is_unsigned_integer_dtype(s.dtype):
        return pd.to_numeric(s, downcast="unsigned")
    return s
def get_dataset(df, shuffle, repeat, seed=None):
    """
    Build simple tensorflow dataset from pandas dataframe
    :param df:
    :param shuffle:
    :param repeat:
    :param seed: seed or np.random.RandomState
    :return:
    """
    rand = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed)
    if shuffle:
        df = df.iloc[rand.permutation(np.arange(len(df)))]
    ds = tf.data.Dataset.from_tensor_slices({c: df[c].values for c in df.columns})
    if repeat:
        ds = ds.repeat(-1 if repeat is True else repeat)
    if shuffle:
        ds = ds.shuffle(min(len(df), 1024),
                        seed=np.frombuffer(rand.bytes(8), dtype=np.int64)[0],
                        reshuffle_each_iteration=False)
    return ds
def build_dataset_from_samples(samples, groupby="category", weighing="uniform", shuffle=True, repeat=True, seed=None):
    """
    Build tensorflow dataset from pandas dataframe with oversampling of groups
    :param samples:
    :param groupby:
    :param weighing:
    :param shuffle:
    :param repeat:
    :param seed: seed or np.random.RandomState
    :return:
    """
    rand = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed)
    ds_metadata = {}
    if not isinstance(samples, pd.DataFrame):
        samples = pd.DataFrame(samples)
    else:
        samples = samples.copy()
    samples = samples.apply(_downcast, axis=0)
    if isinstance(groupby, str):
        groupby = sampling_groupby_presets.get(groupby, [groupby])
    sample_grouper, weight = None, None
    if groupby is not None:
        sampling_group_col_name = "_sampling_group"
        sample_grouper = samples.groupby(groupby)
        samples[sampling_group_col_name] = sample_grouper.ngroup()
        weighing_fn = parse_weighing(weighing)
        weight = np.array([weighing_fn(len(i), x) for x, i in sample_grouper.groups.items()])
        weight = weight / weight.sum()
    # Perform mapping of ragged tuple elements and categoricals before input to tensorflow dataset
    samples, colmapping = item_mapping(samples)
    # Perform oversampling of datasets
    if groupby is not None:
        datasets = [get_dataset(v[colmapping.keys()], shuffle=shuffle, repeat=repeat, seed=rand)
                    for key, v in sample_grouper]
        if shuffle:
            ds = tf.data.experimental.sample_from_datasets(datasets, weights=weight,
                                                           seed=np.frombuffer(rand.bytes(8), dtype=np.int64)[0])
        else:
            selector = tf.data.Dataset.from_generator(weighted_dataset_selector(weight), tf.int64)
            ds = tf.data.experimental.choose_from_datasets(datasets, selector)
        ds_metadata["sample_weight"] = dict(zip(sample_grouper.groups.keys(), weight))
    else:
        ds = get_dataset(samples[colmapping.keys()], shuffle=shuffle, repeat=repeat, seed=rand)
    # Reverse map indexes
    ds = ds.map(lambda x: {name: func(x[k]) for k, (name, func) in colmapping.items()})
    ds._ds_metadata = ds_metadata
    return ds
def map_output_structure(x, structure):
    keys = tf.nest.flatten(structure)
    return tf.nest.pack_sequence_as(structure, [x[k] for k in keys])
class DataGenerator:
    def __init__(self, samples, batch_size: int = 32, shuffle: bool = False, repeat: bool = False,
                 sampling_groupby: str = None,
                 sampling_group_weighing: str = "uniform", seed: int = None,
                 output_structure: tuple = None, max_epoch_samples: int = np.inf):
        """
        Dataset helper based on Tensorflow datasets, capable of seeding, weighted sampling, and tracking datasets for
        logs.
        :param samples: Pandas dataframe with inputs
        :param batch_size: Number of samples per batch
        :param shuffle: Shuffle items in dataset
        :param repeat: Repeat samples from dataset
        :param sampling_groupby: Stratified sample columns to group by when weighing each sample group for sampling
        :param sampling_group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"]
        :param seed: Seeding of dataset
        :param output_structure: default output structure (tuples with keys) of dataset or None for full dictionary
        :param max_epoch_samples: Max number of samples per epoch
        """
        self.random = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed)
        self.output_structure = output_structure
        self._tfds_actions = []
        if not isinstance(samples, pd.DataFrame):
            samples = pd.DataFrame(list(samples) if isinstance(samples, np.ndarray) else samples)
        self.total_sample_count = len(samples)
        self.epoch_samples = min(max_epoch_samples, self.total_sample_count)
        self.batch_size = batch_size
        self._dataset = self.dataset = build_dataset_from_samples(
            samples=samples, groupby=sampling_groupby, weighing=sampling_group_weighing,
            shuffle=shuffle, repeat=repeat, seed=self.random)
    def get_samples(self, batch=True, structure=None) -> tf.data.Dataset:
        """
        Get tensorflow samples as tensorflow dataset without applying maps for e.g. loading data
        :param batch: output batched
        :param structure: Structure of output, (None, "__default__" or structure of keys)
        :return:
        """
        ds = self.dataset.batch(self.batch_size) if batch else self.dataset
        structure = self.output_structure if structure is "__default__" else structure
        if structure is not None:
            ds = ds.map(lambda x: map_output_structure(x, structure))
        return ds
    def get_samples_numpy(self, *args, **kwargs):
        """
        Get numpy iterator of samples in dataset, similar interface as .get_samples()
        :return:
        """
        return NumpyStringIterator(self.get_samples(*args, **kwargs))
    def get_dataset(self, batch=True, structure="__default__") -> tf.data.Dataset:
        """
        Get tensorflow dataset
        :param batch: output batched
        :param structure: Structure of output, (None, "__default__" or structure of keys)
        :return:
        """
        ds = self.dataset.batch(self.batch_size)
        ds = self.build_dataset(ds)
        assert isinstance(ds, tf.data.Dataset), "Return value of build_dataset must be tensorflow dataset"
        ds = self.apply_tfds_actions(ds)
        ds = ds if batch else ds.unbatch()
        structure = self.output_structure if structure is "__default__" else structure
        if structure is not None:
            ds = ds.map(lambda x: map_output_structure(x, structure))
        return ds.prefetch(tf.data.experimental.AUTOTUNE)
    def get_dataset_numpy(self, *args, **kwargs):
        """
        Get numpy iterator of Dataset, similar interface as .get_dataset()
        :return:
        """
        return NumpyStringIterator(self.get_dataset(*args, **kwargs))
    def build_dataset(self, ds: tf.data.Dataset) -> tf.data.Dataset:
        """Extend this function to apply special functions to the dataset"""
        return ds
    def apply_tfds_actions(self, tfds):
        for action, func_kwargs, kwargs in self._tfds_actions:
            apply_unbatched = hasattr(action, "apply_unbatched") and action.apply_unbatched
            if apply_unbatched:
                tfds = tfds.unbatch()
            tfds = tfds.map(lambda x: action(x, **func_kwargs), **kwargs)
            if apply_unbatched:
                tfds = tfds.batch(self.batch_size)
        return tfds
    def get_dataset_actions(self):
        """
        Get variable actions performed on datasets.
        :return: list of actions, each action consisting of (callable,
        args for callable, and args for tensorflow dataset map)
        """
        return self._tfds_actions
    def map(self, map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE, **kwargs):
        """
        :param map_func: an action or a list of actions, for lists None items are skipped
        """
        if isinstance(map_func, list):
            generator = self
            for map_func in map_func:
                if map_func is not None:
                    generator = generator.map(map_func, **kwargs)
            return generator
        else:
            func_kwargs = dict(
                seed=int(self.random.randint(1 << 32, dtype=np.uint64))
            )
            # Check function signature for extra kwargs
            allowed = inspect.signature(map_func).parameters
            if not any(k for k, v in reversed(allowed.items()) if v.kind == v.VAR_KEYWORD):
                func_kwargs = {k: v for k, v in func_kwargs.items() if k in allowed or "kwargs" in allowed}
            output = copy.copy(self)
            output._tfds_actions = [*output._tfds_actions,
                                    (map_func, func_kwargs, dict(num_parallel_calls=num_parallel_calls, **kwargs))]
            return output
    def get_debug_info(self):
        try:
            return self._dataset._ds_metadata
        except Exception:
            return {}
    def __len__(self):
        """The number of batches per epoch"""
        return int(np.ceil(self.epoch_samples / self.batch_size))
    def __iter__(self):
        return iter(self.get_dataset())
class DataGeneratorMap(ABC):
    """
    Interface for a mapping function for the datagenerator
    Use datagenerator.map(object: DataGeneratorMap) to apply.
    Attributes:
        apply_unbatched:    The mapping is performed on batches or not
    """
    apply_unbatched = False
    @abstractmethod
    def __call__(self, x, seed: int, *args, **kwargs) -> dict:
        """
        function to apply map
        :param x: dictionary containing keys with data
        :param seed: randomly generated seed for pseudorandom generation
        :return: dictionary containing keys with data (parameter x)
        """
        return x
class FileLoader(DataGeneratorMap, IoBaseModel):
    """
    Basic File loading module for DataGenerator
    """
    type: Literal["FileLoader"] = "FileLoader"
    path_key: str = Field(default="path", exclude=True)
    output_key: str = Field(default="data", exclude=True)
    metadata_spec: ClassVar[dict] = dict()
    @property
    def apply_unbatched(self):
        """When using in datagenerator, do so on samples, not batches"""
        return True
    def load_file_safe(self, path):
        return load_file_safe(path, io=self._io)
    def load(self, path, metadata: Optional[dict] = None) -> Tuple[Any, Dict[str, Any]]:
        """Loading function, returning data and no metadata about the load"""
        data = tf.py_function(self.load_file_safe, [path], tf.string, name="read_image")
        return data, {}
    def __call__(self, x, *args, **kwargs):
        """Add loaded data to the output key"""
        metadata = {k: x[k] if factory is None else factory(x[k])
                    for k, factory in self.metadata_spec.items() if k in x}
        data, meta = self.load(x[self.path_key], metadata=metadata)
        x[self.output_key] = data
        x.update(meta)
        return x
class StratifiedSampler(vue.VueSettingsModule):
    def __init__(self, batch_size: int = 32, groupby: list = None,
                 group_weighing: str = "uniform", max_epoch_samples: int = 10**9,
                 seed: int = -1):
        """
        https://en.wikipedia.org/wiki/Stratified_sampling
        :param batch_size: Number of samples per batch
        :param groupby: Stratified sample columns to group by when weighing each sample group for sampling
        :param group_weighing: Stratfied sampling weighing function to use for weighing the sample groups
        supply function or select from ["uniform", "count", "square root", "log"]
        :param seed: Seeding of dataset
        """
        self.batch_size = batch_size
        self.groupby = groupby or None
        self.group_weighing = group_weighing
        self.max_epoch_samples = max_epoch_samples
        self.seed = seed
    def get(self, samples, shuffle: bool = False, repeat: bool = False, **kwargs) -> DataGenerator:
        """
        :param samples: Pandas dataframe with inputs
        :param shuffle: Shuffle items in dataset
        :param repeat: Repeat samples from dataset
        :param max_epoch_samples: Max number of samples per epoch
        """
        kwargs["batch_size"] = kwargs.get("batch_size", self.batch_size)
        kwargs["max_epoch_samples"] = kwargs.get("max_epoch_samples", self.max_epoch_samples)
        kwargs["seed"] = kwargs.get("seed", None if self.seed < 0 else self.seed)
        return DataGenerator(samples, shuffle=shuffle, repeat=repeat,
                             sampling_groupby=self.groupby, sampling_group_weighing=self.group_weighing,
                             **kwargs)
    @classmethod
    def to_schema(cls, builder, name, ptype, default, **kwargs):
        if name == "group_weighing":
            builder.add_field(vue.select("Sampling Group Weighing", model=name, default=ptype(default), **kwargs,
                                         values=list(weighing_presets.keys())))
        else:
            return super().to_schema(builder=builder, name=name, ptype=ptype, default=default, **kwargs)
def predict_dataset(model, dataset, map_output=None):
    """
    Predict results of model given dataset
    :param model:
    :param dataset:
    :param map_output:
    :return:
    """
    prediction_func = model.predict_on_batch if isinstance(model, tf.keras.Model) else model
    ds = tf.data.Dataset.zip((dataset.get_samples(batch=True), dataset.get_dataset()))
    for samples, (x, y) in tqdm(ds.take(len(dataset)), total=len(dataset), mininterval=2):
        if isinstance(x, dict):
            yhat = prediction_func(**x)
        else:
            yhat = prediction_func(x)
        if not isinstance(yhat, dict):
            outputs = tuple(x.name.split("/")[0] for x in model.outputs)
            if len(outputs) == 1:
                yhat = {outputs[0]: yhat}
            else:
                yhat = {k: v for k, v in zip(outputs, yhat)}
        if map_output is not None:
            yhat = map_output(yhat)
        yield {**samples, **yhat}
class OneHotEncoder(vue.VueSettingsModule):
    def __init__(self, classes, input_key="category", output_key="onehot"):
        self.classes = classes
        self.input_key = input_key
        self.output_key = output_key
        items = len(classes)
        assert items > 0, "Number of classes should be larger than zero"
        # Build mapping table to indices
        self.class_table = tf.lookup.StaticHashTable(
            initializer=tf.lookup.KeyValueTensorInitializer(
                keys=tf.constant(classes),
                values=tf.range(items),
            ),
            default_value=tf.constant(items),
            name="class_weight"
        )
        # Build encoding table from indices to encoding
        self.encoding = tf.eye(items + 1, items)
    @classmethod
    def to_schema(cls, builder, name, ptype, default, **kwargs):
        if name in {"input_key", "output_key"}:
            return
        else:
            return super().to_schema(builder=builder, name=name, ptype=ptype, default=default, **kwargs)
    def encode(self, item):
        class_idx = self.class_table.lookup(item)
        enc = tf.gather(self.encoding, class_idx)
        return enc
    def __call__(self, x, *args, **kwargs):
        x[self.output_key] = self.encode(x[self.input_key])
        return x
def build_image_data_generator(samples, classes=None, image=None, augmentation=None, *args, **kwargs):
    """
    Utility function for building a default image dataset with images at "path" and class definitions at "category"
    outputting image and onehot encoded class
    :param samples: Pandas dataframe of samples, with at least columns (path, category)
    :param classes: list of classes or none to autodetect from samples
    :param image: kwargs for ImageLoader
    :param augmentation: kwargs for ImageAugmenter
    :param args: args for TfDataset
    :param kwargs: args for TfDataset
    :return: (image, onehot)
    """
    from brevettiai.data.image import ImagePipeline
    if classes is None:
        class_space = set(samples.category.unique())
        classes = set(item for sublist in class_space for item in sublist if item != "__UNLABELED__")
        classes = list(sorted(classes))
    image = image or {}
    image = ImagePipeline(**image) if isinstance(image, dict) else image
    ds = DataGenerator(samples, output_structure=("img", "onehot"), *args, **kwargs) \
        .map(image)
    if augmentation is not None:
        from brevettiai.data.image.image_augmenter import ImageAugmenter
        augmentation = ImageAugmenter(**augmentation) if isinstance(augmentation, dict) else augmentation
        ds = ds.map(augmentation)
    ds = ds.map(OneHotEncoder(classes=classes))
    return ds
Functions
def build_dataset_from_samples(samples, groupby='category', weighing='uniform', shuffle=True, repeat=True, seed=None)- 
Build tensorflow dataset from pandas dataframe with oversampling of groups :param samples: :param groupby: :param weighing: :param shuffle: :param repeat: :param seed: seed or np.random.RandomState :return:
Expand source code
def build_dataset_from_samples(samples, groupby="category", weighing="uniform", shuffle=True, repeat=True, seed=None): """ Build tensorflow dataset from pandas dataframe with oversampling of groups :param samples: :param groupby: :param weighing: :param shuffle: :param repeat: :param seed: seed or np.random.RandomState :return: """ rand = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed) ds_metadata = {} if not isinstance(samples, pd.DataFrame): samples = pd.DataFrame(samples) else: samples = samples.copy() samples = samples.apply(_downcast, axis=0) if isinstance(groupby, str): groupby = sampling_groupby_presets.get(groupby, [groupby]) sample_grouper, weight = None, None if groupby is not None: sampling_group_col_name = "_sampling_group" sample_grouper = samples.groupby(groupby) samples[sampling_group_col_name] = sample_grouper.ngroup() weighing_fn = parse_weighing(weighing) weight = np.array([weighing_fn(len(i), x) for x, i in sample_grouper.groups.items()]) weight = weight / weight.sum() # Perform mapping of ragged tuple elements and categoricals before input to tensorflow dataset samples, colmapping = item_mapping(samples) # Perform oversampling of datasets if groupby is not None: datasets = [get_dataset(v[colmapping.keys()], shuffle=shuffle, repeat=repeat, seed=rand) for key, v in sample_grouper] if shuffle: ds = tf.data.experimental.sample_from_datasets(datasets, weights=weight, seed=np.frombuffer(rand.bytes(8), dtype=np.int64)[0]) else: selector = tf.data.Dataset.from_generator(weighted_dataset_selector(weight), tf.int64) ds = tf.data.experimental.choose_from_datasets(datasets, selector) ds_metadata["sample_weight"] = dict(zip(sample_grouper.groups.keys(), weight)) else: ds = get_dataset(samples[colmapping.keys()], shuffle=shuffle, repeat=repeat, seed=rand) # Reverse map indexes ds = ds.map(lambda x: {name: func(x[k]) for k, (name, func) in colmapping.items()}) ds._ds_metadata = ds_metadata return ds def build_image_data_generator(samples, classes=None, image=None, augmentation=None, *args, **kwargs)- 
Utility function for building a default image dataset with images at "path" and class definitions at "category" outputting image and onehot encoded class :param samples: Pandas dataframe of samples, with at least columns (path, category) :param classes: list of classes or none to autodetect from samples :param image: kwargs for ImageLoader :param augmentation: kwargs for ImageAugmenter :param args: args for TfDataset :param kwargs: args for TfDataset :return: (image, onehot)
Expand source code
def build_image_data_generator(samples, classes=None, image=None, augmentation=None, *args, **kwargs): """ Utility function for building a default image dataset with images at "path" and class definitions at "category" outputting image and onehot encoded class :param samples: Pandas dataframe of samples, with at least columns (path, category) :param classes: list of classes or none to autodetect from samples :param image: kwargs for ImageLoader :param augmentation: kwargs for ImageAugmenter :param args: args for TfDataset :param kwargs: args for TfDataset :return: (image, onehot) """ from brevettiai.data.image import ImagePipeline if classes is None: class_space = set(samples.category.unique()) classes = set(item for sublist in class_space for item in sublist if item != "__UNLABELED__") classes = list(sorted(classes)) image = image or {} image = ImagePipeline(**image) if isinstance(image, dict) else image ds = DataGenerator(samples, output_structure=("img", "onehot"), *args, **kwargs) \ .map(image) if augmentation is not None: from brevettiai.data.image.image_augmenter import ImageAugmenter augmentation = ImageAugmenter(**augmentation) if isinstance(augmentation, dict) else augmentation ds = ds.map(augmentation) ds = ds.map(OneHotEncoder(classes=classes)) return ds def get_dataset(df, shuffle, repeat, seed=None)- 
Build simple tensorflow dataset from pandas dataframe :param df: :param shuffle: :param repeat: :param seed: seed or np.random.RandomState :return:
Expand source code
def get_dataset(df, shuffle, repeat, seed=None): """ Build simple tensorflow dataset from pandas dataframe :param df: :param shuffle: :param repeat: :param seed: seed or np.random.RandomState :return: """ rand = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed) if shuffle: df = df.iloc[rand.permutation(np.arange(len(df)))] ds = tf.data.Dataset.from_tensor_slices({c: df[c].values for c in df.columns}) if repeat: ds = ds.repeat(-1 if repeat is True else repeat) if shuffle: ds = ds.shuffle(min(len(df), 1024), seed=np.frombuffer(rand.bytes(8), dtype=np.int64)[0], reshuffle_each_iteration=False) return ds def item_mapping(df)- 
Expand source code
def item_mapping(df): mapping = {} for name in df.columns: col = df[name] mapping_name = f"_{name}_mapping" if col.dtype.name == "category": df.loc[:, mapping_name] = col.cat.codes lookup_tbl = tf.ragged.constant(col.cat.categories.values, name=f"{name}lookup") mapping[mapping_name] = name, lambda x, tbl=lookup_tbl: tbl[tf.cast(x, tf.int32)] if col.apply(pd.api.types.is_list_like).any(): if col.apply(pd.api.types.is_hashable).all(): grp = df.groupby(name) df.loc[:, mapping_name] = grp.ngroup() try: lookup_tbl = tf.ragged.constant([k for k, v in grp], name=f"{name}lookup") except ValueError: lookup_tbl = tf.ragged.constant([tuple(k) for k, v in grp], name=f"{name}lookup") else: df.loc[:, mapping_name] = np.arange(col.size) try: lookup_tbl = tf.constant(col.values.tolist()) except ValueError: lookup_tbl = tf.ragged.constant(col.values.tolist()) mapping[mapping_name] = name, lambda x, tbl=lookup_tbl: tbl[tf.cast(x, tf.int32)] else: mapping[name] = name, lambda x: x return df[mapping.keys()], mapping def map_output_structure(x, structure)- 
Expand source code
def map_output_structure(x, structure): keys = tf.nest.flatten(structure) return tf.nest.pack_sequence_as(structure, [x[k] for k in keys]) def parse_weighing(weighing)- 
Expand source code
def parse_weighing(weighing): if isinstance(weighing, str): weighing = weighing_presets[weighing] def _get_weights_safe(count, group): try: return weighing(count, group) except TypeError: return weighing(count) return _get_weights_safe def predict_dataset(model, dataset, map_output=None)- 
Predict results of model given dataset :param model: :param dataset: :param map_output: :return:
Expand source code
def predict_dataset(model, dataset, map_output=None): """ Predict results of model given dataset :param model: :param dataset: :param map_output: :return: """ prediction_func = model.predict_on_batch if isinstance(model, tf.keras.Model) else model ds = tf.data.Dataset.zip((dataset.get_samples(batch=True), dataset.get_dataset())) for samples, (x, y) in tqdm(ds.take(len(dataset)), total=len(dataset), mininterval=2): if isinstance(x, dict): yhat = prediction_func(**x) else: yhat = prediction_func(x) if not isinstance(yhat, dict): outputs = tuple(x.name.split("/")[0] for x in model.outputs) if len(outputs) == 1: yhat = {outputs[0]: yhat} else: yhat = {k: v for k, v in zip(outputs, yhat)} if map_output is not None: yhat = map_output(yhat) yield {**samples, **yhat} def weighted_dataset_selector(weight)- 
Expand source code
def weighted_dataset_selector(weight): def selector_gen(): cweight = np.cumsum(weight) step = weight.min() / 2 state = 0 while True: state += step yield np.sum(cweight < state % 1.0) return selector_gen 
Classes
class DataGenerator (samples, batch_size: int = 32, shuffle: bool = False, repeat: bool = False, sampling_groupby: str = None, sampling_group_weighing: str = 'uniform', seed: int = None, output_structure: tuple = None, max_epoch_samples: int = inf)- 
Dataset helper based on Tensorflow datasets, capable of seeding, weighted sampling, and tracking datasets for logs. :param samples: Pandas dataframe with inputs :param batch_size: Number of samples per batch :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param sampling_groupby: Stratified sample columns to group by when weighing each sample group for sampling :param sampling_group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"] :param seed: Seeding of dataset :param output_structure: default output structure (tuples with keys) of dataset or None for full dictionary :param max_epoch_samples: Max number of samples per epoch
Expand source code
class DataGenerator: def __init__(self, samples, batch_size: int = 32, shuffle: bool = False, repeat: bool = False, sampling_groupby: str = None, sampling_group_weighing: str = "uniform", seed: int = None, output_structure: tuple = None, max_epoch_samples: int = np.inf): """ Dataset helper based on Tensorflow datasets, capable of seeding, weighted sampling, and tracking datasets for logs. :param samples: Pandas dataframe with inputs :param batch_size: Number of samples per batch :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param sampling_groupby: Stratified sample columns to group by when weighing each sample group for sampling :param sampling_group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"] :param seed: Seeding of dataset :param output_structure: default output structure (tuples with keys) of dataset or None for full dictionary :param max_epoch_samples: Max number of samples per epoch """ self.random = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed) self.output_structure = output_structure self._tfds_actions = [] if not isinstance(samples, pd.DataFrame): samples = pd.DataFrame(list(samples) if isinstance(samples, np.ndarray) else samples) self.total_sample_count = len(samples) self.epoch_samples = min(max_epoch_samples, self.total_sample_count) self.batch_size = batch_size self._dataset = self.dataset = build_dataset_from_samples( samples=samples, groupby=sampling_groupby, weighing=sampling_group_weighing, shuffle=shuffle, repeat=repeat, seed=self.random) def get_samples(self, batch=True, structure=None) -> tf.data.Dataset: """ Get tensorflow samples as tensorflow dataset without applying maps for e.g. loading data :param batch: output batched :param structure: Structure of output, (None, "__default__" or structure of keys) :return: """ ds = self.dataset.batch(self.batch_size) if batch else self.dataset structure = self.output_structure if structure is "__default__" else structure if structure is not None: ds = ds.map(lambda x: map_output_structure(x, structure)) return ds def get_samples_numpy(self, *args, **kwargs): """ Get numpy iterator of samples in dataset, similar interface as .get_samples() :return: """ return NumpyStringIterator(self.get_samples(*args, **kwargs)) def get_dataset(self, batch=True, structure="__default__") -> tf.data.Dataset: """ Get tensorflow dataset :param batch: output batched :param structure: Structure of output, (None, "__default__" or structure of keys) :return: """ ds = self.dataset.batch(self.batch_size) ds = self.build_dataset(ds) assert isinstance(ds, tf.data.Dataset), "Return value of build_dataset must be tensorflow dataset" ds = self.apply_tfds_actions(ds) ds = ds if batch else ds.unbatch() structure = self.output_structure if structure is "__default__" else structure if structure is not None: ds = ds.map(lambda x: map_output_structure(x, structure)) return ds.prefetch(tf.data.experimental.AUTOTUNE) def get_dataset_numpy(self, *args, **kwargs): """ Get numpy iterator of Dataset, similar interface as .get_dataset() :return: """ return NumpyStringIterator(self.get_dataset(*args, **kwargs)) def build_dataset(self, ds: tf.data.Dataset) -> tf.data.Dataset: """Extend this function to apply special functions to the dataset""" return ds def apply_tfds_actions(self, tfds): for action, func_kwargs, kwargs in self._tfds_actions: apply_unbatched = hasattr(action, "apply_unbatched") and action.apply_unbatched if apply_unbatched: tfds = tfds.unbatch() tfds = tfds.map(lambda x: action(x, **func_kwargs), **kwargs) if apply_unbatched: tfds = tfds.batch(self.batch_size) return tfds def get_dataset_actions(self): """ Get variable actions performed on datasets. :return: list of actions, each action consisting of (callable, args for callable, and args for tensorflow dataset map) """ return self._tfds_actions def map(self, map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE, **kwargs): """ :param map_func: an action or a list of actions, for lists None items are skipped """ if isinstance(map_func, list): generator = self for map_func in map_func: if map_func is not None: generator = generator.map(map_func, **kwargs) return generator else: func_kwargs = dict( seed=int(self.random.randint(1 << 32, dtype=np.uint64)) ) # Check function signature for extra kwargs allowed = inspect.signature(map_func).parameters if not any(k for k, v in reversed(allowed.items()) if v.kind == v.VAR_KEYWORD): func_kwargs = {k: v for k, v in func_kwargs.items() if k in allowed or "kwargs" in allowed} output = copy.copy(self) output._tfds_actions = [*output._tfds_actions, (map_func, func_kwargs, dict(num_parallel_calls=num_parallel_calls, **kwargs))] return output def get_debug_info(self): try: return self._dataset._ds_metadata except Exception: return {} def __len__(self): """The number of batches per epoch""" return int(np.ceil(self.epoch_samples / self.batch_size)) def __iter__(self): return iter(self.get_dataset())Methods
def apply_tfds_actions(self, tfds)- 
Expand source code
def apply_tfds_actions(self, tfds): for action, func_kwargs, kwargs in self._tfds_actions: apply_unbatched = hasattr(action, "apply_unbatched") and action.apply_unbatched if apply_unbatched: tfds = tfds.unbatch() tfds = tfds.map(lambda x: action(x, **func_kwargs), **kwargs) if apply_unbatched: tfds = tfds.batch(self.batch_size) return tfds def build_dataset(self, ds: tensorflow.python.data.ops.dataset_ops.DatasetV2) ‑> tensorflow.python.data.ops.dataset_ops.DatasetV2- 
Extend this function to apply special functions to the dataset
Expand source code
def build_dataset(self, ds: tf.data.Dataset) -> tf.data.Dataset: """Extend this function to apply special functions to the dataset""" return ds def get_dataset(self, batch=True, structure='__default__') ‑> tensorflow.python.data.ops.dataset_ops.DatasetV2- 
Get tensorflow dataset :param batch: output batched :param structure: Structure of output, (None, "default" or structure of keys) :return:
Expand source code
def get_dataset(self, batch=True, structure="__default__") -> tf.data.Dataset: """ Get tensorflow dataset :param batch: output batched :param structure: Structure of output, (None, "__default__" or structure of keys) :return: """ ds = self.dataset.batch(self.batch_size) ds = self.build_dataset(ds) assert isinstance(ds, tf.data.Dataset), "Return value of build_dataset must be tensorflow dataset" ds = self.apply_tfds_actions(ds) ds = ds if batch else ds.unbatch() structure = self.output_structure if structure is "__default__" else structure if structure is not None: ds = ds.map(lambda x: map_output_structure(x, structure)) return ds.prefetch(tf.data.experimental.AUTOTUNE) def get_dataset_actions(self)- 
Get variable actions performed on datasets.
:return: list of actions, each action consisting of (callable, args for callable, and args for tensorflow dataset map)
Expand source code
def get_dataset_actions(self): """ Get variable actions performed on datasets. :return: list of actions, each action consisting of (callable, args for callable, and args for tensorflow dataset map) """ return self._tfds_actions def get_dataset_numpy(self, *args, **kwargs)- 
Get numpy iterator of Dataset, similar interface as .get_dataset() :return:
Expand source code
def get_dataset_numpy(self, *args, **kwargs): """ Get numpy iterator of Dataset, similar interface as .get_dataset() :return: """ return NumpyStringIterator(self.get_dataset(*args, **kwargs)) def get_debug_info(self)- 
Expand source code
def get_debug_info(self): try: return self._dataset._ds_metadata except Exception: return {} def get_samples(self, batch=True, structure=None) ‑> tensorflow.python.data.ops.dataset_ops.DatasetV2- 
Get tensorflow samples as tensorflow dataset without applying maps for e.g. loading data :param batch: output batched :param structure: Structure of output, (None, "default" or structure of keys) :return:
Expand source code
def get_samples(self, batch=True, structure=None) -> tf.data.Dataset: """ Get tensorflow samples as tensorflow dataset without applying maps for e.g. loading data :param batch: output batched :param structure: Structure of output, (None, "__default__" or structure of keys) :return: """ ds = self.dataset.batch(self.batch_size) if batch else self.dataset structure = self.output_structure if structure is "__default__" else structure if structure is not None: ds = ds.map(lambda x: map_output_structure(x, structure)) return ds def get_samples_numpy(self, *args, **kwargs)- 
Get numpy iterator of samples in dataset, similar interface as .get_samples() :return:
Expand source code
def get_samples_numpy(self, *args, **kwargs): """ Get numpy iterator of samples in dataset, similar interface as .get_samples() :return: """ return NumpyStringIterator(self.get_samples(*args, **kwargs)) def map(self, map_func, num_parallel_calls=-1, **kwargs)- 
:param map_func: an action or a list of actions, for lists None items are skipped
Expand source code
def map(self, map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE, **kwargs): """ :param map_func: an action or a list of actions, for lists None items are skipped """ if isinstance(map_func, list): generator = self for map_func in map_func: if map_func is not None: generator = generator.map(map_func, **kwargs) return generator else: func_kwargs = dict( seed=int(self.random.randint(1 << 32, dtype=np.uint64)) ) # Check function signature for extra kwargs allowed = inspect.signature(map_func).parameters if not any(k for k, v in reversed(allowed.items()) if v.kind == v.VAR_KEYWORD): func_kwargs = {k: v for k, v in func_kwargs.items() if k in allowed or "kwargs" in allowed} output = copy.copy(self) output._tfds_actions = [*output._tfds_actions, (map_func, func_kwargs, dict(num_parallel_calls=num_parallel_calls, **kwargs))] return output 
 class DataGeneratorMap- 
Interface for a mapping function for the datagenerator Use datagenerator.map(object: DataGeneratorMap) to apply.
Attributes
apply_unbatched- The mapping is performed on batches or not
 
Expand source code
class DataGeneratorMap(ABC): """ Interface for a mapping function for the datagenerator Use datagenerator.map(object: DataGeneratorMap) to apply. Attributes: apply_unbatched: The mapping is performed on batches or not """ apply_unbatched = False @abstractmethod def __call__(self, x, seed: int, *args, **kwargs) -> dict: """ function to apply map :param x: dictionary containing keys with data :param seed: randomly generated seed for pseudorandom generation :return: dictionary containing keys with data (parameter x) """ return xAncestors
- abc.ABC
 
Subclasses
Class variables
var apply_unbatched
 class FileLoader (io=<brevettiai.io.utils.IoTools object>, **data)- 
Basic File loading module for DataGenerator
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class FileLoader(DataGeneratorMap, IoBaseModel): """ Basic File loading module for DataGenerator """ type: Literal["FileLoader"] = "FileLoader" path_key: str = Field(default="path", exclude=True) output_key: str = Field(default="data", exclude=True) metadata_spec: ClassVar[dict] = dict() @property def apply_unbatched(self): """When using in datagenerator, do so on samples, not batches""" return True def load_file_safe(self, path): return load_file_safe(path, io=self._io) def load(self, path, metadata: Optional[dict] = None) -> Tuple[Any, Dict[str, Any]]: """Loading function, returning data and no metadata about the load""" data = tf.py_function(self.load_file_safe, [path], tf.string, name="read_image") return data, {} def __call__(self, x, *args, **kwargs): """Add loaded data to the output key""" metadata = {k: x[k] if factory is None else factory(x[k]) for k, factory in self.metadata_spec.items() if k in x} data, meta = self.load(x[self.path_key], metadata=metadata) x[self.output_key] = data x.update(meta) return xAncestors
- DataGeneratorMap
 - abc.ABC
 - IoBaseModel
 - pydantic.main.BaseModel
 - pydantic.utils.Representation
 
Subclasses
Class variables
var metadata_spec : ClassVar[dict]var output_key : strvar path_key : strvar type : typing_extensions.Literal['FileLoader']
Instance variables
var apply_unbatched- 
When using in datagenerator, do so on samples, not batches
Expand source code
@property def apply_unbatched(self): """When using in datagenerator, do so on samples, not batches""" return True 
Methods
def load(self, path, metadata: Optional[dict] = None) ‑> Tuple[Any, Dict[str, Any]]- 
Loading function, returning data and no metadata about the load
Expand source code
def load(self, path, metadata: Optional[dict] = None) -> Tuple[Any, Dict[str, Any]]: """Loading function, returning data and no metadata about the load""" data = tf.py_function(self.load_file_safe, [path], tf.string, name="read_image") return data, {} def load_file_safe(self, path)- 
Expand source code
def load_file_safe(self, path): return load_file_safe(path, io=self._io) 
 class OneHotEncoder (classes, input_key='category', output_key='onehot')- 
Base class for serializable modules
Expand source code
class OneHotEncoder(vue.VueSettingsModule): def __init__(self, classes, input_key="category", output_key="onehot"): self.classes = classes self.input_key = input_key self.output_key = output_key items = len(classes) assert items > 0, "Number of classes should be larger than zero" # Build mapping table to indices self.class_table = tf.lookup.StaticHashTable( initializer=tf.lookup.KeyValueTensorInitializer( keys=tf.constant(classes), values=tf.range(items), ), default_value=tf.constant(items), name="class_weight" ) # Build encoding table from indices to encoding self.encoding = tf.eye(items + 1, items) @classmethod def to_schema(cls, builder, name, ptype, default, **kwargs): if name in {"input_key", "output_key"}: return else: return super().to_schema(builder=builder, name=name, ptype=ptype, default=default, **kwargs) def encode(self, item): class_idx = self.class_table.lookup(item) enc = tf.gather(self.encoding, class_idx) return enc def __call__(self, x, *args, **kwargs): x[self.output_key] = self.encode(x[self.input_key]) return xAncestors
Methods
def encode(self, item)- 
Expand source code
def encode(self, item): class_idx = self.class_table.lookup(item) enc = tf.gather(self.encoding, class_idx) return enc 
Inherited members
 class StratifiedSampler (batch_size: int = 32, groupby: list = None, group_weighing: str = 'uniform', max_epoch_samples: int = 1000000000, seed: int = -1)- 
Base class for serializable modules
https://en.wikipedia.org/wiki/Stratified_sampling :param batch_size: Number of samples per batch :param groupby: Stratified sample columns to group by when weighing each sample group for sampling :param group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"] :param seed: Seeding of dataset
Expand source code
class StratifiedSampler(vue.VueSettingsModule): def __init__(self, batch_size: int = 32, groupby: list = None, group_weighing: str = "uniform", max_epoch_samples: int = 10**9, seed: int = -1): """ https://en.wikipedia.org/wiki/Stratified_sampling :param batch_size: Number of samples per batch :param groupby: Stratified sample columns to group by when weighing each sample group for sampling :param group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"] :param seed: Seeding of dataset """ self.batch_size = batch_size self.groupby = groupby or None self.group_weighing = group_weighing self.max_epoch_samples = max_epoch_samples self.seed = seed def get(self, samples, shuffle: bool = False, repeat: bool = False, **kwargs) -> DataGenerator: """ :param samples: Pandas dataframe with inputs :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param max_epoch_samples: Max number of samples per epoch """ kwargs["batch_size"] = kwargs.get("batch_size", self.batch_size) kwargs["max_epoch_samples"] = kwargs.get("max_epoch_samples", self.max_epoch_samples) kwargs["seed"] = kwargs.get("seed", None if self.seed < 0 else self.seed) return DataGenerator(samples, shuffle=shuffle, repeat=repeat, sampling_groupby=self.groupby, sampling_group_weighing=self.group_weighing, **kwargs) @classmethod def to_schema(cls, builder, name, ptype, default, **kwargs): if name == "group_weighing": builder.add_field(vue.select("Sampling Group Weighing", model=name, default=ptype(default), **kwargs, values=list(weighing_presets.keys()))) else: return super().to_schema(builder=builder, name=name, ptype=ptype, default=default, **kwargs)Ancestors
Methods
def get(self, samples, shuffle: bool = False, repeat: bool = False, **kwargs) ‑> DataGenerator- 
:param samples: Pandas dataframe with inputs :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param max_epoch_samples: Max number of samples per epoch
Expand source code
def get(self, samples, shuffle: bool = False, repeat: bool = False, **kwargs) -> DataGenerator: """ :param samples: Pandas dataframe with inputs :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param max_epoch_samples: Max number of samples per epoch """ kwargs["batch_size"] = kwargs.get("batch_size", self.batch_size) kwargs["max_epoch_samples"] = kwargs.get("max_epoch_samples", self.max_epoch_samples) kwargs["seed"] = kwargs.get("seed", None if self.seed < 0 else self.seed) return DataGenerator(samples, shuffle=shuffle, repeat=repeat, sampling_groupby=self.groupby, sampling_group_weighing=self.group_weighing, **kwargs) 
Inherited members