Module brevettiai.data.sample_integrity
Expand source code
import logging
import mmh3
import json
from brevettiai.interfaces import vue_schema_utils as vue
from brevettiai.io import io_tools
from io import BytesIO
import pandas as pd
import numpy as np
log = logging.getLogger(__name__)
def merge_sample_identification(df, dfid, on="etag"):
    """
    Merge sample identification traits onto dataframe, such that values (excluding NA) are transfered to the dataframe
    :param df: Dataframe
    :param dfid: identification dataframe, with index as parameter named by on, by default 'etag'
    :param on: column name on df to match with identification
    :return: df, extra_ids (merged dataframe and ids, and ids not present among samples
    """
    # Reindex id file to match new samples
    extra_ids = dfid[~dfid.index.isin(df[on])]
    dfid = dfid.reindex(df[on])
    # combine sample identification information with samples
    for c in dfid.columns:
        col = dfid[c]
        mask = col.isna()
        if mask.any() and c in df.columns:
            df[c][~mask.values] = col[~mask].values
        else:
            df[c] = col.values
    return df, extra_ids
def load_sample_identification(df, path, column="purpose", io=io_tools, **kwargs):
    """
    Load and join sample identification information onto dataframe of samples
    :param df: sample dataframe
    :param path: path to sample id file
    :param column: name of split column
    :param kwargs: extra args for io_tools.read_file
    :return: df, extra_ids
    """
    dfid = pd.read_csv(BytesIO(io.read_file(path, **kwargs)), index_col="etag")
    if column not in dfid.columns:
        dfid.rename(columns={dfid.columns[0]: column})
    return merge_sample_identification(df, dfid, on="etag")
def save_sample_identification(df, path, known_ids=None, column="purpose", io=io_tools):
    columns = ["etag", column]
    df = df[columns].set_index("etag")
    if df.index.has_duplicates:
        log.info("Duplicate etag entries among samples, saving highest priority purpose")
        df = df.iloc[np.argsort(df.purpose.map({"train": 1, "devel": 2, "development": 2, "test": 3}).fillna(4))]
        df = df[~df.index.duplicated(keep="first")]
    io.write_file(path, df.append(known_ids).to_csv(header=True))
class SampleSplit(vue.VueSettingsModule):
    MODE_MURMURHASH3 = "murmurhash3"
    MODE_SORTED_PERMUTATION = "sorted_permutation"
    def __init__(self, stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1,
                 mode=MODE_SORTED_PERMUTATION):
        """
        :param stratification: As regex string performed on df.path or list selecting columns
        :param uniqueness: As regex string performed on df.path or list selecting columns
        :param split: fraction of samples to apply the purpose on
        :param seed: seeding for assignment
        :param mode: ' or 'murmurhash3'
        :return:
        """
        self.stratification = stratification
        try:
            if isinstance(uniqueness, str):
                uniqueness = json.loads(uniqueness)
        except json.JSONDecodeError:
            pass
        self.uniqueness = uniqueness or ["etag"]
        self.split = split
        self.seed = seed
        self.mode = mode
        assert self.mode in {self.MODE_MURMURHASH3, self.MODE_SORTED_PERMUTATION}
    def assign(self, df, purpose="train", remainder=None, column="purpose"):
        """
        Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.
        Definitions:
        * Stratification: Grouping of samples which should be treated as individual groups.
        meaning every group must be split according to the sample split target percentage,
        and uniqueness is performed on a per group basis
        * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.
        :param df: pd.DataFrame of samples if purpose column does not exist it is added
        :param purpose: purpose to be assigned
        :param remainder: purpose to assign remainder samples, or None to leave unassigned
        :param column: column for assignment of split category
        """
        # Ensure columns
        if column not in df:
            df[column] = pd.NA
        columns = df.columns
        split = self.split
        stratification = self.stratification
        uniqueness = self.uniqueness
        if split == 0 or ~df.purpose.isna().any():  # Assign no samples
            pass
        elif split == 1:  # Assign all samples
            df.loc[df.purpose.isna(), column] = purpose
        else:
            # Parse regex stratification and uniqueness strategies
            if isinstance(stratification, str) and stratification:
                df["_stratification"] = df.path.str.extract(stratification)[0]
                stratification = ["_stratification"]
            assert stratification is None or all(x in df.columns for x in stratification), \
                "stratification should be None or in columns"
            if isinstance(uniqueness, str) and uniqueness:
                df["_uniqueness"] = df.path.str.extract(uniqueness)[0]
                uniqueness = ["_uniqueness"]
            assert uniqueness is None or all(x in df.columns for x in uniqueness), \
                "uniqueness should be None or in columns"
            seed = None if self.seed < 0 else self.seed
            rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed)
            def _split(g):
                if uniqueness:
                    items = g[uniqueness + [column]].copy()
                    items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2})
                    items = items.sort_values("_purpose_prio")[uniqueness + [column]]
                    unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"])
                    unique_items.columns = ["samples", column]
                    unique_items = unique_items.reset_index()
                else:
                    unique_items = g[[column]].reset_index(drop=True)
                    unique_items["samples"] = 1
                # split unmarked items
                unmarked = unique_items[unique_items.purpose.isna()]
                # mode
                if unmarked.size > 0:
                    if self.mode == self.MODE_MURMURHASH3:
                        # Random seed for this stratified group
                        mmh_seed = rng.randint(0x7FFFFFFF)
                        # Extract uniqueness for hashing
                        if uniqueness:
                            unique_df = unmarked[uniqueness]
                        else:
                            unique_df = pd.DataFrame(unmarked.index)
                        # Apply mmh3 hashing
                        hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False),
                                                axis=1)
                        # Assign
                        unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose
                    elif self.mode == self.MODE_SORTED_PERMUTATION or True:  # default
                        # Select unmarked to assign
                        items_count = unique_items.samples.sum()
                        marked_count = unique_items.samples[unique_items.purpose == purpose].sum()
                        assign_count = items_count * split - marked_count
                        unmarked = rng.permutation(unmarked.index)
                        cdf = unique_items.samples[unmarked].cumsum()
                        ix = np.searchsorted(cdf.values, assign_count, side="right")
                        if len(cdf.values) > ix:
                            ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1])))
                        # Assign
                        unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose
                if uniqueness:
                    g.loc[:, column] = unique_items.set_index(uniqueness) \
                        .loc[g[uniqueness].set_index(uniqueness).index].purpose.values
                else:
                    g.loc[:, column] = unique_items.purpose.values
                return g
            if stratification:
                df = df.groupby(stratification).apply(_split)
            else:
                df = _split(df)
        if remainder:
            df.loc[df.purpose.isna(), column] = remainder
        # Ensure etag is unique across all stratified groups
        #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values
        return df[columns]
    def update_unassigned(self, df, id_path,
                          purpose="train", remainder="devel", column="purpose", io=io_tools):
        """
        Updates sample purpose in id_path that may hold previous dataset splits and sample ids
        Unassigned samples are also assigned and id_path is updated
        :param df: pd.DataFrame containing the samples
        :param id_path: path to the identification csv file
        :param purpose: Purpose to assign
        :param remainder: Purpose to assign to remainder or none to leave unassigned
        :param column: Column to assign split purposes to
        :return:
        """
        log.info("Looking for previous train / development split")
        known_ids = None
        if io.isfile(id_path):
            df, known_ids = load_sample_identification(df, id_path, column=column, io=io)
            log.info("Using train / development split from run cached in artifacts")
        else:
            log.info("No initial sample identification file found")
        df = self.assign(df, purpose=purpose, remainder=remainder, column=column)
        save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io)
        return df
Functions
def load_sample_identification(df, path, column='purpose', io=<brevettiai.io.utils.IoTools object>, **kwargs)- 
Load and join sample identification information onto dataframe of samples :param df: sample dataframe :param path: path to sample id file :param column: name of split column :param kwargs: extra args for io_tools.read_file :return: df, extra_ids
Expand source code
def load_sample_identification(df, path, column="purpose", io=io_tools, **kwargs): """ Load and join sample identification information onto dataframe of samples :param df: sample dataframe :param path: path to sample id file :param column: name of split column :param kwargs: extra args for io_tools.read_file :return: df, extra_ids """ dfid = pd.read_csv(BytesIO(io.read_file(path, **kwargs)), index_col="etag") if column not in dfid.columns: dfid.rename(columns={dfid.columns[0]: column}) return merge_sample_identification(df, dfid, on="etag") def merge_sample_identification(df, dfid, on='etag')- 
Merge sample identification traits onto dataframe, such that values (excluding NA) are transfered to the dataframe :param df: Dataframe :param dfid: identification dataframe, with index as parameter named by on, by default 'etag' :param on: column name on df to match with identification :return: df, extra_ids (merged dataframe and ids, and ids not present among samples
Expand source code
def merge_sample_identification(df, dfid, on="etag"): """ Merge sample identification traits onto dataframe, such that values (excluding NA) are transfered to the dataframe :param df: Dataframe :param dfid: identification dataframe, with index as parameter named by on, by default 'etag' :param on: column name on df to match with identification :return: df, extra_ids (merged dataframe and ids, and ids not present among samples """ # Reindex id file to match new samples extra_ids = dfid[~dfid.index.isin(df[on])] dfid = dfid.reindex(df[on]) # combine sample identification information with samples for c in dfid.columns: col = dfid[c] mask = col.isna() if mask.any() and c in df.columns: df[c][~mask.values] = col[~mask].values else: df[c] = col.values return df, extra_ids def save_sample_identification(df, path, known_ids=None, column='purpose', io=<brevettiai.io.utils.IoTools object>)- 
Expand source code
def save_sample_identification(df, path, known_ids=None, column="purpose", io=io_tools): columns = ["etag", column] df = df[columns].set_index("etag") if df.index.has_duplicates: log.info("Duplicate etag entries among samples, saving highest priority purpose") df = df.iloc[np.argsort(df.purpose.map({"train": 1, "devel": 2, "development": 2, "test": 3}).fillna(4))] df = df[~df.index.duplicated(keep="first")] io.write_file(path, df.append(known_ids).to_csv(header=True)) 
Classes
class SampleSplit (stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1, mode='sorted_permutation')- 
Base class for serializable modules
:param stratification: As regex string performed on df.path or list selecting columns :param uniqueness: As regex string performed on df.path or list selecting columns :param split: fraction of samples to apply the purpose on :param seed: seeding for assignment :param mode: ' or 'murmurhash3' :return:
Expand source code
class SampleSplit(vue.VueSettingsModule): MODE_MURMURHASH3 = "murmurhash3" MODE_SORTED_PERMUTATION = "sorted_permutation" def __init__(self, stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1, mode=MODE_SORTED_PERMUTATION): """ :param stratification: As regex string performed on df.path or list selecting columns :param uniqueness: As regex string performed on df.path or list selecting columns :param split: fraction of samples to apply the purpose on :param seed: seeding for assignment :param mode: ' or 'murmurhash3' :return: """ self.stratification = stratification try: if isinstance(uniqueness, str): uniqueness = json.loads(uniqueness) except json.JSONDecodeError: pass self.uniqueness = uniqueness or ["etag"] self.split = split self.seed = seed self.mode = mode assert self.mode in {self.MODE_MURMURHASH3, self.MODE_SORTED_PERMUTATION} def assign(self, df, purpose="train", remainder=None, column="purpose"): """ Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy. Definitions: * Stratification: Grouping of samples which should be treated as individual groups. meaning every group must be split according to the sample split target percentage, and uniqueness is performed on a per group basis * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose. :param df: pd.DataFrame of samples if purpose column does not exist it is added :param purpose: purpose to be assigned :param remainder: purpose to assign remainder samples, or None to leave unassigned :param column: column for assignment of split category """ # Ensure columns if column not in df: df[column] = pd.NA columns = df.columns split = self.split stratification = self.stratification uniqueness = self.uniqueness if split == 0 or ~df.purpose.isna().any(): # Assign no samples pass elif split == 1: # Assign all samples df.loc[df.purpose.isna(), column] = purpose else: # Parse regex stratification and uniqueness strategies if isinstance(stratification, str) and stratification: df["_stratification"] = df.path.str.extract(stratification)[0] stratification = ["_stratification"] assert stratification is None or all(x in df.columns for x in stratification), \ "stratification should be None or in columns" if isinstance(uniqueness, str) and uniqueness: df["_uniqueness"] = df.path.str.extract(uniqueness)[0] uniqueness = ["_uniqueness"] assert uniqueness is None or all(x in df.columns for x in uniqueness), \ "uniqueness should be None or in columns" seed = None if self.seed < 0 else self.seed rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed) def _split(g): if uniqueness: items = g[uniqueness + [column]].copy() items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2}) items = items.sort_values("_purpose_prio")[uniqueness + [column]] unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"]) unique_items.columns = ["samples", column] unique_items = unique_items.reset_index() else: unique_items = g[[column]].reset_index(drop=True) unique_items["samples"] = 1 # split unmarked items unmarked = unique_items[unique_items.purpose.isna()] # mode if unmarked.size > 0: if self.mode == self.MODE_MURMURHASH3: # Random seed for this stratified group mmh_seed = rng.randint(0x7FFFFFFF) # Extract uniqueness for hashing if uniqueness: unique_df = unmarked[uniqueness] else: unique_df = pd.DataFrame(unmarked.index) # Apply mmh3 hashing hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False), axis=1) # Assign unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose elif self.mode == self.MODE_SORTED_PERMUTATION or True: # default # Select unmarked to assign items_count = unique_items.samples.sum() marked_count = unique_items.samples[unique_items.purpose == purpose].sum() assign_count = items_count * split - marked_count unmarked = rng.permutation(unmarked.index) cdf = unique_items.samples[unmarked].cumsum() ix = np.searchsorted(cdf.values, assign_count, side="right") if len(cdf.values) > ix: ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1]))) # Assign unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose if uniqueness: g.loc[:, column] = unique_items.set_index(uniqueness) \ .loc[g[uniqueness].set_index(uniqueness).index].purpose.values else: g.loc[:, column] = unique_items.purpose.values return g if stratification: df = df.groupby(stratification).apply(_split) else: df = _split(df) if remainder: df.loc[df.purpose.isna(), column] = remainder # Ensure etag is unique across all stratified groups #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values return df[columns] def update_unassigned(self, df, id_path, purpose="train", remainder="devel", column="purpose", io=io_tools): """ Updates sample purpose in id_path that may hold previous dataset splits and sample ids Unassigned samples are also assigned and id_path is updated :param df: pd.DataFrame containing the samples :param id_path: path to the identification csv file :param purpose: Purpose to assign :param remainder: Purpose to assign to remainder or none to leave unassigned :param column: Column to assign split purposes to :return: """ log.info("Looking for previous train / development split") known_ids = None if io.isfile(id_path): df, known_ids = load_sample_identification(df, id_path, column=column, io=io) log.info("Using train / development split from run cached in artifacts") else: log.info("No initial sample identification file found") df = self.assign(df, purpose=purpose, remainder=remainder, column=column) save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io) return dfAncestors
Class variables
var MODE_MURMURHASH3var MODE_SORTED_PERMUTATION
Methods
def assign(self, df, purpose='train', remainder=None, column='purpose')- 
Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.
Definitions: * Stratification: Grouping of samples which should be treated as individual groups. meaning every group must be split according to the sample split target percentage, and uniqueness is performed on a per group basis * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.
:param df: pd.DataFrame of samples if purpose column does not exist it is added :param purpose: purpose to be assigned :param remainder: purpose to assign remainder samples, or None to leave unassigned :param column: column for assignment of split category
Expand source code
def assign(self, df, purpose="train", remainder=None, column="purpose"): """ Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy. Definitions: * Stratification: Grouping of samples which should be treated as individual groups. meaning every group must be split according to the sample split target percentage, and uniqueness is performed on a per group basis * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose. :param df: pd.DataFrame of samples if purpose column does not exist it is added :param purpose: purpose to be assigned :param remainder: purpose to assign remainder samples, or None to leave unassigned :param column: column for assignment of split category """ # Ensure columns if column not in df: df[column] = pd.NA columns = df.columns split = self.split stratification = self.stratification uniqueness = self.uniqueness if split == 0 or ~df.purpose.isna().any(): # Assign no samples pass elif split == 1: # Assign all samples df.loc[df.purpose.isna(), column] = purpose else: # Parse regex stratification and uniqueness strategies if isinstance(stratification, str) and stratification: df["_stratification"] = df.path.str.extract(stratification)[0] stratification = ["_stratification"] assert stratification is None or all(x in df.columns for x in stratification), \ "stratification should be None or in columns" if isinstance(uniqueness, str) and uniqueness: df["_uniqueness"] = df.path.str.extract(uniqueness)[0] uniqueness = ["_uniqueness"] assert uniqueness is None or all(x in df.columns for x in uniqueness), \ "uniqueness should be None or in columns" seed = None if self.seed < 0 else self.seed rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed) def _split(g): if uniqueness: items = g[uniqueness + [column]].copy() items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2}) items = items.sort_values("_purpose_prio")[uniqueness + [column]] unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"]) unique_items.columns = ["samples", column] unique_items = unique_items.reset_index() else: unique_items = g[[column]].reset_index(drop=True) unique_items["samples"] = 1 # split unmarked items unmarked = unique_items[unique_items.purpose.isna()] # mode if unmarked.size > 0: if self.mode == self.MODE_MURMURHASH3: # Random seed for this stratified group mmh_seed = rng.randint(0x7FFFFFFF) # Extract uniqueness for hashing if uniqueness: unique_df = unmarked[uniqueness] else: unique_df = pd.DataFrame(unmarked.index) # Apply mmh3 hashing hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False), axis=1) # Assign unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose elif self.mode == self.MODE_SORTED_PERMUTATION or True: # default # Select unmarked to assign items_count = unique_items.samples.sum() marked_count = unique_items.samples[unique_items.purpose == purpose].sum() assign_count = items_count * split - marked_count unmarked = rng.permutation(unmarked.index) cdf = unique_items.samples[unmarked].cumsum() ix = np.searchsorted(cdf.values, assign_count, side="right") if len(cdf.values) > ix: ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1]))) # Assign unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose if uniqueness: g.loc[:, column] = unique_items.set_index(uniqueness) \ .loc[g[uniqueness].set_index(uniqueness).index].purpose.values else: g.loc[:, column] = unique_items.purpose.values return g if stratification: df = df.groupby(stratification).apply(_split) else: df = _split(df) if remainder: df.loc[df.purpose.isna(), column] = remainder # Ensure etag is unique across all stratified groups #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values return df[columns] def update_unassigned(self, df, id_path, purpose='train', remainder='devel', column='purpose', io=<brevettiai.io.utils.IoTools object>)- 
Updates sample purpose in id_path that may hold previous dataset splits and sample ids Unassigned samples are also assigned and id_path is updated :param df: pd.DataFrame containing the samples :param id_path: path to the identification csv file :param purpose: Purpose to assign :param remainder: Purpose to assign to remainder or none to leave unassigned :param column: Column to assign split purposes to :return:
Expand source code
def update_unassigned(self, df, id_path, purpose="train", remainder="devel", column="purpose", io=io_tools): """ Updates sample purpose in id_path that may hold previous dataset splits and sample ids Unassigned samples are also assigned and id_path is updated :param df: pd.DataFrame containing the samples :param id_path: path to the identification csv file :param purpose: Purpose to assign :param remainder: Purpose to assign to remainder or none to leave unassigned :param column: Column to assign split purposes to :return: """ log.info("Looking for previous train / development split") known_ids = None if io.isfile(id_path): df, known_ids = load_sample_identification(df, id_path, column=column, io=io) log.info("Using train / development split from run cached in artifacts") else: log.info("No initial sample identification file found") df = self.assign(df, purpose=purpose, remainder=remainder, column=column) save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io) return df 
Inherited members