Module brevettiai.data.image.image_loader
Expand source code
import json
import tensorflow as tf
import numpy as np
from pydantic import Field
from pydantic.typing import Literal
from typing import Optional, ClassVar, Type
from brevettiai.data import FileLoader
from brevettiai.data.image import ImageKeys
from brevettiai.data.image.image_processor import ImageProcessor
from brevettiai.data.tf_types import TfRange
class ScalingProcessor(ImageProcessor):
type: Literal["ScalingProcessor"] = "ScalingProcessor"
def process(self, image):
"""Process image according to processor"""
max_ = tf.reduce_max(image)
min_ = tf.reduce_min(image)
image = (image - min_) / (max_ - min_)
return image
class CropResizeProcessor(ImageProcessor):
type: Literal["CropResizeProcessor"] = "CropResizeProcessor"
output_height: int = Field(default=0, ge=0, description="Leave at 0 to infer")
output_width: int = Field(default=0, ge=0, description="Leave at 0 to infer")
roi_horizontal_offset: int = Field(
default=0, ge=0, description="Horizontal coordinate of the top-left corner of the bounding box in image.")
roi_vertical_offset: int = Field(
default=0, ge=0, description="Vertical coordinate of the top-left corner of the bounding box in image.")
roi_width: int = Field(default=0, ge=0, description="Width of the bounding box. Zero uses image boundary")
roi_height: int = Field(default=0, ge=0, description="Height of the bounding box. Zero uses image boundary")
interpolation: Literal["bilinear", "nearest"] = Field(
default="bilinear", description="Interpolation mode of cropping and resizing")
def output_size(self, input_height, input_width):
"""Calculated output size of output after postprocessing, given input image sizes"""
height = self.roi_height or input_height
width = self.roi_width or input_width
return self.output_height or height, self.output_width or width
def crop_size(self, input_height, input_width):
height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height
width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width
return height, width
def bbox(self, input_height, input_width):
"""
Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2]
The points both being included in the region of interest
"""
height, width = self.crop_size(input_height, input_width)
return self.roi_vertical_offset, self.roi_horizontal_offset, \
self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1
def scale(self, input_height, input_width):
"""
Calculate output image scale given input image size
returns scale in height then width (sy, sx)
"""
crop_height, crop_width = self.crop_size(input_height, input_width)
output_height, output_width = self.output_size(input_width, input_height)
return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1),
def affine_transform(self, input_height, input_width):
sy, sx = self.scale(input_height, input_width)
return np.array([
[sx, 0, self.roi_horizontal_offset],
[0, sy, self.roi_vertical_offset],
[0, 0, 1]
])
def process(self, image):
shape = tf.shape(image)[:2]
input_height, input_width = shape[0], shape[1]
size = self.output_size(input_height, input_width)
# Normalize bounding box to match crop_and_resize
# https://www.tensorflow.org/api_docs/python/tf/image/crop_and_resize
norm = tf.cast([input_height, input_width, input_height, input_width], tf.float32)-1
bbox = tf.cast(self.bbox(input_height, input_width), tf.float32)
boxes = [bbox / norm]
# Crop and resize, attach batch dimension to match tf call
return tf.image.crop_and_resize(
image[None], boxes, box_indices=[0], crop_size=size, method=self.interpolation,
extrapolation_value=0.0
)[0]
class ImageLoader(FileLoader):
type: Literal["ImageLoader"] = "ImageLoader"
output_key: str = Field(default="img", exclude=True)
postprocessor: Optional[CropResizeProcessor] = Field(default_factory=CropResizeProcessor)
channels: Literal[0, 1, 3, 4] = Field(default=0, description="Number of channels in images, 0 to autodetect")
def output_shape(self, image_height=None, image_width=None):
output_channels = self.channels if self.channels else None
return (*self.postprocessor.output_size(image_height, image_width), output_channels)
def load(self, path, metadata=None, postprocess=True):
data, meta = super().load(path, metadata)
if tf.strings.length(data) > 0:
image = tf.io.decode_image(data, expand_animations=False, channels=self.channels)
_image_file_shape = tf.convert_to_tensor(tf.shape(image))
if postprocess and self.postprocessor is not None:
image = self.postprocessor.process(image)
else:
if postprocess and self.postprocessor is not None:
image = tf.constant(0, dtype=tf.float32, shape=(1, 1, 1))
else:
image = tf.constant(0, dtype=tf.uint8, shape=(1, 1, 1))
_image_file_shape = tf.convert_to_tensor(tf.shape(image))
meta["_image_file_shape"] = _image_file_shape
return image, meta
class BcimgSequenceLoader(ImageLoader):
type: Literal["BcimgSequenceLoader"] = "BcimgSequenceLoader"
range_meta: ClassVar[Type] = TfRange
metadata_spec = {ImageKeys.SEQUENCE_RANGE: range_meta.build}
def output_shape(self, image_height=None, image_width=None):
output_channels = self.channels if self.channels else None
return (*self.postprocessor.output_size(image_height, image_width), output_channels)
def load_sequence(self, path, postprocess=True):
try:
path = path.item()
except AttributeError:
pass
header = json.loads(self._io.read_file(path))["Image"]
if header["DType"] == "eGrayScale8":
channels = 1
else:
raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented")
shape = np.array((
int(header["Frames"]),
int(header["OriginalSize"]["Height"]),
int(header["OriginalSize"]["Width"]),
channels
), np.int32)
sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format
sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])])
return sequence_files, shape
def load(self, path, metadata=None, postprocess=True):
files, shape = tf.numpy_function(self.load_sequence, [path], [tf.string, tf.int32], name="load_header")
if metadata is not None:
# Select frames
if ImageKeys.SEQUENCE_RANGE in metadata:
files = metadata[ImageKeys.SEQUENCE_RANGE].slice(files)
images, meta = tf.map_fn(
fn=lambda x: super(BcimgSequenceLoader, self).load(x, metadata, postprocess=postprocess),
elems=files,
fn_output_signature=(tf.float32, {'_image_file_shape': tf.int32}),
parallel_iterations=16
)
_image_file_shape = meta["_image_file_shape"][0]
return images, {"_image_file_shape": _image_file_shape, "_sequence_files": files}
Classes
class BcimgSequenceLoader (io=<brevettiai.io.utils.IoTools object>, **data)
-
Basic File loading module for DataGenerator
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class BcimgSequenceLoader(ImageLoader): type: Literal["BcimgSequenceLoader"] = "BcimgSequenceLoader" range_meta: ClassVar[Type] = TfRange metadata_spec = {ImageKeys.SEQUENCE_RANGE: range_meta.build} def output_shape(self, image_height=None, image_width=None): output_channels = self.channels if self.channels else None return (*self.postprocessor.output_size(image_height, image_width), output_channels) def load_sequence(self, path, postprocess=True): try: path = path.item() except AttributeError: pass header = json.loads(self._io.read_file(path))["Image"] if header["DType"] == "eGrayScale8": channels = 1 else: raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented") shape = np.array(( int(header["Frames"]), int(header["OriginalSize"]["Height"]), int(header["OriginalSize"]["Width"]), channels ), np.int32) sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])]) return sequence_files, shape def load(self, path, metadata=None, postprocess=True): files, shape = tf.numpy_function(self.load_sequence, [path], [tf.string, tf.int32], name="load_header") if metadata is not None: # Select frames if ImageKeys.SEQUENCE_RANGE in metadata: files = metadata[ImageKeys.SEQUENCE_RANGE].slice(files) images, meta = tf.map_fn( fn=lambda x: super(BcimgSequenceLoader, self).load(x, metadata, postprocess=postprocess), elems=files, fn_output_signature=(tf.float32, {'_image_file_shape': tf.int32}), parallel_iterations=16 ) _image_file_shape = meta["_image_file_shape"][0] return images, {"_image_file_shape": _image_file_shape, "_sequence_files": files}
Ancestors
- ImageLoader
- FileLoader
- DataGeneratorMap
- abc.ABC
- IoBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var metadata_spec : ClassVar[dict]
var range_meta : ClassVar[Type[+CT_co]]
-
An object for slicing tensors
var type : typing_extensions.Literal['BcimgSequenceLoader']
Methods
def load_sequence(self, path, postprocess=True)
-
Expand source code
def load_sequence(self, path, postprocess=True): try: path = path.item() except AttributeError: pass header = json.loads(self._io.read_file(path))["Image"] if header["DType"] == "eGrayScale8": channels = 1 else: raise NotImplementedError(f"dtype of bcimg.json '{header['DType']}' not implemented") shape = np.array(( int(header["Frames"]), int(header["OriginalSize"]["Height"]), int(header["OriginalSize"]["Width"]), channels ), np.int32) sequence_fmt = self._io.path.join(path[:-10].decode(), "image_files", f"{{:06d}}.{header['Format']}").format sequence_files = np.array([sequence_fmt(i) for i in range(shape[0])]) return sequence_files, shape
def output_shape(self, image_height=None, image_width=None)
-
Expand source code
def output_shape(self, image_height=None, image_width=None): output_channels = self.channels if self.channels else None return (*self.postprocessor.output_size(image_height, image_width), output_channels)
Inherited members
class CropResizeProcessor (**data: Any)
-
Baseclass for implementing interface for image proccessors
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class CropResizeProcessor(ImageProcessor): type: Literal["CropResizeProcessor"] = "CropResizeProcessor" output_height: int = Field(default=0, ge=0, description="Leave at 0 to infer") output_width: int = Field(default=0, ge=0, description="Leave at 0 to infer") roi_horizontal_offset: int = Field( default=0, ge=0, description="Horizontal coordinate of the top-left corner of the bounding box in image.") roi_vertical_offset: int = Field( default=0, ge=0, description="Vertical coordinate of the top-left corner of the bounding box in image.") roi_width: int = Field(default=0, ge=0, description="Width of the bounding box. Zero uses image boundary") roi_height: int = Field(default=0, ge=0, description="Height of the bounding box. Zero uses image boundary") interpolation: Literal["bilinear", "nearest"] = Field( default="bilinear", description="Interpolation mode of cropping and resizing") def output_size(self, input_height, input_width): """Calculated output size of output after postprocessing, given input image sizes""" height = self.roi_height or input_height width = self.roi_width or input_width return self.output_height or height, self.output_width or width def crop_size(self, input_height, input_width): height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width return height, width def bbox(self, input_height, input_width): """ Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2] The points both being included in the region of interest """ height, width = self.crop_size(input_height, input_width) return self.roi_vertical_offset, self.roi_horizontal_offset, \ self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1 def scale(self, input_height, input_width): """ Calculate output image scale given input image size returns scale in height then width (sy, sx) """ crop_height, crop_width = self.crop_size(input_height, input_width) output_height, output_width = self.output_size(input_width, input_height) return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1), def affine_transform(self, input_height, input_width): sy, sx = self.scale(input_height, input_width) return np.array([ [sx, 0, self.roi_horizontal_offset], [0, sy, self.roi_vertical_offset], [0, 0, 1] ]) def process(self, image): shape = tf.shape(image)[:2] input_height, input_width = shape[0], shape[1] size = self.output_size(input_height, input_width) # Normalize bounding box to match crop_and_resize # https://www.tensorflow.org/api_docs/python/tf/image/crop_and_resize norm = tf.cast([input_height, input_width, input_height, input_width], tf.float32)-1 bbox = tf.cast(self.bbox(input_height, input_width), tf.float32) boxes = [bbox / norm] # Crop and resize, attach batch dimension to match tf call return tf.image.crop_and_resize( image[None], boxes, box_indices=[0], crop_size=size, method=self.interpolation, extrapolation_value=0.0 )[0]
Ancestors
- ImageProcessor
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var interpolation : typing_extensions.Literal['bilinear', 'nearest']
var output_height : int
var output_width : int
var roi_height : int
var roi_horizontal_offset : int
var roi_vertical_offset : int
var roi_width : int
var type : typing_extensions.Literal['CropResizeProcessor']
Methods
def affine_transform(self, input_height, input_width)
-
Expand source code
def affine_transform(self, input_height, input_width): sy, sx = self.scale(input_height, input_width) return np.array([ [sx, 0, self.roi_horizontal_offset], [0, sy, self.roi_vertical_offset], [0, 0, 1] ])
def bbox(self, input_height, input_width)
-
Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2] The points both being included in the region of interest
Expand source code
def bbox(self, input_height, input_width): """ Calculate bounding box specified in pixel coordinates [y1, x1, y2, x2] The points both being included in the region of interest """ height, width = self.crop_size(input_height, input_width) return self.roi_vertical_offset, self.roi_horizontal_offset, \ self.roi_vertical_offset + height - 1, self.roi_horizontal_offset + width - 1
def crop_size(self, input_height, input_width)
-
Expand source code
def crop_size(self, input_height, input_width): height = input_height - self.roi_vertical_offset if self.roi_height == 0 else self.roi_height width = input_width - self.roi_horizontal_offset if self.roi_width == 0 else self.roi_width return height, width
def output_size(self, input_height, input_width)
-
Calculated output size of output after postprocessing, given input image sizes
Expand source code
def output_size(self, input_height, input_width): """Calculated output size of output after postprocessing, given input image sizes""" height = self.roi_height or input_height width = self.roi_width or input_width return self.output_height or height, self.output_width or width
def scale(self, input_height, input_width)
-
Calculate output image scale given input image size returns scale in height then width (sy, sx)
Expand source code
def scale(self, input_height, input_width): """ Calculate output image scale given input image size returns scale in height then width (sy, sx) """ crop_height, crop_width = self.crop_size(input_height, input_width) output_height, output_width = self.output_size(input_width, input_height) return (crop_height-1) / (output_height-1), (crop_width-1) / (output_width-1),
Inherited members
class ImageLoader (io=<brevettiai.io.utils.IoTools object>, **data)
-
Basic File loading module for DataGenerator
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class ImageLoader(FileLoader): type: Literal["ImageLoader"] = "ImageLoader" output_key: str = Field(default="img", exclude=True) postprocessor: Optional[CropResizeProcessor] = Field(default_factory=CropResizeProcessor) channels: Literal[0, 1, 3, 4] = Field(default=0, description="Number of channels in images, 0 to autodetect") def output_shape(self, image_height=None, image_width=None): output_channels = self.channels if self.channels else None return (*self.postprocessor.output_size(image_height, image_width), output_channels) def load(self, path, metadata=None, postprocess=True): data, meta = super().load(path, metadata) if tf.strings.length(data) > 0: image = tf.io.decode_image(data, expand_animations=False, channels=self.channels) _image_file_shape = tf.convert_to_tensor(tf.shape(image)) if postprocess and self.postprocessor is not None: image = self.postprocessor.process(image) else: if postprocess and self.postprocessor is not None: image = tf.constant(0, dtype=tf.float32, shape=(1, 1, 1)) else: image = tf.constant(0, dtype=tf.uint8, shape=(1, 1, 1)) _image_file_shape = tf.convert_to_tensor(tf.shape(image)) meta["_image_file_shape"] = _image_file_shape return image, meta
Ancestors
- FileLoader
- DataGeneratorMap
- abc.ABC
- IoBaseModel
- pydantic.main.BaseModel
- pydantic.utils.Representation
Subclasses
Class variables
var channels : typing_extensions.Literal[0, 1, 3, 4]
var output_key : str
var postprocessor : Optional[CropResizeProcessor]
var type : typing_extensions.Literal['ImageLoader']
Methods
def output_shape(self, image_height=None, image_width=None)
-
Expand source code
def output_shape(self, image_height=None, image_width=None): output_channels = self.channels if self.channels else None return (*self.postprocessor.output_size(image_height, image_width), output_channels)
Inherited members
class ScalingProcessor (**data: Any)
-
Baseclass for implementing interface for image proccessors
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class ScalingProcessor(ImageProcessor): type: Literal["ScalingProcessor"] = "ScalingProcessor" def process(self, image): """Process image according to processor""" max_ = tf.reduce_max(image) min_ = tf.reduce_min(image) image = (image - min_) / (max_ - min_) return image
Ancestors
- ImageProcessor
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var type : typing_extensions.Literal['ScalingProcessor']
Inherited members