Module brevettiai.data.image.multi_frame_imager
Expand source code
import tensorflow as tf
from pydantic import BaseModel, PrivateAttr, Field
from typing import List
class MultiFrameImager(BaseModel):
    """
    Module to concatenate multiple image frames to a sequence
    call generate_paths and set_image_pipeline before use
    """
    frames: List[int] = Field(default=(0,),
        description="List of frames relative to the target frame to load as channels, in the order of the list")
    _image_pipeline = PrivateAttr(default=None)
    _image_pipelines: dict = PrivateAttr(default_factory=dict)
    _frame_columns = PrivateAttr(default=None)
    _target_size_image_ix = PrivateAttr(default=None)
    # path is xxx/[id].xxx
    _frame_id_extractor = PrivateAttr(default=r"^(?P<prefix>.+\D)(?P<frame>\d+)(?P<postfix>\.\S+)$")
    _frame_prefix_length = PrivateAttr(default=6)
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # Index of image to match target size (first time 0 occurs in frames)
        self._target_size_image_ix = next((i for i, f in enumerate(self.frames) if f is 0), None)
    @property
    def apply_unbatched(self):
        if hasattr(self._image_pipeline, "apply_unbatched"):
            return self._image_pipeline.apply_unbatched
        else:
            return False
    def frame_index_format(self, is_prefixed):
        """Return format of frame index"""
        return f"0{self._frame_prefix_length}d" if is_prefixed else "00d"
    def generate_paths(self, df):
        info = df["path"].str.extract(self._frame_id_extractor)
        info["dataset_id"] = df.get("dataset_id", "unknown")
        is_prefixed = info.groupby("dataset_id")\
            .apply(lambda x: (x["frame"].str.len() == self._frame_prefix_length).all())
        info["fmt"] = is_prefixed.apply(self.frame_index_format).reindex(info["dataset_id"]).values
        info["frame"] = info["frame"].astype(int)
        self._frame_columns = []
        for frame_offset in self.frames:
            if frame_offset != 0:
                key = f"path_t{frame_offset}"
                df[key] = info.apply(
                    lambda x: f'{x["prefix"]}{format(x["frame"] + frame_offset, x["fmt"])}{x["postfix"]}',
                    axis="columns")
                self._frame_columns.append(key)
            else:
                self._frame_columns.append("path")
        return df
    def set_image_pipeline(self, image_pipeline):
        self._image_pipeline = image_pipeline
        self._image_pipelines = {"path": image_pipeline}
        for key in set(self._frame_columns).difference({image_pipeline.path_key}):
            if isinstance(image_pipeline, BaseModel):
                ip = type(image_pipeline)(**image_pipeline.dict())
            else:
                ip = image_pipeline.copy()
            if hasattr(ip, "segmentation"):
                ip.segmentation = None
            ip.path_key = key
            self._image_pipelines[key] = ip
    def loading_extra_frames(self):
        return tuple(self.frames) != (0,)
    def __call__(self, x, *args, **kwargs):
        imgs = []
        for path_key in self._frame_columns:
            ip = self._image_pipelines[path_key]
            imgs.append(ip(x)[ip.output_key])
        if self._target_size_image_ix is not None and self.loading_extra_frames():
            # extract target size for both batched and unbatched states
            sh = tf.shape(imgs[self._target_size_image_ix])[-3:-1]
            for ix in range(len(imgs)):
                # Resize to match
                imgs[ix] = tf.image.resize(imgs[ix], sh)
        return {**x, self._image_pipeline.output_key: tf.concat(imgs, -1)}
    @property
    def output_shape(self):
        if self._image_pipeline is None:
            raise AttributeError(f'Image pipeline not added yet, call set_image_pipeline')
        else:
            sh = self._image_pipeline.output_shape
            return (*sh[:2], sh[2]*len(self.frames))
    def __getattr__(self, item):
        if self._image_pipeline is None:
            raise AttributeError(f'Image pipeline not added yet, attribute {item} not found')
        else:
            return getattr(self._image_pipeline, item)
Classes
class MultiFrameImager (**kwargs)- 
Module to concatenate multiple image frames to a sequence call generate_paths and set_image_pipeline before use
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class MultiFrameImager(BaseModel): """ Module to concatenate multiple image frames to a sequence call generate_paths and set_image_pipeline before use """ frames: List[int] = Field(default=(0,), description="List of frames relative to the target frame to load as channels, in the order of the list") _image_pipeline = PrivateAttr(default=None) _image_pipelines: dict = PrivateAttr(default_factory=dict) _frame_columns = PrivateAttr(default=None) _target_size_image_ix = PrivateAttr(default=None) # path is xxx/[id].xxx _frame_id_extractor = PrivateAttr(default=r"^(?P<prefix>.+\D)(?P<frame>\d+)(?P<postfix>\.\S+)$") _frame_prefix_length = PrivateAttr(default=6) def __init__(self, **kwargs): super().__init__(**kwargs) # Index of image to match target size (first time 0 occurs in frames) self._target_size_image_ix = next((i for i, f in enumerate(self.frames) if f is 0), None) @property def apply_unbatched(self): if hasattr(self._image_pipeline, "apply_unbatched"): return self._image_pipeline.apply_unbatched else: return False def frame_index_format(self, is_prefixed): """Return format of frame index""" return f"0{self._frame_prefix_length}d" if is_prefixed else "00d" def generate_paths(self, df): info = df["path"].str.extract(self._frame_id_extractor) info["dataset_id"] = df.get("dataset_id", "unknown") is_prefixed = info.groupby("dataset_id")\ .apply(lambda x: (x["frame"].str.len() == self._frame_prefix_length).all()) info["fmt"] = is_prefixed.apply(self.frame_index_format).reindex(info["dataset_id"]).values info["frame"] = info["frame"].astype(int) self._frame_columns = [] for frame_offset in self.frames: if frame_offset != 0: key = f"path_t{frame_offset}" df[key] = info.apply( lambda x: f'{x["prefix"]}{format(x["frame"] + frame_offset, x["fmt"])}{x["postfix"]}', axis="columns") self._frame_columns.append(key) else: self._frame_columns.append("path") return df def set_image_pipeline(self, image_pipeline): self._image_pipeline = image_pipeline self._image_pipelines = {"path": image_pipeline} for key in set(self._frame_columns).difference({image_pipeline.path_key}): if isinstance(image_pipeline, BaseModel): ip = type(image_pipeline)(**image_pipeline.dict()) else: ip = image_pipeline.copy() if hasattr(ip, "segmentation"): ip.segmentation = None ip.path_key = key self._image_pipelines[key] = ip def loading_extra_frames(self): return tuple(self.frames) != (0,) def __call__(self, x, *args, **kwargs): imgs = [] for path_key in self._frame_columns: ip = self._image_pipelines[path_key] imgs.append(ip(x)[ip.output_key]) if self._target_size_image_ix is not None and self.loading_extra_frames(): # extract target size for both batched and unbatched states sh = tf.shape(imgs[self._target_size_image_ix])[-3:-1] for ix in range(len(imgs)): # Resize to match imgs[ix] = tf.image.resize(imgs[ix], sh) return {**x, self._image_pipeline.output_key: tf.concat(imgs, -1)} @property def output_shape(self): if self._image_pipeline is None: raise AttributeError(f'Image pipeline not added yet, call set_image_pipeline') else: sh = self._image_pipeline.output_shape return (*sh[:2], sh[2]*len(self.frames)) def __getattr__(self, item): if self._image_pipeline is None: raise AttributeError(f'Image pipeline not added yet, attribute {item} not found') else: return getattr(self._image_pipeline, item)Ancestors
- pydantic.main.BaseModel
 - pydantic.utils.Representation
 
Class variables
var frames : List[int]
Instance variables
var apply_unbatched- 
Expand source code
@property def apply_unbatched(self): if hasattr(self._image_pipeline, "apply_unbatched"): return self._image_pipeline.apply_unbatched else: return False var output_shape- 
Expand source code
@property def output_shape(self): if self._image_pipeline is None: raise AttributeError(f'Image pipeline not added yet, call set_image_pipeline') else: sh = self._image_pipeline.output_shape return (*sh[:2], sh[2]*len(self.frames)) 
Methods
def frame_index_format(self, is_prefixed)- 
Return format of frame index
Expand source code
def frame_index_format(self, is_prefixed): """Return format of frame index""" return f"0{self._frame_prefix_length}d" if is_prefixed else "00d" def generate_paths(self, df)- 
Expand source code
def generate_paths(self, df): info = df["path"].str.extract(self._frame_id_extractor) info["dataset_id"] = df.get("dataset_id", "unknown") is_prefixed = info.groupby("dataset_id")\ .apply(lambda x: (x["frame"].str.len() == self._frame_prefix_length).all()) info["fmt"] = is_prefixed.apply(self.frame_index_format).reindex(info["dataset_id"]).values info["frame"] = info["frame"].astype(int) self._frame_columns = [] for frame_offset in self.frames: if frame_offset != 0: key = f"path_t{frame_offset}" df[key] = info.apply( lambda x: f'{x["prefix"]}{format(x["frame"] + frame_offset, x["fmt"])}{x["postfix"]}', axis="columns") self._frame_columns.append(key) else: self._frame_columns.append("path") return df def loading_extra_frames(self)- 
Expand source code
def loading_extra_frames(self): return tuple(self.frames) != (0,) def set_image_pipeline(self, image_pipeline)- 
Expand source code
def set_image_pipeline(self, image_pipeline): self._image_pipeline = image_pipeline self._image_pipelines = {"path": image_pipeline} for key in set(self._frame_columns).difference({image_pipeline.path_key}): if isinstance(image_pipeline, BaseModel): ip = type(image_pipeline)(**image_pipeline.dict()) else: ip = image_pipeline.copy() if hasattr(ip, "segmentation"): ip.segmentation = None ip.path_key = key self._image_pipelines[key] = ip