Module brevettiai.data.image.image_loader

Expand source code
import json
import tensorflow as tf
import numpy as np

from pydantic import Field
from pydantic.typing import Literal
from typing import Optional, ClassVar, Type
from brevettiai.data import FileLoader
from brevettiai.data.image import ImageKeys
from brevettiai.data.image.image_processor import ImageProcessor
from brevettiai.data.tf_types import TfRange


class ScalingProcessor(ImageProcessor):
    type: Literal["ScalingProcessor"] = "ScalingProcessor"

    def process(self, image):
        """Process image according to processor"""
        max_ = tf.reduce_max(image)
        min_ = tf.reduce_min(image)
        image = (image - min_) / (max_ - min_)
        return image


class CropResizeProcessor(ImageProcessor):
    type: Literal["CropResizeProcessor"] = "CropResizeProcessor"
    output_height: int = Field(default=0, ge=0, description="Leave at 0 to infer")
    output_width: int = Field(default=0, ge=0, description="Leave at 0 to infer")

    roi_horizontal_offset: int = Field(
        default=0, ge=0, description="Horizontal coordinate of the top-left corner of the bounding box in image.")
    roi_vertical_offset: int = Field(
        default=0, ge=0, description="Vertical coordinate of the top-left corner of the bounding box in image.")
    roi_width: int = Field(default=0, ge=0, description="Width of the bounding box. Zero uses image boundary")
    roi_height: int = Field(default=0, ge=0, description="Height of the bounding box. Zero uses image boundary")

    interpolation: Literal["bilinear", "nearest"] = Field(
        default="bilinear", description="Interpolation mode of cropping and resizing")

    def output_size(self, input_height, input_width):
        """Calculated output size of output after postprocessing, given input image sizes"""
        height = self.roi_height or input_height
        width = self.roi_width or input_width
        return self.output_height or height, self.output_width or width

    def crop_size(self, input_height, input_width):
        height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height
        width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width
        return height, width

    def bbox(self, input_height, input_width):
        """
        Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2]
        The points both being included in the region of interest
        """
        height, width = self.crop_size(input_height, input_width)
        return self.roi_vertical_offset, self.roi_horizontal_offset, \
            self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1

    def scale(self, input_height, input_width):
        """
        Calculate output image scale given input image size
        returns scale in height then width (sy, sx)
        """
        crop_height, crop_width = self.crop_size(input_height, input_width)
        output_height, output_width = self.output_size(input_width, input_height)
        return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1),

    def affine_transform(self, input_height, input_width):
        sy, sx = self.scale(input_height, input_width)

        return np.array([
            [sx,  0, self.roi_horizontal_offset],
            [0, sy, self.roi_vertical_offset],
            [0, 0, 1]
        ])

    def process(self, image):
        shape = tf.shape(image)[:2]
        input_height, input_width = shape[0], shape[1]

        size = self.output_size(input_height, input_width)

        # Normalize bounding box to match crop_and_resize
        # https://www.tensorflow.org/api_docs/python/tf/image/crop_and_resize
        norm = tf.cast([input_height, input_width, input_height, input_width], tf.float32)-1
        bbox = tf.cast(self.bbox(input_height, input_width), tf.float32)
        boxes = [bbox / norm]

        # Crop and resize, attach batch dimension to match tf call
        return tf.image.crop_and_resize(
            image[None], boxes, box_indices=[0], crop_size=size, method=self.interpolation,
            extrapolation_value=0.0
        )[0]


class ImageLoader(FileLoader):
    type: Literal["ImageLoader"] = "ImageLoader"
    output_key: str = Field(default="img", exclude=True)
    postprocessor: Optional[CropResizeProcessor] = Field(default_factory=CropResizeProcessor)
    channels: Literal[0, 1, 3, 4] = Field(default=0, description="Number of channels in images, 0 to autodetect")

    def output_shape(self, image_height=None, image_width=None):
        output_channels = self.channels if self.channels else None
        return (*self.postprocessor.output_size(image_height, image_width), output_channels)

    def load(self, path, metadata=None, postprocess=True):
        data, meta = super().load(path, metadata)

        if tf.strings.length(data) > 0:
            image = tf.io.decode_image(data, expand_animations=False, channels=self.channels)
            _image_file_shape = tf.convert_to_tensor(tf.shape(image))

            if postprocess and self.postprocessor is not None:
                image = self.postprocessor.process(image)
        else:
            if postprocess and self.postprocessor is not None:
                image = tf.constant(0, dtype=tf.float32, shape=(1, 1, 1))
            else:
                image = tf.constant(0, dtype=tf.uint8, shape=(1, 1, 1))
            _image_file_shape = tf.convert_to_tensor(tf.shape(image))

        meta["_image_file_shape"] = _image_file_shape

        return image, meta


class BcimgSequenceLoader(ImageLoader):
    type: Literal["BcimgSequenceLoader"] = "BcimgSequenceLoader"
    range_meta: ClassVar[Type] = TfRange
    metadata_spec = {ImageKeys.SEQUENCE_RANGE: range_meta.build}

    def output_shape(self, image_height=None, image_width=None):
        output_channels = self.channels if self.channels else None
        return (*self.postprocessor.output_size(image_height, image_width), output_channels)

    def load_sequence(self, path, postprocess=True):
        try:
            path = path.item()
        except AttributeError:
            pass
        header = json.loads(self._io.read_file(path))["Image"]

        if header["DType"] == "eGrayScale8":
            channels = 1
        else:
            raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented")
        shape = np.array((
            int(header["Frames"]),
            int(header["OriginalSize"]["Height"]),
            int(header["OriginalSize"]["Width"]),
            channels
        ), np.int32)
        sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format
        sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])])
        return sequence_files, shape

    def load(self, path, metadata=None, postprocess=True):
        files, shape = tf.numpy_function(self.load_sequence, [path], [tf.string, tf.int32], name="load_header")

        if metadata is not None:
            # Select frames
            if ImageKeys.SEQUENCE_RANGE in metadata:
                files = metadata[ImageKeys.SEQUENCE_RANGE].slice(files)

        images, meta = tf.map_fn(
            fn=lambda x: super(BcimgSequenceLoader, self).load(x, metadata, postprocess=postprocess),
            elems=files,
            fn_output_signature=(tf.float32, {'_image_file_shape': tf.int32}),
            parallel_iterations=16
        )
        _image_file_shape = meta["_image_file_shape"][0]
        return images, {"_image_file_shape": _image_file_shape, "_sequence_files": files}

Classes

class BcimgSequenceLoader (io=<brevettiai.io.utils.IoTools object>, **data)

Basic File loading module for DataGenerator

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class BcimgSequenceLoader(ImageLoader):
    type: Literal["BcimgSequenceLoader"] = "BcimgSequenceLoader"
    range_meta: ClassVar[Type] = TfRange
    metadata_spec = {ImageKeys.SEQUENCE_RANGE: range_meta.build}

    def output_shape(self, image_height=None, image_width=None):
        output_channels = self.channels if self.channels else None
        return (*self.postprocessor.output_size(image_height, image_width), output_channels)

    def load_sequence(self, path, postprocess=True):
        try:
            path = path.item()
        except AttributeError:
            pass
        header = json.loads(self._io.read_file(path))["Image"]

        if header["DType"] == "eGrayScale8":
            channels = 1
        else:
            raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented")
        shape = np.array((
            int(header["Frames"]),
            int(header["OriginalSize"]["Height"]),
            int(header["OriginalSize"]["Width"]),
            channels
        ), np.int32)
        sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format
        sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])])
        return sequence_files, shape

    def load(self, path, metadata=None, postprocess=True):
        files, shape = tf.numpy_function(self.load_sequence, [path], [tf.string, tf.int32], name="load_header")

        if metadata is not None:
            # Select frames
            if ImageKeys.SEQUENCE_RANGE in metadata:
                files = metadata[ImageKeys.SEQUENCE_RANGE].slice(files)

        images, meta = tf.map_fn(
            fn=lambda x: super(BcimgSequenceLoader, self).load(x, metadata, postprocess=postprocess),
            elems=files,
            fn_output_signature=(tf.float32, {'_image_file_shape': tf.int32}),
            parallel_iterations=16
        )
        _image_file_shape = meta["_image_file_shape"][0]
        return images, {"_image_file_shape": _image_file_shape, "_sequence_files": files}

Ancestors

Class variables

var metadata_spec : ClassVar[dict]
var range_meta : ClassVar[Type[+CT_co]]

An object for slicing tensors

var type : typing_extensions.Literal['BcimgSequenceLoader']

Methods

def load_sequence(self, path, postprocess=True)
Expand source code
def load_sequence(self, path, postprocess=True):
    try:
        path = path.item()
    except AttributeError:
        pass
    header = json.loads(self._io.read_file(path))["Image"]

    if header["DType"] == "eGrayScale8":
        channels = 1
    else:
        raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented")
    shape = np.array((
        int(header["Frames"]),
        int(header["OriginalSize"]["Height"]),
        int(header["OriginalSize"]["Width"]),
        channels
    ), np.int32)
    sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format
    sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])])
    return sequence_files, shape
def output_shape(self, image_height=None, image_width=None)
Expand source code
def output_shape(self, image_height=None, image_width=None):
    output_channels = self.channels if self.channels else None
    return (*self.postprocessor.output_size(image_height, image_width), output_channels)

Inherited members

class CropResizeProcessor (**data: Any)

Baseclass for implementing interface for image proccessors

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class CropResizeProcessor(ImageProcessor):
    type: Literal["CropResizeProcessor"] = "CropResizeProcessor"
    output_height: int = Field(default=0, ge=0, description="Leave at 0 to infer")
    output_width: int = Field(default=0, ge=0, description="Leave at 0 to infer")

    roi_horizontal_offset: int = Field(
        default=0, ge=0, description="Horizontal coordinate of the top-left corner of the bounding box in image.")
    roi_vertical_offset: int = Field(
        default=0, ge=0, description="Vertical coordinate of the top-left corner of the bounding box in image.")
    roi_width: int = Field(default=0, ge=0, description="Width of the bounding box. Zero uses image boundary")
    roi_height: int = Field(default=0, ge=0, description="Height of the bounding box. Zero uses image boundary")

    interpolation: Literal["bilinear", "nearest"] = Field(
        default="bilinear", description="Interpolation mode of cropping and resizing")

    def output_size(self, input_height, input_width):
        """Calculated output size of output after postprocessing, given input image sizes"""
        height = self.roi_height or input_height
        width = self.roi_width or input_width
        return self.output_height or height, self.output_width or width

    def crop_size(self, input_height, input_width):
        height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height
        width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width
        return height, width

    def bbox(self, input_height, input_width):
        """
        Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2]
        The points both being included in the region of interest
        """
        height, width = self.crop_size(input_height, input_width)
        return self.roi_vertical_offset, self.roi_horizontal_offset, \
            self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1

    def scale(self, input_height, input_width):
        """
        Calculate output image scale given input image size
        returns scale in height then width (sy, sx)
        """
        crop_height, crop_width = self.crop_size(input_height, input_width)
        output_height, output_width = self.output_size(input_width, input_height)
        return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1),

    def affine_transform(self, input_height, input_width):
        sy, sx = self.scale(input_height, input_width)

        return np.array([
            [sx,  0, self.roi_horizontal_offset],
            [0, sy, self.roi_vertical_offset],
            [0, 0, 1]
        ])

    def process(self, image):
        shape = tf.shape(image)[:2]
        input_height, input_width = shape[0], shape[1]

        size = self.output_size(input_height, input_width)

        # Normalize bounding box to match crop_and_resize
        # https://www.tensorflow.org/api_docs/python/tf/image/crop_and_resize
        norm = tf.cast([input_height, input_width, input_height, input_width], tf.float32)-1
        bbox = tf.cast(self.bbox(input_height, input_width), tf.float32)
        boxes = [bbox / norm]

        # Crop and resize, attach batch dimension to match tf call
        return tf.image.crop_and_resize(
            image[None], boxes, box_indices=[0], crop_size=size, method=self.interpolation,
            extrapolation_value=0.0
        )[0]

Ancestors

  • ImageProcessor
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var interpolation : typing_extensions.Literal['bilinear', 'nearest']
var output_height : int
var output_width : int
var roi_height : int
var roi_horizontal_offset : int
var roi_vertical_offset : int
var roi_width : int
var type : typing_extensions.Literal['CropResizeProcessor']

Methods

def affine_transform(self, input_height, input_width)
Expand source code
def affine_transform(self, input_height, input_width):
    sy, sx = self.scale(input_height, input_width)

    return np.array([
        [sx,  0, self.roi_horizontal_offset],
        [0, sy, self.roi_vertical_offset],
        [0, 0, 1]
    ])
def bbox(self, input_height, input_width)

Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2] The points both being included in the region of interest

Expand source code
def bbox(self, input_height, input_width):
    """
    Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2]
    The points both being included in the region of interest
    """
    height, width = self.crop_size(input_height, input_width)
    return self.roi_vertical_offset, self.roi_horizontal_offset, \
        self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1
def crop_size(self, input_height, input_width)
Expand source code
def crop_size(self, input_height, input_width):
    height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height
    width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width
    return height, width
def output_size(self, input_height, input_width)

Calculated output size of output after postprocessing, given input image sizes

Expand source code
def output_size(self, input_height, input_width):
    """Calculated output size of output after postprocessing, given input image sizes"""
    height = self.roi_height or input_height
    width = self.roi_width or input_width
    return self.output_height or height, self.output_width or width
def scale(self, input_height, input_width)

Calculate output image scale given input image size returns scale in height then width (sy, sx)

Expand source code
def scale(self, input_height, input_width):
    """
    Calculate output image scale given input image size
    returns scale in height then width (sy, sx)
    """
    crop_height, crop_width = self.crop_size(input_height, input_width)
    output_height, output_width = self.output_size(input_width, input_height)
    return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1),

Inherited members

class ImageLoader (io=<brevettiai.io.utils.IoTools object>, **data)

Basic File loading module for DataGenerator

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class ImageLoader(FileLoader):
    type: Literal["ImageLoader"] = "ImageLoader"
    output_key: str = Field(default="img", exclude=True)
    postprocessor: Optional[CropResizeProcessor] = Field(default_factory=CropResizeProcessor)
    channels: Literal[0, 1, 3, 4] = Field(default=0, description="Number of channels in images, 0 to autodetect")

    def output_shape(self, image_height=None, image_width=None):
        output_channels = self.channels if self.channels else None
        return (*self.postprocessor.output_size(image_height, image_width), output_channels)

    def load(self, path, metadata=None, postprocess=True):
        data, meta = super().load(path, metadata)

        if tf.strings.length(data) > 0:
            image = tf.io.decode_image(data, expand_animations=False, channels=self.channels)
            _image_file_shape = tf.convert_to_tensor(tf.shape(image))

            if postprocess and self.postprocessor is not None:
                image = self.postprocessor.process(image)
        else:
            if postprocess and self.postprocessor is not None:
                image = tf.constant(0, dtype=tf.float32, shape=(1, 1, 1))
            else:
                image = tf.constant(0, dtype=tf.uint8, shape=(1, 1, 1))
            _image_file_shape = tf.convert_to_tensor(tf.shape(image))

        meta["_image_file_shape"] = _image_file_shape

        return image, meta

Ancestors

Subclasses

Class variables

var channels : typing_extensions.Literal[0, 1, 3, 4]
var output_key : str
var postprocessor : Optional[CropResizeProcessor]
var type : typing_extensions.Literal['ImageLoader']

Methods

def output_shape(self, image_height=None, image_width=None)
Expand source code
def output_shape(self, image_height=None, image_width=None):
    output_channels = self.channels if self.channels else None
    return (*self.postprocessor.output_size(image_height, image_width), output_channels)

Inherited members

class ScalingProcessor (**data: Any)

Baseclass for implementing interface for image proccessors

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class ScalingProcessor(ImageProcessor):
    type: Literal["ScalingProcessor"] = "ScalingProcessor"

    def process(self, image):
        """Process image according to processor"""
        max_ = tf.reduce_max(image)
        min_ = tf.reduce_min(image)
        image = (image - min_) / (max_ - min_)
        return image

Ancestors

  • ImageProcessor
  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var type : typing_extensions.Literal['ScalingProcessor']

Inherited members