Module brevettiai.platform.models.job

Expand source code
import json
import logging
import tempfile
import os
import argparse
import requests
import urllib
import pkg_resources

import numpy as np
from pydantic import BaseModel, Field, PrivateAttr, root_validator, parse_obj_as, validator
from typing import Callable, Optional, List, Dict, Any, Type, Union, Iterable
from uuid import uuid4
from datetime import datetime
from urllib.parse import quote

from brevettiai import Module
from brevettiai.model import ModelMetadata
from brevettiai.interfaces import vue_schema_utils as vue
from brevettiai.interfaces.git import GitRepositoryState
from brevettiai.io import IoTools, io_tools
from brevettiai.platform.models import PlatformBackend
from brevettiai.platform.models import backend as default_backend
from brevettiai.platform.platform_credentials import DefaultJobCredentialsChain
from brevettiai.platform.models import Dataset, Tag
from brevettiai.utils import argparse_utils

log = logging.getLogger(__name__)

custom_json_encoders = {
    Module: lambda x: x.get_config()
}


def parse_job_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--host', help='path to platform host', default=None)
    parser.add_argument('--data_bucket', help='location of dataset bucket', default=None)
    parser.add_argument('--job_dir', help='location to write checkpoints and export models', default=None)
    parser.add_argument('--model_id', help='Assigned id to model', default='unknown')
    parser.add_argument('--test_id', help='Assigned id to model', default='unknown')
    parser.add_argument('--api_key', help="Job api key", default=None)
    parser.add_argument('--info_file', type=str, help='Info file with test job info', required=False)
    parser.add_argument('--cache_path', type=str, help='Cache path', required=False)
    parser.add_argument('--raygun_api_key', help='api key for raygun', default=None)
    parser.add_argument('--run_locally', help='Run the job locally', default=False)
    return parser.parse_known_args()


class JobSettings(BaseModel):
    """
    Baseclass for job settings
    """
    extra: Dict[str, Any] = Field(default_factory=dict, vue=vue.SchemaConfig(exclude=True))

    class Config:
        arbitrary_types_allowed = True
        json_encoders = custom_json_encoders

    @classmethod
    def platform_schema(cls):
        """Utility function to get vue schema"""
        builder = vue.from_pydantic_model(cls)
        return builder

    @root_validator(pre=True, allow_reuse=True)
    def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Extra validator parsing settings from the platform"""

        # Parse extra args
        all_required_field_names = {field.alias for field in cls.__fields__.values() if
                                    field.alias != 'extra'}  # to support alias

        extra: Dict[str, Any] = values.get("Extra", {})
        for field_name in list(values):
            if field_name not in all_required_field_names:
                extra[field_name] = values.pop(field_name)
        values['extra'] = extra

        # Parse string as json variables
        properties = cls.schema()["properties"]
        try:
            for k, v in values.items():
                if isinstance(v, str) and (not "string" in [properties[k]["type"]] if "type" in properties[k] else [t["type"] for t in properties[k]["anyOf"]]):
                    if v == "":
                        values[k] = None
                    else:
                        values[k] = json.loads(v)
        except (ValueError, KeyError):
            pass

        return values


class Job(BaseModel):
    """
    Model defining a job on the Brevetti platform,
    Use it as base class for your jobs
    """
    id: str = Field(default_factory=lambda: str(uuid4()))
    run_id: str = Field(default_factory=lambda: datetime.now().strftime("%Y-%m-%dT%H%M%S"))
    name: str
    datasets: List[Dataset] = Field(default_factory=list)
    models: List[dict] = Field(default_factory=list)
    settings: JobSettings = Field(default_factory=dict)
    tags: List[Tag] = Field(default_factory=list)
    api_key: Optional[str]
    host_name: Optional[str]
    charts_url: Optional[str]
    complete_url: Optional[str]
    remote_url: Optional[str]
    security_credentials_url: Optional[str]  # Deprecated
    parent: Optional[dict]
    model_path: Optional[str]

    _job_dir: str = PrivateAttr(default=None)
    _backend: PlatformBackend = PrivateAttr(default=None)
    _io: IoTools = PrivateAttr(default=None)
    _temp_dir_reference: Optional[tempfile.TemporaryDirectory] = PrivateAttr(default=None)
    _temp_dir: str = PrivateAttr(default=None)
    _job_output: dict = PrivateAttr(default_factory=dict)

    class Config:
        json_encoders = custom_json_encoders

    def __init__(self, io=io_tools, backend=default_backend, cache_path=None, job_dir=None, **data) -> None:
        # Inject backend and io into datasets that are not yet initialized
        for ds in data.get("datasets", []):
            if not isinstance(ds, Dataset):
                ds["io"] = io
                ds["backend"] = backend

        super().__init__(**data)

        self._io = io
        self._backend = backend
        self._temp_dir = cache_path
        self._job_dir = job_dir

    def get_metadata(self) -> Union[ModelMetadata]:
        """
        Build metadata object, containing information to transfer to external users of the model
        """
        return ModelMetadata(
            id=self.id,
            run_id=self.run_id,
            name=self.name,
            host_name=self.host_name,
            producer=type(self).__name__
        )

    def start(self, resolve_access_rights: bool = True, cache_remote_files: bool = True, set_credentials: bool = True, complete_job: bool = True):
        """
        Start the job
        """
        if self._temp_dir is None:
            self._temp_dir_reference = tempfile.TemporaryDirectory(prefix=f"brevettiai-job-{self.id}-")
            self._temp_dir = self._temp_dir_reference.name

        if cache_remote_files:
            self.io.set_cache_root(self.temp_path("cache", dir=True))

        if set_credentials:
            self.io.minio.credentials = self.io.minio.credentials or DefaultJobCredentialsChain()
            self.io.minio.credentials.set_credentials(type="JobCredentials", user=self.id, secret=self.api_key)

        if resolve_access_rights:
            self.resolve_access_rights()

        self.upload_job_output()
        package_path = self.run()

        if complete_job:
            self.complete(package_path=package_path)
        return package_path

    def run(self) -> Optional[str]:
        """
        Overwrite this to run your job
        Return path to model in temp dir to upload
        """
        return None

    def resolve_access_rights(self) -> None:
        """
        Resolve access rights of this job
        :return:
        """
        self.io.resolve_access_rights(path=self.job_dir, resource_id=self.id, resource_type="job", mode="w")
        for ds in self.datasets:
            ds.resolve_access_rights()
        for model in self.models:
            self.io.resolve_access_rights(path=model['bucket'], resource_id=model['id'], resource_type="job", mode="w")

    @property
    def io(self):
        """Io reference for file handling"""
        assert self._io is not None, "Remember to call start_job()"
        return self._io

    @property
    def backend(self):
        """Reference to the backend"""
        assert self._backend is not None, "Remember to call start_job()"
        return self._backend

    @property
    def job_dir(self) -> str:
        return self._job_dir or self.backend.resource_path(self.id)

    def prepare_path(self, *paths, dir: bool = False) -> str:
        dir_ = paths if dir else paths[:-1]
        folder = self.io.path.join(*dir_)
        self.io.make_dirs(folder)
        return folder if dir else self.io.path.join(folder, paths[-1])

    def artifact_path(self, *paths, dir: bool = False) -> str:
        """
        Get path in the artifact directory tree
        :param paths: N path arguments
        :param dir: this is a directory
        :return:
        """
        return self.prepare_path(self.job_dir, "artifacts", *paths, dir=dir)

    def temp_path(self, *paths, dir=False):
        """
        Get path in the temp directory tree
        :param paths: N path arguments
        :param dir: this is a directory
        :return:
        """
        return self.prepare_path(self._temp_dir, *paths, dir=dir)

    def upload_artifact(self, artifact_name, payload, is_file=False):
        """
        Upload an artifact with a given name
        :param artifact_name: target artifact name
        :param payload: source
        :param is_file: payload is string to file location
        :return:
        """
        artifact_path = self.artifact_path(*((artifact_name,) if isinstance(artifact_name, str) else artifact_name))
        log.info('Uploading {} to {}'.format(artifact_name, artifact_path))
        if is_file:
            self.io.copy(payload, artifact_path)
        else:
            self.io.write_file(artifact_path, payload)

        return artifact_path.replace(self.io.path.join(self.job_dir, ''), '')

    def get_annotation_url(self, s3_image_path, annotation_name=None,
                           bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None,
                           min_zoom=2, max_zoom=300):
        """
        Get url to annotation file
        :param s3_image_path: Name of image file
        :param annotation_name: Name of annotation file, if any
        :param bbox: Selects zoom and center for the bbox
        :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox)
        :param screen_size: default screen size in pixels
        :param test_report_id:
        :param model_id:
        :param min_zoom:
        :param max_zoom:
        """

        uri_length = 36
        rm_keys = [self.backend.data_bucket, ".tiles/", "/dzi.json"]
        image_key = s3_image_path
        for rm_key in rm_keys:
            image_key = image_key.replace(rm_key, "")
        image_key = image_key.lstrip("/")
        dataset_id = image_key[:uri_length]
        image_rel_path = "/".join(image_key.split("/")[1:])

        url_info = dict(file=image_rel_path)

        if annotation_name:
            url_info["annotationFile"] = annotation_name

        if test_report_id:
            url_info["testReportId"] = test_report_id

        if model_id:
            url_info["modelId"] = model_id

        if bbox is not None:
            url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int)
            # NB: This will be overwritten if zoom is provided
            zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1]))
            url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min())

        if zoom:
            url_info["zoom"] = zoom

        return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info)

    def add_output_metric(self, key, metric):
        """
        Add an output metric for the job comparison module
        :param key:
        :param metric:
        :return:
        """
        self._job_output[key] = metric

    def add_output_metrics(self, metrics):
        """
        Add a number of metrics to the job
        :param metrics:
        :return:
        """
        self._job_output.update(metrics)

    def upload_job_output(self, path="output.json"):
        """
        Upload / update the output.json artifact containing parsed settings, etc.
        :param path:
        :return:
        """
        # Load relevant environment variables
        environment = {x: os.getenv(x) for x in ("BUILD_ID",)}

        # Add git info if available
        for name, object_type in (("git", type(self)), ("brevettiai git", Job)):
            try:
                environment[name] = GitRepositoryState.from_type(object_type)
            except Exception:
                pass

        # Add python module information
        environment["modules"] = list(sorted(f"{d.project_name}=={d.version}" for d in pkg_resources.working_set))

        payload = JobOutput(
            output=self._job_output,
            environment=environment,
            config=self
        ).json(indent=2, exclude={"job": {"api_key"}})
        if self.api_key:
            payload = payload.replace(self.api_key, "*" * len(self.api_key))
        return self.upload_artifact(path, payload)

    def get_remote_monitor(self):
        """Retrieve remote monitor object if available"""
        from brevettiai.interfaces.remote_monitor import RemoteMonitor
        return RemoteMonitor(root=self.host_name, path=self.remote_url) if self.remote_url else None

    def complete(self, tmp_package_path=None, package_path=None, output_args=''):
        """
        Complete job by uploading package to gcs and notifying api
        :param tmp_package_path: Path to tar archive with python package
        :param package_path: package path on gcs
        :param output_args
        :return:
        """
        self.upload_job_output()

        complete_url_args = ""
        if package_path is None:
            if tmp_package_path is not None:
                artifact_path = self.artifact_path("saved_model.tar.gz")
                self.io.copy(tmp_package_path, artifact_path)
                complete_url_args = quote(artifact_path)
        else:
            artifact_path = self.io.path.relpath(package_path, self.job_dir)
            assert ".." not in artifact_path, "Illegal package path. It should be an artifact of the model"
            complete_url_args = quote(artifact_path)

        # Clean temp dir if autogenerated
        if self._temp_dir_reference is not None:
            self._temp_dir_reference.cleanup()
            self._temp_dir_reference = None

        if self.complete_url:
            complete_url = self.host_name + self.complete_url + complete_url_args + output_args
            try:
                r = requests.post(complete_url)
                log.info(f'Job completed: {complete_url.split("&", 1)[-1]}')
                return r
            except requests.exceptions.HTTPError as e:
                log.warning("HTTP error on complete job", exc_info=e)
            except requests.exceptions.RequestException as e:
                log.warning("No Response on complete job", exc_info=e)
        else:
            log.warning("No known completion url, backend not notified")

    @classmethod
    def init(cls, job_id: Optional[str] = None, api_key: Optional[str] = None,
             cache_path: Optional[str] = None, info_file: Optional[str] = None,
             type_selector: Union[Iterable[Type['Job']], Callable[[dict], Type['Job']]] = None,
             job_config: Optional[dict] = None,
             log_level=logging.INFO, io=io_tools, backend=default_backend, **kwargs) -> 'Job':
        """
        Initialize a job
        :param job_id: id of job to find on the backend
        :param api_key: Api key for access if job is containing remote resources
        :param info_file: filename of info file to use, overwrites job id
        :param cache_path: Path to use for caching remote resources and as temporary storage
        :param type_selector: list of different job types to match
        :param job_config: configuration of the job
        :param log_level: logging level
        :param io IoTools: object managing data access and reads / writes
        :param backend: PlatformBackend object containing info on the backend to use
        """
        # Setup logging
        logging.basicConfig()
        log.root.setLevel(log_level)
        logging.getLogger("urllib3").setLevel(logging.WARNING)
        logging.getLogger("charset_normalizer").setLevel(logging.WARNING)

        backend.prepare_runtime()

        # Parse args
        args, extra_args = parse_job_args()

        if args.raygun_api_key is not None:
            from brevettiai.interfaces import raygun
            raygun.setup_raygun(api_key=args.raygun_api_key)

        # Job API: from input or input args
        job_id = job_id or args.test_id if args.model_id == "unknown" else args.model_id
        api_key = api_key or args.api_key
        job_dir = args.job_dir or backend.resource_path(job_id)
        info_file = info_file or args.info_file or io.path.join(job_dir, "info.json")
        cache_path = cache_path or args.cache_path

        io.minio.credentials = io.minio.credentials or DefaultJobCredentialsChain()

        if job_config is None:
            try:
                job_config = io.read_file(info_file)
            except KeyError:
                io.minio.credentials.set_credentials(type="JobCredentials", user=job_id, secret=api_key)
                io.resolve_access_rights(job_dir, resource_id=job_id, resource_type="job", mode="w")
                job_config = io.read_file(info_file)

            job_config = json.loads(job_config)

        # Overload config with extras
        job_config.update(io=io, backend=backend, cache_path=cache_path, **kwargs)

        # overload
        settings_args = argparse_utils.parse_args_from_dict(extra_args, target=job_config["settings"])
        argparse_utils.overload_dict_from_args(settings_args, target=job_config["settings"])
        if type_selector is not None:
            try:
                job = parse_obj_as(type_selector, job_config)
            except TypeError:
                try:
                    job = parse_obj_as(Union[tuple(type_selector)], job_config)
                except TypeError:
                    job = type_selector(job_config).parse_obj(job_config)
        else:
            job = cls.parse_obj({**job_config, "_job_dir": job_dir})
        log.info(f"{type(job)} initialized")
        return job

    @classmethod
    def from_model_spec(cls, model, schema=None, config=None, job_dir=None, **kwargs):
        """
        Build job object from model specification
        :param model:
        :param schema:
        :param config:
        :param kwargs:
        :param job_dir:
        :return:
        """
        cdict = {} if config is None else config.__dict__

        if schema is None:
            schema = vue.SchemaBuilder().schema

        if isinstance(model["settings"], str):
            model["settings"] = json.loads(model["settings"])

        config = cls(
            name=model["name"],
            id=model["id"],
            settings=model["settings"],
            tags=model.get("tags", []),
            platform_schema=schema,
            job_dir=job_dir,
            datasets=cdict.get("datasets", []),
            api_key=cdict.get("api_key"),
            charts_url=cdict.get("charts_url"),
            complete_url=cdict.get("complete_url"),
            remote_url=cdict.get("remote_url"),
            host_name=cdict.get("host_name"),
            **kwargs,
        )
        config.model_path = model.get("model_path", None)
        return config


class JobOutput(BaseModel):
    output: dict
    environment: Dict[str, Any]
    job: Job = Field(alias="config")

    class Config:
        json_encoders = custom_json_encoders

Functions

def parse_job_args()
Expand source code
def parse_job_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--host', help='path to platform host', default=None)
    parser.add_argument('--data_bucket', help='location of dataset bucket', default=None)
    parser.add_argument('--job_dir', help='location to write checkpoints and export models', default=None)
    parser.add_argument('--model_id', help='Assigned id to model', default='unknown')
    parser.add_argument('--test_id', help='Assigned id to model', default='unknown')
    parser.add_argument('--api_key', help="Job api key", default=None)
    parser.add_argument('--info_file', type=str, help='Info file with test job info', required=False)
    parser.add_argument('--cache_path', type=str, help='Cache path', required=False)
    parser.add_argument('--raygun_api_key', help='api key for raygun', default=None)
    parser.add_argument('--run_locally', help='Run the job locally', default=False)
    return parser.parse_known_args()

Classes

class Job (io=<brevettiai.io.utils.IoTools object>, backend=PlatformBackend(host='https://platform.brevetti.ai', output_segmentation_dir='output_segmentations', bucket_region='eu-west-1', data_bucket='s3://data.criterion.ai', custom_job_id='a0aaad69-c032-41c1-a68c-e9a15a5fb18c'), cache_path=None, job_dir=None, **data)

Model defining a job on the Brevetti platform, Use it as base class for your jobs

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class Job(BaseModel):
    """
    Model defining a job on the Brevetti platform,
    Use it as base class for your jobs
    """
    id: str = Field(default_factory=lambda: str(uuid4()))
    run_id: str = Field(default_factory=lambda: datetime.now().strftime("%Y-%m-%dT%H%M%S"))
    name: str
    datasets: List[Dataset] = Field(default_factory=list)
    models: List[dict] = Field(default_factory=list)
    settings: JobSettings = Field(default_factory=dict)
    tags: List[Tag] = Field(default_factory=list)
    api_key: Optional[str]
    host_name: Optional[str]
    charts_url: Optional[str]
    complete_url: Optional[str]
    remote_url: Optional[str]
    security_credentials_url: Optional[str]  # Deprecated
    parent: Optional[dict]
    model_path: Optional[str]

    _job_dir: str = PrivateAttr(default=None)
    _backend: PlatformBackend = PrivateAttr(default=None)
    _io: IoTools = PrivateAttr(default=None)
    _temp_dir_reference: Optional[tempfile.TemporaryDirectory] = PrivateAttr(default=None)
    _temp_dir: str = PrivateAttr(default=None)
    _job_output: dict = PrivateAttr(default_factory=dict)

    class Config:
        json_encoders = custom_json_encoders

    def __init__(self, io=io_tools, backend=default_backend, cache_path=None, job_dir=None, **data) -> None:
        # Inject backend and io into datasets that are not yet initialized
        for ds in data.get("datasets", []):
            if not isinstance(ds, Dataset):
                ds["io"] = io
                ds["backend"] = backend

        super().__init__(**data)

        self._io = io
        self._backend = backend
        self._temp_dir = cache_path
        self._job_dir = job_dir

    def get_metadata(self) -> Union[ModelMetadata]:
        """
        Build metadata object, containing information to transfer to external users of the model
        """
        return ModelMetadata(
            id=self.id,
            run_id=self.run_id,
            name=self.name,
            host_name=self.host_name,
            producer=type(self).__name__
        )

    def start(self, resolve_access_rights: bool = True, cache_remote_files: bool = True, set_credentials: bool = True, complete_job: bool = True):
        """
        Start the job
        """
        if self._temp_dir is None:
            self._temp_dir_reference = tempfile.TemporaryDirectory(prefix=f"brevettiai-job-{self.id}-")
            self._temp_dir = self._temp_dir_reference.name

        if cache_remote_files:
            self.io.set_cache_root(self.temp_path("cache", dir=True))

        if set_credentials:
            self.io.minio.credentials = self.io.minio.credentials or DefaultJobCredentialsChain()
            self.io.minio.credentials.set_credentials(type="JobCredentials", user=self.id, secret=self.api_key)

        if resolve_access_rights:
            self.resolve_access_rights()

        self.upload_job_output()
        package_path = self.run()

        if complete_job:
            self.complete(package_path=package_path)
        return package_path

    def run(self) -> Optional[str]:
        """
        Overwrite this to run your job
        Return path to model in temp dir to upload
        """
        return None

    def resolve_access_rights(self) -> None:
        """
        Resolve access rights of this job
        :return:
        """
        self.io.resolve_access_rights(path=self.job_dir, resource_id=self.id, resource_type="job", mode="w")
        for ds in self.datasets:
            ds.resolve_access_rights()
        for model in self.models:
            self.io.resolve_access_rights(path=model['bucket'], resource_id=model['id'], resource_type="job", mode="w")

    @property
    def io(self):
        """Io reference for file handling"""
        assert self._io is not None, "Remember to call start_job()"
        return self._io

    @property
    def backend(self):
        """Reference to the backend"""
        assert self._backend is not None, "Remember to call start_job()"
        return self._backend

    @property
    def job_dir(self) -> str:
        return self._job_dir or self.backend.resource_path(self.id)

    def prepare_path(self, *paths, dir: bool = False) -> str:
        dir_ = paths if dir else paths[:-1]
        folder = self.io.path.join(*dir_)
        self.io.make_dirs(folder)
        return folder if dir else self.io.path.join(folder, paths[-1])

    def artifact_path(self, *paths, dir: bool = False) -> str:
        """
        Get path in the artifact directory tree
        :param paths: N path arguments
        :param dir: this is a directory
        :return:
        """
        return self.prepare_path(self.job_dir, "artifacts", *paths, dir=dir)

    def temp_path(self, *paths, dir=False):
        """
        Get path in the temp directory tree
        :param paths: N path arguments
        :param dir: this is a directory
        :return:
        """
        return self.prepare_path(self._temp_dir, *paths, dir=dir)

    def upload_artifact(self, artifact_name, payload, is_file=False):
        """
        Upload an artifact with a given name
        :param artifact_name: target artifact name
        :param payload: source
        :param is_file: payload is string to file location
        :return:
        """
        artifact_path = self.artifact_path(*((artifact_name,) if isinstance(artifact_name, str) else artifact_name))
        log.info('Uploading {} to {}'.format(artifact_name, artifact_path))
        if is_file:
            self.io.copy(payload, artifact_path)
        else:
            self.io.write_file(artifact_path, payload)

        return artifact_path.replace(self.io.path.join(self.job_dir, ''), '')

    def get_annotation_url(self, s3_image_path, annotation_name=None,
                           bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None,
                           min_zoom=2, max_zoom=300):
        """
        Get url to annotation file
        :param s3_image_path: Name of image file
        :param annotation_name: Name of annotation file, if any
        :param bbox: Selects zoom and center for the bbox
        :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox)
        :param screen_size: default screen size in pixels
        :param test_report_id:
        :param model_id:
        :param min_zoom:
        :param max_zoom:
        """

        uri_length = 36
        rm_keys = [self.backend.data_bucket, ".tiles/", "/dzi.json"]
        image_key = s3_image_path
        for rm_key in rm_keys:
            image_key = image_key.replace(rm_key, "")
        image_key = image_key.lstrip("/")
        dataset_id = image_key[:uri_length]
        image_rel_path = "/".join(image_key.split("/")[1:])

        url_info = dict(file=image_rel_path)

        if annotation_name:
            url_info["annotationFile"] = annotation_name

        if test_report_id:
            url_info["testReportId"] = test_report_id

        if model_id:
            url_info["modelId"] = model_id

        if bbox is not None:
            url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int)
            # NB: This will be overwritten if zoom is provided
            zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1]))
            url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min())

        if zoom:
            url_info["zoom"] = zoom

        return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info)

    def add_output_metric(self, key, metric):
        """
        Add an output metric for the job comparison module
        :param key:
        :param metric:
        :return:
        """
        self._job_output[key] = metric

    def add_output_metrics(self, metrics):
        """
        Add a number of metrics to the job
        :param metrics:
        :return:
        """
        self._job_output.update(metrics)

    def upload_job_output(self, path="output.json"):
        """
        Upload / update the output.json artifact containing parsed settings, etc.
        :param path:
        :return:
        """
        # Load relevant environment variables
        environment = {x: os.getenv(x) for x in ("BUILD_ID",)}

        # Add git info if available
        for name, object_type in (("git", type(self)), ("brevettiai git", Job)):
            try:
                environment[name] = GitRepositoryState.from_type(object_type)
            except Exception:
                pass

        # Add python module information
        environment["modules"] = list(sorted(f"{d.project_name}=={d.version}" for d in pkg_resources.working_set))

        payload = JobOutput(
            output=self._job_output,
            environment=environment,
            config=self
        ).json(indent=2, exclude={"job": {"api_key"}})
        if self.api_key:
            payload = payload.replace(self.api_key, "*" * len(self.api_key))
        return self.upload_artifact(path, payload)

    def get_remote_monitor(self):
        """Retrieve remote monitor object if available"""
        from brevettiai.interfaces.remote_monitor import RemoteMonitor
        return RemoteMonitor(root=self.host_name, path=self.remote_url) if self.remote_url else None

    def complete(self, tmp_package_path=None, package_path=None, output_args=''):
        """
        Complete job by uploading package to gcs and notifying api
        :param tmp_package_path: Path to tar archive with python package
        :param package_path: package path on gcs
        :param output_args
        :return:
        """
        self.upload_job_output()

        complete_url_args = ""
        if package_path is None:
            if tmp_package_path is not None:
                artifact_path = self.artifact_path("saved_model.tar.gz")
                self.io.copy(tmp_package_path, artifact_path)
                complete_url_args = quote(artifact_path)
        else:
            artifact_path = self.io.path.relpath(package_path, self.job_dir)
            assert ".." not in artifact_path, "Illegal package path. It should be an artifact of the model"
            complete_url_args = quote(artifact_path)

        # Clean temp dir if autogenerated
        if self._temp_dir_reference is not None:
            self._temp_dir_reference.cleanup()
            self._temp_dir_reference = None

        if self.complete_url:
            complete_url = self.host_name + self.complete_url + complete_url_args + output_args
            try:
                r = requests.post(complete_url)
                log.info(f'Job completed: {complete_url.split("&", 1)[-1]}')
                return r
            except requests.exceptions.HTTPError as e:
                log.warning("HTTP error on complete job", exc_info=e)
            except requests.exceptions.RequestException as e:
                log.warning("No Response on complete job", exc_info=e)
        else:
            log.warning("No known completion url, backend not notified")

    @classmethod
    def init(cls, job_id: Optional[str] = None, api_key: Optional[str] = None,
             cache_path: Optional[str] = None, info_file: Optional[str] = None,
             type_selector: Union[Iterable[Type['Job']], Callable[[dict], Type['Job']]] = None,
             job_config: Optional[dict] = None,
             log_level=logging.INFO, io=io_tools, backend=default_backend, **kwargs) -> 'Job':
        """
        Initialize a job
        :param job_id: id of job to find on the backend
        :param api_key: Api key for access if job is containing remote resources
        :param info_file: filename of info file to use, overwrites job id
        :param cache_path: Path to use for caching remote resources and as temporary storage
        :param type_selector: list of different job types to match
        :param job_config: configuration of the job
        :param log_level: logging level
        :param io IoTools: object managing data access and reads / writes
        :param backend: PlatformBackend object containing info on the backend to use
        """
        # Setup logging
        logging.basicConfig()
        log.root.setLevel(log_level)
        logging.getLogger("urllib3").setLevel(logging.WARNING)
        logging.getLogger("charset_normalizer").setLevel(logging.WARNING)

        backend.prepare_runtime()

        # Parse args
        args, extra_args = parse_job_args()

        if args.raygun_api_key is not None:
            from brevettiai.interfaces import raygun
            raygun.setup_raygun(api_key=args.raygun_api_key)

        # Job API: from input or input args
        job_id = job_id or args.test_id if args.model_id == "unknown" else args.model_id
        api_key = api_key or args.api_key
        job_dir = args.job_dir or backend.resource_path(job_id)
        info_file = info_file or args.info_file or io.path.join(job_dir, "info.json")
        cache_path = cache_path or args.cache_path

        io.minio.credentials = io.minio.credentials or DefaultJobCredentialsChain()

        if job_config is None:
            try:
                job_config = io.read_file(info_file)
            except KeyError:
                io.minio.credentials.set_credentials(type="JobCredentials", user=job_id, secret=api_key)
                io.resolve_access_rights(job_dir, resource_id=job_id, resource_type="job", mode="w")
                job_config = io.read_file(info_file)

            job_config = json.loads(job_config)

        # Overload config with extras
        job_config.update(io=io, backend=backend, cache_path=cache_path, **kwargs)

        # overload
        settings_args = argparse_utils.parse_args_from_dict(extra_args, target=job_config["settings"])
        argparse_utils.overload_dict_from_args(settings_args, target=job_config["settings"])
        if type_selector is not None:
            try:
                job = parse_obj_as(type_selector, job_config)
            except TypeError:
                try:
                    job = parse_obj_as(Union[tuple(type_selector)], job_config)
                except TypeError:
                    job = type_selector(job_config).parse_obj(job_config)
        else:
            job = cls.parse_obj({**job_config, "_job_dir": job_dir})
        log.info(f"{type(job)} initialized")
        return job

    @classmethod
    def from_model_spec(cls, model, schema=None, config=None, job_dir=None, **kwargs):
        """
        Build job object from model specification
        :param model:
        :param schema:
        :param config:
        :param kwargs:
        :param job_dir:
        :return:
        """
        cdict = {} if config is None else config.__dict__

        if schema is None:
            schema = vue.SchemaBuilder().schema

        if isinstance(model["settings"], str):
            model["settings"] = json.loads(model["settings"])

        config = cls(
            name=model["name"],
            id=model["id"],
            settings=model["settings"],
            tags=model.get("tags", []),
            platform_schema=schema,
            job_dir=job_dir,
            datasets=cdict.get("datasets", []),
            api_key=cdict.get("api_key"),
            charts_url=cdict.get("charts_url"),
            complete_url=cdict.get("complete_url"),
            remote_url=cdict.get("remote_url"),
            host_name=cdict.get("host_name"),
            **kwargs,
        )
        config.model_path = model.get("model_path", None)
        return config

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Config
var api_key : Optional[str]
var charts_url : Optional[str]
var complete_url : Optional[str]
var datasets : List[Dataset]
var host_name : Optional[str]
var id : str
var model_path : Optional[str]
var models : List[dict]
var name : str
var parent : Optional[dict]
var remote_url : Optional[str]
var run_id : str
var security_credentials_url : Optional[str]
var settingsJobSettings
var tags : List[Tag]

Static methods

def from_model_spec(model, schema=None, config=None, job_dir=None, **kwargs)

Build job object from model specification :param model: :param schema: :param config: :param kwargs: :param job_dir: :return:

Expand source code
@classmethod
def from_model_spec(cls, model, schema=None, config=None, job_dir=None, **kwargs):
    """
    Build job object from model specification
    :param model:
    :param schema:
    :param config:
    :param kwargs:
    :param job_dir:
    :return:
    """
    cdict = {} if config is None else config.__dict__

    if schema is None:
        schema = vue.SchemaBuilder().schema

    if isinstance(model["settings"], str):
        model["settings"] = json.loads(model["settings"])

    config = cls(
        name=model["name"],
        id=model["id"],
        settings=model["settings"],
        tags=model.get("tags", []),
        platform_schema=schema,
        job_dir=job_dir,
        datasets=cdict.get("datasets", []),
        api_key=cdict.get("api_key"),
        charts_url=cdict.get("charts_url"),
        complete_url=cdict.get("complete_url"),
        remote_url=cdict.get("remote_url"),
        host_name=cdict.get("host_name"),
        **kwargs,
    )
    config.model_path = model.get("model_path", None)
    return config
def init(job_id: Optional[str] = None, api_key: Optional[str] = None, cache_path: Optional[str] = None, info_file: Optional[str] = None, type_selector: Union[Iterable[Type[ForwardRef('Job')]], Callable[[dict], Type[ForwardRef('Job')]]] = None, job_config: Optional[dict] = None, log_level=20, io=<brevettiai.io.utils.IoTools object>, backend=PlatformBackend(host='https://platform.brevetti.ai', output_segmentation_dir='output_segmentations', bucket_region='eu-west-1', data_bucket='s3://data.criterion.ai', custom_job_id='a0aaad69-c032-41c1-a68c-e9a15a5fb18c'), **kwargs) ‑> Job

Initialize a job :param job_id: id of job to find on the backend :param api_key: Api key for access if job is containing remote resources :param info_file: filename of info file to use, overwrites job id :param cache_path: Path to use for caching remote resources and as temporary storage :param type_selector: list of different job types to match :param job_config: configuration of the job :param log_level: logging level :param io IoTools: object managing data access and reads / writes :param backend: PlatformBackend object containing info on the backend to use

Expand source code
@classmethod
def init(cls, job_id: Optional[str] = None, api_key: Optional[str] = None,
         cache_path: Optional[str] = None, info_file: Optional[str] = None,
         type_selector: Union[Iterable[Type['Job']], Callable[[dict], Type['Job']]] = None,
         job_config: Optional[dict] = None,
         log_level=logging.INFO, io=io_tools, backend=default_backend, **kwargs) -> 'Job':
    """
    Initialize a job
    :param job_id: id of job to find on the backend
    :param api_key: Api key for access if job is containing remote resources
    :param info_file: filename of info file to use, overwrites job id
    :param cache_path: Path to use for caching remote resources and as temporary storage
    :param type_selector: list of different job types to match
    :param job_config: configuration of the job
    :param log_level: logging level
    :param io IoTools: object managing data access and reads / writes
    :param backend: PlatformBackend object containing info on the backend to use
    """
    # Setup logging
    logging.basicConfig()
    log.root.setLevel(log_level)
    logging.getLogger("urllib3").setLevel(logging.WARNING)
    logging.getLogger("charset_normalizer").setLevel(logging.WARNING)

    backend.prepare_runtime()

    # Parse args
    args, extra_args = parse_job_args()

    if args.raygun_api_key is not None:
        from brevettiai.interfaces import raygun
        raygun.setup_raygun(api_key=args.raygun_api_key)

    # Job API: from input or input args
    job_id = job_id or args.test_id if args.model_id == "unknown" else args.model_id
    api_key = api_key or args.api_key
    job_dir = args.job_dir or backend.resource_path(job_id)
    info_file = info_file or args.info_file or io.path.join(job_dir, "info.json")
    cache_path = cache_path or args.cache_path

    io.minio.credentials = io.minio.credentials or DefaultJobCredentialsChain()

    if job_config is None:
        try:
            job_config = io.read_file(info_file)
        except KeyError:
            io.minio.credentials.set_credentials(type="JobCredentials", user=job_id, secret=api_key)
            io.resolve_access_rights(job_dir, resource_id=job_id, resource_type="job", mode="w")
            job_config = io.read_file(info_file)

        job_config = json.loads(job_config)

    # Overload config with extras
    job_config.update(io=io, backend=backend, cache_path=cache_path, **kwargs)

    # overload
    settings_args = argparse_utils.parse_args_from_dict(extra_args, target=job_config["settings"])
    argparse_utils.overload_dict_from_args(settings_args, target=job_config["settings"])
    if type_selector is not None:
        try:
            job = parse_obj_as(type_selector, job_config)
        except TypeError:
            try:
                job = parse_obj_as(Union[tuple(type_selector)], job_config)
            except TypeError:
                job = type_selector(job_config).parse_obj(job_config)
    else:
        job = cls.parse_obj({**job_config, "_job_dir": job_dir})
    log.info(f"{type(job)} initialized")
    return job

Instance variables

var backend

Reference to the backend

Expand source code
@property
def backend(self):
    """Reference to the backend"""
    assert self._backend is not None, "Remember to call start_job()"
    return self._backend
var io

Io reference for file handling

Expand source code
@property
def io(self):
    """Io reference for file handling"""
    assert self._io is not None, "Remember to call start_job()"
    return self._io
var job_dir : str
Expand source code
@property
def job_dir(self) -> str:
    return self._job_dir or self.backend.resource_path(self.id)

Methods

def add_output_metric(self, key, metric)

Add an output metric for the job comparison module :param key: :param metric: :return:

Expand source code
def add_output_metric(self, key, metric):
    """
    Add an output metric for the job comparison module
    :param key:
    :param metric:
    :return:
    """
    self._job_output[key] = metric
def add_output_metrics(self, metrics)

Add a number of metrics to the job :param metrics: :return:

Expand source code
def add_output_metrics(self, metrics):
    """
    Add a number of metrics to the job
    :param metrics:
    :return:
    """
    self._job_output.update(metrics)
def artifact_path(self, *paths, dir: bool = False) ‑> str

Get path in the artifact directory tree :param paths: N path arguments :param dir: this is a directory :return:

Expand source code
def artifact_path(self, *paths, dir: bool = False) -> str:
    """
    Get path in the artifact directory tree
    :param paths: N path arguments
    :param dir: this is a directory
    :return:
    """
    return self.prepare_path(self.job_dir, "artifacts", *paths, dir=dir)
def complete(self, tmp_package_path=None, package_path=None, output_args='')

Complete job by uploading package to gcs and notifying api :param tmp_package_path: Path to tar archive with python package :param package_path: package path on gcs :param output_args :return:

Expand source code
def complete(self, tmp_package_path=None, package_path=None, output_args=''):
    """
    Complete job by uploading package to gcs and notifying api
    :param tmp_package_path: Path to tar archive with python package
    :param package_path: package path on gcs
    :param output_args
    :return:
    """
    self.upload_job_output()

    complete_url_args = ""
    if package_path is None:
        if tmp_package_path is not None:
            artifact_path = self.artifact_path("saved_model.tar.gz")
            self.io.copy(tmp_package_path, artifact_path)
            complete_url_args = quote(artifact_path)
    else:
        artifact_path = self.io.path.relpath(package_path, self.job_dir)
        assert ".." not in artifact_path, "Illegal package path. It should be an artifact of the model"
        complete_url_args = quote(artifact_path)

    # Clean temp dir if autogenerated
    if self._temp_dir_reference is not None:
        self._temp_dir_reference.cleanup()
        self._temp_dir_reference = None

    if self.complete_url:
        complete_url = self.host_name + self.complete_url + complete_url_args + output_args
        try:
            r = requests.post(complete_url)
            log.info(f'Job completed: {complete_url.split("&", 1)[-1]}')
            return r
        except requests.exceptions.HTTPError as e:
            log.warning("HTTP error on complete job", exc_info=e)
        except requests.exceptions.RequestException as e:
            log.warning("No Response on complete job", exc_info=e)
    else:
        log.warning("No known completion url, backend not notified")
def get_annotation_url(self, s3_image_path, annotation_name=None, bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None, min_zoom=2, max_zoom=300)

Get url to annotation file :param s3_image_path: Name of image file :param annotation_name: Name of annotation file, if any :param bbox: Selects zoom and center for the bbox :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox) :param screen_size: default screen size in pixels :param test_report_id: :param model_id: :param min_zoom: :param max_zoom:

Expand source code
def get_annotation_url(self, s3_image_path, annotation_name=None,
                       bbox=None, zoom=None, screen_size=1024, test_report_id=None, model_id=None,
                       min_zoom=2, max_zoom=300):
    """
    Get url to annotation file
    :param s3_image_path: Name of image file
    :param annotation_name: Name of annotation file, if any
    :param bbox: Selects zoom and center for the bbox
    :param zoom: Zoom level [2-300] related to screen pixel size (if None zoom will be calculated from bbox)
    :param screen_size: default screen size in pixels
    :param test_report_id:
    :param model_id:
    :param min_zoom:
    :param max_zoom:
    """

    uri_length = 36
    rm_keys = [self.backend.data_bucket, ".tiles/", "/dzi.json"]
    image_key = s3_image_path
    for rm_key in rm_keys:
        image_key = image_key.replace(rm_key, "")
    image_key = image_key.lstrip("/")
    dataset_id = image_key[:uri_length]
    image_rel_path = "/".join(image_key.split("/")[1:])

    url_info = dict(file=image_rel_path)

    if annotation_name:
        url_info["annotationFile"] = annotation_name

    if test_report_id:
        url_info["testReportId"] = test_report_id

    if model_id:
        url_info["modelId"] = model_id

    if bbox is not None:
        url_info["x"], url_info["y"] = np.array(bbox).reshape(2, 2).mean(0).astype(np.int)
        # NB: This will be overwritten if zoom is provided
        zoom = (100 * screen_size / np.array(bbox).reshape(2, 2).T.dot([-1, 1]))
        url_info["zoom"] = int(zoom.clip(min=min_zoom, max=max_zoom).min())

    if zoom:
        url_info["zoom"] = zoom

    return "https://platform.brevetti.ai/data/{}?".format(dataset_id) + urllib.parse.urlencode(url_info)
def get_metadata(self) ‑> ModelMetadata

Build metadata object, containing information to transfer to external users of the model

Expand source code
def get_metadata(self) -> Union[ModelMetadata]:
    """
    Build metadata object, containing information to transfer to external users of the model
    """
    return ModelMetadata(
        id=self.id,
        run_id=self.run_id,
        name=self.name,
        host_name=self.host_name,
        producer=type(self).__name__
    )
def get_remote_monitor(self)

Retrieve remote monitor object if available

Expand source code
def get_remote_monitor(self):
    """Retrieve remote monitor object if available"""
    from brevettiai.interfaces.remote_monitor import RemoteMonitor
    return RemoteMonitor(root=self.host_name, path=self.remote_url) if self.remote_url else None
def prepare_path(self, *paths, dir: bool = False) ‑> str
Expand source code
def prepare_path(self, *paths, dir: bool = False) -> str:
    dir_ = paths if dir else paths[:-1]
    folder = self.io.path.join(*dir_)
    self.io.make_dirs(folder)
    return folder if dir else self.io.path.join(folder, paths[-1])
def resolve_access_rights(self) ‑> None

Resolve access rights of this job :return:

Expand source code
def resolve_access_rights(self) -> None:
    """
    Resolve access rights of this job
    :return:
    """
    self.io.resolve_access_rights(path=self.job_dir, resource_id=self.id, resource_type="job", mode="w")
    for ds in self.datasets:
        ds.resolve_access_rights()
    for model in self.models:
        self.io.resolve_access_rights(path=model['bucket'], resource_id=model['id'], resource_type="job", mode="w")
def run(self) ‑> Optional[str]

Overwrite this to run your job Return path to model in temp dir to upload

Expand source code
def run(self) -> Optional[str]:
    """
    Overwrite this to run your job
    Return path to model in temp dir to upload
    """
    return None
def start(self, resolve_access_rights: bool = True, cache_remote_files: bool = True, set_credentials: bool = True, complete_job: bool = True)

Start the job

Expand source code
def start(self, resolve_access_rights: bool = True, cache_remote_files: bool = True, set_credentials: bool = True, complete_job: bool = True):
    """
    Start the job
    """
    if self._temp_dir is None:
        self._temp_dir_reference = tempfile.TemporaryDirectory(prefix=f"brevettiai-job-{self.id}-")
        self._temp_dir = self._temp_dir_reference.name

    if cache_remote_files:
        self.io.set_cache_root(self.temp_path("cache", dir=True))

    if set_credentials:
        self.io.minio.credentials = self.io.minio.credentials or DefaultJobCredentialsChain()
        self.io.minio.credentials.set_credentials(type="JobCredentials", user=self.id, secret=self.api_key)

    if resolve_access_rights:
        self.resolve_access_rights()

    self.upload_job_output()
    package_path = self.run()

    if complete_job:
        self.complete(package_path=package_path)
    return package_path
def temp_path(self, *paths, dir=False)

Get path in the temp directory tree :param paths: N path arguments :param dir: this is a directory :return:

Expand source code
def temp_path(self, *paths, dir=False):
    """
    Get path in the temp directory tree
    :param paths: N path arguments
    :param dir: this is a directory
    :return:
    """
    return self.prepare_path(self._temp_dir, *paths, dir=dir)
def upload_artifact(self, artifact_name, payload, is_file=False)

Upload an artifact with a given name :param artifact_name: target artifact name :param payload: source :param is_file: payload is string to file location :return:

Expand source code
def upload_artifact(self, artifact_name, payload, is_file=False):
    """
    Upload an artifact with a given name
    :param artifact_name: target artifact name
    :param payload: source
    :param is_file: payload is string to file location
    :return:
    """
    artifact_path = self.artifact_path(*((artifact_name,) if isinstance(artifact_name, str) else artifact_name))
    log.info('Uploading {} to {}'.format(artifact_name, artifact_path))
    if is_file:
        self.io.copy(payload, artifact_path)
    else:
        self.io.write_file(artifact_path, payload)

    return artifact_path.replace(self.io.path.join(self.job_dir, ''), '')
def upload_job_output(self, path='output.json')

Upload / update the output.json artifact containing parsed settings, etc. :param path: :return:

Expand source code
def upload_job_output(self, path="output.json"):
    """
    Upload / update the output.json artifact containing parsed settings, etc.
    :param path:
    :return:
    """
    # Load relevant environment variables
    environment = {x: os.getenv(x) for x in ("BUILD_ID",)}

    # Add git info if available
    for name, object_type in (("git", type(self)), ("brevettiai git", Job)):
        try:
            environment[name] = GitRepositoryState.from_type(object_type)
        except Exception:
            pass

    # Add python module information
    environment["modules"] = list(sorted(f"{d.project_name}=={d.version}" for d in pkg_resources.working_set))

    payload = JobOutput(
        output=self._job_output,
        environment=environment,
        config=self
    ).json(indent=2, exclude={"job": {"api_key"}})
    if self.api_key:
        payload = payload.replace(self.api_key, "*" * len(self.api_key))
    return self.upload_artifact(path, payload)
class JobOutput (**data: Any)

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class JobOutput(BaseModel):
    output: dict
    environment: Dict[str, Any]
    job: Job = Field(alias="config")

    class Config:
        json_encoders = custom_json_encoders

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Config
var environment : Dict[str, Any]
var jobJob
var output : dict
class JobSettings (**data: Any)

Baseclass for job settings

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class JobSettings(BaseModel):
    """
    Baseclass for job settings
    """
    extra: Dict[str, Any] = Field(default_factory=dict, vue=vue.SchemaConfig(exclude=True))

    class Config:
        arbitrary_types_allowed = True
        json_encoders = custom_json_encoders

    @classmethod
    def platform_schema(cls):
        """Utility function to get vue schema"""
        builder = vue.from_pydantic_model(cls)
        return builder

    @root_validator(pre=True, allow_reuse=True)
    def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Extra validator parsing settings from the platform"""

        # Parse extra args
        all_required_field_names = {field.alias for field in cls.__fields__.values() if
                                    field.alias != 'extra'}  # to support alias

        extra: Dict[str, Any] = values.get("Extra", {})
        for field_name in list(values):
            if field_name not in all_required_field_names:
                extra[field_name] = values.pop(field_name)
        values['extra'] = extra

        # Parse string as json variables
        properties = cls.schema()["properties"]
        try:
            for k, v in values.items():
                if isinstance(v, str) and (not "string" in [properties[k]["type"]] if "type" in properties[k] else [t["type"] for t in properties[k]["anyOf"]]):
                    if v == "":
                        values[k] = None
                    else:
                        values[k] = json.loads(v)
        except (ValueError, KeyError):
            pass

        return values

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Config
var extra : Dict[str, Any]

Static methods

def parse_settings(values: Dict[str, Any]) ‑> Dict[str, Any]

Extra validator parsing settings from the platform

Expand source code
@root_validator(pre=True, allow_reuse=True)
def parse_settings(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Extra validator parsing settings from the platform"""

    # Parse extra args
    all_required_field_names = {field.alias for field in cls.__fields__.values() if
                                field.alias != 'extra'}  # to support alias

    extra: Dict[str, Any] = values.get("Extra", {})
    for field_name in list(values):
        if field_name not in all_required_field_names:
            extra[field_name] = values.pop(field_name)
    values['extra'] = extra

    # Parse string as json variables
    properties = cls.schema()["properties"]
    try:
        for k, v in values.items():
            if isinstance(v, str) and (not "string" in [properties[k]["type"]] if "type" in properties[k] else [t["type"] for t in properties[k]["anyOf"]]):
                if v == "":
                    values[k] = None
                else:
                    values[k] = json.loads(v)
    except (ValueError, KeyError):
        pass

    return values
def platform_schema()

Utility function to get vue schema

Expand source code
@classmethod
def platform_schema(cls):
    """Utility function to get vue schema"""
    builder = vue.from_pydantic_model(cls)
    return builder