Module brevettiai.interfaces.pivot

Expand source code
import json

import pandas as pd

from brevettiai.io import io_tools
from brevettiai.data.sample_tools import join_dataset_meta
from brevettiai.utils.pandas_utils import explode


def pivot_fields(fields, rows=None, cols=None):
    """
    Build pivot export fields dict
    :param fields: field dict, {key:label,...}
    :param rows: iterable of field keys to start in row selector
    :param cols: iterable of field keys to start in column selector
    :return: vue-pivot-table fields dict
    """
    fields = {k: {"key": k, "label": v} if isinstance(v, str) else
              {"key": k, **v}
              for k, v in fields.items()}
    pfields = dict(rowFields=[], colFields=[], fields=[])
    for r in rows or []:
        pfields["rowFields"].append(fields[r])
    for c in cols or []:
        if not (rows and c in rows):
            pfields["colFields"].append(fields[c])
    for k, v in fields.items():
        if not (rows and k in rows) and not (cols and k in cols):
            pfields["fields"].append(v)
    return pfields


def pivot_data(df, fields, datasets=None, tags=None, agg=None):
    """
    Build pivot ready dataframe with precalculated object groups
    :param df: sample dataframe with dataset_id to join on if datasets and tags are not None
    :param fields: field dict, {key:label,...} updated with metadata fields if datasets and tags are not None
    :param datasets: datasets to build metadata from
    :param tags: tag root tree, to find parent tags
    :param agg: Aggregate parameter dictionary, uses count column as default (weight 1 for all samples if nonexistent)
    :return: vue-pivot-table export ready dataframe
    """
    agg = (agg or {}).copy()
    agg["count"] = "sum"
    if "count" not in df:
        df = df.copy()
        df["count"] = 1

    if isinstance(fields, dict):
        fieldlist = list(fields.keys())
    else:
        fieldlist = fields
        fields = {f: f for f in fields}

    if datasets is not None and tags is not None:
        df, meta_fields = join_dataset_meta(df, datasets, tags)
        fieldlist.extend(list(meta_fields.keys()))
        fields.update(meta_fields)

    df = df.applymap(lambda x: tuple(x) if pd.api.types.is_list_like(x) else x)
    df = explode(df[fieldlist + list(agg.keys())], on=fieldlist, duplicate_id="id")

    df = df.groupby(fieldlist + ["id"]).agg(agg).reset_index()
    return df


def get_default_fields(df):
    """
    Build default pivot fields structure from dataframe
    :param df:
    :return:
    """
    fields = {
        "category": {"label": "Category", "sort": [a for b in df.category.unique() for a in b]},
        "folder": "Folder", "dataset_id": "Dataset Id", "purpose": "Purpose"
    }
    return {k: v for k, v in fields.items() if k in df}


def export_pivot_table(pivot_dir, df, fields=None, datasets=None, tags=None, rows=None, cols=None, **data_args):
    """
    Build and export pivot table using :py:func:pivot_data and :py:func:pivot_fields methods
    :param pivot_dir:
    :param df:
    :param fields:
    :param datasets:
    :param tags:
    :param rows:
    :param cols:
    :return:
    """
    fields = fields or get_default_fields(df)

    # Build and export pivot table
    df = pivot_data(df, fields, datasets, tags, **data_args)
    pfields = pivot_fields(fields, rows, cols)

    io_tools.write_file(io_tools.path.join(pivot_dir, "summary_fields.json"), json.dumps(pfields))
    io_tools.write_file(io_tools.path.join(pivot_dir, "classification_summary.json"), json.dumps(df.to_dict("records")))

Functions

def export_pivot_table(pivot_dir, df, fields=None, datasets=None, tags=None, rows=None, cols=None, **data_args)

Build and export pivot table using :py:func:pivot_data and :py:func:pivot_fields methods :param pivot_dir: :param df: :param fields: :param datasets: :param tags: :param rows: :param cols: :return:

Expand source code
def export_pivot_table(pivot_dir, df, fields=None, datasets=None, tags=None, rows=None, cols=None, **data_args):
    """
    Build and export pivot table using :py:func:pivot_data and :py:func:pivot_fields methods
    :param pivot_dir:
    :param df:
    :param fields:
    :param datasets:
    :param tags:
    :param rows:
    :param cols:
    :return:
    """
    fields = fields or get_default_fields(df)

    # Build and export pivot table
    df = pivot_data(df, fields, datasets, tags, **data_args)
    pfields = pivot_fields(fields, rows, cols)

    io_tools.write_file(io_tools.path.join(pivot_dir, "summary_fields.json"), json.dumps(pfields))
    io_tools.write_file(io_tools.path.join(pivot_dir, "classification_summary.json"), json.dumps(df.to_dict("records")))
def get_default_fields(df)

Build default pivot fields structure from dataframe :param df: :return:

Expand source code
def get_default_fields(df):
    """
    Build default pivot fields structure from dataframe
    :param df:
    :return:
    """
    fields = {
        "category": {"label": "Category", "sort": [a for b in df.category.unique() for a in b]},
        "folder": "Folder", "dataset_id": "Dataset Id", "purpose": "Purpose"
    }
    return {k: v for k, v in fields.items() if k in df}
def pivot_data(df, fields, datasets=None, tags=None, agg=None)

Build pivot ready dataframe with precalculated object groups :param df: sample dataframe with dataset_id to join on if datasets and tags are not None :param fields: field dict, {key:label,…} updated with metadata fields if datasets and tags are not None :param datasets: datasets to build metadata from :param tags: tag root tree, to find parent tags :param agg: Aggregate parameter dictionary, uses count column as default (weight 1 for all samples if nonexistent) :return: vue-pivot-table export ready dataframe

Expand source code
def pivot_data(df, fields, datasets=None, tags=None, agg=None):
    """
    Build pivot ready dataframe with precalculated object groups
    :param df: sample dataframe with dataset_id to join on if datasets and tags are not None
    :param fields: field dict, {key:label,...} updated with metadata fields if datasets and tags are not None
    :param datasets: datasets to build metadata from
    :param tags: tag root tree, to find parent tags
    :param agg: Aggregate parameter dictionary, uses count column as default (weight 1 for all samples if nonexistent)
    :return: vue-pivot-table export ready dataframe
    """
    agg = (agg or {}).copy()
    agg["count"] = "sum"
    if "count" not in df:
        df = df.copy()
        df["count"] = 1

    if isinstance(fields, dict):
        fieldlist = list(fields.keys())
    else:
        fieldlist = fields
        fields = {f: f for f in fields}

    if datasets is not None and tags is not None:
        df, meta_fields = join_dataset_meta(df, datasets, tags)
        fieldlist.extend(list(meta_fields.keys()))
        fields.update(meta_fields)

    df = df.applymap(lambda x: tuple(x) if pd.api.types.is_list_like(x) else x)
    df = explode(df[fieldlist + list(agg.keys())], on=fieldlist, duplicate_id="id")

    df = df.groupby(fieldlist + ["id"]).agg(agg).reset_index()
    return df
def pivot_fields(fields, rows=None, cols=None)

Build pivot export fields dict :param fields: field dict, {key:label,…} :param rows: iterable of field keys to start in row selector :param cols: iterable of field keys to start in column selector :return: vue-pivot-table fields dict

Expand source code
def pivot_fields(fields, rows=None, cols=None):
    """
    Build pivot export fields dict
    :param fields: field dict, {key:label,...}
    :param rows: iterable of field keys to start in row selector
    :param cols: iterable of field keys to start in column selector
    :return: vue-pivot-table fields dict
    """
    fields = {k: {"key": k, "label": v} if isinstance(v, str) else
              {"key": k, **v}
              for k, v in fields.items()}
    pfields = dict(rowFields=[], colFields=[], fields=[])
    for r in rows or []:
        pfields["rowFields"].append(fields[r])
    for c in cols or []:
        if not (rows and c in rows):
            pfields["colFields"].append(fields[c])
    for k, v in fields.items():
        if not (rows and k in rows) and not (cols and k in cols):
            pfields["fields"].append(v)
    return pfields