Module brevettiai.data.image.image_loader
Expand source code
import json
import tensorflow as tf
import numpy as np
from pydantic import Field
from pydantic.typing import Literal
from typing import Optional, ClassVar, Type
from brevettiai.data import FileLoader
from brevettiai.data.image import ImageKeys
from brevettiai.data.image.image_processor import ImageProcessor
from brevettiai.data.tf_types import TfRange
class ScalingProcessor(ImageProcessor):
    type: Literal["ScalingProcessor"] = "ScalingProcessor"
    def process(self, image):
        """Process image according to processor"""
        max_ = tf.reduce_max(image)
        min_ = tf.reduce_min(image)
        image = (image - min_) / (max_ - min_)
        return image
class CropResizeProcessor(ImageProcessor):
    type: Literal["CropResizeProcessor"] = "CropResizeProcessor"
    output_height: int = Field(default=0, ge=0, description="Leave at 0 to infer")
    output_width: int = Field(default=0, ge=0, description="Leave at 0 to infer")
    roi_horizontal_offset: int = Field(
        default=0, ge=0, description="Horizontal coordinate of the top-left corner of the bounding box in image.")
    roi_vertical_offset: int = Field(
        default=0, ge=0, description="Vertical coordinate of the top-left corner of the bounding box in image.")
    roi_width: int = Field(default=0, ge=0, description="Width of the bounding box. Zero uses image boundary")
    roi_height: int = Field(default=0, ge=0, description="Height of the bounding box. Zero uses image boundary")
    interpolation: Literal["bilinear", "nearest"] = Field(
        default="bilinear", description="Interpolation mode of cropping and resizing")
    def output_size(self, input_height, input_width):
        """Calculated output size of output after postprocessing, given input image sizes"""
        height = self.roi_height or input_height
        width = self.roi_width or input_width
        return self.output_height or height, self.output_width or width
    def crop_size(self, input_height, input_width):
        height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height
        width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width
        return height, width
    def bbox(self, input_height, input_width):
        """
        Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2]
        The points both being included in the region of interest
        """
        height, width = self.crop_size(input_height, input_width)
        return self.roi_vertical_offset, self.roi_horizontal_offset, \
            self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1
    def scale(self, input_height, input_width):
        """
        Calculate output image scale given input image size
        returns scale in height then width (sy, sx)
        """
        crop_height, crop_width = self.crop_size(input_height, input_width)
        output_height, output_width = self.output_size(input_width, input_height)
        return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1),
    def affine_transform(self, input_height, input_width):
        sy, sx = self.scale(input_height, input_width)
        return np.array([
            [sx,  0, self.roi_horizontal_offset],
            [0, sy, self.roi_vertical_offset],
            [0, 0, 1]
        ])
    def process(self, image):
        shape = tf.shape(image)[:2]
        input_height, input_width = shape[0], shape[1]
        size = self.output_size(input_height, input_width)
        # Normalize bounding box to match crop_and_resize
        # https://www.tensorflow.org/api_docs/python/tf/image/crop_and_resize
        norm = tf.cast([input_height, input_width, input_height, input_width], tf.float32)-1
        bbox = tf.cast(self.bbox(input_height, input_width), tf.float32)
        boxes = [bbox / norm]
        # Crop and resize, attach batch dimension to match tf call
        return tf.image.crop_and_resize(
            image[None], boxes, box_indices=[0], crop_size=size, method=self.interpolation,
            extrapolation_value=0.0
        )[0]
class ImageLoader(FileLoader):
    type: Literal["ImageLoader"] = "ImageLoader"
    output_key: str = Field(default="img", exclude=True)
    postprocessor: Optional[CropResizeProcessor] = Field(default_factory=CropResizeProcessor)
    channels: Literal[0, 1, 3, 4] = Field(default=0, description="Number of channels in images, 0 to autodetect")
    def output_shape(self, image_height=None, image_width=None):
        output_channels = self.channels if self.channels else None
        return (*self.postprocessor.output_size(image_height, image_width), output_channels)
    def load(self, path, metadata=None, postprocess=True):
        data, meta = super().load(path, metadata)
        if tf.strings.length(data) > 0:
            image = tf.io.decode_image(data, expand_animations=False, channels=self.channels)
            _image_file_shape = tf.convert_to_tensor(tf.shape(image))
            if postprocess and self.postprocessor is not None:
                image = self.postprocessor.process(image)
        else:
            if postprocess and self.postprocessor is not None:
                image = tf.constant(0, dtype=tf.float32, shape=(1, 1, 1))
            else:
                image = tf.constant(0, dtype=tf.uint8, shape=(1, 1, 1))
            _image_file_shape = tf.convert_to_tensor(tf.shape(image))
        meta["_image_file_shape"] = _image_file_shape
        return image, meta
class BcimgSequenceLoader(ImageLoader):
    type: Literal["BcimgSequenceLoader"] = "BcimgSequenceLoader"
    range_meta: ClassVar[Type] = TfRange
    metadata_spec = {ImageKeys.SEQUENCE_RANGE: range_meta.build}
    def output_shape(self, image_height=None, image_width=None):
        output_channels = self.channels if self.channels else None
        return (*self.postprocessor.output_size(image_height, image_width), output_channels)
    def load_sequence(self, path, postprocess=True):
        try:
            path = path.item()
        except AttributeError:
            pass
        header = json.loads(self._io.read_file(path))["Image"]
        if header["DType"] == "eGrayScale8":
            channels = 1
        else:
            raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented")
        shape = np.array((
            int(header["Frames"]),
            int(header["OriginalSize"]["Height"]),
            int(header["OriginalSize"]["Width"]),
            channels
        ), np.int32)
        sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format
        sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])])
        return sequence_files, shape
    def load(self, path, metadata=None, postprocess=True):
        files, shape = tf.numpy_function(self.load_sequence, [path], [tf.string, tf.int32], name="load_header")
        if metadata is not None:
            # Select frames
            if ImageKeys.SEQUENCE_RANGE in metadata:
                files = metadata[ImageKeys.SEQUENCE_RANGE].slice(files)
        images, meta = tf.map_fn(
            fn=lambda x: super(BcimgSequenceLoader, self).load(x, metadata, postprocess=postprocess),
            elems=files,
            fn_output_signature=(tf.float32, {'_image_file_shape': tf.int32}),
            parallel_iterations=16
        )
        _image_file_shape = meta["_image_file_shape"][0]
        return images, {"_image_file_shape": _image_file_shape, "_sequence_files": files}
Classes
class BcimgSequenceLoader (io=<brevettiai.io.utils.IoTools object>, **data)- 
Basic File loading module for DataGenerator
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class BcimgSequenceLoader(ImageLoader): type: Literal["BcimgSequenceLoader"] = "BcimgSequenceLoader" range_meta: ClassVar[Type] = TfRange metadata_spec = {ImageKeys.SEQUENCE_RANGE: range_meta.build} def output_shape(self, image_height=None, image_width=None): output_channels = self.channels if self.channels else None return (*self.postprocessor.output_size(image_height, image_width), output_channels) def load_sequence(self, path, postprocess=True): try: path = path.item() except AttributeError: pass header = json.loads(self._io.read_file(path))["Image"] if header["DType"] == "eGrayScale8": channels = 1 else: raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented") shape = np.array(( int(header["Frames"]), int(header["OriginalSize"]["Height"]), int(header["OriginalSize"]["Width"]), channels ), np.int32) sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])]) return sequence_files, shape def load(self, path, metadata=None, postprocess=True): files, shape = tf.numpy_function(self.load_sequence, [path], [tf.string, tf.int32], name="load_header") if metadata is not None: # Select frames if ImageKeys.SEQUENCE_RANGE in metadata: files = metadata[ImageKeys.SEQUENCE_RANGE].slice(files) images, meta = tf.map_fn( fn=lambda x: super(BcimgSequenceLoader, self).load(x, metadata, postprocess=postprocess), elems=files, fn_output_signature=(tf.float32, {'_image_file_shape': tf.int32}), parallel_iterations=16 ) _image_file_shape = meta["_image_file_shape"][0] return images, {"_image_file_shape": _image_file_shape, "_sequence_files": files}Ancestors
- ImageLoader
 - FileLoader
 - DataGeneratorMap
 - abc.ABC
 - IoBaseModel
 - pydantic.main.BaseModel
 - pydantic.utils.Representation
 
Class variables
var metadata_spec : ClassVar[dict]var range_meta : ClassVar[Type[+CT_co]]- 
An object for slicing tensors
 var type : typing_extensions.Literal['BcimgSequenceLoader']
Methods
def load_sequence(self, path, postprocess=True)- 
Expand source code
def load_sequence(self, path, postprocess=True): try: path = path.item() except AttributeError: pass header = json.loads(self._io.read_file(path))["Image"] if header["DType"] == "eGrayScale8": channels = 1 else: raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented") shape = np.array(( int(header["Frames"]), int(header["OriginalSize"]["Height"]), int(header["OriginalSize"]["Width"]), channels ), np.int32) sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])]) return sequence_files, shape def output_shape(self, image_height=None, image_width=None)- 
Expand source code
def output_shape(self, image_height=None, image_width=None): output_channels = self.channels if self.channels else None return (*self.postprocessor.output_size(image_height, image_width), output_channels) 
Inherited members
 class CropResizeProcessor (**data: Any)- 
Baseclass for implementing interface for image proccessors
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class CropResizeProcessor(ImageProcessor): type: Literal["CropResizeProcessor"] = "CropResizeProcessor" output_height: int = Field(default=0, ge=0, description="Leave at 0 to infer") output_width: int = Field(default=0, ge=0, description="Leave at 0 to infer") roi_horizontal_offset: int = Field( default=0, ge=0, description="Horizontal coordinate of the top-left corner of the bounding box in image.") roi_vertical_offset: int = Field( default=0, ge=0, description="Vertical coordinate of the top-left corner of the bounding box in image.") roi_width: int = Field(default=0, ge=0, description="Width of the bounding box. Zero uses image boundary") roi_height: int = Field(default=0, ge=0, description="Height of the bounding box. Zero uses image boundary") interpolation: Literal["bilinear", "nearest"] = Field( default="bilinear", description="Interpolation mode of cropping and resizing") def output_size(self, input_height, input_width): """Calculated output size of output after postprocessing, given input image sizes""" height = self.roi_height or input_height width = self.roi_width or input_width return self.output_height or height, self.output_width or width def crop_size(self, input_height, input_width): height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width return height, width def bbox(self, input_height, input_width): """ Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2] The points both being included in the region of interest """ height, width = self.crop_size(input_height, input_width) return self.roi_vertical_offset, self.roi_horizontal_offset, \ self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1 def scale(self, input_height, input_width): """ Calculate output image scale given input image size returns scale in height then width (sy, sx) """ crop_height, crop_width = self.crop_size(input_height, input_width) output_height, output_width = self.output_size(input_width, input_height) return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1), def affine_transform(self, input_height, input_width): sy, sx = self.scale(input_height, input_width) return np.array([ [sx, 0, self.roi_horizontal_offset], [0, sy, self.roi_vertical_offset], [0, 0, 1] ]) def process(self, image): shape = tf.shape(image)[:2] input_height, input_width = shape[0], shape[1] size = self.output_size(input_height, input_width) # Normalize bounding box to match crop_and_resize # https://www.tensorflow.org/api_docs/python/tf/image/crop_and_resize norm = tf.cast([input_height, input_width, input_height, input_width], tf.float32)-1 bbox = tf.cast(self.bbox(input_height, input_width), tf.float32) boxes = [bbox / norm] # Crop and resize, attach batch dimension to match tf call return tf.image.crop_and_resize( image[None], boxes, box_indices=[0], crop_size=size, method=self.interpolation, extrapolation_value=0.0 )[0]Ancestors
- ImageProcessor
 - pydantic.main.BaseModel
 - pydantic.utils.Representation
 
Class variables
var interpolation : typing_extensions.Literal['bilinear', 'nearest']var output_height : intvar output_width : intvar roi_height : intvar roi_horizontal_offset : intvar roi_vertical_offset : intvar roi_width : intvar type : typing_extensions.Literal['CropResizeProcessor']
Methods
def affine_transform(self, input_height, input_width)- 
Expand source code
def affine_transform(self, input_height, input_width): sy, sx = self.scale(input_height, input_width) return np.array([ [sx, 0, self.roi_horizontal_offset], [0, sy, self.roi_vertical_offset], [0, 0, 1] ]) def bbox(self, input_height, input_width)- 
Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2] The points both being included in the region of interest
Expand source code
def bbox(self, input_height, input_width): """ Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2] The points both being included in the region of interest """ height, width = self.crop_size(input_height, input_width) return self.roi_vertical_offset, self.roi_horizontal_offset, \ self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1 def crop_size(self, input_height, input_width)- 
Expand source code
def crop_size(self, input_height, input_width): height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width return height, width def output_size(self, input_height, input_width)- 
Calculated output size of output after postprocessing, given input image sizes
Expand source code
def output_size(self, input_height, input_width): """Calculated output size of output after postprocessing, given input image sizes""" height = self.roi_height or input_height width = self.roi_width or input_width return self.output_height or height, self.output_width or width def scale(self, input_height, input_width)- 
Calculate output image scale given input image size returns scale in height then width (sy, sx)
Expand source code
def scale(self, input_height, input_width): """ Calculate output image scale given input image size returns scale in height then width (sy, sx) """ crop_height, crop_width = self.crop_size(input_height, input_width) output_height, output_width = self.output_size(input_width, input_height) return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1), 
Inherited members
 class ImageLoader (io=<brevettiai.io.utils.IoTools object>, **data)- 
Basic File loading module for DataGenerator
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class ImageLoader(FileLoader): type: Literal["ImageLoader"] = "ImageLoader" output_key: str = Field(default="img", exclude=True) postprocessor: Optional[CropResizeProcessor] = Field(default_factory=CropResizeProcessor) channels: Literal[0, 1, 3, 4] = Field(default=0, description="Number of channels in images, 0 to autodetect") def output_shape(self, image_height=None, image_width=None): output_channels = self.channels if self.channels else None return (*self.postprocessor.output_size(image_height, image_width), output_channels) def load(self, path, metadata=None, postprocess=True): data, meta = super().load(path, metadata) if tf.strings.length(data) > 0: image = tf.io.decode_image(data, expand_animations=False, channels=self.channels) _image_file_shape = tf.convert_to_tensor(tf.shape(image)) if postprocess and self.postprocessor is not None: image = self.postprocessor.process(image) else: if postprocess and self.postprocessor is not None: image = tf.constant(0, dtype=tf.float32, shape=(1, 1, 1)) else: image = tf.constant(0, dtype=tf.uint8, shape=(1, 1, 1)) _image_file_shape = tf.convert_to_tensor(tf.shape(image)) meta["_image_file_shape"] = _image_file_shape return image, metaAncestors
- FileLoader
 - DataGeneratorMap
 - abc.ABC
 - IoBaseModel
 - pydantic.main.BaseModel
 - pydantic.utils.Representation
 
Subclasses
Class variables
var channels : typing_extensions.Literal[0, 1, 3, 4]var output_key : strvar postprocessor : Optional[CropResizeProcessor]var type : typing_extensions.Literal['ImageLoader']
Methods
def output_shape(self, image_height=None, image_width=None)- 
Expand source code
def output_shape(self, image_height=None, image_width=None): output_channels = self.channels if self.channels else None return (*self.postprocessor.output_size(image_height, image_width), output_channels) 
Inherited members
 class ScalingProcessor (**data: Any)- 
Baseclass for implementing interface for image proccessors
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class ScalingProcessor(ImageProcessor): type: Literal["ScalingProcessor"] = "ScalingProcessor" def process(self, image): """Process image according to processor""" max_ = tf.reduce_max(image) min_ = tf.reduce_min(image) image = (image - min_) / (max_ - min_) return imageAncestors
- ImageProcessor
 - pydantic.main.BaseModel
 - pydantic.utils.Representation
 
Class variables
var type : typing_extensions.Literal['ScalingProcessor']
Inherited members