Module brevettiai.platform.web_api

Expand source code
import base64
import json
import os
import re
import requests
import uuid
from urllib.parse import urlparse, unquote
from pydantic import BaseModel, PrivateAttr, parse_raw_as
from cryptography.fernet import Fernet, InvalidToken
from typing import ClassVar, List, Union, Type

from brevettiai.platform.models import Job, JobSettings
from brevettiai.io.utils import IoTools, prompt_for_password
from brevettiai.platform.models import backend, PlatformBackend
from brevettiai.platform.platform_credentials import PlatformDatasetCredentials
from brevettiai.platform.models import web_api_types as api
from brevettiai.platform.models import Dataset, Tag

_ENV_BREVETTI_AI_USER = "BREVETTI_AI_USER"
_ENV_BREVETTI_AI_PW = "BREVETTI_AI_PW"


class WebApiConfig(BaseModel):
    """
    Configuration object for `PlatformAPI`
    """
    secret: bytes = b''
    _config_file: ClassVar[str] = os.path.join(os.path.expanduser("~"), ".brevetti", "webapi")
    _modified: bool = PrivateAttr(default=False)

    @staticmethod
    def _get_fernet():
        """Retrieve Fernet module"""
        node = uuid.getnode()
        key = base64.urlsafe_b64encode(node.to_bytes(6, 'little') +
                                       b'Q\x19$v>8Lx\xbaQ\x86T\x06$\x91\x04x\x1a\xc7\xa5/\x83~\xe6+m')
        return Fernet(key)

    def set_credentials(self, username: str, password: str):
        """Set credentials for later retrieval"""
        self.secret = self._get_fernet().encrypt(f"{username}:{password}".encode())
        self._modified = True

    def get_credentials(self):
        """Get Username and password for platform login

        Returns:
            username, password
        """
        try:
            return tuple(self._get_fernet().decrypt(self.secret).decode().split(":"))
        except InvalidToken as ex:
            raise ValueError("Invalid secret") from ex

    @property
    def is_modified(self):
        """Is the configuration modified?"""
        return self._modified

    @staticmethod
    def load():
        """Load WebApiConfig from config_file"""
        return WebApiConfig.parse_file(WebApiConfig._config_file)

    def save(self):
        """Save WebApiConfig to config_file"""
        os.makedirs(os.path.dirname(WebApiConfig._config_file), exist_ok=True)
        with open(WebApiConfig._config_file, "w") as fp:
            fp.write(self.json())


class PlatformAPI:
    def __init__(self, username=None, password=None, host=None, remember_me=False):
        self.host = host or backend
        self.session = requests.session()

        username = username or os.getenv(_ENV_BREVETTI_AI_USER)
        password = password or os.getenv(_ENV_BREVETTI_AI_PW)

        try:
            self.config = WebApiConfig.load()
        except IOError:
            self.config = WebApiConfig()
        self.user = self.login(username, password, remember_me=remember_me)
        self._io = IoTools().factory()
        self._io.minio.credentials = PlatformDatasetCredentials(self)

    @property
    def host(self):
        return self._host.host if isinstance(self._host, PlatformBackend) else self._host

    @host.setter
    def host(self, host):
        self._host = host

    @property
    def io(self):
        return self._io

    @property
    def backend(self):
        if isinstance(self._host, PlatformBackend):
            return self._host
        else:
            raise AttributeError("Backend unknown")

    def login(self, username, password, remember_me=False):
        try:
            if username and password:
                self.config.set_credentials(username, password)
            else:
                username, password = self.config.get_credentials()
        except ValueError:
            if username is None:
                username = input(f"{self.host} - username: ")
            if password is None:
                password = prompt_for_password()
            self.config.set_credentials(username, password)

        res = self.session.post(self.host + "/api/account/token", data=dict(userName=username, password=password))
        if not res.ok:
            raise PermissionError(f"Could not log in: {res.reason}")

        data = res.json()
        if remember_me and self.config.is_modified:
            self.config.save()
        return data

    def _http_get(self, url, headers=None, **kwargs):
        if headers is None and url.startswith(self.host):
            headers = self.antiforgery_headers

        r = self.session.get(url, headers=headers, **kwargs)
        if not r.ok:
            if r.status_code == 401:
                raise PermissionError("Not authorized")
            raise requests.HTTPError(r.reason)
        return r

    def _http_post(self, url, headers=None, **kwargs):
        if headers is None and url.startswith(self.host):
            headers = self.antiforgery_headers

        r = self.session.post(url, headers=headers, **kwargs)
        if not r.ok:
            if r.status_code == 401:
                raise PermissionError("Not authorized")
            raise requests.HTTPError(r.reason)
        return r

    def _http_delete(self, url, headers=None, **kwargs):
        if headers is None and url.startswith(self.host):
            headers = self.antiforgery_headers

        r = self.session.delete(url, headers=headers, **kwargs)
        if not r.ok:
            if r.status_code == 401:
                raise PermissionError("Not authorized")
            raise requests.HTTPError(r.reason)
        return r

    @property
    def antiforgery_headers(self):
        """
        Get anti forgery headers from platform

        :return:
        """
        headers = {
            "Content-Type": "application/json",
            "Authorization": "Bearer " + self.user['token']
        }
        r = self.session.get(self.host + "/api/account/antiforgery", headers=headers)
        return {**headers, 'X-XSRF-TOKEN': r.json()['requestToken']}

    def get_dataset(self, id: str = None, write_access=False, **kwargs) -> Union[Dataset, List[Dataset]]:
        """
        Get dataset, or list of all datasets

        Args:
            id: dataset id
            write_access: resolve accessrights to the dataset
            **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

        Returns:
            dataset if id is given, otherwise list of datasets
        """
        url = self.host + "/api/data"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)

        if id is None:
            return [Dataset(**x, backend=self.backend, io=self._io, resolve_access_rights=write_access)
                    for x in r.json() if all(x.get(k) == v for k, v in kwargs.items())]
        else:
            return Dataset(**r.json(), backend=self.backend, io=self._io, resolve_access_rights=write_access)

    def get_tag(self, id=None) -> Union[Tag, List[Tag]]:
        """
        Get tag or list of all tags

        Args:
            id: Tag id

        Returns:
            tag if id is given, otherwise a list of tags
        """
        url = self.host + "/api/resources/tags"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        return parse_raw_as(Union[Tag, List[Tag]], r.content)

    def get_model(self, id=None, **kwargs) -> Union[api.Model, List[api.Model]]:
        """
        Get model or list of all models

        Args:
            id: model id
            **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

        Returns:
            model if id is given, otherwise a list of models
        """
        url = self.host + "/api/models"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        if id is None:
            models = parse_raw_as(List[api.Model], r.content)
            return [x for x in models if all(getattr(x, k, None) == v for k, v in kwargs.items())]
        else:
            return parse_raw_as(api.Model, r.content)

    def get_report(self, id=None, **kwargs) -> Union[api.Report, List[api.Report]]:
        """
        Get test report, or list of all reports

        Args:
            id: report id
            **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

        Returns:
            report if id is given, otherwise a list of reports
        """
        url = self.host + "/api/reports"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        if id is None:
            reports = parse_raw_as(List[api.Report], r.content)
            return [x for x in reports if all(getattr(x, k, None) == v for k, v in kwargs.items())]
        else:
            return parse_raw_as(api.Report, r.content)

    def get_artifacts(self, obj: Union[api.Model, api.Report], prefix: str = '') -> List[api.FileEntry]:
        """
        Get artifacts for model or test report

        Args:
            obj: model/test report object
            prefix: object prefix (folder)

        Returns:
            List of files
        """
        if isinstance(obj, api.Model):
            r = self._http_get(f"{self.host}/api/models/{obj.id}/artifacts?prefix={prefix}")
        elif isinstance(obj, api.Report):
            r = self._http_get(f"{self.host}/api/reports/{obj.id}/artifacts?prefix={prefix}")
        else:
            raise NotImplementedError("Artifacts not available for type")

        return parse_raw_as(List[api.FileEntry], r.content)

    def get_application(self, id=None) -> Union[api.Application, List[api.Application]]:
        """
        Get application by id

        Args:
            id:

        Returns:
            application if id is given, otherwise a list of applications
        """
        projects = self.get_project()
        applications = [a for p in projects for a in p.applications]
        if id is not None:
            applications = [a for a in applications if id in a.related_ids]
            if len(applications) == 1:
                return applications[0]
            else:
                return applications
        return applications

    def get_device(self, id=None):
        url = self.host + "/api/devices"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        return parse_raw_as(List[api.Device], r.content)

    def get_project(self, id=None) -> Union[api.Project, List[api.Project]]:
        url = self.host + "/api/resources/projects"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        return parse_raw_as(Union[api.Project, List[api.Project]], r.content)

    def get_modeltype(self, id=None, master=False) -> Union[api.ModelType, List[api.ModelType]]:
        """
        Get model type

        Args:
            id:
            master: use master mode

        Returns:

        """
        url = f"{self.host}/api/{'master/' if master else 'resources/'}modeltypes/{id if id else ''}"
        r = self._http_get(url)
        return parse_raw_as(Union[api.ModelType, List[api.ModelType]], r.content)

    def get_reporttype(self, id=None, master=False) -> Union[api.ReportType, List[api.ReportType]]:
        """
        Get report type

        Args:
            id:
            master: use master mode

        Returns:

        """
        url = f"{self.host}/api/{'master/' if master else 'resources/'}reporttypes/{id if id else ''}"
        r = self._http_get(url)
        return parse_raw_as(Union[api.ReportType, List[api.ReportType]], r.content)

    def get_available_model_types(self):
        """
        List all available model types
        """
        r = self._http_get(f"{self.host}/api/models/availabletypes")
        return parse_raw_as(List[api.ModelType], r.content)

    def create(self, obj: Union[Dataset, Tag, api.Model, api.Report], **kwargs):
        if isinstance(obj, Dataset):
            payload = obj.dict(include={"name", "reference", "notes", "locked"}, by_alias=True)
            payload["tagIds"] = [tag.id for tag in obj.tags]
            r = self._http_post(f"{self.host}/api/data/", json=payload)
            return self.get_dataset(r.json()["datasetId"], **kwargs)
        elif isinstance(obj, Tag):
            payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True)
            self._http_post(f"{self.host}/api/resources/tags/", json=payload)

            # TODO: return tag id on api
            parent = self.get_tag(obj.parent_id)
            tag = next(filter(lambda x: x.name == obj.name, (parent.children if isinstance(parent, Tag) else parent)))
            return tag
        elif isinstance(obj, api.Model):
            payload = obj.dict(include={"name", "model_type_id", "application_id",
                                        "settings", "dataset_ids", "tag_ids"}, by_alias=True)
            payload["datasetIds"] = payload.pop("datasets")
            r = self._http_post(f"{self.host}/api/models", json=payload)
            return api.Model.parse_raw(r.content)
        elif isinstance(obj, api.Report):
            payload = obj.dict(include={"name", "parent_id", "parent_type", "report_type_id",
                                        "model_ids", "settings", "dataset_ids", "tag_ids"}, by_alias=True)
            payload["modelIds"] = payload.pop("models")
            payload["submitToCloud"] = ("submitToCloud" in kwargs and kwargs["submitToCloud"])

            r = self._http_post(f"{self.host}/api/reports", json=payload)
            report = self.get_report(r.json()["id"])
            return report
        else:
            raise NotImplementedError(f"create not implemented for type {type(obj)}")

    def update(self, obj, master=False):
        if isinstance(obj, Dataset):
            payload = obj.dict(include={"name", "reference", "notes", "locked"})
            payload["tagIds"] = [tag.id for tag in obj.tags]
            self._http_post(f"{self.host}/api/data/{obj.id}", json=payload)
        if isinstance(obj, Tag):
            payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True)
            self._http_post(f"{self.host}/api/resources/tags/{obj.id}", json=payload)
        if isinstance(obj, api.ModelType):
            if not master:
                raise PermissionError("Not authorized")
            self._http_post(f"{self.host}/api/master/modeltypes/update", json=obj.dict(by_alias=True))
        if isinstance(obj, api.ReportType):
            if not master:
                raise PermissionError("Not authorized")
            self._http_post(f"{self.host}/api/master/reporttypes/update", json=obj.dict(by_alias=True))
        else:
            raise NotImplementedError(f"create not implemented for type {type(obj)}")

    def delete(self, obj: Union[Dataset, Tag, api.Model, api.Report, api.SftpUser]):
        if isinstance(obj, Dataset):
            self._http_delete(f"{self.host}/api/data/{obj.id}")
        elif isinstance(obj, Tag):
            self._http_delete(f"{self.host}/api/resources/tags/{obj.id}")
        elif isinstance(obj, api.Model):
            self._http_delete(f"{self.host}/api/models/{obj.id}")
        elif isinstance(obj, api.Report):
            self._http_delete(f"{self.host}/api/reports/{obj.id}")
        elif isinstance(obj, api.SftpUser):
            self._http_delete(f"{self.host}/api/data/{obj.folder}/sftp/{obj.user_name}")
        else:
            raise NotImplementedError(f"delete not implemented for type {type(obj)}")

    def update_dataset_permission(self, id, user_id, group_id=None, permission_type="Editor"):
        """
        Update dataset permissions for user

        Args:
            id:
            user_id:
            group_id:
            permission_type:

        Returns:

        """
        payload = {"groupId": group_id, "userId": user_id, "resourceId": id, "objectType": 0,
                   "permissionType": permission_type}
        r = self._http_post(self.host + "/api/admin/datasets/" + id + "/permissions", json=payload)
        return r

    def get_dataset_sts_assume_role_response(self, guid):
        cred = self._http_get(f"{self.host}/api/data/{guid}/securitycredentials")
        return cred.text

    def get_schema(self, obj: Union[api.ModelType, api.ReportType]):
        """
        Get schema for a certain model type

        Args:
            obj: modeltype or report type

        Returns:

        """
        r = self._http_get(obj.settings_schema_path, headers={})
        return r.json()

    def get_userinfo(self):
        """
        Get info on user
        """
        url = self.host + "/api/manage/index"
        r = self._http_get(url)
        return api.User.parse_raw(r.content)

    def get_sftp_users(self, dataset, **kwargs) -> List[api.SftpUser]:
        r = self._http_get(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs)
        users = parse_raw_as(List[api.SftpUser], r.content)
        for user in users:
            user.folder = user.folder or dataset.id
        return users

    def create_sftp_user(self, dataset, **kwargs) -> api.SftpUser:
        r = self._http_post(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs)
        return api.SftpUser.parse_raw(r.content)

    def create_model(self, name, datasets, settings: JobSettings = None,
                     model_type=None, tags=None, application: api.Application = None):
        """
        Create a model on the platform

        Args:
            name:
            datasets:
            settings:
            model_type:
            tags:
            application:

        Returns:
            Model after its creation on the platform
        """
        tags = tags or []
        settings = settings or {}
        try:
            settings = settings.json()
        except AttributeError:
            settings = json.dumps(settings)

        model_type = model_type or self.backend.custom_model_type
        settings = settings or {}

        application_id = None if application is None else application.id
        model = api.Model(name=name, dataset_ids=[x.id for x in datasets], model_type_id=model_type.id,
                          model_type_status=model_type.status, settings=settings, application_id=application_id,
                          tag_ids=[x.id for x in tags], id="<unknown>", api_key="<unknown>", created="<unknown>")
        return self.create(model)

    def create_testreport(self, name, model, datasets, report_type, settings, tags=None, submitToCloud=False):
        """
        Create a test report on the platform

        Args:
            name:
            model:
            datasets:
            report_type:
            settings:
            tags:
            submitToCloud: start test report in the cloud

        Returns:
            Test report after its creation on the platform
        """
        tags = tags or []
        try:
            settings = settings.json()
        except AttributeError:
            settings = json.dumps(settings)

        report = api.Report(name=name, model_ids=[model.id], dataset_ids=[x.id for x in datasets],
                            id="<unknown>", api_key="<unknown>", created="<unknown>",
                            parent_id=model.id, parent_name=model.name, parent_type="model",
                            settings=settings, tag_ids=[x.id for x in tags], report_type_id=report_type.id)
        return self.create(report, submitToCloud=submitToCloud)

    def initialize_training(self, model: Union[str, api.Model], job_type: Type[Job] = None, submitToCloud=False) -> Union[Job, None]:
        """
        Start training flow of a model

        Args:
            model: model or model id
            job_type:
            submitToCloud: start model in the cloud

        Returns:
            Job if submitToCloud is false, otherwise None
        """
        payload = {"submitToCloud": "true" if submitToCloud else "false"}

        if isinstance(model, str):
            model = self.get_model(model)

        if model.completed:
            print("Model already completed")
            return None

        r = self._http_post(f"{self.host}/api/models/{model.id}/start", params=payload)
        if submitToCloud:
            return None
        job_config = self.download_url(r.json()).json()
        type_selector = job_type or Job

        job = Job.init(type_selector=type_selector, job_config=job_config, backend=backend)
        return job

    def initialize_report(self, report: Union[str, api.Report], job_type: Type[Job] = None) -> Union[Job, None]:
        """
        Start training flow of a model

        Args:
            report: model or model id
            job_type:

        Returns:
            Job
        """

        if isinstance(report, str):
            report = self.get_report(model)

        if report.completed:
            print("Model already completed")
            return None

        type_selector = [job_type] if issubclass(job_type, Job) else job_type

        job = Job.init(job_id=report.id, api_key=report.api_key,
                       type_selector=type_selector, backend=backend)
        return job

    def stop_model_training(self, model):
        """
        Stop training of model

        Args:
            model:

        Returns:

        """
        r = self._http_post(f"{self.host}/api/models/{model.id}/stop")
        return api.Model.parse_raw(r.content)

    def download_url(self, url, dst=None, headers=None):
        if url.startswith("/api/"):
            url = self.host + url

        r = self._http_get(url, stream=True, headers=headers)
        if r.status_code == requests.codes.ok:
            if dst is not None:
                if os.path.isdir(dst):
                    try:
                        d = r.headers['content-disposition']
                        fname = re.findall('filename="(.+)"', d)[0]
                    except Exception:
                        fname = os.path.basename(unquote(urlparse(url).path))
                    assert fname is not ""
                    dst = os.path.join(dst, fname)
                else:
                    if os.path.dirname(dst):
                        os.makedirs(os.path.dirname(dst), exist_ok=True)
                with open(dst, 'wb') as f:
                    for data in r:
                        f.write(data)
            else:
                return r
        return dst


if __name__ == "__main__":
    # Imports and setup
    from brevettiai.platform import BrevettiAI

    from image_segmentation.image_segmentation import ImageSegmentationJobSettings,  ImageSegmentationJob
    web: PlatformAPI = BrevettiAI()
    ds = web.get_dataset()[:2]
    model = web.create_model("test api model", settings=ImageSegmentationJobSettings(), datasets=ds)
    job = web.initialize_training(model, job_type=ImageSegmentationJob)
    job.start()
    web.delete(model)

    ds = web.get_dataset('8098021d-79ed-43b5-956a-adecadfde66b')
    users_ = web.get_sftp_users(ds)
    for user_ in users_:
        web.delete(user_)
    web.get_userinfo()
    schema = web.get_schema(web.get_reporttype()[0])
    mt = web.get_available_model_types()
    web.get_model()
    web.get_report()
    artifacts = web.get_artifacts(web.get_model()[0])
    dev = web.get_device()
    app = web.get_application("a1c111ec-2043-41ef-b2f1-f0912b55bd3b")

    web.get_model('000863da-f567-4a74-b48b-5d63da061555').get_datasets(web)
    mm = web.get_report()

Classes

class PlatformAPI (username=None, password=None, host=None, remember_me=False)
Expand source code
class PlatformAPI:
    def __init__(self, username=None, password=None, host=None, remember_me=False):
        self.host = host or backend
        self.session = requests.session()

        username = username or os.getenv(_ENV_BREVETTI_AI_USER)
        password = password or os.getenv(_ENV_BREVETTI_AI_PW)

        try:
            self.config = WebApiConfig.load()
        except IOError:
            self.config = WebApiConfig()
        self.user = self.login(username, password, remember_me=remember_me)
        self._io = IoTools().factory()
        self._io.minio.credentials = PlatformDatasetCredentials(self)

    @property
    def host(self):
        return self._host.host if isinstance(self._host, PlatformBackend) else self._host

    @host.setter
    def host(self, host):
        self._host = host

    @property
    def io(self):
        return self._io

    @property
    def backend(self):
        if isinstance(self._host, PlatformBackend):
            return self._host
        else:
            raise AttributeError("Backend unknown")

    def login(self, username, password, remember_me=False):
        try:
            if username and password:
                self.config.set_credentials(username, password)
            else:
                username, password = self.config.get_credentials()
        except ValueError:
            if username is None:
                username = input(f"{self.host} - username: ")
            if password is None:
                password = prompt_for_password()
            self.config.set_credentials(username, password)

        res = self.session.post(self.host + "/api/account/token", data=dict(userName=username, password=password))
        if not res.ok:
            raise PermissionError(f"Could not log in: {res.reason}")

        data = res.json()
        if remember_me and self.config.is_modified:
            self.config.save()
        return data

    def _http_get(self, url, headers=None, **kwargs):
        if headers is None and url.startswith(self.host):
            headers = self.antiforgery_headers

        r = self.session.get(url, headers=headers, **kwargs)
        if not r.ok:
            if r.status_code == 401:
                raise PermissionError("Not authorized")
            raise requests.HTTPError(r.reason)
        return r

    def _http_post(self, url, headers=None, **kwargs):
        if headers is None and url.startswith(self.host):
            headers = self.antiforgery_headers

        r = self.session.post(url, headers=headers, **kwargs)
        if not r.ok:
            if r.status_code == 401:
                raise PermissionError("Not authorized")
            raise requests.HTTPError(r.reason)
        return r

    def _http_delete(self, url, headers=None, **kwargs):
        if headers is None and url.startswith(self.host):
            headers = self.antiforgery_headers

        r = self.session.delete(url, headers=headers, **kwargs)
        if not r.ok:
            if r.status_code == 401:
                raise PermissionError("Not authorized")
            raise requests.HTTPError(r.reason)
        return r

    @property
    def antiforgery_headers(self):
        """
        Get anti forgery headers from platform

        :return:
        """
        headers = {
            "Content-Type": "application/json",
            "Authorization": "Bearer " + self.user['token']
        }
        r = self.session.get(self.host + "/api/account/antiforgery", headers=headers)
        return {**headers, 'X-XSRF-TOKEN': r.json()['requestToken']}

    def get_dataset(self, id: str = None, write_access=False, **kwargs) -> Union[Dataset, List[Dataset]]:
        """
        Get dataset, or list of all datasets

        Args:
            id: dataset id
            write_access: resolve accessrights to the dataset
            **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

        Returns:
            dataset if id is given, otherwise list of datasets
        """
        url = self.host + "/api/data"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)

        if id is None:
            return [Dataset(**x, backend=self.backend, io=self._io, resolve_access_rights=write_access)
                    for x in r.json() if all(x.get(k) == v for k, v in kwargs.items())]
        else:
            return Dataset(**r.json(), backend=self.backend, io=self._io, resolve_access_rights=write_access)

    def get_tag(self, id=None) -> Union[Tag, List[Tag]]:
        """
        Get tag or list of all tags

        Args:
            id: Tag id

        Returns:
            tag if id is given, otherwise a list of tags
        """
        url = self.host + "/api/resources/tags"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        return parse_raw_as(Union[Tag, List[Tag]], r.content)

    def get_model(self, id=None, **kwargs) -> Union[api.Model, List[api.Model]]:
        """
        Get model or list of all models

        Args:
            id: model id
            **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

        Returns:
            model if id is given, otherwise a list of models
        """
        url = self.host + "/api/models"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        if id is None:
            models = parse_raw_as(List[api.Model], r.content)
            return [x for x in models if all(getattr(x, k, None) == v for k, v in kwargs.items())]
        else:
            return parse_raw_as(api.Model, r.content)

    def get_report(self, id=None, **kwargs) -> Union[api.Report, List[api.Report]]:
        """
        Get test report, or list of all reports

        Args:
            id: report id
            **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

        Returns:
            report if id is given, otherwise a list of reports
        """
        url = self.host + "/api/reports"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        if id is None:
            reports = parse_raw_as(List[api.Report], r.content)
            return [x for x in reports if all(getattr(x, k, None) == v for k, v in kwargs.items())]
        else:
            return parse_raw_as(api.Report, r.content)

    def get_artifacts(self, obj: Union[api.Model, api.Report], prefix: str = '') -> List[api.FileEntry]:
        """
        Get artifacts for model or test report

        Args:
            obj: model/test report object
            prefix: object prefix (folder)

        Returns:
            List of files
        """
        if isinstance(obj, api.Model):
            r = self._http_get(f"{self.host}/api/models/{obj.id}/artifacts?prefix={prefix}")
        elif isinstance(obj, api.Report):
            r = self._http_get(f"{self.host}/api/reports/{obj.id}/artifacts?prefix={prefix}")
        else:
            raise NotImplementedError("Artifacts not available for type")

        return parse_raw_as(List[api.FileEntry], r.content)

    def get_application(self, id=None) -> Union[api.Application, List[api.Application]]:
        """
        Get application by id

        Args:
            id:

        Returns:
            application if id is given, otherwise a list of applications
        """
        projects = self.get_project()
        applications = [a for p in projects for a in p.applications]
        if id is not None:
            applications = [a for a in applications if id in a.related_ids]
            if len(applications) == 1:
                return applications[0]
            else:
                return applications
        return applications

    def get_device(self, id=None):
        url = self.host + "/api/devices"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        return parse_raw_as(List[api.Device], r.content)

    def get_project(self, id=None) -> Union[api.Project, List[api.Project]]:
        url = self.host + "/api/resources/projects"
        url = url if id is None else url + "/" + id
        r = self._http_get(url)
        return parse_raw_as(Union[api.Project, List[api.Project]], r.content)

    def get_modeltype(self, id=None, master=False) -> Union[api.ModelType, List[api.ModelType]]:
        """
        Get model type

        Args:
            id:
            master: use master mode

        Returns:

        """
        url = f"{self.host}/api/{'master/' if master else 'resources/'}modeltypes/{id if id else ''}"
        r = self._http_get(url)
        return parse_raw_as(Union[api.ModelType, List[api.ModelType]], r.content)

    def get_reporttype(self, id=None, master=False) -> Union[api.ReportType, List[api.ReportType]]:
        """
        Get report type

        Args:
            id:
            master: use master mode

        Returns:

        """
        url = f"{self.host}/api/{'master/' if master else 'resources/'}reporttypes/{id if id else ''}"
        r = self._http_get(url)
        return parse_raw_as(Union[api.ReportType, List[api.ReportType]], r.content)

    def get_available_model_types(self):
        """
        List all available model types
        """
        r = self._http_get(f"{self.host}/api/models/availabletypes")
        return parse_raw_as(List[api.ModelType], r.content)

    def create(self, obj: Union[Dataset, Tag, api.Model, api.Report], **kwargs):
        if isinstance(obj, Dataset):
            payload = obj.dict(include={"name", "reference", "notes", "locked"}, by_alias=True)
            payload["tagIds"] = [tag.id for tag in obj.tags]
            r = self._http_post(f"{self.host}/api/data/", json=payload)
            return self.get_dataset(r.json()["datasetId"], **kwargs)
        elif isinstance(obj, Tag):
            payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True)
            self._http_post(f"{self.host}/api/resources/tags/", json=payload)

            # TODO: return tag id on api
            parent = self.get_tag(obj.parent_id)
            tag = next(filter(lambda x: x.name == obj.name, (parent.children if isinstance(parent, Tag) else parent)))
            return tag
        elif isinstance(obj, api.Model):
            payload = obj.dict(include={"name", "model_type_id", "application_id",
                                        "settings", "dataset_ids", "tag_ids"}, by_alias=True)
            payload["datasetIds"] = payload.pop("datasets")
            r = self._http_post(f"{self.host}/api/models", json=payload)
            return api.Model.parse_raw(r.content)
        elif isinstance(obj, api.Report):
            payload = obj.dict(include={"name", "parent_id", "parent_type", "report_type_id",
                                        "model_ids", "settings", "dataset_ids", "tag_ids"}, by_alias=True)
            payload["modelIds"] = payload.pop("models")
            payload["submitToCloud"] = ("submitToCloud" in kwargs and kwargs["submitToCloud"])

            r = self._http_post(f"{self.host}/api/reports", json=payload)
            report = self.get_report(r.json()["id"])
            return report
        else:
            raise NotImplementedError(f"create not implemented for type {type(obj)}")

    def update(self, obj, master=False):
        if isinstance(obj, Dataset):
            payload = obj.dict(include={"name", "reference", "notes", "locked"})
            payload["tagIds"] = [tag.id for tag in obj.tags]
            self._http_post(f"{self.host}/api/data/{obj.id}", json=payload)
        if isinstance(obj, Tag):
            payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True)
            self._http_post(f"{self.host}/api/resources/tags/{obj.id}", json=payload)
        if isinstance(obj, api.ModelType):
            if not master:
                raise PermissionError("Not authorized")
            self._http_post(f"{self.host}/api/master/modeltypes/update", json=obj.dict(by_alias=True))
        if isinstance(obj, api.ReportType):
            if not master:
                raise PermissionError("Not authorized")
            self._http_post(f"{self.host}/api/master/reporttypes/update", json=obj.dict(by_alias=True))
        else:
            raise NotImplementedError(f"create not implemented for type {type(obj)}")

    def delete(self, obj: Union[Dataset, Tag, api.Model, api.Report, api.SftpUser]):
        if isinstance(obj, Dataset):
            self._http_delete(f"{self.host}/api/data/{obj.id}")
        elif isinstance(obj, Tag):
            self._http_delete(f"{self.host}/api/resources/tags/{obj.id}")
        elif isinstance(obj, api.Model):
            self._http_delete(f"{self.host}/api/models/{obj.id}")
        elif isinstance(obj, api.Report):
            self._http_delete(f"{self.host}/api/reports/{obj.id}")
        elif isinstance(obj, api.SftpUser):
            self._http_delete(f"{self.host}/api/data/{obj.folder}/sftp/{obj.user_name}")
        else:
            raise NotImplementedError(f"delete not implemented for type {type(obj)}")

    def update_dataset_permission(self, id, user_id, group_id=None, permission_type="Editor"):
        """
        Update dataset permissions for user

        Args:
            id:
            user_id:
            group_id:
            permission_type:

        Returns:

        """
        payload = {"groupId": group_id, "userId": user_id, "resourceId": id, "objectType": 0,
                   "permissionType": permission_type}
        r = self._http_post(self.host + "/api/admin/datasets/" + id + "/permissions", json=payload)
        return r

    def get_dataset_sts_assume_role_response(self, guid):
        cred = self._http_get(f"{self.host}/api/data/{guid}/securitycredentials")
        return cred.text

    def get_schema(self, obj: Union[api.ModelType, api.ReportType]):
        """
        Get schema for a certain model type

        Args:
            obj: modeltype or report type

        Returns:

        """
        r = self._http_get(obj.settings_schema_path, headers={})
        return r.json()

    def get_userinfo(self):
        """
        Get info on user
        """
        url = self.host + "/api/manage/index"
        r = self._http_get(url)
        return api.User.parse_raw(r.content)

    def get_sftp_users(self, dataset, **kwargs) -> List[api.SftpUser]:
        r = self._http_get(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs)
        users = parse_raw_as(List[api.SftpUser], r.content)
        for user in users:
            user.folder = user.folder or dataset.id
        return users

    def create_sftp_user(self, dataset, **kwargs) -> api.SftpUser:
        r = self._http_post(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs)
        return api.SftpUser.parse_raw(r.content)

    def create_model(self, name, datasets, settings: JobSettings = None,
                     model_type=None, tags=None, application: api.Application = None):
        """
        Create a model on the platform

        Args:
            name:
            datasets:
            settings:
            model_type:
            tags:
            application:

        Returns:
            Model after its creation on the platform
        """
        tags = tags or []
        settings = settings or {}
        try:
            settings = settings.json()
        except AttributeError:
            settings = json.dumps(settings)

        model_type = model_type or self.backend.custom_model_type
        settings = settings or {}

        application_id = None if application is None else application.id
        model = api.Model(name=name, dataset_ids=[x.id for x in datasets], model_type_id=model_type.id,
                          model_type_status=model_type.status, settings=settings, application_id=application_id,
                          tag_ids=[x.id for x in tags], id="<unknown>", api_key="<unknown>", created="<unknown>")
        return self.create(model)

    def create_testreport(self, name, model, datasets, report_type, settings, tags=None, submitToCloud=False):
        """
        Create a test report on the platform

        Args:
            name:
            model:
            datasets:
            report_type:
            settings:
            tags:
            submitToCloud: start test report in the cloud

        Returns:
            Test report after its creation on the platform
        """
        tags = tags or []
        try:
            settings = settings.json()
        except AttributeError:
            settings = json.dumps(settings)

        report = api.Report(name=name, model_ids=[model.id], dataset_ids=[x.id for x in datasets],
                            id="<unknown>", api_key="<unknown>", created="<unknown>",
                            parent_id=model.id, parent_name=model.name, parent_type="model",
                            settings=settings, tag_ids=[x.id for x in tags], report_type_id=report_type.id)
        return self.create(report, submitToCloud=submitToCloud)

    def initialize_training(self, model: Union[str, api.Model], job_type: Type[Job] = None, submitToCloud=False) -> Union[Job, None]:
        """
        Start training flow of a model

        Args:
            model: model or model id
            job_type:
            submitToCloud: start model in the cloud

        Returns:
            Job if submitToCloud is false, otherwise None
        """
        payload = {"submitToCloud": "true" if submitToCloud else "false"}

        if isinstance(model, str):
            model = self.get_model(model)

        if model.completed:
            print("Model already completed")
            return None

        r = self._http_post(f"{self.host}/api/models/{model.id}/start", params=payload)
        if submitToCloud:
            return None
        job_config = self.download_url(r.json()).json()
        type_selector = job_type or Job

        job = Job.init(type_selector=type_selector, job_config=job_config, backend=backend)
        return job

    def initialize_report(self, report: Union[str, api.Report], job_type: Type[Job] = None) -> Union[Job, None]:
        """
        Start training flow of a model

        Args:
            report: model or model id
            job_type:

        Returns:
            Job
        """

        if isinstance(report, str):
            report = self.get_report(model)

        if report.completed:
            print("Model already completed")
            return None

        type_selector = [job_type] if issubclass(job_type, Job) else job_type

        job = Job.init(job_id=report.id, api_key=report.api_key,
                       type_selector=type_selector, backend=backend)
        return job

    def stop_model_training(self, model):
        """
        Stop training of model

        Args:
            model:

        Returns:

        """
        r = self._http_post(f"{self.host}/api/models/{model.id}/stop")
        return api.Model.parse_raw(r.content)

    def download_url(self, url, dst=None, headers=None):
        if url.startswith("/api/"):
            url = self.host + url

        r = self._http_get(url, stream=True, headers=headers)
        if r.status_code == requests.codes.ok:
            if dst is not None:
                if os.path.isdir(dst):
                    try:
                        d = r.headers['content-disposition']
                        fname = re.findall('filename="(.+)"', d)[0]
                    except Exception:
                        fname = os.path.basename(unquote(urlparse(url).path))
                    assert fname is not ""
                    dst = os.path.join(dst, fname)
                else:
                    if os.path.dirname(dst):
                        os.makedirs(os.path.dirname(dst), exist_ok=True)
                with open(dst, 'wb') as f:
                    for data in r:
                        f.write(data)
            else:
                return r
        return dst

Instance variables

var antiforgery_headers

Get anti forgery headers from platform

:return:

Expand source code
@property
def antiforgery_headers(self):
    """
    Get anti forgery headers from platform

    :return:
    """
    headers = {
        "Content-Type": "application/json",
        "Authorization": "Bearer " + self.user['token']
    }
    r = self.session.get(self.host + "/api/account/antiforgery", headers=headers)
    return {**headers, 'X-XSRF-TOKEN': r.json()['requestToken']}
var backend
Expand source code
@property
def backend(self):
    if isinstance(self._host, PlatformBackend):
        return self._host
    else:
        raise AttributeError("Backend unknown")
var host
Expand source code
@property
def host(self):
    return self._host.host if isinstance(self._host, PlatformBackend) else self._host
var io
Expand source code
@property
def io(self):
    return self._io

Methods

def create(self, obj: Union[DatasetTagModelReport], **kwargs)
Expand source code
def create(self, obj: Union[Dataset, Tag, api.Model, api.Report], **kwargs):
    if isinstance(obj, Dataset):
        payload = obj.dict(include={"name", "reference", "notes", "locked"}, by_alias=True)
        payload["tagIds"] = [tag.id for tag in obj.tags]
        r = self._http_post(f"{self.host}/api/data/", json=payload)
        return self.get_dataset(r.json()["datasetId"], **kwargs)
    elif isinstance(obj, Tag):
        payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True)
        self._http_post(f"{self.host}/api/resources/tags/", json=payload)

        # TODO: return tag id on api
        parent = self.get_tag(obj.parent_id)
        tag = next(filter(lambda x: x.name == obj.name, (parent.children if isinstance(parent, Tag) else parent)))
        return tag
    elif isinstance(obj, api.Model):
        payload = obj.dict(include={"name", "model_type_id", "application_id",
                                    "settings", "dataset_ids", "tag_ids"}, by_alias=True)
        payload["datasetIds"] = payload.pop("datasets")
        r = self._http_post(f"{self.host}/api/models", json=payload)
        return api.Model.parse_raw(r.content)
    elif isinstance(obj, api.Report):
        payload = obj.dict(include={"name", "parent_id", "parent_type", "report_type_id",
                                    "model_ids", "settings", "dataset_ids", "tag_ids"}, by_alias=True)
        payload["modelIds"] = payload.pop("models")
        payload["submitToCloud"] = ("submitToCloud" in kwargs and kwargs["submitToCloud"])

        r = self._http_post(f"{self.host}/api/reports", json=payload)
        report = self.get_report(r.json()["id"])
        return report
    else:
        raise NotImplementedError(f"create not implemented for type {type(obj)}")
def create_model(self, name, datasets, settings: JobSettings = None, model_type=None, tags=None, application: Application = None)

Create a model on the platform

Args

name: datasets: settings: model_type: tags: application:

Returns

Model after its creation on the platform

Expand source code
def create_model(self, name, datasets, settings: JobSettings = None,
                 model_type=None, tags=None, application: api.Application = None):
    """
    Create a model on the platform

    Args:
        name:
        datasets:
        settings:
        model_type:
        tags:
        application:

    Returns:
        Model after its creation on the platform
    """
    tags = tags or []
    settings = settings or {}
    try:
        settings = settings.json()
    except AttributeError:
        settings = json.dumps(settings)

    model_type = model_type or self.backend.custom_model_type
    settings = settings or {}

    application_id = None if application is None else application.id
    model = api.Model(name=name, dataset_ids=[x.id for x in datasets], model_type_id=model_type.id,
                      model_type_status=model_type.status, settings=settings, application_id=application_id,
                      tag_ids=[x.id for x in tags], id="<unknown>", api_key="<unknown>", created="<unknown>")
    return self.create(model)
def create_sftp_user(self, dataset, **kwargs) ‑> SftpUser
Expand source code
def create_sftp_user(self, dataset, **kwargs) -> api.SftpUser:
    r = self._http_post(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs)
    return api.SftpUser.parse_raw(r.content)
def create_testreport(self, name, model, datasets, report_type, settings, tags=None, submitToCloud=False)

Create a test report on the platform

Args

name:
model:
datasets:
report_type:
settings:
tags:
submitToCloud
start test report in the cloud

Returns

Test report after its creation on the platform

Expand source code
def create_testreport(self, name, model, datasets, report_type, settings, tags=None, submitToCloud=False):
    """
    Create a test report on the platform

    Args:
        name:
        model:
        datasets:
        report_type:
        settings:
        tags:
        submitToCloud: start test report in the cloud

    Returns:
        Test report after its creation on the platform
    """
    tags = tags or []
    try:
        settings = settings.json()
    except AttributeError:
        settings = json.dumps(settings)

    report = api.Report(name=name, model_ids=[model.id], dataset_ids=[x.id for x in datasets],
                        id="<unknown>", api_key="<unknown>", created="<unknown>",
                        parent_id=model.id, parent_name=model.name, parent_type="model",
                        settings=settings, tag_ids=[x.id for x in tags], report_type_id=report_type.id)
    return self.create(report, submitToCloud=submitToCloud)
def delete(self, obj: Union[DatasetTagModelReportSftpUser])
Expand source code
def delete(self, obj: Union[Dataset, Tag, api.Model, api.Report, api.SftpUser]):
    if isinstance(obj, Dataset):
        self._http_delete(f"{self.host}/api/data/{obj.id}")
    elif isinstance(obj, Tag):
        self._http_delete(f"{self.host}/api/resources/tags/{obj.id}")
    elif isinstance(obj, api.Model):
        self._http_delete(f"{self.host}/api/models/{obj.id}")
    elif isinstance(obj, api.Report):
        self._http_delete(f"{self.host}/api/reports/{obj.id}")
    elif isinstance(obj, api.SftpUser):
        self._http_delete(f"{self.host}/api/data/{obj.folder}/sftp/{obj.user_name}")
    else:
        raise NotImplementedError(f"delete not implemented for type {type(obj)}")
def download_url(self, url, dst=None, headers=None)
Expand source code
def download_url(self, url, dst=None, headers=None):
    if url.startswith("/api/"):
        url = self.host + url

    r = self._http_get(url, stream=True, headers=headers)
    if r.status_code == requests.codes.ok:
        if dst is not None:
            if os.path.isdir(dst):
                try:
                    d = r.headers['content-disposition']
                    fname = re.findall('filename="(.+)"', d)[0]
                except Exception:
                    fname = os.path.basename(unquote(urlparse(url).path))
                assert fname is not ""
                dst = os.path.join(dst, fname)
            else:
                if os.path.dirname(dst):
                    os.makedirs(os.path.dirname(dst), exist_ok=True)
            with open(dst, 'wb') as f:
                for data in r:
                    f.write(data)
        else:
            return r
    return dst
def get_application(self, id=None) ‑> Union[Application, List[Application]]

Get application by id

Args

id:

Returns

application if id is given, otherwise a list of applications

Expand source code
def get_application(self, id=None) -> Union[api.Application, List[api.Application]]:
    """
    Get application by id

    Args:
        id:

    Returns:
        application if id is given, otherwise a list of applications
    """
    projects = self.get_project()
    applications = [a for p in projects for a in p.applications]
    if id is not None:
        applications = [a for a in applications if id in a.related_ids]
        if len(applications) == 1:
            return applications[0]
        else:
            return applications
    return applications
def get_artifacts(self, obj: Union[ModelReport], prefix: str = '') ‑> List[FileEntry]

Get artifacts for model or test report

Args

obj
model/test report object
prefix
object prefix (folder)

Returns

List of files

Expand source code
def get_artifacts(self, obj: Union[api.Model, api.Report], prefix: str = '') -> List[api.FileEntry]:
    """
    Get artifacts for model or test report

    Args:
        obj: model/test report object
        prefix: object prefix (folder)

    Returns:
        List of files
    """
    if isinstance(obj, api.Model):
        r = self._http_get(f"{self.host}/api/models/{obj.id}/artifacts?prefix={prefix}")
    elif isinstance(obj, api.Report):
        r = self._http_get(f"{self.host}/api/reports/{obj.id}/artifacts?prefix={prefix}")
    else:
        raise NotImplementedError("Artifacts not available for type")

    return parse_raw_as(List[api.FileEntry], r.content)
def get_available_model_types(self)

List all available model types

Expand source code
def get_available_model_types(self):
    """
    List all available model types
    """
    r = self._http_get(f"{self.host}/api/models/availabletypes")
    return parse_raw_as(List[api.ModelType], r.content)
def get_dataset(self, id: str = None, write_access=False, **kwargs) ‑> Union[Dataset, List[Dataset]]

Get dataset, or list of all datasets

Args

id
dataset id
write_access
resolve accessrights to the dataset
**kwargs
Extended search criteria: use ('name', 'reference' 'locked', …)

Returns

dataset if id is given, otherwise list of datasets

Expand source code
def get_dataset(self, id: str = None, write_access=False, **kwargs) -> Union[Dataset, List[Dataset]]:
    """
    Get dataset, or list of all datasets

    Args:
        id: dataset id
        write_access: resolve accessrights to the dataset
        **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

    Returns:
        dataset if id is given, otherwise list of datasets
    """
    url = self.host + "/api/data"
    url = url if id is None else url + "/" + id
    r = self._http_get(url)

    if id is None:
        return [Dataset(**x, backend=self.backend, io=self._io, resolve_access_rights=write_access)
                for x in r.json() if all(x.get(k) == v for k, v in kwargs.items())]
    else:
        return Dataset(**r.json(), backend=self.backend, io=self._io, resolve_access_rights=write_access)
def get_dataset_sts_assume_role_response(self, guid)
Expand source code
def get_dataset_sts_assume_role_response(self, guid):
    cred = self._http_get(f"{self.host}/api/data/{guid}/securitycredentials")
    return cred.text
def get_device(self, id=None)
Expand source code
def get_device(self, id=None):
    url = self.host + "/api/devices"
    url = url if id is None else url + "/" + id
    r = self._http_get(url)
    return parse_raw_as(List[api.Device], r.content)
def get_model(self, id=None, **kwargs) ‑> Union[Model, List[Model]]

Get model or list of all models

Args

id
model id
**kwargs
Extended search criteria: use ('name', 'reference' 'locked', …)

Returns

model if id is given, otherwise a list of models

Expand source code
def get_model(self, id=None, **kwargs) -> Union[api.Model, List[api.Model]]:
    """
    Get model or list of all models

    Args:
        id: model id
        **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

    Returns:
        model if id is given, otherwise a list of models
    """
    url = self.host + "/api/models"
    url = url if id is None else url + "/" + id
    r = self._http_get(url)
    if id is None:
        models = parse_raw_as(List[api.Model], r.content)
        return [x for x in models if all(getattr(x, k, None) == v for k, v in kwargs.items())]
    else:
        return parse_raw_as(api.Model, r.content)
def get_modeltype(self, id=None, master=False) ‑> Union[ModelType, List[ModelType]]

Get model type

Args

id:
master
use master mode

Returns:

Expand source code
def get_modeltype(self, id=None, master=False) -> Union[api.ModelType, List[api.ModelType]]:
    """
    Get model type

    Args:
        id:
        master: use master mode

    Returns:

    """
    url = f"{self.host}/api/{'master/' if master else 'resources/'}modeltypes/{id if id else ''}"
    r = self._http_get(url)
    return parse_raw_as(Union[api.ModelType, List[api.ModelType]], r.content)
def get_project(self, id=None) ‑> Union[Project, List[Project]]
Expand source code
def get_project(self, id=None) -> Union[api.Project, List[api.Project]]:
    url = self.host + "/api/resources/projects"
    url = url if id is None else url + "/" + id
    r = self._http_get(url)
    return parse_raw_as(Union[api.Project, List[api.Project]], r.content)
def get_report(self, id=None, **kwargs) ‑> Union[Report, List[Report]]

Get test report, or list of all reports

Args

id
report id
**kwargs
Extended search criteria: use ('name', 'reference' 'locked', …)

Returns

report if id is given, otherwise a list of reports

Expand source code
def get_report(self, id=None, **kwargs) -> Union[api.Report, List[api.Report]]:
    """
    Get test report, or list of all reports

    Args:
        id: report id
        **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)

    Returns:
        report if id is given, otherwise a list of reports
    """
    url = self.host + "/api/reports"
    url = url if id is None else url + "/" + id
    r = self._http_get(url)
    if id is None:
        reports = parse_raw_as(List[api.Report], r.content)
        return [x for x in reports if all(getattr(x, k, None) == v for k, v in kwargs.items())]
    else:
        return parse_raw_as(api.Report, r.content)
def get_reporttype(self, id=None, master=False) ‑> Union[ReportType, List[ReportType]]

Get report type

Args

id:
master
use master mode

Returns:

Expand source code
def get_reporttype(self, id=None, master=False) -> Union[api.ReportType, List[api.ReportType]]:
    """
    Get report type

    Args:
        id:
        master: use master mode

    Returns:

    """
    url = f"{self.host}/api/{'master/' if master else 'resources/'}reporttypes/{id if id else ''}"
    r = self._http_get(url)
    return parse_raw_as(Union[api.ReportType, List[api.ReportType]], r.content)
def get_schema(self, obj: Union[ModelTypeReportType])

Get schema for a certain model type

Args

obj
modeltype or report type

Returns:

Expand source code
def get_schema(self, obj: Union[api.ModelType, api.ReportType]):
    """
    Get schema for a certain model type

    Args:
        obj: modeltype or report type

    Returns:

    """
    r = self._http_get(obj.settings_schema_path, headers={})
    return r.json()
def get_sftp_users(self, dataset, **kwargs) ‑> List[SftpUser]
Expand source code
def get_sftp_users(self, dataset, **kwargs) -> List[api.SftpUser]:
    r = self._http_get(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs)
    users = parse_raw_as(List[api.SftpUser], r.content)
    for user in users:
        user.folder = user.folder or dataset.id
    return users
def get_tag(self, id=None) ‑> Union[Tag, List[Tag]]

Get tag or list of all tags

Args

id
Tag id

Returns

tag if id is given, otherwise a list of tags

Expand source code
def get_tag(self, id=None) -> Union[Tag, List[Tag]]:
    """
    Get tag or list of all tags

    Args:
        id: Tag id

    Returns:
        tag if id is given, otherwise a list of tags
    """
    url = self.host + "/api/resources/tags"
    url = url if id is None else url + "/" + id
    r = self._http_get(url)
    return parse_raw_as(Union[Tag, List[Tag]], r.content)
def get_userinfo(self)

Get info on user

Expand source code
def get_userinfo(self):
    """
    Get info on user
    """
    url = self.host + "/api/manage/index"
    r = self._http_get(url)
    return api.User.parse_raw(r.content)
def initialize_report(self, report: Union[str, Report], job_type: Type[Job] = None) ‑> Optional[Job]

Start training flow of a model

Args

report
model or model id

job_type:

Returns

Job

Expand source code
def initialize_report(self, report: Union[str, api.Report], job_type: Type[Job] = None) -> Union[Job, None]:
    """
    Start training flow of a model

    Args:
        report: model or model id
        job_type:

    Returns:
        Job
    """

    if isinstance(report, str):
        report = self.get_report(model)

    if report.completed:
        print("Model already completed")
        return None

    type_selector = [job_type] if issubclass(job_type, Job) else job_type

    job = Job.init(job_id=report.id, api_key=report.api_key,
                   type_selector=type_selector, backend=backend)
    return job
def initialize_training(self, model: Union[str, Model], job_type: Type[Job] = None, submitToCloud=False) ‑> Optional[Job]

Start training flow of a model

Args

model
model or model id
job_type:
submitToCloud
start model in the cloud

Returns

Job if submitToCloud is false, otherwise None

Expand source code
def initialize_training(self, model: Union[str, api.Model], job_type: Type[Job] = None, submitToCloud=False) -> Union[Job, None]:
    """
    Start training flow of a model

    Args:
        model: model or model id
        job_type:
        submitToCloud: start model in the cloud

    Returns:
        Job if submitToCloud is false, otherwise None
    """
    payload = {"submitToCloud": "true" if submitToCloud else "false"}

    if isinstance(model, str):
        model = self.get_model(model)

    if model.completed:
        print("Model already completed")
        return None

    r = self._http_post(f"{self.host}/api/models/{model.id}/start", params=payload)
    if submitToCloud:
        return None
    job_config = self.download_url(r.json()).json()
    type_selector = job_type or Job

    job = Job.init(type_selector=type_selector, job_config=job_config, backend=backend)
    return job
def login(self, username, password, remember_me=False)
Expand source code
def login(self, username, password, remember_me=False):
    try:
        if username and password:
            self.config.set_credentials(username, password)
        else:
            username, password = self.config.get_credentials()
    except ValueError:
        if username is None:
            username = input(f"{self.host} - username: ")
        if password is None:
            password = prompt_for_password()
        self.config.set_credentials(username, password)

    res = self.session.post(self.host + "/api/account/token", data=dict(userName=username, password=password))
    if not res.ok:
        raise PermissionError(f"Could not log in: {res.reason}")

    data = res.json()
    if remember_me and self.config.is_modified:
        self.config.save()
    return data
def stop_model_training(self, model)

Stop training of model

Args

model: Returns:

Expand source code
def stop_model_training(self, model):
    """
    Stop training of model

    Args:
        model:

    Returns:

    """
    r = self._http_post(f"{self.host}/api/models/{model.id}/stop")
    return api.Model.parse_raw(r.content)
def update(self, obj, master=False)
Expand source code
def update(self, obj, master=False):
    if isinstance(obj, Dataset):
        payload = obj.dict(include={"name", "reference", "notes", "locked"})
        payload["tagIds"] = [tag.id for tag in obj.tags]
        self._http_post(f"{self.host}/api/data/{obj.id}", json=payload)
    if isinstance(obj, Tag):
        payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True)
        self._http_post(f"{self.host}/api/resources/tags/{obj.id}", json=payload)
    if isinstance(obj, api.ModelType):
        if not master:
            raise PermissionError("Not authorized")
        self._http_post(f"{self.host}/api/master/modeltypes/update", json=obj.dict(by_alias=True))
    if isinstance(obj, api.ReportType):
        if not master:
            raise PermissionError("Not authorized")
        self._http_post(f"{self.host}/api/master/reporttypes/update", json=obj.dict(by_alias=True))
    else:
        raise NotImplementedError(f"create not implemented for type {type(obj)}")
def update_dataset_permission(self, id, user_id, group_id=None, permission_type='Editor')

Update dataset permissions for user

Args

id: user_id: group_id: permission_type: Returns:

Expand source code
def update_dataset_permission(self, id, user_id, group_id=None, permission_type="Editor"):
    """
    Update dataset permissions for user

    Args:
        id:
        user_id:
        group_id:
        permission_type:

    Returns:

    """
    payload = {"groupId": group_id, "userId": user_id, "resourceId": id, "objectType": 0,
               "permissionType": permission_type}
    r = self._http_post(self.host + "/api/admin/datasets/" + id + "/permissions", json=payload)
    return r
class WebApiConfig (**data: Any)

Configuration object for PlatformAPI

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class WebApiConfig(BaseModel):
    """
    Configuration object for `PlatformAPI`
    """
    secret: bytes = b''
    _config_file: ClassVar[str] = os.path.join(os.path.expanduser("~"), ".brevetti", "webapi")
    _modified: bool = PrivateAttr(default=False)

    @staticmethod
    def _get_fernet():
        """Retrieve Fernet module"""
        node = uuid.getnode()
        key = base64.urlsafe_b64encode(node.to_bytes(6, 'little') +
                                       b'Q\x19$v>8Lx\xbaQ\x86T\x06$\x91\x04x\x1a\xc7\xa5/\x83~\xe6+m')
        return Fernet(key)

    def set_credentials(self, username: str, password: str):
        """Set credentials for later retrieval"""
        self.secret = self._get_fernet().encrypt(f"{username}:{password}".encode())
        self._modified = True

    def get_credentials(self):
        """Get Username and password for platform login

        Returns:
            username, password
        """
        try:
            return tuple(self._get_fernet().decrypt(self.secret).decode().split(":"))
        except InvalidToken as ex:
            raise ValueError("Invalid secret") from ex

    @property
    def is_modified(self):
        """Is the configuration modified?"""
        return self._modified

    @staticmethod
    def load():
        """Load WebApiConfig from config_file"""
        return WebApiConfig.parse_file(WebApiConfig._config_file)

    def save(self):
        """Save WebApiConfig to config_file"""
        os.makedirs(os.path.dirname(WebApiConfig._config_file), exist_ok=True)
        with open(WebApiConfig._config_file, "w") as fp:
            fp.write(self.json())

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var secret : bytes

Static methods

def load()

Load WebApiConfig from config_file

Expand source code
@staticmethod
def load():
    """Load WebApiConfig from config_file"""
    return WebApiConfig.parse_file(WebApiConfig._config_file)

Instance variables

var is_modified

Is the configuration modified?

Expand source code
@property
def is_modified(self):
    """Is the configuration modified?"""
    return self._modified

Methods

def get_credentials(self)

Get Username and password for platform login

Returns

username, password

Expand source code
def get_credentials(self):
    """Get Username and password for platform login

    Returns:
        username, password
    """
    try:
        return tuple(self._get_fernet().decrypt(self.secret).decode().split(":"))
    except InvalidToken as ex:
        raise ValueError("Invalid secret") from ex
def save(self)

Save WebApiConfig to config_file

Expand source code
def save(self):
    """Save WebApiConfig to config_file"""
    os.makedirs(os.path.dirname(WebApiConfig._config_file), exist_ok=True)
    with open(WebApiConfig._config_file, "w") as fp:
        fp.write(self.json())
def set_credentials(self, username: str, password: str)

Set credentials for later retrieval

Expand source code
def set_credentials(self, username: str, password: str):
    """Set credentials for later retrieval"""
    self.secret = self._get_fernet().encrypt(f"{username}:{password}".encode())
    self._modified = True