Module brevettiai.data.sample_integrity

Expand source code
import logging
import mmh3
import json

from brevettiai.interfaces import vue_schema_utils as vue
from brevettiai.io import io_tools

from io import BytesIO
import pandas as pd
import numpy as np

log = logging.getLogger(__name__)


def merge_sample_identification(df, dfid, on="etag"):
    """
    Merge sample identification traits onto dataframe, such that values (excluding NA) are transfered to the dataframe
    :param df: Dataframe
    :param dfid: identification dataframe, with index as parameter named by on, by default 'etag'
    :param on: column name on df to match with identification
    :return: df, extra_ids (merged dataframe and ids, and ids not present among samples
    """
    # Reindex id file to match new samples
    extra_ids = dfid[~dfid.index.isin(df[on])]
    dfid = dfid.reindex(df[on])

    # combine sample identification information with samples
    for c in dfid.columns:
        col = dfid[c]
        mask = col.isna()
        if mask.any() and c in df.columns:
            df[c][~mask.values] = col[~mask].values
        else:
            df[c] = col.values
    return df, extra_ids


def load_sample_identification(df, path, column="purpose", io=io_tools, **kwargs):
    """
    Load and join sample identification information onto dataframe of samples
    :param df: sample dataframe
    :param path: path to sample id file
    :param column: name of split column
    :param kwargs: extra args for io_tools.read_file
    :return: df, extra_ids
    """
    dfid = pd.read_csv(BytesIO(io.read_file(path, **kwargs)), index_col="etag")
    if column not in dfid.columns:
        dfid.rename(columns={dfid.columns[0]: column})
    return merge_sample_identification(df, dfid, on="etag")


def save_sample_identification(df, path, known_ids=None, column="purpose", io=io_tools):
    columns = ["etag", column]
    df = df[columns].set_index("etag")
    if df.index.has_duplicates:
        log.info("Duplicate etag entries among samples, saving highest priority purpose")
        df = df.iloc[np.argsort(df.purpose.map({"train": 1, "devel": 2, "development": 2, "test": 3}).fillna(4))]
        df = df[~df.index.duplicated(keep="first")]
    io.write_file(path, df.append(known_ids).to_csv(header=True))


class SampleSplit(vue.VueSettingsModule):
    MODE_MURMURHASH3 = "murmurhash3"
    MODE_SORTED_PERMUTATION = "sorted_permutation"

    def __init__(self, stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1,
                 mode=MODE_SORTED_PERMUTATION):
        """
        :param stratification: As regex string performed on df.path or list selecting columns
        :param uniqueness: As regex string performed on df.path or list selecting columns
        :param split: fraction of samples to apply the purpose on
        :param seed: seeding for assignment
        :param mode: ' or 'murmurhash3'
        :return:
        """
        self.stratification = stratification
        try:
            if isinstance(uniqueness, str):
                uniqueness = json.loads(uniqueness)
        except json.JSONDecodeError:
            pass
        self.uniqueness = uniqueness or ["etag"]
        self.split = split
        self.seed = seed
        self.mode = mode
        assert self.mode in {self.MODE_MURMURHASH3, self.MODE_SORTED_PERMUTATION}

    def assign(self, df, purpose="train", remainder=None, column="purpose"):
        """
        Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.

        Definitions:
        * Stratification: Grouping of samples which should be treated as individual groups.
        meaning every group must be split according to the sample split target percentage,
        and uniqueness is performed on a per group basis
        * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.

        :param df: pd.DataFrame of samples if purpose column does not exist it is added
        :param purpose: purpose to be assigned
        :param remainder: purpose to assign remainder samples, or None to leave unassigned
        :param column: column for assignment of split category
        """
        # Ensure columns
        if column not in df:
            df[column] = pd.NA
        columns = df.columns

        split = self.split
        stratification = self.stratification
        uniqueness = self.uniqueness

        if split == 0 or ~df.purpose.isna().any():  # Assign no samples
            pass
        elif split == 1:  # Assign all samples
            df.loc[df.purpose.isna(), column] = purpose
        else:
            # Parse regex stratification and uniqueness strategies
            if isinstance(stratification, str) and stratification:
                df["_stratification"] = df.path.str.extract(stratification)[0]
                stratification = ["_stratification"]
            assert stratification is None or all(x in df.columns for x in stratification), \
                "stratification should be None or in columns"

            if isinstance(uniqueness, str) and uniqueness:
                df["_uniqueness"] = df.path.str.extract(uniqueness)[0]
                uniqueness = ["_uniqueness"]
            assert uniqueness is None or all(x in df.columns for x in uniqueness), \
                "uniqueness should be None or in columns"

            seed = None if self.seed < 0 else self.seed
            rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed)

            def _split(g):
                if uniqueness:
                    items = g[uniqueness + [column]].copy()
                    items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2})
                    items = items.sort_values("_purpose_prio")[uniqueness + [column]]
                    unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"])
                    unique_items.columns = ["samples", column]
                    unique_items = unique_items.reset_index()
                else:
                    unique_items = g[[column]].reset_index(drop=True)
                    unique_items["samples"] = 1

                # split unmarked items
                unmarked = unique_items[unique_items.purpose.isna()]

                # mode
                if unmarked.size > 0:
                    if self.mode == self.MODE_MURMURHASH3:
                        # Random seed for this stratified group
                        mmh_seed = rng.randint(0x7FFFFFFF)

                        # Extract uniqueness for hashing
                        if uniqueness:
                            unique_df = unmarked[uniqueness]
                        else:
                            unique_df = pd.DataFrame(unmarked.index)

                        # Apply mmh3 hashing
                        hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False),
                                                axis=1)

                        # Assign
                        unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose
                    elif self.mode == self.MODE_SORTED_PERMUTATION or True:  # default
                        # Select unmarked to assign
                        items_count = unique_items.samples.sum()
                        marked_count = unique_items.samples[unique_items.purpose == purpose].sum()
                        assign_count = items_count * split - marked_count
                        unmarked = rng.permutation(unmarked.index)

                        cdf = unique_items.samples[unmarked].cumsum()
                        ix = np.searchsorted(cdf.values, assign_count, side="right")
                        if len(cdf.values) > ix:
                            ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1])))

                        # Assign
                        unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose

                if uniqueness:
                    g.loc[:, column] = unique_items.set_index(uniqueness) \
                        .loc[g[uniqueness].set_index(uniqueness).index].purpose.values
                else:
                    g.loc[:, column] = unique_items.purpose.values
                return g

            if stratification:
                df = df.groupby(stratification).apply(_split)
            else:
                df = _split(df)

        if remainder:
            df.loc[df.purpose.isna(), column] = remainder

        # Ensure etag is unique across all stratified groups
        #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values
        return df[columns]

    def update_unassigned(self, df, id_path,
                          purpose="train", remainder="devel", column="purpose", io=io_tools):
        """
        Updates sample purpose in id_path that may hold previous dataset splits and sample ids
        Unassigned samples are also assigned and id_path is updated
        :param df: pd.DataFrame containing the samples
        :param id_path: path to the identification csv file
        :param purpose: Purpose to assign
        :param remainder: Purpose to assign to remainder or none to leave unassigned
        :param column: Column to assign split purposes to
        :return:
        """

        log.info("Looking for previous train / development split")

        known_ids = None
        if io.isfile(id_path):
            df, known_ids = load_sample_identification(df, id_path, column=column, io=io)
            log.info("Using train / development split from run cached in artifacts")
        else:
            log.info("No initial sample identification file found")

        df = self.assign(df, purpose=purpose, remainder=remainder, column=column)

        save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io)

        return df

Functions

def load_sample_identification(df, path, column='purpose', io=<brevettiai.io.utils.IoTools object>, **kwargs)

Load and join sample identification information onto dataframe of samples :param df: sample dataframe :param path: path to sample id file :param column: name of split column :param kwargs: extra args for io_tools.read_file :return: df, extra_ids

Expand source code
def load_sample_identification(df, path, column="purpose", io=io_tools, **kwargs):
    """
    Load and join sample identification information onto dataframe of samples
    :param df: sample dataframe
    :param path: path to sample id file
    :param column: name of split column
    :param kwargs: extra args for io_tools.read_file
    :return: df, extra_ids
    """
    dfid = pd.read_csv(BytesIO(io.read_file(path, **kwargs)), index_col="etag")
    if column not in dfid.columns:
        dfid.rename(columns={dfid.columns[0]: column})
    return merge_sample_identification(df, dfid, on="etag")
def merge_sample_identification(df, dfid, on='etag')

Merge sample identification traits onto dataframe, such that values (excluding NA) are transfered to the dataframe :param df: Dataframe :param dfid: identification dataframe, with index as parameter named by on, by default 'etag' :param on: column name on df to match with identification :return: df, extra_ids (merged dataframe and ids, and ids not present among samples

Expand source code
def merge_sample_identification(df, dfid, on="etag"):
    """
    Merge sample identification traits onto dataframe, such that values (excluding NA) are transfered to the dataframe
    :param df: Dataframe
    :param dfid: identification dataframe, with index as parameter named by on, by default 'etag'
    :param on: column name on df to match with identification
    :return: df, extra_ids (merged dataframe and ids, and ids not present among samples
    """
    # Reindex id file to match new samples
    extra_ids = dfid[~dfid.index.isin(df[on])]
    dfid = dfid.reindex(df[on])

    # combine sample identification information with samples
    for c in dfid.columns:
        col = dfid[c]
        mask = col.isna()
        if mask.any() and c in df.columns:
            df[c][~mask.values] = col[~mask].values
        else:
            df[c] = col.values
    return df, extra_ids
def save_sample_identification(df, path, known_ids=None, column='purpose', io=<brevettiai.io.utils.IoTools object>)
Expand source code
def save_sample_identification(df, path, known_ids=None, column="purpose", io=io_tools):
    columns = ["etag", column]
    df = df[columns].set_index("etag")
    if df.index.has_duplicates:
        log.info("Duplicate etag entries among samples, saving highest priority purpose")
        df = df.iloc[np.argsort(df.purpose.map({"train": 1, "devel": 2, "development": 2, "test": 3}).fillna(4))]
        df = df[~df.index.duplicated(keep="first")]
    io.write_file(path, df.append(known_ids).to_csv(header=True))

Classes

class SampleSplit (stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1, mode='sorted_permutation')

Base class for serializable modules

:param stratification: As regex string performed on df.path or list selecting columns :param uniqueness: As regex string performed on df.path or list selecting columns :param split: fraction of samples to apply the purpose on :param seed: seeding for assignment :param mode: ' or 'murmurhash3' :return:

Expand source code
class SampleSplit(vue.VueSettingsModule):
    MODE_MURMURHASH3 = "murmurhash3"
    MODE_SORTED_PERMUTATION = "sorted_permutation"

    def __init__(self, stratification: list = None, uniqueness: list = None, split: float = 0.8, seed: int = -1,
                 mode=MODE_SORTED_PERMUTATION):
        """
        :param stratification: As regex string performed on df.path or list selecting columns
        :param uniqueness: As regex string performed on df.path or list selecting columns
        :param split: fraction of samples to apply the purpose on
        :param seed: seeding for assignment
        :param mode: ' or 'murmurhash3'
        :return:
        """
        self.stratification = stratification
        try:
            if isinstance(uniqueness, str):
                uniqueness = json.loads(uniqueness)
        except json.JSONDecodeError:
            pass
        self.uniqueness = uniqueness or ["etag"]
        self.split = split
        self.seed = seed
        self.mode = mode
        assert self.mode in {self.MODE_MURMURHASH3, self.MODE_SORTED_PERMUTATION}

    def assign(self, df, purpose="train", remainder=None, column="purpose"):
        """
        Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.

        Definitions:
        * Stratification: Grouping of samples which should be treated as individual groups.
        meaning every group must be split according to the sample split target percentage,
        and uniqueness is performed on a per group basis
        * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.

        :param df: pd.DataFrame of samples if purpose column does not exist it is added
        :param purpose: purpose to be assigned
        :param remainder: purpose to assign remainder samples, or None to leave unassigned
        :param column: column for assignment of split category
        """
        # Ensure columns
        if column not in df:
            df[column] = pd.NA
        columns = df.columns

        split = self.split
        stratification = self.stratification
        uniqueness = self.uniqueness

        if split == 0 or ~df.purpose.isna().any():  # Assign no samples
            pass
        elif split == 1:  # Assign all samples
            df.loc[df.purpose.isna(), column] = purpose
        else:
            # Parse regex stratification and uniqueness strategies
            if isinstance(stratification, str) and stratification:
                df["_stratification"] = df.path.str.extract(stratification)[0]
                stratification = ["_stratification"]
            assert stratification is None or all(x in df.columns for x in stratification), \
                "stratification should be None or in columns"

            if isinstance(uniqueness, str) and uniqueness:
                df["_uniqueness"] = df.path.str.extract(uniqueness)[0]
                uniqueness = ["_uniqueness"]
            assert uniqueness is None or all(x in df.columns for x in uniqueness), \
                "uniqueness should be None or in columns"

            seed = None if self.seed < 0 else self.seed
            rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed)

            def _split(g):
                if uniqueness:
                    items = g[uniqueness + [column]].copy()
                    items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2})
                    items = items.sort_values("_purpose_prio")[uniqueness + [column]]
                    unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"])
                    unique_items.columns = ["samples", column]
                    unique_items = unique_items.reset_index()
                else:
                    unique_items = g[[column]].reset_index(drop=True)
                    unique_items["samples"] = 1

                # split unmarked items
                unmarked = unique_items[unique_items.purpose.isna()]

                # mode
                if unmarked.size > 0:
                    if self.mode == self.MODE_MURMURHASH3:
                        # Random seed for this stratified group
                        mmh_seed = rng.randint(0x7FFFFFFF)

                        # Extract uniqueness for hashing
                        if uniqueness:
                            unique_df = unmarked[uniqueness]
                        else:
                            unique_df = pd.DataFrame(unmarked.index)

                        # Apply mmh3 hashing
                        hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False),
                                                axis=1)

                        # Assign
                        unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose
                    elif self.mode == self.MODE_SORTED_PERMUTATION or True:  # default
                        # Select unmarked to assign
                        items_count = unique_items.samples.sum()
                        marked_count = unique_items.samples[unique_items.purpose == purpose].sum()
                        assign_count = items_count * split - marked_count
                        unmarked = rng.permutation(unmarked.index)

                        cdf = unique_items.samples[unmarked].cumsum()
                        ix = np.searchsorted(cdf.values, assign_count, side="right")
                        if len(cdf.values) > ix:
                            ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1])))

                        # Assign
                        unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose

                if uniqueness:
                    g.loc[:, column] = unique_items.set_index(uniqueness) \
                        .loc[g[uniqueness].set_index(uniqueness).index].purpose.values
                else:
                    g.loc[:, column] = unique_items.purpose.values
                return g

            if stratification:
                df = df.groupby(stratification).apply(_split)
            else:
                df = _split(df)

        if remainder:
            df.loc[df.purpose.isna(), column] = remainder

        # Ensure etag is unique across all stratified groups
        #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values
        return df[columns]

    def update_unassigned(self, df, id_path,
                          purpose="train", remainder="devel", column="purpose", io=io_tools):
        """
        Updates sample purpose in id_path that may hold previous dataset splits and sample ids
        Unassigned samples are also assigned and id_path is updated
        :param df: pd.DataFrame containing the samples
        :param id_path: path to the identification csv file
        :param purpose: Purpose to assign
        :param remainder: Purpose to assign to remainder or none to leave unassigned
        :param column: Column to assign split purposes to
        :return:
        """

        log.info("Looking for previous train / development split")

        known_ids = None
        if io.isfile(id_path):
            df, known_ids = load_sample_identification(df, id_path, column=column, io=io)
            log.info("Using train / development split from run cached in artifacts")
        else:
            log.info("No initial sample identification file found")

        df = self.assign(df, purpose=purpose, remainder=remainder, column=column)

        save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io)

        return df

Ancestors

Class variables

var MODE_MURMURHASH3
var MODE_SORTED_PERMUTATION

Methods

def assign(self, df, purpose='train', remainder=None, column='purpose')

Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.

Definitions: * Stratification: Grouping of samples which should be treated as individual groups. meaning every group must be split according to the sample split target percentage, and uniqueness is performed on a per group basis * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.

:param df: pd.DataFrame of samples if purpose column does not exist it is added :param purpose: purpose to be assigned :param remainder: purpose to assign remainder samples, or None to leave unassigned :param column: column for assignment of split category

Expand source code
def assign(self, df, purpose="train", remainder=None, column="purpose"):
    """
    Assign purpose column randomly to non-assigned samples based on stratification, uniqueness and split strategy.

    Definitions:
    * Stratification: Grouping of samples which should be treated as individual groups.
    meaning every group must be split according to the sample split target percentage,
    and uniqueness is performed on a per group basis
    * Uniqueness: grouping of samples which must be treated as a single sample, thus be assigned the same purpose.

    :param df: pd.DataFrame of samples if purpose column does not exist it is added
    :param purpose: purpose to be assigned
    :param remainder: purpose to assign remainder samples, or None to leave unassigned
    :param column: column for assignment of split category
    """
    # Ensure columns
    if column not in df:
        df[column] = pd.NA
    columns = df.columns

    split = self.split
    stratification = self.stratification
    uniqueness = self.uniqueness

    if split == 0 or ~df.purpose.isna().any():  # Assign no samples
        pass
    elif split == 1:  # Assign all samples
        df.loc[df.purpose.isna(), column] = purpose
    else:
        # Parse regex stratification and uniqueness strategies
        if isinstance(stratification, str) and stratification:
            df["_stratification"] = df.path.str.extract(stratification)[0]
            stratification = ["_stratification"]
        assert stratification is None or all(x in df.columns for x in stratification), \
            "stratification should be None or in columns"

        if isinstance(uniqueness, str) and uniqueness:
            df["_uniqueness"] = df.path.str.extract(uniqueness)[0]
            uniqueness = ["_uniqueness"]
        assert uniqueness is None or all(x in df.columns for x in uniqueness), \
            "uniqueness should be None or in columns"

        seed = None if self.seed < 0 else self.seed
        rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed)

        def _split(g):
            if uniqueness:
                items = g[uniqueness + [column]].copy()
                items["_purpose_prio"] = items.purpose.map({"train": 1, "test": 2})
                items = items.sort_values("_purpose_prio")[uniqueness + [column]]
                unique_items = items.groupby(uniqueness).purpose.agg(["size", "first"])
                unique_items.columns = ["samples", column]
                unique_items = unique_items.reset_index()
            else:
                unique_items = g[[column]].reset_index(drop=True)
                unique_items["samples"] = 1

            # split unmarked items
            unmarked = unique_items[unique_items.purpose.isna()]

            # mode
            if unmarked.size > 0:
                if self.mode == self.MODE_MURMURHASH3:
                    # Random seed for this stratified group
                    mmh_seed = rng.randint(0x7FFFFFFF)

                    # Extract uniqueness for hashing
                    if uniqueness:
                        unique_df = unmarked[uniqueness]
                    else:
                        unique_df = pd.DataFrame(unmarked.index)

                    # Apply mmh3 hashing
                    hash_ = unique_df.apply(lambda x: mmh3.hash("_".join(map(str, x)), seed=mmh_seed, signed=False),
                                            axis=1)

                    # Assign
                    unique_items.loc[hash_[hash_ < 0xFFFFFFFF * split].index, column] = purpose
                elif self.mode == self.MODE_SORTED_PERMUTATION or True:  # default
                    # Select unmarked to assign
                    items_count = unique_items.samples.sum()
                    marked_count = unique_items.samples[unique_items.purpose == purpose].sum()
                    assign_count = items_count * split - marked_count
                    unmarked = rng.permutation(unmarked.index)

                    cdf = unique_items.samples[unmarked].cumsum()
                    ix = np.searchsorted(cdf.values, assign_count, side="right")
                    if len(cdf.values) > ix:
                        ix = ix - (rng.rand() > ((assign_count - cdf.values[ix - 1]) / (cdf.values[ix] - cdf.values[ix - 1])))

                    # Assign
                    unique_items.loc[cdf.iloc[:ix+1].index, column] = purpose

            if uniqueness:
                g.loc[:, column] = unique_items.set_index(uniqueness) \
                    .loc[g[uniqueness].set_index(uniqueness).index].purpose.values
            else:
                g.loc[:, column] = unique_items.purpose.values
            return g

        if stratification:
            df = df.groupby(stratification).apply(_split)
        else:
            df = _split(df)

    if remainder:
        df.loc[df.purpose.isna(), column] = remainder

    # Ensure etag is unique across all stratified groups
    #df.loc[:, column] = df.groupby("etag").first()[column].reindex(df.etag).values
    return df[columns]
def update_unassigned(self, df, id_path, purpose='train', remainder='devel', column='purpose', io=<brevettiai.io.utils.IoTools object>)

Updates sample purpose in id_path that may hold previous dataset splits and sample ids Unassigned samples are also assigned and id_path is updated :param df: pd.DataFrame containing the samples :param id_path: path to the identification csv file :param purpose: Purpose to assign :param remainder: Purpose to assign to remainder or none to leave unassigned :param column: Column to assign split purposes to :return:

Expand source code
def update_unassigned(self, df, id_path,
                      purpose="train", remainder="devel", column="purpose", io=io_tools):
    """
    Updates sample purpose in id_path that may hold previous dataset splits and sample ids
    Unassigned samples are also assigned and id_path is updated
    :param df: pd.DataFrame containing the samples
    :param id_path: path to the identification csv file
    :param purpose: Purpose to assign
    :param remainder: Purpose to assign to remainder or none to leave unassigned
    :param column: Column to assign split purposes to
    :return:
    """

    log.info("Looking for previous train / development split")

    known_ids = None
    if io.isfile(id_path):
        df, known_ids = load_sample_identification(df, id_path, column=column, io=io)
        log.info("Using train / development split from run cached in artifacts")
    else:
        log.info("No initial sample identification file found")

    df = self.assign(df, purpose=purpose, remainder=remainder, column=column)

    save_sample_identification(df, id_path, known_ids=known_ids, column=column, io=io)

    return df

Inherited members