Module brevettiai.platform.models.platform_backend
Expand source code
import os
import logging
import numpy as np
import urllib
import requests
from brevettiai.io import path as path_utils
from .tag import Tag
from pydantic import Field, BaseModel
from typing import Union, List
from uuid import UUID
log = logging.getLogger(__name__)
class PlatformBackend(BaseModel):
host: str = Field(default_factory=lambda: os.getenv("BREVETTIAI_HOST_NAME", "https://platform.brevetti.ai"))
output_segmentation_dir: str = Field(default="output_segmentations")
bucket_region: str = Field(default_factory=lambda: os.getenv("AWS_REGION", "eu-west-1"))
data_bucket: str = Field(default_factory=lambda: os.getenv("BREVETTIAI_DATA_BUCKET", "s3://data.criterion.ai"))
custom_job_id: str = Field(default="a0aaad69-c032-41c1-a68c-e9a15a5fb18c",
description="uuid of model type to use for custom jobs")
@property
def s3_endpoint(self):
return f"s3.{self.bucket_region}.amazonaws.com"
def resource_path(self, uuid: Union[str, UUID]) -> str:
"""
Get location of a resource
"""
return path_utils.join(self.data_bucket, str(uuid))
def prepare_runtime(self):
# Determine runtime
on_sagemaker = os.environ.get("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") is not None
# Initialize services
if on_sagemaker:
from brevettiai.interfaces import sagemaker
sagemaker.load_hyperparameters_cmd_args()
def get_download_link(self, path):
if path.startswith("s3://"):
target = path[5:].split("/", 1)[1]
return f"{self.host}/download?path={urllib.parse.quote(target, safe='')}"
else:
raise ValueError("Can only provide download links on s3")
def get_root_tags(self, id, api_key) -> List[Tag]:
r = requests.get(f"{self.host}/api/resources/roottags?key={api_key}&id={id}")
if r.ok:
return [Tag.parse_obj(x) for x in r.json()]
else:
log.warning("Could not get root tags")
return []
def get_annotation_url(self, s3_image_path, annotation_name=None,
bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None,
min_zoom=2, max_zoom=300):
"""
Get url to annotation file
:param s3_image_path: Name of image file
:param annotation_name: Name of annotation file, if any
:param bbox: Selects zoom and center for the bbox
:param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox)
:param screen_size: default screen size in pixels
:param test_report_id:
:param model_id:
:param min_zoom:
:param max_zoom:
"""
uri_length = 36
rm_keys = [self.data_bucket, ".tiles/", "/dzi.json"]
image_key = s3_image_path
for rm_key in rm_keys:
image_key = image_key.replace(rm_key, "")
image_key = image_key.lstrip("/")
dataset_id = image_key[:uri_length]
image_rel_path = "/".join(image_key.split("/")[1:])
url_info = dict(file=image_rel_path)
if annotation_name:
url_info["annotationFile"] = annotation_name
if test_report_id:
url_info["testReportId"] = test_report_id
if model_id:
url_info["modelId"] = model_id
if bbox is not None:
url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int)
# NB: This will be overwritten if zoom is provided
zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1]))
url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min())
if zoom is not None:
url_info["zoom"] = zoom
return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info)
@property
def custom_model_type(self):
from .web_api_types import ModelType
return ModelType(id=self.custom_job_id, name="custom job")
backend = PlatformBackend()
Classes
class PlatformBackend (**data: Any)
-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class PlatformBackend(BaseModel): host: str = Field(default_factory=lambda: os.getenv("BREVETTIAI_HOST_NAME", "https://platform.brevetti.ai")) output_segmentation_dir: str = Field(default="output_segmentations") bucket_region: str = Field(default_factory=lambda: os.getenv("AWS_REGION", "eu-west-1")) data_bucket: str = Field(default_factory=lambda: os.getenv("BREVETTIAI_DATA_BUCKET", "s3://data.criterion.ai")) custom_job_id: str = Field(default="a0aaad69-c032-41c1-a68c-e9a15a5fb18c", description="uuid of model type to use for custom jobs") @property def s3_endpoint(self): return f"s3.{self.bucket_region}.amazonaws.com" def resource_path(self, uuid: Union[str, UUID]) -> str: """ Get location of a resource """ return path_utils.join(self.data_bucket, str(uuid)) def prepare_runtime(self): # Determine runtime on_sagemaker = os.environ.get("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") is not None # Initialize services if on_sagemaker: from brevettiai.interfaces import sagemaker sagemaker.load_hyperparameters_cmd_args() def get_download_link(self, path): if path.startswith("s3://"): target = path[5:].split("/", 1)[1] return f"{self.host}/download?path={urllib.parse.quote(target, safe='')}" else: raise ValueError("Can only provide download links on s3") def get_root_tags(self, id, api_key) -> List[Tag]: r = requests.get(f"{self.host}/api/resources/roottags?key={api_key}&id={id}") if r.ok: return [Tag.parse_obj(x) for x in r.json()] else: log.warning("Could not get root tags") return [] def get_annotation_url(self, s3_image_path, annotation_name=None, bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None, min_zoom=2, max_zoom=300): """ Get url to annotation file :param s3_image_path: Name of image file :param annotation_name: Name of annotation file, if any :param bbox: Selects zoom and center for the bbox :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox) :param screen_size: default screen size in pixels :param test_report_id: :param model_id: :param min_zoom: :param max_zoom: """ uri_length = 36 rm_keys = [self.data_bucket, ".tiles/", "/dzi.json"] image_key = s3_image_path for rm_key in rm_keys: image_key = image_key.replace(rm_key, "") image_key = image_key.lstrip("/") dataset_id = image_key[:uri_length] image_rel_path = "/".join(image_key.split("/")[1:]) url_info = dict(file=image_rel_path) if annotation_name: url_info["annotationFile"] = annotation_name if test_report_id: url_info["testReportId"] = test_report_id if model_id: url_info["modelId"] = model_id if bbox is not None: url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int) # NB: This will be overwritten if zoom is provided zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1])) url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min()) if zoom is not None: url_info["zoom"] = zoom return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info) @property def custom_model_type(self): from .web_api_types import ModelType return ModelType(id=self.custom_job_id, name="custom job")
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var bucket_region : str
var custom_job_id : str
var data_bucket : str
var host : str
var output_segmentation_dir : str
Instance variables
var custom_model_type
-
Expand source code
@property def custom_model_type(self): from .web_api_types import ModelType return ModelType(id=self.custom_job_id, name="custom job")
var s3_endpoint
-
Expand source code
@property def s3_endpoint(self): return f"s3.{self.bucket_region}.amazonaws.com"
Methods
def get_annotation_url(self, s3_image_path, annotation_name=None, bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None, min_zoom=2, max_zoom=300)
-
Get url to annotation file :param s3_image_path: Name of image file :param annotation_name: Name of annotation file, if any :param bbox: Selects zoom and center for the bbox :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox) :param screen_size: default screen size in pixels :param test_report_id: :param model_id: :param min_zoom: :param max_zoom:
Expand source code
def get_annotation_url(self, s3_image_path, annotation_name=None, bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None, min_zoom=2, max_zoom=300): """ Get url to annotation file :param s3_image_path: Name of image file :param annotation_name: Name of annotation file, if any :param bbox: Selects zoom and center for the bbox :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox) :param screen_size: default screen size in pixels :param test_report_id: :param model_id: :param min_zoom: :param max_zoom: """ uri_length = 36 rm_keys = [self.data_bucket, ".tiles/", "/dzi.json"] image_key = s3_image_path for rm_key in rm_keys: image_key = image_key.replace(rm_key, "") image_key = image_key.lstrip("/") dataset_id = image_key[:uri_length] image_rel_path = "/".join(image_key.split("/")[1:]) url_info = dict(file=image_rel_path) if annotation_name: url_info["annotationFile"] = annotation_name if test_report_id: url_info["testReportId"] = test_report_id if model_id: url_info["modelId"] = model_id if bbox is not None: url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int) # NB: This will be overwritten if zoom is provided zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1])) url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min()) if zoom is not None: url_info["zoom"] = zoom return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info)
def get_download_link(self, path)
-
Expand source code
def get_download_link(self, path): if path.startswith("s3://"): target = path[5:].split("/", 1)[1] return f"{self.host}/download?path={urllib.parse.quote(target, safe='')}" else: raise ValueError("Can only provide download links on s3")
-
Expand source code
def get_root_tags(self, id, api_key) -> List[Tag]: r = requests.get(f"{self.host}/api/resources/roottags?key={api_key}&id={id}") if r.ok: return [Tag.parse_obj(x) for x in r.json()] else: log.warning("Could not get root tags") return []
def prepare_runtime(self)
-
Expand source code
def prepare_runtime(self): # Determine runtime on_sagemaker = os.environ.get("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") is not None # Initialize services if on_sagemaker: from brevettiai.interfaces import sagemaker sagemaker.load_hyperparameters_cmd_args()
def resource_path(self, uuid: Union[str, uuid.UUID]) ‑> str
-
Get location of a resource
Expand source code
def resource_path(self, uuid: Union[str, UUID]) -> str: """ Get location of a resource """ return path_utils.join(self.data_bucket, str(uuid))