Module brevettiai.platform.model_archive
Expand source code
import hashlib
import logging
import os
import re
import tarfile
from dataclasses import dataclass
from io import BytesIO
from typing import Dict
from pydantic import Field
from brevettiai.model.metadata import ModelMetadata
from brevettiai.platform import Job
from brevettiai.utils.model_version import get_model_version
log = logging.getLogger(__name__)
@dataclass
class ModelArchiveAsset:
"""
Data holder for file names and hashes of assets in model archive
"""
file: str
hash: str
class ModelArchiveMetadata(ModelMetadata):
"""
Metadata for model archives
adding an assets field for users of the archive to check if it contains what is needed
"""
assets: Dict[str, ModelArchiveAsset] = Field(default_factory=dict,
description="Key value pairs of name and path to asset")
class ModelArchive:
"""
ModelArchive Wrapper class, to help create archives with assets and metadata
"""
METADATA_URI = "metadata.json"
"""location of metadata file"""
def __init__(self, path=None, metadata: ModelMetadata = None, job: Job = None):
"""
Initialize model archive.
Can use job or metadata as source, if neither are given metadata is loaded from current archive
Notes:
* Use the context manager to write assets.
* When entering the write context previously existing assets are not copied to the new archive
Usage:
in Job.run:
```python
archive = ModelArchive(job=self)
with archive.open_write() as writer:
writer.add_asset(name="onnx", arcname="model.onnx", file=self.temp_path("model.onnx"))
with archive.open_read() as reader:
with reader.get_asset("onnx") as fp:
data = fp.read()
...
return archive.upload(self)
```
Args:
path: location of archive (Must end with tar.gz), if job given and path is None f"{job.name}.tar.gz" is chosen
metadata: optional ModelMetadata
job: optional job to gather metadata from if metadata keyword is missing
"""
self.path = path or job.temp_path(f"{re.sub('[^-a-zA-Z0-9_.()]+', '_', job.name)}.tar.gz".lower())
if metadata is not None:
self.metadata = ModelArchiveMetadata.parse_obj(metadata)
elif job is not None:
self.metadata = ModelArchiveMetadata.parse_obj(job.get_metadata())
else:
with tarfile.open(self.path, "r:gz") as arc:
fp = arc.extractfile(ModelArchive.METADATA_URI)
self.metadata = ModelArchiveMetadata.parse_raw(fp.read())
self._archive = None
def _write_metadata(self, archive):
buffer = BytesIO()
buffer.write(self.metadata.json(indent=2).encode("utf-8"))
info = tarfile.TarInfo(name=ModelArchive.METADATA_URI)
info.size = buffer.tell()
buffer.seek(0)
archive.addfile(tarinfo=info, fileobj=buffer)
def add_asset(self, name, arcname, file) -> ModelArchiveAsset:
"""
Add an asset to the archive
Args:
name: name of asset
arcname: name of file location in archive to save asset
file: Path to asset to add to archive
Returns:
ModelArchiveAsset
"""
if self._archive is None:
raise IOError("Archive not open. use: 'with archive.writer() as writer:'")
if self._archive.mode != "w":
raise IOError("Archive not in write mode. use: 'with archive.writer() as writer:'")
# Normalize path
arcname = os.path.normpath(arcname).replace("\\", "/")
# Create asset
asset = ModelArchiveAsset(file=arcname, hash=self._get_hash(file))
self._archive.add(name=file, arcname=arcname)
self.metadata.assets[name] = asset
return asset
def open_read(self):
if self._archive is None:
self._archive = tarfile.open(self.path, f"r:gz")
if self._archive.mode != "r":
raise IOError("Archive already opened in write mode")
return self
def open_write(self):
if self._archive is None:
self._archive = tarfile.open(self.path, f"w:gz")
if self._archive.mode != "w":
raise IOError("Archive already opened in read mode")
return self
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._archive.mode == "w":
self._write_metadata(self._archive)
self._archive.close()
self._archive = None
def get_asset(self, name):
if self._archive is None:
raise IOError("Archive not open. use: 'with archive.reader() as reader:'")
if self._archive.mode != "r":
raise IOError("Archive not in read mode. use: 'with archive.reader() as reader:'")
try:
asset = self.metadata.assets[name]
except KeyError as ex:
raise KeyError(f"Asset '{name} not in archive'") from ex
return self._archive.extractfile(asset.file)
@staticmethod
def _get_hash(filename):
sha256 = hashlib.sha256()
with open(filename, "rb") as f:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: f.read(4096), b""):
sha256.update(byte_block)
return f"sha256:{sha256.hexdigest()}"
@property
def versioned_name(self) -> str:
"""
Get a versioned name according to the specification name.tar.gz -> name.version.tar.gz
where the version is generated by `get_model_version`
Returns:
"""
archive_name = os.path.basename(self.path)
split_name = archive_name.rsplit(".", 2)
split_name.insert(1, str(get_model_version(self.path)))
return ".".join(split_name)
def upload(self, job: Job) -> str:
"""
Generate archive version and upload model archive to job artifacts
Args:
job: Job to upload the archive as artifact
Returns:
Path to uploaded artifact
"""
if not os.path.isfile(self.path):
raise IOError("Archive does not exist, use 'with' keyword build archive")
if self._archive is not None and not self._archive.closed:
raise IOError("Archive file not closed")
archive_artifact_path = job.artifact_path(self.versioned_name)
job.io.copy(self.path, archive_artifact_path)
return archive_artifact_path
@classmethod
def from_job(cls, job, tmpdir):
if not job.model_path:
raise AttributeError("Job does not contain a ModelArchive")
model_archive_path = job.io.path.join(job.job_dir, job.model_path)
tmp_archive_path = os.path.join(tmpdir, os.path.basename(model_archive_path))
job.io.copy(model_archive_path, tmp_archive_path)
return cls(path=tmp_archive_path)
Classes
class ModelArchive (path=None, metadata: ModelMetadata = None, job: Job = None)
-
ModelArchive Wrapper class, to help create archives with assets and metadata
Initialize model archive. Can use job or metadata as source, if neither are given metadata is loaded from current archive
Notes
- Use the context manager to write assets.
- When entering the write context previously existing assets are not copied to the new archive
Usage
in Job.run:
archive = ModelArchive(job=self) with archive.open_write() as writer: writer.add_asset(name="onnx", arcname="model.onnx", file=self.temp_path("model.onnx")) with archive.open_read() as reader: with reader.get_asset("onnx") as fp: data = fp.read() ... return archive.upload(self)
Args
path
- location of archive (Must end with tar.gz), if job given and path is None f"{job.name}.tar.gz" is chosen
metadata
- optional ModelMetadata
job
- optional job to gather metadata from if metadata keyword is missing
Expand source code
class ModelArchive: """ ModelArchive Wrapper class, to help create archives with assets and metadata """ METADATA_URI = "metadata.json" """location of metadata file""" def __init__(self, path=None, metadata: ModelMetadata = None, job: Job = None): """ Initialize model archive. Can use job or metadata as source, if neither are given metadata is loaded from current archive Notes: * Use the context manager to write assets. * When entering the write context previously existing assets are not copied to the new archive Usage: in Job.run: ```python archive = ModelArchive(job=self) with archive.open_write() as writer: writer.add_asset(name="onnx", arcname="model.onnx", file=self.temp_path("model.onnx")) with archive.open_read() as reader: with reader.get_asset("onnx") as fp: data = fp.read() ... return archive.upload(self) ``` Args: path: location of archive (Must end with tar.gz), if job given and path is None f"{job.name}.tar.gz" is chosen metadata: optional ModelMetadata job: optional job to gather metadata from if metadata keyword is missing """ self.path = path or job.temp_path(f"{re.sub('[^-a-zA-Z0-9_.()]+', '_', job.name)}.tar.gz".lower()) if metadata is not None: self.metadata = ModelArchiveMetadata.parse_obj(metadata) elif job is not None: self.metadata = ModelArchiveMetadata.parse_obj(job.get_metadata()) else: with tarfile.open(self.path, "r:gz") as arc: fp = arc.extractfile(ModelArchive.METADATA_URI) self.metadata = ModelArchiveMetadata.parse_raw(fp.read()) self._archive = None def _write_metadata(self, archive): buffer = BytesIO() buffer.write(self.metadata.json(indent=2).encode("utf-8")) info = tarfile.TarInfo(name=ModelArchive.METADATA_URI) info.size = buffer.tell() buffer.seek(0) archive.addfile(tarinfo=info, fileobj=buffer) def add_asset(self, name, arcname, file) -> ModelArchiveAsset: """ Add an asset to the archive Args: name: name of asset arcname: name of file location in archive to save asset file: Path to asset to add to archive Returns: ModelArchiveAsset """ if self._archive is None: raise IOError("Archive not open. use: 'with archive.writer() as writer:'") if self._archive.mode != "w": raise IOError("Archive not in write mode. use: 'with archive.writer() as writer:'") # Normalize path arcname = os.path.normpath(arcname).replace("\\", "/") # Create asset asset = ModelArchiveAsset(file=arcname, hash=self._get_hash(file)) self._archive.add(name=file, arcname=arcname) self.metadata.assets[name] = asset return asset def open_read(self): if self._archive is None: self._archive = tarfile.open(self.path, f"r:gz") if self._archive.mode != "r": raise IOError("Archive already opened in write mode") return self def open_write(self): if self._archive is None: self._archive = tarfile.open(self.path, f"w:gz") if self._archive.mode != "w": raise IOError("Archive already opened in read mode") return self def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if self._archive.mode == "w": self._write_metadata(self._archive) self._archive.close() self._archive = None def get_asset(self, name): if self._archive is None: raise IOError("Archive not open. use: 'with archive.reader() as reader:'") if self._archive.mode != "r": raise IOError("Archive not in read mode. use: 'with archive.reader() as reader:'") try: asset = self.metadata.assets[name] except KeyError as ex: raise KeyError(f"Asset '{name} not in archive'") from ex return self._archive.extractfile(asset.file) @staticmethod def _get_hash(filename): sha256 = hashlib.sha256() with open(filename, "rb") as f: # Read and update hash string value in blocks of 4K for byte_block in iter(lambda: f.read(4096), b""): sha256.update(byte_block) return f"sha256:{sha256.hexdigest()}" @property def versioned_name(self) -> str: """ Get a versioned name according to the specification name.tar.gz -> name.version.tar.gz where the version is generated by `get_model_version` Returns: """ archive_name = os.path.basename(self.path) split_name = archive_name.rsplit(".", 2) split_name.insert(1, str(get_model_version(self.path))) return ".".join(split_name) def upload(self, job: Job) -> str: """ Generate archive version and upload model archive to job artifacts Args: job: Job to upload the archive as artifact Returns: Path to uploaded artifact """ if not os.path.isfile(self.path): raise IOError("Archive does not exist, use 'with' keyword build archive") if self._archive is not None and not self._archive.closed: raise IOError("Archive file not closed") archive_artifact_path = job.artifact_path(self.versioned_name) job.io.copy(self.path, archive_artifact_path) return archive_artifact_path @classmethod def from_job(cls, job, tmpdir): if not job.model_path: raise AttributeError("Job does not contain a ModelArchive") model_archive_path = job.io.path.join(job.job_dir, job.model_path) tmp_archive_path = os.path.join(tmpdir, os.path.basename(model_archive_path)) job.io.copy(model_archive_path, tmp_archive_path) return cls(path=tmp_archive_path)
Class variables
var METADATA_URI
-
location of metadata file
Static methods
def from_job(job, tmpdir)
-
Expand source code
@classmethod def from_job(cls, job, tmpdir): if not job.model_path: raise AttributeError("Job does not contain a ModelArchive") model_archive_path = job.io.path.join(job.job_dir, job.model_path) tmp_archive_path = os.path.join(tmpdir, os.path.basename(model_archive_path)) job.io.copy(model_archive_path, tmp_archive_path) return cls(path=tmp_archive_path)
Instance variables
var versioned_name : str
-
Get a versioned name according to the specification name.tar.gz -> name.version.tar.gz where the version is generated by
get_model_version
Returns:
Expand source code
@property def versioned_name(self) -> str: """ Get a versioned name according to the specification name.tar.gz -> name.version.tar.gz where the version is generated by `get_model_version` Returns: """ archive_name = os.path.basename(self.path) split_name = archive_name.rsplit(".", 2) split_name.insert(1, str(get_model_version(self.path))) return ".".join(split_name)
Methods
def add_asset(self, name, arcname, file) ‑> ModelArchiveAsset
-
Add an asset to the archive
Args
name
- name of asset
arcname
- name of file location in archive to save asset
file
- Path to asset to add to archive
Returns
ModelArchiveAsset
Expand source code
def add_asset(self, name, arcname, file) -> ModelArchiveAsset: """ Add an asset to the archive Args: name: name of asset arcname: name of file location in archive to save asset file: Path to asset to add to archive Returns: ModelArchiveAsset """ if self._archive is None: raise IOError("Archive not open. use: 'with archive.writer() as writer:'") if self._archive.mode != "w": raise IOError("Archive not in write mode. use: 'with archive.writer() as writer:'") # Normalize path arcname = os.path.normpath(arcname).replace("\\", "/") # Create asset asset = ModelArchiveAsset(file=arcname, hash=self._get_hash(file)) self._archive.add(name=file, arcname=arcname) self.metadata.assets[name] = asset return asset
def get_asset(self, name)
-
Expand source code
def get_asset(self, name): if self._archive is None: raise IOError("Archive not open. use: 'with archive.reader() as reader:'") if self._archive.mode != "r": raise IOError("Archive not in read mode. use: 'with archive.reader() as reader:'") try: asset = self.metadata.assets[name] except KeyError as ex: raise KeyError(f"Asset '{name} not in archive'") from ex return self._archive.extractfile(asset.file)
def open_read(self)
-
Expand source code
def open_read(self): if self._archive is None: self._archive = tarfile.open(self.path, f"r:gz") if self._archive.mode != "r": raise IOError("Archive already opened in write mode") return self
def open_write(self)
-
Expand source code
def open_write(self): if self._archive is None: self._archive = tarfile.open(self.path, f"w:gz") if self._archive.mode != "w": raise IOError("Archive already opened in read mode") return self
def upload(self, job: Job) ‑> str
-
Generate archive version and upload model archive to job artifacts
Args
job
- Job to upload the archive as artifact
Returns
Path to uploaded artifact
Expand source code
def upload(self, job: Job) -> str: """ Generate archive version and upload model archive to job artifacts Args: job: Job to upload the archive as artifact Returns: Path to uploaded artifact """ if not os.path.isfile(self.path): raise IOError("Archive does not exist, use 'with' keyword build archive") if self._archive is not None and not self._archive.closed: raise IOError("Archive file not closed") archive_artifact_path = job.artifact_path(self.versioned_name) job.io.copy(self.path, archive_artifact_path) return archive_artifact_path
class ModelArchiveAsset (file: str, hash: str)
-
Data holder for file names and hashes of assets in model archive
Expand source code
class ModelArchiveAsset: """ Data holder for file names and hashes of assets in model archive """ file: str hash: str
Subclasses
- pydantic.dataclasses._Pydantic_ModelArchiveAsset_94595108563664
Class variables
var file : str
var hash : str
class ModelArchiveMetadata (**data: Any)
-
Metadata for model archives adding an assets field for users of the archive to check if it contains what is needed
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class ModelArchiveMetadata(ModelMetadata): """ Metadata for model archives adding an assets field for users of the archive to check if it contains what is needed """ assets: Dict[str, ModelArchiveAsset] = Field(default_factory=dict, description="Key value pairs of name and path to asset")
Ancestors
- ModelMetadata
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var assets : Dict[str, ModelArchiveAsset]