Module brevettiai.platform.models.platform_backend

Expand source code
import os
import logging
import numpy as np
import urllib
import requests
from brevettiai.io import path as path_utils
from .tag import Tag
from pydantic import Field, BaseModel
from typing import Union, List
from uuid import UUID

log = logging.getLogger(__name__)


class PlatformBackend(BaseModel):
    host: str = Field(default_factory=lambda: os.getenv("BREVETTIAI_HOST_NAME", "https://platform.brevetti.ai"))
    output_segmentation_dir: str = Field(default="output_segmentations")
    bucket_region: str = Field(default_factory=lambda: os.getenv("AWS_REGION", "eu-west-1"))
    data_bucket: str = Field(default_factory=lambda: os.getenv("BREVETTIAI_DATA_BUCKET", "s3://data.criterion.ai"))
    custom_job_id: str = Field(default="a0aaad69-c032-41c1-a68c-e9a15a5fb18c",
                               description="uuid of model type to use for custom jobs")

    @property
    def s3_endpoint(self):
        return f"s3.{self.bucket_region}.amazonaws.com"

    def resource_path(self, uuid: Union[str, UUID]) -> str:
        """
        Get location of a resource
        """
        return path_utils.join(self.data_bucket, str(uuid))

    def prepare_runtime(self):
        # Determine runtime
        on_sagemaker = os.environ.get("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") is not None

        # Initialize services
        if on_sagemaker:
            from brevettiai.interfaces import sagemaker
            sagemaker.load_hyperparameters_cmd_args()

    def get_download_link(self, path):
        if path.startswith("s3://"):
            target = path[5:].split("/", 1)[1]
            return f"{self.host}/download?path={urllib.parse.quote(target, safe='')}"
        else:
            raise ValueError("Can only provide download links on s3")

    def get_root_tags(self, id, api_key) -> List[Tag]:
        r = requests.get(f"{self.host}/api/resources/roottags?key={api_key}&id={id}")
        if r.ok:
            return [Tag.parse_obj(x) for x in r.json()]
        else:
            log.warning("Could not get root tags")
            return []

    def get_annotation_url(self, s3_image_path, annotation_name=None,
                           bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None,
                           min_zoom=2, max_zoom=300):
        """
        Get url to annotation file
        :param s3_image_path: Name of image file
        :param annotation_name: Name of annotation file, if any
        :param bbox: Selects zoom and center for the bbox
        :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox)
        :param screen_size: default screen size in pixels
        :param test_report_id:
        :param model_id:
        :param min_zoom:
        :param max_zoom:
        """

        uri_length = 36
        rm_keys = [self.data_bucket, ".tiles/", "/dzi.json"]
        image_key = s3_image_path
        for rm_key in rm_keys:
            image_key = image_key.replace(rm_key, "")
        image_key = image_key.lstrip("/")
        dataset_id = image_key[:uri_length]
        image_rel_path = "/".join(image_key.split("/")[1:])

        url_info = dict(file=image_rel_path)

        if annotation_name:
            url_info["annotationFile"] = annotation_name

        if test_report_id:
            url_info["testReportId"] = test_report_id

        if model_id:
            url_info["modelId"] = model_id

        if bbox is not None:
            url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int)
            # NB: This will be overwritten if zoom is provided
            zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1]))
            url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min())

        if zoom is not None:
            url_info["zoom"] = zoom

        return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info)

    @property
    def custom_model_type(self):
        from .web_api_types import ModelType
        return ModelType(id=self.custom_job_id, name="custom job")


backend = PlatformBackend()

Classes

class PlatformBackend (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class PlatformBackend(BaseModel):
    host: str = Field(default_factory=lambda: os.getenv("BREVETTIAI_HOST_NAME", "https://platform.brevetti.ai"))
    output_segmentation_dir: str = Field(default="output_segmentations")
    bucket_region: str = Field(default_factory=lambda: os.getenv("AWS_REGION", "eu-west-1"))
    data_bucket: str = Field(default_factory=lambda: os.getenv("BREVETTIAI_DATA_BUCKET", "s3://data.criterion.ai"))
    custom_job_id: str = Field(default="a0aaad69-c032-41c1-a68c-e9a15a5fb18c",
                               description="uuid of model type to use for custom jobs")

    @property
    def s3_endpoint(self):
        return f"s3.{self.bucket_region}.amazonaws.com"

    def resource_path(self, uuid: Union[str, UUID]) -> str:
        """
        Get location of a resource
        """
        return path_utils.join(self.data_bucket, str(uuid))

    def prepare_runtime(self):
        # Determine runtime
        on_sagemaker = os.environ.get("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") is not None

        # Initialize services
        if on_sagemaker:
            from brevettiai.interfaces import sagemaker
            sagemaker.load_hyperparameters_cmd_args()

    def get_download_link(self, path):
        if path.startswith("s3://"):
            target = path[5:].split("/", 1)[1]
            return f"{self.host}/download?path={urllib.parse.quote(target, safe='')}"
        else:
            raise ValueError("Can only provide download links on s3")

    def get_root_tags(self, id, api_key) -> List[Tag]:
        r = requests.get(f"{self.host}/api/resources/roottags?key={api_key}&id={id}")
        if r.ok:
            return [Tag.parse_obj(x) for x in r.json()]
        else:
            log.warning("Could not get root tags")
            return []

    def get_annotation_url(self, s3_image_path, annotation_name=None,
                           bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None,
                           min_zoom=2, max_zoom=300):
        """
        Get url to annotation file
        :param s3_image_path: Name of image file
        :param annotation_name: Name of annotation file, if any
        :param bbox: Selects zoom and center for the bbox
        :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox)
        :param screen_size: default screen size in pixels
        :param test_report_id:
        :param model_id:
        :param min_zoom:
        :param max_zoom:
        """

        uri_length = 36
        rm_keys = [self.data_bucket, ".tiles/", "/dzi.json"]
        image_key = s3_image_path
        for rm_key in rm_keys:
            image_key = image_key.replace(rm_key, "")
        image_key = image_key.lstrip("/")
        dataset_id = image_key[:uri_length]
        image_rel_path = "/".join(image_key.split("/")[1:])

        url_info = dict(file=image_rel_path)

        if annotation_name:
            url_info["annotationFile"] = annotation_name

        if test_report_id:
            url_info["testReportId"] = test_report_id

        if model_id:
            url_info["modelId"] = model_id

        if bbox is not None:
            url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int)
            # NB: This will be overwritten if zoom is provided
            zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1]))
            url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min())

        if zoom is not None:
            url_info["zoom"] = zoom

        return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info)

    @property
    def custom_model_type(self):
        from .web_api_types import ModelType
        return ModelType(id=self.custom_job_id, name="custom job")

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var bucket_region : str
var custom_job_id : str
var data_bucket : str
var host : str
var output_segmentation_dir : str

Instance variables

var custom_model_type
Expand source code
@property
def custom_model_type(self):
    from .web_api_types import ModelType
    return ModelType(id=self.custom_job_id, name="custom job")
var s3_endpoint
Expand source code
@property
def s3_endpoint(self):
    return f"s3.{self.bucket_region}.amazonaws.com"

Methods

def get_annotation_url(self, s3_image_path, annotation_name=None, bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None, min_zoom=2, max_zoom=300)

Get url to annotation file :param s3_image_path: Name of image file :param annotation_name: Name of annotation file, if any :param bbox: Selects zoom and center for the bbox :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox) :param screen_size: default screen size in pixels :param test_report_id: :param model_id: :param min_zoom: :param max_zoom:

Expand source code
def get_annotation_url(self, s3_image_path, annotation_name=None,
                       bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None,
                       min_zoom=2, max_zoom=300):
    """
    Get url to annotation file
    :param s3_image_path: Name of image file
    :param annotation_name: Name of annotation file, if any
    :param bbox: Selects zoom and center for the bbox
    :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox)
    :param screen_size: default screen size in pixels
    :param test_report_id:
    :param model_id:
    :param min_zoom:
    :param max_zoom:
    """

    uri_length = 36
    rm_keys = [self.data_bucket, ".tiles/", "/dzi.json"]
    image_key = s3_image_path
    for rm_key in rm_keys:
        image_key = image_key.replace(rm_key, "")
    image_key = image_key.lstrip("/")
    dataset_id = image_key[:uri_length]
    image_rel_path = "/".join(image_key.split("/")[1:])

    url_info = dict(file=image_rel_path)

    if annotation_name:
        url_info["annotationFile"] = annotation_name

    if test_report_id:
        url_info["testReportId"] = test_report_id

    if model_id:
        url_info["modelId"] = model_id

    if bbox is not None:
        url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int)
        # NB: This will be overwritten if zoom is provided
        zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1]))
        url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min())

    if zoom is not None:
        url_info["zoom"] = zoom

    return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info)
Expand source code
def get_download_link(self, path):
    if path.startswith("s3://"):
        target = path[5:].split("/", 1)[1]
        return f"{self.host}/download?path={urllib.parse.quote(target, safe='')}"
    else:
        raise ValueError("Can only provide download links on s3")
def get_root_tags(self, id, api_key) ‑> List[Tag]
Expand source code
def get_root_tags(self, id, api_key) -> List[Tag]:
    r = requests.get(f"{self.host}/api/resources/roottags?key={api_key}&id={id}")
    if r.ok:
        return [Tag.parse_obj(x) for x in r.json()]
    else:
        log.warning("Could not get root tags")
        return []
def prepare_runtime(self)
Expand source code
def prepare_runtime(self):
    # Determine runtime
    on_sagemaker = os.environ.get("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI") is not None

    # Initialize services
    if on_sagemaker:
        from brevettiai.interfaces import sagemaker
        sagemaker.load_hyperparameters_cmd_args()
def resource_path(self, uuid: Union[str, uuid.UUID]) ‑> str

Get location of a resource

Expand source code
def resource_path(self, uuid: Union[str, UUID]) -> str:
    """
    Get location of a resource
    """
    return path_utils.join(self.data_bucket, str(uuid))