Module brevettiai.platform.aipackage
Expand source code
import hashlib
import logging
import os
import re
from dataclasses import dataclass
from typing import Dict, ClassVar
from io import BytesIO
from pydantic import Field
from brevettiai.model.metadata import ModelMetadata
from brevettiai.platform import Job
from brevettiai.utils.model_version import get_model_version
import py7zr
log = logging.getLogger(__name__)
MSG_OPEN_ERROR = "Error opening archive"
@dataclass
class AIPackageAsset:
"""
Data holder for file names and hashes of assets in model archive
"""
file: str
hash: str
class AIPackageMetadata(ModelMetadata):
"""
Metadata for model archives
adding an assets field for users of the archive to check if it contains what is needed
"""
AIPACKAGE_VERSION: ClassVar[str] = "1.0"
"""Version of ai package"""
version: str = Field(default=AIPACKAGE_VERSION, const=True, description="aipackage version number marker")
assets: Dict[str, AIPackageAsset] = Field(default_factory=dict,
description="Key value pairs of name and path to asset")
class AIPackage:
"""
Helps to create archives with different assets and their metadata
"""
METADATA_URI: ClassVar[str] = "metadata.json"
"""location of metadata file"""
def __init__(self, path=None, metadata: AIPackageMetadata = None, job: Job = None, password: str = None):
"""
Initialize ai package.
Can use job or metadata as source, if neither are given metadata is loaded from current archive
Notes:
* Use the context manager to write assets.
Usage:
in Job.run:
```python
archive = AIPackage(job=self)
with model_archive.open_write() as writer:
writer.add_asset(name="onnx", arcname="model.onnx", file=self.temp_path("model.onnx"))
with archive.open_read() as reader:
with reader.get_asset("onnx") as fp:
data = fp.read()
...
return archive.upload(self)
```
Args:
path: location of archive (Must end with .aipkg), if job given and path is None f"{job.name}.aipkg" is chosen
metadata: optional AIPackage metadata for new packages
job: optional job to gather metadata from if metadata keyword is missing
password: optional password to protect the archive. default = None
"""
self.path = path or job.temp_path(f"{re.sub('[^-a-zA-Z0-9_.()]+', '_', job.name)}.aipkg".lower())
self.password = password
try:
if metadata is not None:
self.metadata = AIPackageMetadata.parse_obj(metadata)
elif job is not None:
self.metadata = AIPackageMetadata.parse_obj(job.get_metadata())
if os.path.isfile(self.path):
with py7zr.SevenZipFile(self.path, 'r', password=self.password) as arc:
targets = arc.read(targets=AIPackage.METADATA_URI)
self.metadata = AIPackageMetadata.parse_raw(targets[AIPackage.METADATA_URI].read())
else:
if not os.path.isfile(self.path):
raise TypeError("Model archive needs 'metadata' to create archive")
with py7zr.SevenZipFile(self.path, 'r', password=self.password) as arc:
targets = arc.read(targets=AIPackage.METADATA_URI)
self.metadata = AIPackageMetadata.parse_raw(targets[AIPackage.METADATA_URI].read())
except py7zr.Bad7zFile as ex:
raise IOError(MSG_OPEN_ERROR) from ex
except py7zr.PasswordRequired as ex:
raise IOError("Password required for opening archive") from ex
self._archive = None
@property
def closed(self):
return True if self._archive is None else self._archive.fp.closed
def _write_metadata(self, archive):
archive.writestr(self.metadata.json(indent=2), arcname=AIPackage.METADATA_URI)
def open_read(self):
if self._archive is None:
try:
self._archive = py7zr.SevenZipFile(self.path, "r", password=self.password)
except py7zr.Bad7zFile as ex:
raise IOError(MSG_OPEN_ERROR) from ex
if self._archive.mode != "r":
raise IOError("Archive already opened in write mode")
return self
def open_write(self):
if self._archive is None:
try:
self._archive = py7zr.SevenZipFile(self.path, "w", password=self.password)
except py7zr.Bad7zFile as ex:
raise IOError(MSG_OPEN_ERROR) from ex
self.metadata.assets.clear()
if self._archive.mode != "w":
raise IOError("Archive already opened in read mode")
return self
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._archive.mode == "w":
self._write_metadata(self._archive)
self._archive.close()
self._archive = None
def get_asset(self, name, validate=True):
if self._archive is None:
raise IOError("Archive not open. use: 'with archive.open_read() as reader:'")
if self._archive.mode != "r":
raise IOError("Archive not in read mode. use: 'with archive.open_read() as reader:'")
try:
asset = self.metadata.assets[name]
except KeyError as ex:
raise KeyError(f"Asset '{name} not in archive'") from ex
try:
data = self._archive.read(targets=asset.file)[asset.file]
if validate and self._get_hash(data) != asset.hash:
raise IOError("Asset file does not match asset hash")
return data
except py7zr.Bad7zFile as ex:
raise IOError("Error extracting asset from archive") from ex
def add_asset(self, name, arcname, file) -> AIPackageAsset:
"""
Add an asset to the archive
Args:
name: name of asset
arcname: name of file location in archive to save asset
file: Path to asset to add to archive
Returns:
ModelArchiveAsset
"""
if self._archive is None:
raise IOError("Archive not open. use: 'with archive.open_write() as writer:'")
if self._archive.mode != "w":
raise IOError("Archive not in write mode. use: 'with archive.open_write() as writer:'")
# Normalize path
arcname = os.path.normpath(arcname).replace("\\", "/")
# Create asset
asset = AIPackageAsset(file=arcname, hash=self._get_hash(file))
self._archive.write(file, arcname=arcname)
self.metadata.assets[name] = asset
return asset
@staticmethod
def _get_hash(data):
sha256 = hashlib.sha256()
if isinstance(data, BytesIO):
for byte_block in iter(lambda: data.read(4096), b""):
sha256.update(byte_block)
data.seek(0)
else:
with open(data, "rb") as fp:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: fp.read(4096), b""):
sha256.update(byte_block)
return f"sha256:{sha256.hexdigest()}"
@property
def versioned_name(self) -> str:
"""
Get a versioned name according to the specification name.tar.gz -> name.version.tar.gz
where the version is generated by `get_model_version`
Returns: versioned file name
"""
if not os.path.isfile(self.path):
raise IOError("Archive does not exist, use 'with' keyword build archive")
if not self.closed:
raise IOError("Archive file not closed")
archive_name = os.path.basename(self.path)
split_name = archive_name.rsplit(".", 1)
split_name.insert(1, str(get_model_version(self.path)))
return ".".join(split_name)
@classmethod
def from_job(cls, job, tmpdir):
if not job.model_path:
raise AttributeError("Job does not contain an AiPackage")
model_archive_path = job.io.path.join(job.job_dir, job.model_path)
tmp_archive_path = os.path.join(tmpdir, os.path.basename(model_archive_path))
job.io.copy(model_archive_path, tmp_archive_path)
return cls(path=tmp_archive_path)
Classes
class AIPackage (path=None, metadata: AIPackageMetadata = None, job: Job = None, password: str = None)-
Helps to create archives with different assets and their metadata
Initialize ai package. Can use job or metadata as source, if neither are given metadata is loaded from current archive
Notes
- Use the context manager to write assets.
Usage
in Job.run:
archive = AIPackage(job=self) with model_archive.open_write() as writer: writer.add_asset(name="onnx", arcname="model.onnx", file=self.temp_path("model.onnx")) with archive.open_read() as reader: with reader.get_asset("onnx") as fp: data = fp.read() ... return archive.upload(self)Args
path- location of archive (Must end with .aipkg), if job given and path is None f"{job.name}.aipkg" is chosen
metadata- optional AIPackage metadata for new packages
job- optional job to gather metadata from if metadata keyword is missing
password- optional password to protect the archive. default = None
Expand source code
class AIPackage: """ Helps to create archives with different assets and their metadata """ METADATA_URI: ClassVar[str] = "metadata.json" """location of metadata file""" def __init__(self, path=None, metadata: AIPackageMetadata = None, job: Job = None, password: str = None): """ Initialize ai package. Can use job or metadata as source, if neither are given metadata is loaded from current archive Notes: * Use the context manager to write assets. Usage: in Job.run: ```python archive = AIPackage(job=self) with model_archive.open_write() as writer: writer.add_asset(name="onnx", arcname="model.onnx", file=self.temp_path("model.onnx")) with archive.open_read() as reader: with reader.get_asset("onnx") as fp: data = fp.read() ... return archive.upload(self) ``` Args: path: location of archive (Must end with .aipkg), if job given and path is None f"{job.name}.aipkg" is chosen metadata: optional AIPackage metadata for new packages job: optional job to gather metadata from if metadata keyword is missing password: optional password to protect the archive. default = None """ self.path = path or job.temp_path(f"{re.sub('[^-a-zA-Z0-9_.()]+', '_', job.name)}.aipkg".lower()) self.password = password try: if metadata is not None: self.metadata = AIPackageMetadata.parse_obj(metadata) elif job is not None: self.metadata = AIPackageMetadata.parse_obj(job.get_metadata()) if os.path.isfile(self.path): with py7zr.SevenZipFile(self.path, 'r', password=self.password) as arc: targets = arc.read(targets=AIPackage.METADATA_URI) self.metadata = AIPackageMetadata.parse_raw(targets[AIPackage.METADATA_URI].read()) else: if not os.path.isfile(self.path): raise TypeError("Model archive needs 'metadata' to create archive") with py7zr.SevenZipFile(self.path, 'r', password=self.password) as arc: targets = arc.read(targets=AIPackage.METADATA_URI) self.metadata = AIPackageMetadata.parse_raw(targets[AIPackage.METADATA_URI].read()) except py7zr.Bad7zFile as ex: raise IOError(MSG_OPEN_ERROR) from ex except py7zr.PasswordRequired as ex: raise IOError("Password required for opening archive") from ex self._archive = None @property def closed(self): return True if self._archive is None else self._archive.fp.closed def _write_metadata(self, archive): archive.writestr(self.metadata.json(indent=2), arcname=AIPackage.METADATA_URI) def open_read(self): if self._archive is None: try: self._archive = py7zr.SevenZipFile(self.path, "r", password=self.password) except py7zr.Bad7zFile as ex: raise IOError(MSG_OPEN_ERROR) from ex if self._archive.mode != "r": raise IOError("Archive already opened in write mode") return self def open_write(self): if self._archive is None: try: self._archive = py7zr.SevenZipFile(self.path, "w", password=self.password) except py7zr.Bad7zFile as ex: raise IOError(MSG_OPEN_ERROR) from ex self.metadata.assets.clear() if self._archive.mode != "w": raise IOError("Archive already opened in read mode") return self def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if self._archive.mode == "w": self._write_metadata(self._archive) self._archive.close() self._archive = None def get_asset(self, name, validate=True): if self._archive is None: raise IOError("Archive not open. use: 'with archive.open_read() as reader:'") if self._archive.mode != "r": raise IOError("Archive not in read mode. use: 'with archive.open_read() as reader:'") try: asset = self.metadata.assets[name] except KeyError as ex: raise KeyError(f"Asset '{name} not in archive'") from ex try: data = self._archive.read(targets=asset.file)[asset.file] if validate and self._get_hash(data) != asset.hash: raise IOError("Asset file does not match asset hash") return data except py7zr.Bad7zFile as ex: raise IOError("Error extracting asset from archive") from ex def add_asset(self, name, arcname, file) -> AIPackageAsset: """ Add an asset to the archive Args: name: name of asset arcname: name of file location in archive to save asset file: Path to asset to add to archive Returns: ModelArchiveAsset """ if self._archive is None: raise IOError("Archive not open. use: 'with archive.open_write() as writer:'") if self._archive.mode != "w": raise IOError("Archive not in write mode. use: 'with archive.open_write() as writer:'") # Normalize path arcname = os.path.normpath(arcname).replace("\\", "/") # Create asset asset = AIPackageAsset(file=arcname, hash=self._get_hash(file)) self._archive.write(file, arcname=arcname) self.metadata.assets[name] = asset return asset @staticmethod def _get_hash(data): sha256 = hashlib.sha256() if isinstance(data, BytesIO): for byte_block in iter(lambda: data.read(4096), b""): sha256.update(byte_block) data.seek(0) else: with open(data, "rb") as fp: # Read and update hash string value in blocks of 4K for byte_block in iter(lambda: fp.read(4096), b""): sha256.update(byte_block) return f"sha256:{sha256.hexdigest()}" @property def versioned_name(self) -> str: """ Get a versioned name according to the specification name.tar.gz -> name.version.tar.gz where the version is generated by `get_model_version` Returns: versioned file name """ if not os.path.isfile(self.path): raise IOError("Archive does not exist, use 'with' keyword build archive") if not self.closed: raise IOError("Archive file not closed") archive_name = os.path.basename(self.path) split_name = archive_name.rsplit(".", 1) split_name.insert(1, str(get_model_version(self.path))) return ".".join(split_name) @classmethod def from_job(cls, job, tmpdir): if not job.model_path: raise AttributeError("Job does not contain an AiPackage") model_archive_path = job.io.path.join(job.job_dir, job.model_path) tmp_archive_path = os.path.join(tmpdir, os.path.basename(model_archive_path)) job.io.copy(model_archive_path, tmp_archive_path) return cls(path=tmp_archive_path)Class variables
var METADATA_URI : ClassVar[str]-
location of metadata file
Static methods
def from_job(job, tmpdir)-
Expand source code
@classmethod def from_job(cls, job, tmpdir): if not job.model_path: raise AttributeError("Job does not contain an AiPackage") model_archive_path = job.io.path.join(job.job_dir, job.model_path) tmp_archive_path = os.path.join(tmpdir, os.path.basename(model_archive_path)) job.io.copy(model_archive_path, tmp_archive_path) return cls(path=tmp_archive_path)
Instance variables
var closed-
Expand source code
@property def closed(self): return True if self._archive is None else self._archive.fp.closed var versioned_name : str-
Get a versioned name according to the specification name.tar.gz -> name.version.tar.gz where the version is generated by
get_model_versionReturns: versioned file name
Expand source code
@property def versioned_name(self) -> str: """ Get a versioned name according to the specification name.tar.gz -> name.version.tar.gz where the version is generated by `get_model_version` Returns: versioned file name """ if not os.path.isfile(self.path): raise IOError("Archive does not exist, use 'with' keyword build archive") if not self.closed: raise IOError("Archive file not closed") archive_name = os.path.basename(self.path) split_name = archive_name.rsplit(".", 1) split_name.insert(1, str(get_model_version(self.path))) return ".".join(split_name)
Methods
def add_asset(self, name, arcname, file) ‑> AIPackageAsset-
Add an asset to the archive
Args
name- name of asset
arcname- name of file location in archive to save asset
file- Path to asset to add to archive
Returns
ModelArchiveAsset
Expand source code
def add_asset(self, name, arcname, file) -> AIPackageAsset: """ Add an asset to the archive Args: name: name of asset arcname: name of file location in archive to save asset file: Path to asset to add to archive Returns: ModelArchiveAsset """ if self._archive is None: raise IOError("Archive not open. use: 'with archive.open_write() as writer:'") if self._archive.mode != "w": raise IOError("Archive not in write mode. use: 'with archive.open_write() as writer:'") # Normalize path arcname = os.path.normpath(arcname).replace("\\", "/") # Create asset asset = AIPackageAsset(file=arcname, hash=self._get_hash(file)) self._archive.write(file, arcname=arcname) self.metadata.assets[name] = asset return asset def get_asset(self, name, validate=True)-
Expand source code
def get_asset(self, name, validate=True): if self._archive is None: raise IOError("Archive not open. use: 'with archive.open_read() as reader:'") if self._archive.mode != "r": raise IOError("Archive not in read mode. use: 'with archive.open_read() as reader:'") try: asset = self.metadata.assets[name] except KeyError as ex: raise KeyError(f"Asset '{name} not in archive'") from ex try: data = self._archive.read(targets=asset.file)[asset.file] if validate and self._get_hash(data) != asset.hash: raise IOError("Asset file does not match asset hash") return data except py7zr.Bad7zFile as ex: raise IOError("Error extracting asset from archive") from ex def open_read(self)-
Expand source code
def open_read(self): if self._archive is None: try: self._archive = py7zr.SevenZipFile(self.path, "r", password=self.password) except py7zr.Bad7zFile as ex: raise IOError(MSG_OPEN_ERROR) from ex if self._archive.mode != "r": raise IOError("Archive already opened in write mode") return self def open_write(self)-
Expand source code
def open_write(self): if self._archive is None: try: self._archive = py7zr.SevenZipFile(self.path, "w", password=self.password) except py7zr.Bad7zFile as ex: raise IOError(MSG_OPEN_ERROR) from ex self.metadata.assets.clear() if self._archive.mode != "w": raise IOError("Archive already opened in read mode") return self
class AIPackageAsset (file: str, hash: str)-
Data holder for file names and hashes of assets in model archive
Expand source code
class AIPackageAsset: """ Data holder for file names and hashes of assets in model archive """ file: str hash: strSubclasses
- pydantic.dataclasses._Pydantic_AIPackageAsset_94140164754416
Class variables
var file : strvar hash : str
class AIPackageMetadata (**data: Any)-
Metadata for model archives adding an assets field for users of the archive to check if it contains what is needed
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class AIPackageMetadata(ModelMetadata): """ Metadata for model archives adding an assets field for users of the archive to check if it contains what is needed """ AIPACKAGE_VERSION: ClassVar[str] = "1.0" """Version of ai package""" version: str = Field(default=AIPACKAGE_VERSION, const=True, description="aipackage version number marker") assets: Dict[str, AIPackageAsset] = Field(default_factory=dict, description="Key value pairs of name and path to asset")Ancestors
- ModelMetadata
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var AIPACKAGE_VERSION : ClassVar[str]-
Version of ai package
var assets : Dict[str, AIPackageAsset]var version : str