Module brevettiai.data.data_generator
Expand source code
import logging
import inspect
from collections import OrderedDict
import copy
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from pandas.core.dtypes.common import is_signed_integer_dtype, is_unsigned_integer_dtype
from pydantic import Field
from pydantic.typing import Literal
from tqdm import tqdm
from typing import Tuple, Dict, Any, Optional, ClassVar
from brevettiai.io import load_file_safe
from brevettiai.platform.models import IoBaseModel
from brevettiai.interfaces import vue_schema_utils as vue
log = logging.getLogger(__name__)
try:
import tensorflow as tf
from brevettiai.data.tf_utils import NumpyStringIterator
except ImportError as ex:
log.warn("Tensorflow not installed; Some data loading functionality may not work")
from unittest.mock import MagicMock
tf = MagicMock()
weighing_presets = OrderedDict([
("uniform", lambda x: 1),
("count", lambda x: x),
("square root", np.sqrt),
("log", np.log),
("logx+1", lambda x: np.log(x) + 1)])
def parse_weighing(weighing):
if isinstance(weighing, str):
weighing = weighing_presets[weighing]
def _get_weights_safe(count, group):
try:
return weighing(count, group)
except TypeError:
return weighing(count)
return _get_weights_safe
sampling_groupby_presets = OrderedDict([
("", None),
("None", None),
("Class", ["category"]),
("Dataset / Class", ["dataset_id", "category"]),
("Dataset / Folder", ["dataset_id", "folder"]),
])
def weighted_dataset_selector(weight):
def selector_gen():
cweight = np.cumsum(weight)
step = weight.min() / 2
state = 0
while True:
state += step
yield np.sum(cweight < state % 1.0)
return selector_gen
def item_mapping(df):
mapping = {}
for name in df.columns:
col = df[name]
mapping_name = f"_{name}_mapping"
if col.dtype.name == "category":
df.loc[:, mapping_name] = col.cat.codes
lookup_tbl = tf.ragged.constant(col.cat.categories.values, name=f"{name}lookup")
mapping[mapping_name] = name, lambda x, tbl=lookup_tbl: tbl[tf.cast(x, tf.int32)]
if col.apply(pd.api.types.is_list_like).any():
if col.apply(pd.api.types.is_hashable).all():
grp = df.groupby(name)
df.loc[:, mapping_name] = grp.ngroup()
try:
lookup_tbl = tf.ragged.constant([k for k, v in grp], name=f"{name}lookup")
except ValueError:
lookup_tbl = tf.ragged.constant([tuple(k) for k, v in grp], name=f"{name}lookup")
else:
df.loc[:, mapping_name] = np.arange(col.size)
try:
lookup_tbl = tf.constant(col.values.tolist())
except ValueError:
lookup_tbl = tf.ragged.constant(col.values.tolist())
mapping[mapping_name] = name, lambda x, tbl=lookup_tbl: tbl[tf.cast(x, tf.int32)]
else:
mapping[name] = name, lambda x: x
return df[mapping.keys()], mapping
def _downcast(s):
if is_signed_integer_dtype(s.dtype):
return pd.to_numeric(s, downcast="integer")
elif is_unsigned_integer_dtype(s.dtype):
return pd.to_numeric(s, downcast="unsigned")
return s
def get_dataset(df, shuffle, repeat, seed=None):
"""
Build simple tensorflow dataset from pandas dataframe
:param df:
:param shuffle:
:param repeat:
:param seed: seed or np.random.RandomState
:return:
"""
rand = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed)
if shuffle:
df = df.iloc[rand.permutation(np.arange(len(df)))]
ds = tf.data.Dataset.from_tensor_slices({c: df[c].values for c in df.columns})
if repeat:
ds = ds.repeat(-1 if repeat is True else repeat)
if shuffle:
ds = ds.shuffle(min(len(df), 1024),
seed=np.frombuffer(rand.bytes(8), dtype=np.int64)[0],
reshuffle_each_iteration=False)
return ds
def build_dataset_from_samples(samples, groupby="category", weighing="uniform", shuffle=True, repeat=True, seed=None):
"""
Build tensorflow dataset from pandas dataframe with oversampling of groups
:param samples:
:param groupby:
:param weighing:
:param shuffle:
:param repeat:
:param seed: seed or np.random.RandomState
:return:
"""
rand = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed)
ds_metadata = {}
if not isinstance(samples, pd.DataFrame):
samples = pd.DataFrame(samples)
else:
samples = samples.copy()
samples = samples.apply(_downcast, axis=0)
if isinstance(groupby, str):
groupby = sampling_groupby_presets.get(groupby, [groupby])
sample_grouper, weight = None, None
if groupby is not None:
sampling_group_col_name = "_sampling_group"
sample_grouper = samples.groupby(groupby)
samples[sampling_group_col_name] = sample_grouper.ngroup()
weighing_fn = parse_weighing(weighing)
weight = np.array([weighing_fn(len(i), x) for x, i in sample_grouper.groups.items()])
weight = weight / weight.sum()
# Perform mapping of ragged tuple elements and categoricals before input to tensorflow dataset
samples, colmapping = item_mapping(samples)
# Perform oversampling of datasets
if groupby is not None:
datasets = [get_dataset(v[colmapping.keys()], shuffle=shuffle, repeat=repeat, seed=rand)
for key, v in sample_grouper]
if shuffle:
ds = tf.data.experimental.sample_from_datasets(datasets, weights=weight,
seed=np.frombuffer(rand.bytes(8), dtype=np.int64)[0])
else:
selector = tf.data.Dataset.from_generator(weighted_dataset_selector(weight), tf.int64)
ds = tf.data.experimental.choose_from_datasets(datasets, selector)
ds_metadata["sample_weight"] = dict(zip(sample_grouper.groups.keys(), weight))
else:
ds = get_dataset(samples[colmapping.keys()], shuffle=shuffle, repeat=repeat, seed=rand)
# Reverse map indexes
ds = ds.map(lambda x: {name: func(x[k]) for k, (name, func) in colmapping.items()})
ds._ds_metadata = ds_metadata
return ds
def map_output_structure(x, structure):
keys = tf.nest.flatten(structure)
return tf.nest.pack_sequence_as(structure, [x[k] for k in keys])
class DataGenerator:
def __init__(self, samples, batch_size: int = 32, shuffle: bool = False, repeat: bool = False,
sampling_groupby: str = None,
sampling_group_weighing: str = "uniform", seed: int = None,
output_structure: tuple = None, max_epoch_samples: int = np.inf):
"""
Dataset helper based on Tensorflow datasets, capable of seeding, weighted sampling, and tracking datasets for
logs.
:param samples: Pandas dataframe with inputs
:param batch_size: Number of samples per batch
:param shuffle: Shuffle items in dataset
:param repeat: Repeat samples from dataset
:param sampling_groupby: Stratified sample columns to group by when weighing each sample group for sampling
:param sampling_group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"]
:param seed: Seeding of dataset
:param output_structure: default output structure (tuples with keys) of dataset or None for full dictionary
:param max_epoch_samples: Max number of samples per epoch
"""
self.random = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed)
self.output_structure = output_structure
self._tfds_actions = []
if not isinstance(samples, pd.DataFrame):
samples = pd.DataFrame(list(samples) if isinstance(samples, np.ndarray) else samples)
self.total_sample_count = len(samples)
self.epoch_samples = min(max_epoch_samples, self.total_sample_count)
self.batch_size = batch_size
self._dataset = self.dataset = build_dataset_from_samples(
samples=samples, groupby=sampling_groupby, weighing=sampling_group_weighing,
shuffle=shuffle, repeat=repeat, seed=self.random)
def get_samples(self, batch=True, structure=None) -> tf.data.Dataset:
"""
Get tensorflow samples as tensorflow dataset without applying maps for e.g. loading data
:param batch: output batched
:param structure: Structure of output, (None, "__default__" or structure of keys)
:return:
"""
ds = self.dataset.batch(self.batch_size) if batch else self.dataset
structure = self.output_structure if structure is "__default__" else structure
if structure is not None:
ds = ds.map(lambda x: map_output_structure(x, structure))
return ds
def get_samples_numpy(self, *args, **kwargs):
"""
Get numpy iterator of samples in dataset, similar interface as .get_samples()
:return:
"""
return NumpyStringIterator(self.get_samples(*args, **kwargs))
def get_dataset(self, batch=True, structure="__default__") -> tf.data.Dataset:
"""
Get tensorflow dataset
:param batch: output batched
:param structure: Structure of output, (None, "__default__" or structure of keys)
:return:
"""
ds = self.dataset.batch(self.batch_size)
ds = self.build_dataset(ds)
assert isinstance(ds, tf.data.Dataset), "Return value of build_dataset must be tensorflow dataset"
ds = self.apply_tfds_actions(ds)
ds = ds if batch else ds.unbatch()
structure = self.output_structure if structure is "__default__" else structure
if structure is not None:
ds = ds.map(lambda x: map_output_structure(x, structure))
return ds.prefetch(tf.data.experimental.AUTOTUNE)
def get_dataset_numpy(self, *args, **kwargs):
"""
Get numpy iterator of Dataset, similar interface as .get_dataset()
:return:
"""
return NumpyStringIterator(self.get_dataset(*args, **kwargs))
def build_dataset(self, ds: tf.data.Dataset) -> tf.data.Dataset:
"""Extend this function to apply special functions to the dataset"""
return ds
def apply_tfds_actions(self, tfds):
for action, func_kwargs, kwargs in self._tfds_actions:
apply_unbatched = hasattr(action, "apply_unbatched") and action.apply_unbatched
if apply_unbatched:
tfds = tfds.unbatch()
tfds = tfds.map(lambda x: action(x, **func_kwargs), **kwargs)
if apply_unbatched:
tfds = tfds.batch(self.batch_size)
return tfds
def get_dataset_actions(self):
"""
Get variable actions performed on datasets.
:return: list of actions, each action consisting of (callable,
args for callable, and args for tensorflow dataset map)
"""
return self._tfds_actions
def map(self, map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE, **kwargs):
"""
:param map_func: an action or a list of actions, for lists None items are skipped
"""
if isinstance(map_func, list):
generator = self
for map_func in map_func:
if map_func is not None:
generator = generator.map(map_func, **kwargs)
return generator
else:
func_kwargs = dict(
seed=int(self.random.randint(1 << 32, dtype=np.uint64))
)
# Check function signature for extra kwargs
allowed = inspect.signature(map_func).parameters
if not any(k for k, v in reversed(allowed.items()) if v.kind == v.VAR_KEYWORD):
func_kwargs = {k: v for k, v in func_kwargs.items() if k in allowed or "kwargs" in allowed}
output = copy.copy(self)
output._tfds_actions = [*output._tfds_actions,
(map_func, func_kwargs, dict(num_parallel_calls=num_parallel_calls, **kwargs))]
return output
def get_debug_info(self):
try:
return self._dataset._ds_metadata
except Exception:
return {}
def __len__(self):
"""The number of batches per epoch"""
return int(np.ceil(self.epoch_samples / self.batch_size))
def __iter__(self):
return iter(self.get_dataset())
class DataGeneratorMap(ABC):
"""
Interface for a mapping function for the datagenerator
Use datagenerator.map(object: DataGeneratorMap) to apply.
Attributes:
apply_unbatched: The mapping is performed on batches or not
"""
apply_unbatched = False
@abstractmethod
def __call__(self, x, seed: int, *args, **kwargs) -> dict:
"""
function to apply map
:param x: dictionary containing keys with data
:param seed: randomly generated seed for pseudorandom generation
:return: dictionary containing keys with data (parameter x)
"""
return x
class FileLoader(DataGeneratorMap, IoBaseModel):
"""
Basic File loading module for DataGenerator
"""
type: Literal["FileLoader"] = "FileLoader"
path_key: str = Field(default="path", exclude=True)
output_key: str = Field(default="data", exclude=True)
metadata_spec: ClassVar[dict] = dict()
@property
def apply_unbatched(self):
"""When using in datagenerator, do so on samples, not batches"""
return True
def load_file_safe(self, path):
return load_file_safe(path, io=self._io)
def load(self, path, metadata: Optional[dict] = None) -> Tuple[Any, Dict[str, Any]]:
"""Loading function, returning data and no metadata about the load"""
data = tf.py_function(self.load_file_safe, [path], tf.string, name="read_image")
return data, {}
def __call__(self, x, *args, **kwargs):
"""Add loaded data to the output key"""
metadata = {k: x[k] if factory is None else factory(x[k])
for k, factory in self.metadata_spec.items() if k in x}
data, meta = self.load(x[self.path_key], metadata=metadata)
x[self.output_key] = data
x.update(meta)
return x
class StratifiedSampler(vue.VueSettingsModule):
def __init__(self, batch_size: int = 32, groupby: list = None,
group_weighing: str = "uniform", max_epoch_samples: int = 10**9,
seed: int = -1):
"""
https://en.wikipedia.org/wiki/Stratified_sampling
:param batch_size: Number of samples per batch
:param groupby: Stratified sample columns to group by when weighing each sample group for sampling
:param group_weighing: Stratfied sampling weighing function to use for weighing the sample groups
supply function or select from ["uniform", "count", "square root", "log"]
:param seed: Seeding of dataset
"""
self.batch_size = batch_size
self.groupby = groupby or None
self.group_weighing = group_weighing
self.max_epoch_samples = max_epoch_samples
self.seed = seed
def get(self, samples, shuffle: bool = False, repeat: bool = False, **kwargs) -> DataGenerator:
"""
:param samples: Pandas dataframe with inputs
:param shuffle: Shuffle items in dataset
:param repeat: Repeat samples from dataset
:param max_epoch_samples: Max number of samples per epoch
"""
kwargs["batch_size"] = kwargs.get("batch_size", self.batch_size)
kwargs["max_epoch_samples"] = kwargs.get("max_epoch_samples", self.max_epoch_samples)
kwargs["seed"] = kwargs.get("seed", None if self.seed < 0 else self.seed)
return DataGenerator(samples, shuffle=shuffle, repeat=repeat,
sampling_groupby=self.groupby, sampling_group_weighing=self.group_weighing,
**kwargs)
@classmethod
def to_schema(cls, builder, name, ptype, default, **kwargs):
if name == "group_weighing":
builder.add_field(vue.select("Sampling Group Weighing", model=name, default=ptype(default), **kwargs,
values=list(weighing_presets.keys())))
else:
return super().to_schema(builder=builder, name=name, ptype=ptype, default=default, **kwargs)
def predict_dataset(model, dataset, map_output=None):
"""
Predict results of model given dataset
:param model:
:param dataset:
:param map_output:
:return:
"""
prediction_func = model.predict_on_batch if isinstance(model, tf.keras.Model) else model
ds = tf.data.Dataset.zip((dataset.get_samples(batch=True), dataset.get_dataset()))
for samples, (x, y) in tqdm(ds.take(len(dataset)), total=len(dataset), mininterval=2):
if isinstance(x, dict):
yhat = prediction_func(**x)
else:
yhat = prediction_func(x)
if not isinstance(yhat, dict):
outputs = tuple(x.name.split("/")[0] for x in model.outputs)
if len(outputs) == 1:
yhat = {outputs[0]: yhat}
else:
yhat = {k: v for k, v in zip(outputs, yhat)}
if map_output is not None:
yhat = map_output(yhat)
yield {**samples, **yhat}
class OneHotEncoder(vue.VueSettingsModule):
def __init__(self, classes, input_key="category", output_key="onehot"):
self.classes = classes
self.input_key = input_key
self.output_key = output_key
items = len(classes)
assert items > 0, "Number of classes should be larger than zero"
# Build mapping table to indices
self.class_table = tf.lookup.StaticHashTable(
initializer=tf.lookup.KeyValueTensorInitializer(
keys=tf.constant(classes),
values=tf.range(items),
),
default_value=tf.constant(items),
name="class_weight"
)
# Build encoding table from indices to encoding
self.encoding = tf.eye(items + 1, items)
@classmethod
def to_schema(cls, builder, name, ptype, default, **kwargs):
if name in {"input_key", "output_key"}:
return
else:
return super().to_schema(builder=builder, name=name, ptype=ptype, default=default, **kwargs)
def encode(self, item):
class_idx = self.class_table.lookup(item)
enc = tf.gather(self.encoding, class_idx)
return enc
def __call__(self, x, *args, **kwargs):
x[self.output_key] = self.encode(x[self.input_key])
return x
def build_image_data_generator(samples, classes=None, image=None, augmentation=None, *args, **kwargs):
"""
Utility function for building a default image dataset with images at "path" and class definitions at "category"
outputting image and onehot encoded class
:param samples: Pandas dataframe of samples, with at least columns (path, category)
:param classes: list of classes or none to autodetect from samples
:param image: kwargs for ImageLoader
:param augmentation: kwargs for ImageAugmenter
:param args: args for TfDataset
:param kwargs: args for TfDataset
:return: (image, onehot)
"""
from brevettiai.data.image import ImagePipeline
if classes is None:
class_space = set(samples.category.unique())
classes = set(item for sublist in class_space for item in sublist if item != "__UNLABELED__")
classes = list(sorted(classes))
image = image or {}
image = ImagePipeline(**image) if isinstance(image, dict) else image
ds = DataGenerator(samples, output_structure=("img", "onehot"), *args, **kwargs) \
.map(image)
if augmentation is not None:
from brevettiai.data.image.image_augmenter import ImageAugmenter
augmentation = ImageAugmenter(**augmentation) if isinstance(augmentation, dict) else augmentation
ds = ds.map(augmentation)
ds = ds.map(OneHotEncoder(classes=classes))
return ds
Functions
def build_dataset_from_samples(samples, groupby='category', weighing='uniform', shuffle=True, repeat=True, seed=None)
-
Build tensorflow dataset from pandas dataframe with oversampling of groups :param samples: :param groupby: :param weighing: :param shuffle: :param repeat: :param seed: seed or np.random.RandomState :return:
Expand source code
def build_dataset_from_samples(samples, groupby="category", weighing="uniform", shuffle=True, repeat=True, seed=None): """ Build tensorflow dataset from pandas dataframe with oversampling of groups :param samples: :param groupby: :param weighing: :param shuffle: :param repeat: :param seed: seed or np.random.RandomState :return: """ rand = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed) ds_metadata = {} if not isinstance(samples, pd.DataFrame): samples = pd.DataFrame(samples) else: samples = samples.copy() samples = samples.apply(_downcast, axis=0) if isinstance(groupby, str): groupby = sampling_groupby_presets.get(groupby, [groupby]) sample_grouper, weight = None, None if groupby is not None: sampling_group_col_name = "_sampling_group" sample_grouper = samples.groupby(groupby) samples[sampling_group_col_name] = sample_grouper.ngroup() weighing_fn = parse_weighing(weighing) weight = np.array([weighing_fn(len(i), x) for x, i in sample_grouper.groups.items()]) weight = weight / weight.sum() # Perform mapping of ragged tuple elements and categoricals before input to tensorflow dataset samples, colmapping = item_mapping(samples) # Perform oversampling of datasets if groupby is not None: datasets = [get_dataset(v[colmapping.keys()], shuffle=shuffle, repeat=repeat, seed=rand) for key, v in sample_grouper] if shuffle: ds = tf.data.experimental.sample_from_datasets(datasets, weights=weight, seed=np.frombuffer(rand.bytes(8), dtype=np.int64)[0]) else: selector = tf.data.Dataset.from_generator(weighted_dataset_selector(weight), tf.int64) ds = tf.data.experimental.choose_from_datasets(datasets, selector) ds_metadata["sample_weight"] = dict(zip(sample_grouper.groups.keys(), weight)) else: ds = get_dataset(samples[colmapping.keys()], shuffle=shuffle, repeat=repeat, seed=rand) # Reverse map indexes ds = ds.map(lambda x: {name: func(x[k]) for k, (name, func) in colmapping.items()}) ds._ds_metadata = ds_metadata return ds
def build_image_data_generator(samples, classes=None, image=None, augmentation=None, *args, **kwargs)
-
Utility function for building a default image dataset with images at "path" and class definitions at "category" outputting image and onehot encoded class :param samples: Pandas dataframe of samples, with at least columns (path, category) :param classes: list of classes or none to autodetect from samples :param image: kwargs for ImageLoader :param augmentation: kwargs for ImageAugmenter :param args: args for TfDataset :param kwargs: args for TfDataset :return: (image, onehot)
Expand source code
def build_image_data_generator(samples, classes=None, image=None, augmentation=None, *args, **kwargs): """ Utility function for building a default image dataset with images at "path" and class definitions at "category" outputting image and onehot encoded class :param samples: Pandas dataframe of samples, with at least columns (path, category) :param classes: list of classes or none to autodetect from samples :param image: kwargs for ImageLoader :param augmentation: kwargs for ImageAugmenter :param args: args for TfDataset :param kwargs: args for TfDataset :return: (image, onehot) """ from brevettiai.data.image import ImagePipeline if classes is None: class_space = set(samples.category.unique()) classes = set(item for sublist in class_space for item in sublist if item != "__UNLABELED__") classes = list(sorted(classes)) image = image or {} image = ImagePipeline(**image) if isinstance(image, dict) else image ds = DataGenerator(samples, output_structure=("img", "onehot"), *args, **kwargs) \ .map(image) if augmentation is not None: from brevettiai.data.image.image_augmenter import ImageAugmenter augmentation = ImageAugmenter(**augmentation) if isinstance(augmentation, dict) else augmentation ds = ds.map(augmentation) ds = ds.map(OneHotEncoder(classes=classes)) return ds
def get_dataset(df, shuffle, repeat, seed=None)
-
Build simple tensorflow dataset from pandas dataframe :param df: :param shuffle: :param repeat: :param seed: seed or np.random.RandomState :return:
Expand source code
def get_dataset(df, shuffle, repeat, seed=None): """ Build simple tensorflow dataset from pandas dataframe :param df: :param shuffle: :param repeat: :param seed: seed or np.random.RandomState :return: """ rand = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed) if shuffle: df = df.iloc[rand.permutation(np.arange(len(df)))] ds = tf.data.Dataset.from_tensor_slices({c: df[c].values for c in df.columns}) if repeat: ds = ds.repeat(-1 if repeat is True else repeat) if shuffle: ds = ds.shuffle(min(len(df), 1024), seed=np.frombuffer(rand.bytes(8), dtype=np.int64)[0], reshuffle_each_iteration=False) return ds
def item_mapping(df)
-
Expand source code
def item_mapping(df): mapping = {} for name in df.columns: col = df[name] mapping_name = f"_{name}_mapping" if col.dtype.name == "category": df.loc[:, mapping_name] = col.cat.codes lookup_tbl = tf.ragged.constant(col.cat.categories.values, name=f"{name}lookup") mapping[mapping_name] = name, lambda x, tbl=lookup_tbl: tbl[tf.cast(x, tf.int32)] if col.apply(pd.api.types.is_list_like).any(): if col.apply(pd.api.types.is_hashable).all(): grp = df.groupby(name) df.loc[:, mapping_name] = grp.ngroup() try: lookup_tbl = tf.ragged.constant([k for k, v in grp], name=f"{name}lookup") except ValueError: lookup_tbl = tf.ragged.constant([tuple(k) for k, v in grp], name=f"{name}lookup") else: df.loc[:, mapping_name] = np.arange(col.size) try: lookup_tbl = tf.constant(col.values.tolist()) except ValueError: lookup_tbl = tf.ragged.constant(col.values.tolist()) mapping[mapping_name] = name, lambda x, tbl=lookup_tbl: tbl[tf.cast(x, tf.int32)] else: mapping[name] = name, lambda x: x return df[mapping.keys()], mapping
def map_output_structure(x, structure)
-
Expand source code
def map_output_structure(x, structure): keys = tf.nest.flatten(structure) return tf.nest.pack_sequence_as(structure, [x[k] for k in keys])
def parse_weighing(weighing)
-
Expand source code
def parse_weighing(weighing): if isinstance(weighing, str): weighing = weighing_presets[weighing] def _get_weights_safe(count, group): try: return weighing(count, group) except TypeError: return weighing(count) return _get_weights_safe
def predict_dataset(model, dataset, map_output=None)
-
Predict results of model given dataset :param model: :param dataset: :param map_output: :return:
Expand source code
def predict_dataset(model, dataset, map_output=None): """ Predict results of model given dataset :param model: :param dataset: :param map_output: :return: """ prediction_func = model.predict_on_batch if isinstance(model, tf.keras.Model) else model ds = tf.data.Dataset.zip((dataset.get_samples(batch=True), dataset.get_dataset())) for samples, (x, y) in tqdm(ds.take(len(dataset)), total=len(dataset), mininterval=2): if isinstance(x, dict): yhat = prediction_func(**x) else: yhat = prediction_func(x) if not isinstance(yhat, dict): outputs = tuple(x.name.split("/")[0] for x in model.outputs) if len(outputs) == 1: yhat = {outputs[0]: yhat} else: yhat = {k: v for k, v in zip(outputs, yhat)} if map_output is not None: yhat = map_output(yhat) yield {**samples, **yhat}
def weighted_dataset_selector(weight)
-
Expand source code
def weighted_dataset_selector(weight): def selector_gen(): cweight = np.cumsum(weight) step = weight.min() / 2 state = 0 while True: state += step yield np.sum(cweight < state % 1.0) return selector_gen
Classes
class DataGenerator (samples, batch_size: int = 32, shuffle: bool = False, repeat: bool = False, sampling_groupby: str = None, sampling_group_weighing: str = 'uniform', seed: int = None, output_structure: tuple = None, max_epoch_samples: int = inf)
-
Dataset helper based on Tensorflow datasets, capable of seeding, weighted sampling, and tracking datasets for logs. :param samples: Pandas dataframe with inputs :param batch_size: Number of samples per batch :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param sampling_groupby: Stratified sample columns to group by when weighing each sample group for sampling :param sampling_group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"] :param seed: Seeding of dataset :param output_structure: default output structure (tuples with keys) of dataset or None for full dictionary :param max_epoch_samples: Max number of samples per epoch
Expand source code
class DataGenerator: def __init__(self, samples, batch_size: int = 32, shuffle: bool = False, repeat: bool = False, sampling_groupby: str = None, sampling_group_weighing: str = "uniform", seed: int = None, output_structure: tuple = None, max_epoch_samples: int = np.inf): """ Dataset helper based on Tensorflow datasets, capable of seeding, weighted sampling, and tracking datasets for logs. :param samples: Pandas dataframe with inputs :param batch_size: Number of samples per batch :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param sampling_groupby: Stratified sample columns to group by when weighing each sample group for sampling :param sampling_group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"] :param seed: Seeding of dataset :param output_structure: default output structure (tuples with keys) of dataset or None for full dictionary :param max_epoch_samples: Max number of samples per epoch """ self.random = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed=seed) self.output_structure = output_structure self._tfds_actions = [] if not isinstance(samples, pd.DataFrame): samples = pd.DataFrame(list(samples) if isinstance(samples, np.ndarray) else samples) self.total_sample_count = len(samples) self.epoch_samples = min(max_epoch_samples, self.total_sample_count) self.batch_size = batch_size self._dataset = self.dataset = build_dataset_from_samples( samples=samples, groupby=sampling_groupby, weighing=sampling_group_weighing, shuffle=shuffle, repeat=repeat, seed=self.random) def get_samples(self, batch=True, structure=None) -> tf.data.Dataset: """ Get tensorflow samples as tensorflow dataset without applying maps for e.g. loading data :param batch: output batched :param structure: Structure of output, (None, "__default__" or structure of keys) :return: """ ds = self.dataset.batch(self.batch_size) if batch else self.dataset structure = self.output_structure if structure is "__default__" else structure if structure is not None: ds = ds.map(lambda x: map_output_structure(x, structure)) return ds def get_samples_numpy(self, *args, **kwargs): """ Get numpy iterator of samples in dataset, similar interface as .get_samples() :return: """ return NumpyStringIterator(self.get_samples(*args, **kwargs)) def get_dataset(self, batch=True, structure="__default__") -> tf.data.Dataset: """ Get tensorflow dataset :param batch: output batched :param structure: Structure of output, (None, "__default__" or structure of keys) :return: """ ds = self.dataset.batch(self.batch_size) ds = self.build_dataset(ds) assert isinstance(ds, tf.data.Dataset), "Return value of build_dataset must be tensorflow dataset" ds = self.apply_tfds_actions(ds) ds = ds if batch else ds.unbatch() structure = self.output_structure if structure is "__default__" else structure if structure is not None: ds = ds.map(lambda x: map_output_structure(x, structure)) return ds.prefetch(tf.data.experimental.AUTOTUNE) def get_dataset_numpy(self, *args, **kwargs): """ Get numpy iterator of Dataset, similar interface as .get_dataset() :return: """ return NumpyStringIterator(self.get_dataset(*args, **kwargs)) def build_dataset(self, ds: tf.data.Dataset) -> tf.data.Dataset: """Extend this function to apply special functions to the dataset""" return ds def apply_tfds_actions(self, tfds): for action, func_kwargs, kwargs in self._tfds_actions: apply_unbatched = hasattr(action, "apply_unbatched") and action.apply_unbatched if apply_unbatched: tfds = tfds.unbatch() tfds = tfds.map(lambda x: action(x, **func_kwargs), **kwargs) if apply_unbatched: tfds = tfds.batch(self.batch_size) return tfds def get_dataset_actions(self): """ Get variable actions performed on datasets. :return: list of actions, each action consisting of (callable, args for callable, and args for tensorflow dataset map) """ return self._tfds_actions def map(self, map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE, **kwargs): """ :param map_func: an action or a list of actions, for lists None items are skipped """ if isinstance(map_func, list): generator = self for map_func in map_func: if map_func is not None: generator = generator.map(map_func, **kwargs) return generator else: func_kwargs = dict( seed=int(self.random.randint(1 << 32, dtype=np.uint64)) ) # Check function signature for extra kwargs allowed = inspect.signature(map_func).parameters if not any(k for k, v in reversed(allowed.items()) if v.kind == v.VAR_KEYWORD): func_kwargs = {k: v for k, v in func_kwargs.items() if k in allowed or "kwargs" in allowed} output = copy.copy(self) output._tfds_actions = [*output._tfds_actions, (map_func, func_kwargs, dict(num_parallel_calls=num_parallel_calls, **kwargs))] return output def get_debug_info(self): try: return self._dataset._ds_metadata except Exception: return {} def __len__(self): """The number of batches per epoch""" return int(np.ceil(self.epoch_samples / self.batch_size)) def __iter__(self): return iter(self.get_dataset())
Methods
def apply_tfds_actions(self, tfds)
-
Expand source code
def apply_tfds_actions(self, tfds): for action, func_kwargs, kwargs in self._tfds_actions: apply_unbatched = hasattr(action, "apply_unbatched") and action.apply_unbatched if apply_unbatched: tfds = tfds.unbatch() tfds = tfds.map(lambda x: action(x, **func_kwargs), **kwargs) if apply_unbatched: tfds = tfds.batch(self.batch_size) return tfds
def build_dataset(self, ds: tensorflow.python.data.ops.dataset_ops.DatasetV2) ‑> tensorflow.python.data.ops.dataset_ops.DatasetV2
-
Extend this function to apply special functions to the dataset
Expand source code
def build_dataset(self, ds: tf.data.Dataset) -> tf.data.Dataset: """Extend this function to apply special functions to the dataset""" return ds
def get_dataset(self, batch=True, structure='__default__') ‑> tensorflow.python.data.ops.dataset_ops.DatasetV2
-
Get tensorflow dataset :param batch: output batched :param structure: Structure of output, (None, "default" or structure of keys) :return:
Expand source code
def get_dataset(self, batch=True, structure="__default__") -> tf.data.Dataset: """ Get tensorflow dataset :param batch: output batched :param structure: Structure of output, (None, "__default__" or structure of keys) :return: """ ds = self.dataset.batch(self.batch_size) ds = self.build_dataset(ds) assert isinstance(ds, tf.data.Dataset), "Return value of build_dataset must be tensorflow dataset" ds = self.apply_tfds_actions(ds) ds = ds if batch else ds.unbatch() structure = self.output_structure if structure is "__default__" else structure if structure is not None: ds = ds.map(lambda x: map_output_structure(x, structure)) return ds.prefetch(tf.data.experimental.AUTOTUNE)
def get_dataset_actions(self)
-
Get variable actions performed on datasets.
:return: list of actions, each action consisting of (callable, args for callable, and args for tensorflow dataset map)
Expand source code
def get_dataset_actions(self): """ Get variable actions performed on datasets. :return: list of actions, each action consisting of (callable, args for callable, and args for tensorflow dataset map) """ return self._tfds_actions
def get_dataset_numpy(self, *args, **kwargs)
-
Get numpy iterator of Dataset, similar interface as .get_dataset() :return:
Expand source code
def get_dataset_numpy(self, *args, **kwargs): """ Get numpy iterator of Dataset, similar interface as .get_dataset() :return: """ return NumpyStringIterator(self.get_dataset(*args, **kwargs))
def get_debug_info(self)
-
Expand source code
def get_debug_info(self): try: return self._dataset._ds_metadata except Exception: return {}
def get_samples(self, batch=True, structure=None) ‑> tensorflow.python.data.ops.dataset_ops.DatasetV2
-
Get tensorflow samples as tensorflow dataset without applying maps for e.g. loading data :param batch: output batched :param structure: Structure of output, (None, "default" or structure of keys) :return:
Expand source code
def get_samples(self, batch=True, structure=None) -> tf.data.Dataset: """ Get tensorflow samples as tensorflow dataset without applying maps for e.g. loading data :param batch: output batched :param structure: Structure of output, (None, "__default__" or structure of keys) :return: """ ds = self.dataset.batch(self.batch_size) if batch else self.dataset structure = self.output_structure if structure is "__default__" else structure if structure is not None: ds = ds.map(lambda x: map_output_structure(x, structure)) return ds
def get_samples_numpy(self, *args, **kwargs)
-
Get numpy iterator of samples in dataset, similar interface as .get_samples() :return:
Expand source code
def get_samples_numpy(self, *args, **kwargs): """ Get numpy iterator of samples in dataset, similar interface as .get_samples() :return: """ return NumpyStringIterator(self.get_samples(*args, **kwargs))
def map(self, map_func, num_parallel_calls=-1, **kwargs)
-
:param map_func: an action or a list of actions, for lists None items are skipped
Expand source code
def map(self, map_func, num_parallel_calls=tf.data.experimental.AUTOTUNE, **kwargs): """ :param map_func: an action or a list of actions, for lists None items are skipped """ if isinstance(map_func, list): generator = self for map_func in map_func: if map_func is not None: generator = generator.map(map_func, **kwargs) return generator else: func_kwargs = dict( seed=int(self.random.randint(1 << 32, dtype=np.uint64)) ) # Check function signature for extra kwargs allowed = inspect.signature(map_func).parameters if not any(k for k, v in reversed(allowed.items()) if v.kind == v.VAR_KEYWORD): func_kwargs = {k: v for k, v in func_kwargs.items() if k in allowed or "kwargs" in allowed} output = copy.copy(self) output._tfds_actions = [*output._tfds_actions, (map_func, func_kwargs, dict(num_parallel_calls=num_parallel_calls, **kwargs))] return output
class DataGeneratorMap
-
Interface for a mapping function for the datagenerator Use datagenerator.map(object: DataGeneratorMap) to apply.
Attributes
apply_unbatched
- The mapping is performed on batches or not
Expand source code
class DataGeneratorMap(ABC): """ Interface for a mapping function for the datagenerator Use datagenerator.map(object: DataGeneratorMap) to apply. Attributes: apply_unbatched: The mapping is performed on batches or not """ apply_unbatched = False @abstractmethod def __call__(self, x, seed: int, *args, **kwargs) -> dict: """ function to apply map :param x: dictionary containing keys with data :param seed: randomly generated seed for pseudorandom generation :return: dictionary containing keys with data (parameter x) """ return x
Ancestors
- abc.ABC
Subclasses
Class variables
var apply_unbatched
class FileLoader (io=<brevettiai.io.utils.IoTools object>, **data)
-
Basic File loading module for DataGenerator
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class FileLoader(DataGeneratorMap, IoBaseModel): """ Basic File loading module for DataGenerator """ type: Literal["FileLoader"] = "FileLoader" path_key: str = Field(default="path", exclude=True) output_key: str = Field(default="data", exclude=True) metadata_spec: ClassVar[dict] = dict() @property def apply_unbatched(self): """When using in datagenerator, do so on samples, not batches""" return True def load_file_safe(self, path): return load_file_safe(path, io=self._io) def load(self, path, metadata: Optional[dict] = None) -> Tuple[Any, Dict[str, Any]]: """Loading function, returning data and no metadata about the load""" data = tf.py_function(self.load_file_safe, [path], tf.string, name="read_image") return data, {} def __call__(self, x, *args, **kwargs): """Add loaded data to the output key""" metadata = {k: x[k] if factory is None else factory(x[k]) for k, factory in self.metadata_spec.items() if k in x} data, meta = self.load(x[self.path_key], metadata=metadata) x[self.output_key] = data x.update(meta) return x
Ancestors
- DataGeneratorMap
- abc.ABC
- IoBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Class variables
var metadata_spec : ClassVar[dict]
var output_key : str
var path_key : str
var type : typing_extensions.Literal['FileLoader']
Instance variables
var apply_unbatched
-
When using in datagenerator, do so on samples, not batches
Expand source code
@property def apply_unbatched(self): """When using in datagenerator, do so on samples, not batches""" return True
Methods
def load(self, path, metadata: Optional[dict] = None) ‑> Tuple[Any, Dict[str, Any]]
-
Loading function, returning data and no metadata about the load
Expand source code
def load(self, path, metadata: Optional[dict] = None) -> Tuple[Any, Dict[str, Any]]: """Loading function, returning data and no metadata about the load""" data = tf.py_function(self.load_file_safe, [path], tf.string, name="read_image") return data, {}
def load_file_safe(self, path)
-
Expand source code
def load_file_safe(self, path): return load_file_safe(path, io=self._io)
class OneHotEncoder (classes, input_key='category', output_key='onehot')
-
Base class for serializable modules
Expand source code
class OneHotEncoder(vue.VueSettingsModule): def __init__(self, classes, input_key="category", output_key="onehot"): self.classes = classes self.input_key = input_key self.output_key = output_key items = len(classes) assert items > 0, "Number of classes should be larger than zero" # Build mapping table to indices self.class_table = tf.lookup.StaticHashTable( initializer=tf.lookup.KeyValueTensorInitializer( keys=tf.constant(classes), values=tf.range(items), ), default_value=tf.constant(items), name="class_weight" ) # Build encoding table from indices to encoding self.encoding = tf.eye(items + 1, items) @classmethod def to_schema(cls, builder, name, ptype, default, **kwargs): if name in {"input_key", "output_key"}: return else: return super().to_schema(builder=builder, name=name, ptype=ptype, default=default, **kwargs) def encode(self, item): class_idx = self.class_table.lookup(item) enc = tf.gather(self.encoding, class_idx) return enc def __call__(self, x, *args, **kwargs): x[self.output_key] = self.encode(x[self.input_key]) return x
Ancestors
Methods
def encode(self, item)
-
Expand source code
def encode(self, item): class_idx = self.class_table.lookup(item) enc = tf.gather(self.encoding, class_idx) return enc
Inherited members
class StratifiedSampler (batch_size: int = 32, groupby: list = None, group_weighing: str = 'uniform', max_epoch_samples: int = 1000000000, seed: int = -1)
-
Base class for serializable modules
https://en.wikipedia.org/wiki/Stratified_sampling :param batch_size: Number of samples per batch :param groupby: Stratified sample columns to group by when weighing each sample group for sampling :param group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"] :param seed: Seeding of dataset
Expand source code
class StratifiedSampler(vue.VueSettingsModule): def __init__(self, batch_size: int = 32, groupby: list = None, group_weighing: str = "uniform", max_epoch_samples: int = 10**9, seed: int = -1): """ https://en.wikipedia.org/wiki/Stratified_sampling :param batch_size: Number of samples per batch :param groupby: Stratified sample columns to group by when weighing each sample group for sampling :param group_weighing: Stratfied sampling weighing function to use for weighing the sample groups supply function or select from ["uniform", "count", "square root", "log"] :param seed: Seeding of dataset """ self.batch_size = batch_size self.groupby = groupby or None self.group_weighing = group_weighing self.max_epoch_samples = max_epoch_samples self.seed = seed def get(self, samples, shuffle: bool = False, repeat: bool = False, **kwargs) -> DataGenerator: """ :param samples: Pandas dataframe with inputs :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param max_epoch_samples: Max number of samples per epoch """ kwargs["batch_size"] = kwargs.get("batch_size", self.batch_size) kwargs["max_epoch_samples"] = kwargs.get("max_epoch_samples", self.max_epoch_samples) kwargs["seed"] = kwargs.get("seed", None if self.seed < 0 else self.seed) return DataGenerator(samples, shuffle=shuffle, repeat=repeat, sampling_groupby=self.groupby, sampling_group_weighing=self.group_weighing, **kwargs) @classmethod def to_schema(cls, builder, name, ptype, default, **kwargs): if name == "group_weighing": builder.add_field(vue.select("Sampling Group Weighing", model=name, default=ptype(default), **kwargs, values=list(weighing_presets.keys()))) else: return super().to_schema(builder=builder, name=name, ptype=ptype, default=default, **kwargs)
Ancestors
Methods
def get(self, samples, shuffle: bool = False, repeat: bool = False, **kwargs) ‑> DataGenerator
-
:param samples: Pandas dataframe with inputs :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param max_epoch_samples: Max number of samples per epoch
Expand source code
def get(self, samples, shuffle: bool = False, repeat: bool = False, **kwargs) -> DataGenerator: """ :param samples: Pandas dataframe with inputs :param shuffle: Shuffle items in dataset :param repeat: Repeat samples from dataset :param max_epoch_samples: Max number of samples per epoch """ kwargs["batch_size"] = kwargs.get("batch_size", self.batch_size) kwargs["max_epoch_samples"] = kwargs.get("max_epoch_samples", self.max_epoch_samples) kwargs["seed"] = kwargs.get("seed", None if self.seed < 0 else self.seed) return DataGenerator(samples, shuffle=shuffle, repeat=repeat, sampling_groupby=self.groupby, sampling_group_weighing=self.group_weighing, **kwargs)
Inherited members