Module brevettiai.platform.web_api
Expand source code
import base64
import json
import os
import re
import requests
import uuid
from urllib.parse import urlparse, unquote
from pydantic import BaseModel, PrivateAttr, parse_raw_as
from cryptography.fernet import Fernet, InvalidToken
from typing import ClassVar, List, Union, Type
from brevettiai.platform.models import Job, JobSettings
from brevettiai.io.utils import IoTools, prompt_for_password
from brevettiai.platform.models import backend, PlatformBackend
from brevettiai.platform.platform_credentials import PlatformDatasetCredentials
from brevettiai.platform.models import web_api_types as api
from brevettiai.platform.models import Dataset, Tag
_ENV_BREVETTI_AI_USER = "BREVETTI_AI_USER"
_ENV_BREVETTI_AI_PW = "BREVETTI_AI_PW"
class WebApiConfig(BaseModel):
"""
Configuration object for `PlatformAPI`
"""
secret: bytes = b''
_config_file: ClassVar[str] = os.path.join(os.path.expanduser("~"), ".brevetti", "webapi")
_modified: bool = PrivateAttr(default=False)
@staticmethod
def _get_fernet():
"""Retrieve Fernet module"""
node = uuid.getnode()
key = base64.urlsafe_b64encode(node.to_bytes(6, 'little') +
b'Q\x19$v>8Lx\xbaQ\x86T\x06$\x91\x04x\x1a\xc7\xa5/\x83~\xe6+m')
return Fernet(key)
def set_credentials(self, username: str, password: str):
"""Set credentials for later retrieval"""
self.secret = self._get_fernet().encrypt(f"{username}:{password}".encode())
self._modified = True
def get_credentials(self):
"""Get Username and password for platform login
Returns:
username, password
"""
try:
return tuple(self._get_fernet().decrypt(self.secret).decode().split(":"))
except InvalidToken as ex:
raise ValueError("Invalid secret") from ex
@property
def is_modified(self):
"""Is the configuration modified?"""
return self._modified
@staticmethod
def load():
"""Load WebApiConfig from config_file"""
return WebApiConfig.parse_file(WebApiConfig._config_file)
def save(self):
"""Save WebApiConfig to config_file"""
os.makedirs(os.path.dirname(WebApiConfig._config_file), exist_ok=True)
with open(WebApiConfig._config_file, "w") as fp:
fp.write(self.json())
class PlatformAPI:
def __init__(self, username=None, password=None, host=None, remember_me=False):
self.host = host or backend
self.session = requests.session()
username = username or os.getenv(_ENV_BREVETTI_AI_USER)
password = password or os.getenv(_ENV_BREVETTI_AI_PW)
try:
self.config = WebApiConfig.load()
except IOError:
self.config = WebApiConfig()
self.user = self.login(username, password, remember_me=remember_me)
self._io = IoTools().factory()
self._io.minio.credentials = PlatformDatasetCredentials(self)
@property
def host(self):
return self._host.host if isinstance(self._host, PlatformBackend) else self._host
@host.setter
def host(self, host):
self._host = host
@property
def io(self):
return self._io
@property
def backend(self):
if isinstance(self._host, PlatformBackend):
return self._host
else:
raise AttributeError("Backend unknown")
def login(self, username, password, remember_me=False):
try:
if username and password:
self.config.set_credentials(username, password)
else:
username, password = self.config.get_credentials()
except ValueError:
if username is None:
username = input(f"{self.host} - username: ")
if password is None:
password = prompt_for_password()
self.config.set_credentials(username, password)
res = self.session.post(self.host + "/api/account/token", data=dict(userName=username, password=password))
if not res.ok:
raise PermissionError(f"Could not log in: {res.reason}")
data = res.json()
if remember_me and self.config.is_modified:
self.config.save()
return data
def _http_get(self, url, headers=None, **kwargs):
if headers is None and url.startswith(self.host):
headers = self.antiforgery_headers
r = self.session.get(url, headers=headers, **kwargs)
if not r.ok:
if r.status_code == 401:
raise PermissionError("Not authorized")
raise requests.HTTPError(r.reason)
return r
def _http_post(self, url, headers=None, **kwargs):
if headers is None and url.startswith(self.host):
headers = self.antiforgery_headers
r = self.session.post(url, headers=headers, **kwargs)
if not r.ok:
if r.status_code == 401:
raise PermissionError("Not authorized")
raise requests.HTTPError(r.reason)
return r
def _http_delete(self, url, headers=None, **kwargs):
if headers is None and url.startswith(self.host):
headers = self.antiforgery_headers
r = self.session.delete(url, headers=headers, **kwargs)
if not r.ok:
if r.status_code == 401:
raise PermissionError("Not authorized")
raise requests.HTTPError(r.reason)
return r
@property
def antiforgery_headers(self):
"""
Get anti forgery headers from platform
:return:
"""
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + self.user['token']
}
r = self.session.get(self.host + "/api/account/antiforgery", headers=headers)
return {**headers, 'X-XSRF-TOKEN': r.json()['requestToken']}
def get_dataset(self, id: str = None, write_access=False, **kwargs) -> Union[Dataset, List[Dataset]]:
"""
Get dataset, or list of all datasets
Args:
id: dataset id
write_access: resolve accessrights to the dataset
**kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)
Returns:
dataset if id is given, otherwise list of datasets
"""
url = self.host + "/api/data"
url = url if id is None else url + "/" + id
r = self._http_get(url)
if id is None:
return [Dataset(**x, backend=self.backend, io=self._io, resolve_access_rights=write_access)
for x in r.json() if all(x.get(k) == v for k, v in kwargs.items())]
else:
return Dataset(**r.json(), backend=self.backend, io=self._io, resolve_access_rights=write_access)
def get_tag(self, id=None) -> Union[Tag, List[Tag]]:
"""
Get tag or list of all tags
Args:
id: Tag id
Returns:
tag if id is given, otherwise a list of tags
"""
url = self.host + "/api/resources/tags"
url = url if id is None else url + "/" + id
r = self._http_get(url)
return parse_raw_as(Union[Tag, List[Tag]], r.content)
def get_model(self, id=None, **kwargs) -> Union[api.Model, List[api.Model]]:
"""
Get model or list of all models
Args:
id: model id
**kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)
Returns:
model if id is given, otherwise a list of models
"""
url = self.host + "/api/models"
url = url if id is None else url + "/" + id
r = self._http_get(url)
if id is None:
models = parse_raw_as(List[api.Model], r.content)
return [x for x in models if all(getattr(x, k, None) == v for k, v in kwargs.items())]
else:
return parse_raw_as(api.Model, r.content)
def get_report(self, id=None, **kwargs) -> Union[api.Report, List[api.Report]]:
"""
Get test report, or list of all reports
Args:
id: report id
**kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...)
Returns:
report if id is given, otherwise a list of reports
"""
url = self.host + "/api/reports"
url = url if id is None else url + "/" + id
r = self._http_get(url)
if id is None:
reports = parse_raw_as(List[api.Report], r.content)
return [x for x in reports if all(getattr(x, k, None) == v for k, v in kwargs.items())]
else:
return parse_raw_as(api.Report, r.content)
def get_artifacts(self, obj: Union[api.Model, api.Report], prefix: str = '') -> List[api.FileEntry]:
"""
Get artifacts for model or test report
Args:
obj: model/test report object
prefix: object prefix (folder)
Returns:
List of files
"""
if isinstance(obj, api.Model):
r = self._http_get(f"{self.host}/api/models/{obj.id}/artifacts?prefix={prefix}")
elif isinstance(obj, api.Report):
r = self._http_get(f"{self.host}/api/reports/{obj.id}/artifacts?prefix={prefix}")
else:
raise NotImplementedError("Artifacts not available for type")
return parse_raw_as(List[api.FileEntry], r.content)
def get_application(self, id=None) -> Union[api.Application, List[api.Application]]:
"""
Get application by id
Args:
id:
Returns:
application if id is given, otherwise a list of applications
"""
projects = self.get_project()
applications = [a for p in projects for a in p.applications]
if id is not None:
applications = [a for a in applications if id in a.related_ids]
if len(applications) == 1:
return applications[0]
else:
return applications
return applications
def get_device(self, id=None):
url = self.host + "/api/devices"
url = url if id is None else url + "/" + id
r = self._http_get(url)
return parse_raw_as(List[api.Device], r.content)
def get_project(self, id=None) -> Union[api.Project, List[api.Project]]:
url = self.host + "/api/resources/projects"
url = url if id is None else url + "/" + id
r = self._http_get(url)
return parse_raw_as(Union[api.Project, List[api.Project]], r.content)
def get_modeltype(self, id=None, master=False) -> Union[api.ModelType, List[api.ModelType]]:
"""
Get model type
Args:
id:
master: use master mode
Returns:
"""
url = f"{self.host}/api/{'master/' if master else 'resources/'}modeltypes/{id if id else ''}"
r = self._http_get(url)
return parse_raw_as(Union[api.ModelType, List[api.ModelType]], r.content)
def get_reporttype(self, id=None, master=False) -> Union[api.ReportType, List[api.ReportType]]:
"""
Get report type
Args:
id:
master: use master mode
Returns:
"""
url = f"{self.host}/api/{'master/' if master else 'resources/'}reporttypes/{id if id else ''}"
r = self._http_get(url)
return parse_raw_as(Union[api.ReportType, List[api.ReportType]], r.content)
def get_available_model_types(self):
"""
List all available model types
"""
r = self._http_get(f"{self.host}/api/models/availabletypes")
return parse_raw_as(List[api.ModelType], r.content)
def create(self, obj: Union[Dataset, Tag, api.Model, api.Report], **kwargs):
if isinstance(obj, Dataset):
payload = obj.dict(include={"name", "reference", "notes", "locked"}, by_alias=True)
payload["tagIds"] = [tag.id for tag in obj.tags]
r = self._http_post(f"{self.host}/api/data/", json=payload)
return self.get_dataset(r.json()["datasetId"], **kwargs)
elif isinstance(obj, Tag):
payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True)
self._http_post(f"{self.host}/api/resources/tags/", json=payload)
# TODO: return tag id on api
parent = self.get_tag(obj.parent_id)
tag = next(filter(lambda x: x.name == obj.name, (parent.children if isinstance(parent, Tag) else parent)))
return tag
elif isinstance(obj, api.Model):
payload = obj.dict(include={"name", "model_type_id", "application_id",
"settings", "dataset_ids", "tag_ids"}, by_alias=True)
payload["datasetIds"] = payload.pop("datasets")
r = self._http_post(f"{self.host}/api/models", json=payload)
return api.Model.parse_raw(r.content)
elif isinstance(obj, api.Report):
payload = obj.dict(include={"name", "parent_id", "parent_type", "report_type_id",
"model_ids", "settings", "dataset_ids", "tag_ids"}, by_alias=True)
payload["modelIds"] = payload.pop("models")
payload["submitToCloud"] = ("submitToCloud" in kwargs and kwargs["submitToCloud"])
r = self._http_post(f"{self.host}/api/reports", json=payload)
report = self.get_report(r.json()["id"])
return report
else:
raise NotImplementedError(f"create not implemented for type {type(obj)}")
def update(self, obj, master=False):
if isinstance(obj, Dataset):
payload = obj.dict(include={"name", "reference", "notes", "locked"})
payload["tagIds"] = [tag.id for tag in obj.tags]
self._http_post(f"{self.host}/api/data/{obj.id}", json=payload)
if isinstance(obj, Tag):
payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True)
self._http_post(f"{self.host}/api/resources/tags/{obj.id}", json=payload)
if isinstance(obj, api.ModelType):
if not master:
raise PermissionError("Not authorized")
self._http_post(f"{self.host}/api/master/modeltypes/update", json=obj.dict(by_alias=True))
if isinstance(obj, api.ReportType):
if not master:
raise PermissionError("Not authorized")
self._http_post(f"{self.host}/api/master/reporttypes/update", json=obj.dict(by_alias=True))
else:
raise NotImplementedError(f"create not implemented for type {type(obj)}")
def delete(self, obj: Union[Dataset, Tag, api.Model, api.Report, api.SftpUser]):
if isinstance(obj, Dataset):
self._http_delete(f"{self.host}/api/data/{obj.id}")
elif isinstance(obj, Tag):
self._http_delete(f"{self.host}/api/resources/tags/{obj.id}")
elif isinstance(obj, api.Model):
self._http_delete(f"{self.host}/api/models/{obj.id}")
elif isinstance(obj, api.Report):
self._http_delete(f"{self.host}/api/reports/{obj.id}")
elif isinstance(obj, api.SftpUser):
self._http_delete(f"{self.host}/api/data/{obj.folder}/sftp/{obj.user_name}")
else:
raise NotImplementedError(f"delete not implemented for type {type(obj)}")
def update_dataset_permission(self, id, user_id, group_id=None, permission_type="Editor"):
"""
Update dataset permissions for user
Args:
id:
user_id:
group_id:
permission_type:
Returns:
"""
payload = {"groupId": group_id, "userId": user_id, "resourceId": id, "objectType": 0,
"permissionType": permission_type}
r = self._http_post(self.host + "/api/admin/datasets/" + id + "/permissions", json=payload)
return r
def get_dataset_sts_assume_role_response(self, guid):
cred = self._http_get(f"{self.host}/api/data/{guid}/securitycredentials")
return cred.text
def get_schema(self, obj: Union[api.ModelType, api.ReportType]):
"""
Get schema for a certain model type
Args:
obj: modeltype or report type
Returns:
"""
r = self._http_get(obj.settings_schema_path, headers={})
return r.json()
def get_userinfo(self):
"""
Get info on user
"""
url = self.host + "/api/manage/index"
r = self._http_get(url)
return api.User.parse_raw(r.content)
def get_sftp_users(self, dataset, **kwargs) -> List[api.SftpUser]:
r = self._http_get(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs)
users = parse_raw_as(List[api.SftpUser], r.content)
for user in users:
user.folder = user.folder or dataset.id
return users
def create_sftp_user(self, dataset, **kwargs) -> api.SftpUser:
r = self._http_post(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs)
return api.SftpUser.parse_raw(r.content)
def create_model(self, name, datasets, settings: JobSettings = None,
model_type=None, tags=None, application: api.Application = None):
"""
Create a model on the platform
Args:
name:
datasets:
settings:
model_type:
tags:
application:
Returns:
Model after its creation on the platform
"""
tags = tags or []
settings = settings or {}
try:
settings = settings.json()
except AttributeError:
settings = json.dumps(settings)
model_type = model_type or self.backend.custom_model_type
settings = settings or {}
application_id = None if application is None else application.id
model = api.Model(name=name, dataset_ids=[x.id for x in datasets], model_type_id=model_type.id,
model_type_status=model_type.status, settings=settings, application_id=application_id,
tag_ids=[x.id for x in tags], id="<unknown>", api_key="<unknown>", created="<unknown>")
return self.create(model)
def create_testreport(self, name, model, datasets, report_type, settings, tags=None, submitToCloud=False):
"""
Create a test report on the platform
Args:
name:
model:
datasets:
report_type:
settings:
tags:
submitToCloud: start test report in the cloud
Returns:
Test report after its creation on the platform
"""
tags = tags or []
try:
settings = settings.json()
except AttributeError:
settings = json.dumps(settings)
report = api.Report(name=name, model_ids=[model.id], dataset_ids=[x.id for x in datasets],
id="<unknown>", api_key="<unknown>", created="<unknown>",
parent_id=model.id, parent_name=model.name, parent_type="model",
settings=settings, tag_ids=[x.id for x in tags], report_type_id=report_type.id)
return self.create(report, submitToCloud=submitToCloud)
def initialize_training(self, model: Union[str, api.Model], job_type: Type[Job] = None, submitToCloud=False) -> Union[Job, None]:
"""
Start training flow of a model
Args:
model: model or model id
job_type:
submitToCloud: start model in the cloud
Returns:
Job if submitToCloud is false, otherwise None
"""
payload = {"submitToCloud": "true" if submitToCloud else "false"}
if isinstance(model, str):
model = self.get_model(model)
if model.completed:
print("Model already completed")
return None
r = self._http_post(f"{self.host}/api/models/{model.id}/start", params=payload)
if submitToCloud:
return None
job_config = self.download_url(r.json()).json()
type_selector = job_type or Job
job = Job.init(type_selector=type_selector, job_config=job_config, backend=backend)
return job
def initialize_report(self, report: Union[str, api.Report], job_type: Type[Job] = None) -> Union[Job, None]:
"""
Start training flow of a model
Args:
report: model or model id
job_type:
Returns:
Job
"""
if isinstance(report, str):
report = self.get_report(model)
if report.completed:
print("Model already completed")
return None
type_selector = [job_type] if issubclass(job_type, Job) else job_type
job = Job.init(job_id=report.id, api_key=report.api_key,
type_selector=type_selector, backend=backend)
return job
def stop_model_training(self, model):
"""
Stop training of model
Args:
model:
Returns:
"""
r = self._http_post(f"{self.host}/api/models/{model.id}/stop")
return api.Model.parse_raw(r.content)
def download_url(self, url, dst=None, headers=None):
if url.startswith("/api/"):
url = self.host + url
r = self._http_get(url, stream=True, headers=headers)
if r.status_code == requests.codes.ok:
if dst is not None:
if os.path.isdir(dst):
try:
d = r.headers['content-disposition']
fname = re.findall('filename="(.+)"', d)[0]
except Exception:
fname = os.path.basename(unquote(urlparse(url).path))
assert fname is not ""
dst = os.path.join(dst, fname)
else:
if os.path.dirname(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
with open(dst, 'wb') as f:
for data in r:
f.write(data)
else:
return r
return dst
if __name__ == "__main__":
# Imports and setup
from brevettiai.platform import BrevettiAI
from image_segmentation.image_segmentation import ImageSegmentationJobSettings, ImageSegmentationJob
web: PlatformAPI = BrevettiAI()
ds = web.get_dataset()[:2]
model = web.create_model("test api model", settings=ImageSegmentationJobSettings(), datasets=ds)
job = web.initialize_training(model, job_type=ImageSegmentationJob)
job.start()
web.delete(model)
ds = web.get_dataset('8098021d-79ed-43b5-956a-adecadfde66b')
users_ = web.get_sftp_users(ds)
for user_ in users_:
web.delete(user_)
web.get_userinfo()
schema = web.get_schema(web.get_reporttype()[0])
mt = web.get_available_model_types()
web.get_model()
web.get_report()
artifacts = web.get_artifacts(web.get_model()[0])
dev = web.get_device()
app = web.get_application("a1c111ec-2043-41ef-b2f1-f0912b55bd3b")
web.get_model('000863da-f567-4a74-b48b-5d63da061555').get_datasets(web)
mm = web.get_report()
Classes
class PlatformAPI (username=None, password=None, host=None, remember_me=False)
-
Expand source code
class PlatformAPI: def __init__(self, username=None, password=None, host=None, remember_me=False): self.host = host or backend self.session = requests.session() username = username or os.getenv(_ENV_BREVETTI_AI_USER) password = password or os.getenv(_ENV_BREVETTI_AI_PW) try: self.config = WebApiConfig.load() except IOError: self.config = WebApiConfig() self.user = self.login(username, password, remember_me=remember_me) self._io = IoTools().factory() self._io.minio.credentials = PlatformDatasetCredentials(self) @property def host(self): return self._host.host if isinstance(self._host, PlatformBackend) else self._host @host.setter def host(self, host): self._host = host @property def io(self): return self._io @property def backend(self): if isinstance(self._host, PlatformBackend): return self._host else: raise AttributeError("Backend unknown") def login(self, username, password, remember_me=False): try: if username and password: self.config.set_credentials(username, password) else: username, password = self.config.get_credentials() except ValueError: if username is None: username = input(f"{self.host} - username: ") if password is None: password = prompt_for_password() self.config.set_credentials(username, password) res = self.session.post(self.host + "/api/account/token", data=dict(userName=username, password=password)) if not res.ok: raise PermissionError(f"Could not log in: {res.reason}") data = res.json() if remember_me and self.config.is_modified: self.config.save() return data def _http_get(self, url, headers=None, **kwargs): if headers is None and url.startswith(self.host): headers = self.antiforgery_headers r = self.session.get(url, headers=headers, **kwargs) if not r.ok: if r.status_code == 401: raise PermissionError("Not authorized") raise requests.HTTPError(r.reason) return r def _http_post(self, url, headers=None, **kwargs): if headers is None and url.startswith(self.host): headers = self.antiforgery_headers r = self.session.post(url, headers=headers, **kwargs) if not r.ok: if r.status_code == 401: raise PermissionError("Not authorized") raise requests.HTTPError(r.reason) return r def _http_delete(self, url, headers=None, **kwargs): if headers is None and url.startswith(self.host): headers = self.antiforgery_headers r = self.session.delete(url, headers=headers, **kwargs) if not r.ok: if r.status_code == 401: raise PermissionError("Not authorized") raise requests.HTTPError(r.reason) return r @property def antiforgery_headers(self): """ Get anti forgery headers from platform :return: """ headers = { "Content-Type": "application/json", "Authorization": "Bearer " + self.user['token'] } r = self.session.get(self.host + "/api/account/antiforgery", headers=headers) return {**headers, 'X-XSRF-TOKEN': r.json()['requestToken']} def get_dataset(self, id: str = None, write_access=False, **kwargs) -> Union[Dataset, List[Dataset]]: """ Get dataset, or list of all datasets Args: id: dataset id write_access: resolve accessrights to the dataset **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...) Returns: dataset if id is given, otherwise list of datasets """ url = self.host + "/api/data" url = url if id is None else url + "/" + id r = self._http_get(url) if id is None: return [Dataset(**x, backend=self.backend, io=self._io, resolve_access_rights=write_access) for x in r.json() if all(x.get(k) == v for k, v in kwargs.items())] else: return Dataset(**r.json(), backend=self.backend, io=self._io, resolve_access_rights=write_access) def get_tag(self, id=None) -> Union[Tag, List[Tag]]: """ Get tag or list of all tags Args: id: Tag id Returns: tag if id is given, otherwise a list of tags """ url = self.host + "/api/resources/tags" url = url if id is None else url + "/" + id r = self._http_get(url) return parse_raw_as(Union[Tag, List[Tag]], r.content) def get_model(self, id=None, **kwargs) -> Union[api.Model, List[api.Model]]: """ Get model or list of all models Args: id: model id **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...) Returns: model if id is given, otherwise a list of models """ url = self.host + "/api/models" url = url if id is None else url + "/" + id r = self._http_get(url) if id is None: models = parse_raw_as(List[api.Model], r.content) return [x for x in models if all(getattr(x, k, None) == v for k, v in kwargs.items())] else: return parse_raw_as(api.Model, r.content) def get_report(self, id=None, **kwargs) -> Union[api.Report, List[api.Report]]: """ Get test report, or list of all reports Args: id: report id **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...) Returns: report if id is given, otherwise a list of reports """ url = self.host + "/api/reports" url = url if id is None else url + "/" + id r = self._http_get(url) if id is None: reports = parse_raw_as(List[api.Report], r.content) return [x for x in reports if all(getattr(x, k, None) == v for k, v in kwargs.items())] else: return parse_raw_as(api.Report, r.content) def get_artifacts(self, obj: Union[api.Model, api.Report], prefix: str = '') -> List[api.FileEntry]: """ Get artifacts for model or test report Args: obj: model/test report object prefix: object prefix (folder) Returns: List of files """ if isinstance(obj, api.Model): r = self._http_get(f"{self.host}/api/models/{obj.id}/artifacts?prefix={prefix}") elif isinstance(obj, api.Report): r = self._http_get(f"{self.host}/api/reports/{obj.id}/artifacts?prefix={prefix}") else: raise NotImplementedError("Artifacts not available for type") return parse_raw_as(List[api.FileEntry], r.content) def get_application(self, id=None) -> Union[api.Application, List[api.Application]]: """ Get application by id Args: id: Returns: application if id is given, otherwise a list of applications """ projects = self.get_project() applications = [a for p in projects for a in p.applications] if id is not None: applications = [a for a in applications if id in a.related_ids] if len(applications) == 1: return applications[0] else: return applications return applications def get_device(self, id=None): url = self.host + "/api/devices" url = url if id is None else url + "/" + id r = self._http_get(url) return parse_raw_as(List[api.Device], r.content) def get_project(self, id=None) -> Union[api.Project, List[api.Project]]: url = self.host + "/api/resources/projects" url = url if id is None else url + "/" + id r = self._http_get(url) return parse_raw_as(Union[api.Project, List[api.Project]], r.content) def get_modeltype(self, id=None, master=False) -> Union[api.ModelType, List[api.ModelType]]: """ Get model type Args: id: master: use master mode Returns: """ url = f"{self.host}/api/{'master/' if master else 'resources/'}modeltypes/{id if id else ''}" r = self._http_get(url) return parse_raw_as(Union[api.ModelType, List[api.ModelType]], r.content) def get_reporttype(self, id=None, master=False) -> Union[api.ReportType, List[api.ReportType]]: """ Get report type Args: id: master: use master mode Returns: """ url = f"{self.host}/api/{'master/' if master else 'resources/'}reporttypes/{id if id else ''}" r = self._http_get(url) return parse_raw_as(Union[api.ReportType, List[api.ReportType]], r.content) def get_available_model_types(self): """ List all available model types """ r = self._http_get(f"{self.host}/api/models/availabletypes") return parse_raw_as(List[api.ModelType], r.content) def create(self, obj: Union[Dataset, Tag, api.Model, api.Report], **kwargs): if isinstance(obj, Dataset): payload = obj.dict(include={"name", "reference", "notes", "locked"}, by_alias=True) payload["tagIds"] = [tag.id for tag in obj.tags] r = self._http_post(f"{self.host}/api/data/", json=payload) return self.get_dataset(r.json()["datasetId"], **kwargs) elif isinstance(obj, Tag): payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True) self._http_post(f"{self.host}/api/resources/tags/", json=payload) # TODO: return tag id on api parent = self.get_tag(obj.parent_id) tag = next(filter(lambda x: x.name == obj.name, (parent.children if isinstance(parent, Tag) else parent))) return tag elif isinstance(obj, api.Model): payload = obj.dict(include={"name", "model_type_id", "application_id", "settings", "dataset_ids", "tag_ids"}, by_alias=True) payload["datasetIds"] = payload.pop("datasets") r = self._http_post(f"{self.host}/api/models", json=payload) return api.Model.parse_raw(r.content) elif isinstance(obj, api.Report): payload = obj.dict(include={"name", "parent_id", "parent_type", "report_type_id", "model_ids", "settings", "dataset_ids", "tag_ids"}, by_alias=True) payload["modelIds"] = payload.pop("models") payload["submitToCloud"] = ("submitToCloud" in kwargs and kwargs["submitToCloud"]) r = self._http_post(f"{self.host}/api/reports", json=payload) report = self.get_report(r.json()["id"]) return report else: raise NotImplementedError(f"create not implemented for type {type(obj)}") def update(self, obj, master=False): if isinstance(obj, Dataset): payload = obj.dict(include={"name", "reference", "notes", "locked"}) payload["tagIds"] = [tag.id for tag in obj.tags] self._http_post(f"{self.host}/api/data/{obj.id}", json=payload) if isinstance(obj, Tag): payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True) self._http_post(f"{self.host}/api/resources/tags/{obj.id}", json=payload) if isinstance(obj, api.ModelType): if not master: raise PermissionError("Not authorized") self._http_post(f"{self.host}/api/master/modeltypes/update", json=obj.dict(by_alias=True)) if isinstance(obj, api.ReportType): if not master: raise PermissionError("Not authorized") self._http_post(f"{self.host}/api/master/reporttypes/update", json=obj.dict(by_alias=True)) else: raise NotImplementedError(f"create not implemented for type {type(obj)}") def delete(self, obj: Union[Dataset, Tag, api.Model, api.Report, api.SftpUser]): if isinstance(obj, Dataset): self._http_delete(f"{self.host}/api/data/{obj.id}") elif isinstance(obj, Tag): self._http_delete(f"{self.host}/api/resources/tags/{obj.id}") elif isinstance(obj, api.Model): self._http_delete(f"{self.host}/api/models/{obj.id}") elif isinstance(obj, api.Report): self._http_delete(f"{self.host}/api/reports/{obj.id}") elif isinstance(obj, api.SftpUser): self._http_delete(f"{self.host}/api/data/{obj.folder}/sftp/{obj.user_name}") else: raise NotImplementedError(f"delete not implemented for type {type(obj)}") def update_dataset_permission(self, id, user_id, group_id=None, permission_type="Editor"): """ Update dataset permissions for user Args: id: user_id: group_id: permission_type: Returns: """ payload = {"groupId": group_id, "userId": user_id, "resourceId": id, "objectType": 0, "permissionType": permission_type} r = self._http_post(self.host + "/api/admin/datasets/" + id + "/permissions", json=payload) return r def get_dataset_sts_assume_role_response(self, guid): cred = self._http_get(f"{self.host}/api/data/{guid}/securitycredentials") return cred.text def get_schema(self, obj: Union[api.ModelType, api.ReportType]): """ Get schema for a certain model type Args: obj: modeltype or report type Returns: """ r = self._http_get(obj.settings_schema_path, headers={}) return r.json() def get_userinfo(self): """ Get info on user """ url = self.host + "/api/manage/index" r = self._http_get(url) return api.User.parse_raw(r.content) def get_sftp_users(self, dataset, **kwargs) -> List[api.SftpUser]: r = self._http_get(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs) users = parse_raw_as(List[api.SftpUser], r.content) for user in users: user.folder = user.folder or dataset.id return users def create_sftp_user(self, dataset, **kwargs) -> api.SftpUser: r = self._http_post(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs) return api.SftpUser.parse_raw(r.content) def create_model(self, name, datasets, settings: JobSettings = None, model_type=None, tags=None, application: api.Application = None): """ Create a model on the platform Args: name: datasets: settings: model_type: tags: application: Returns: Model after its creation on the platform """ tags = tags or [] settings = settings or {} try: settings = settings.json() except AttributeError: settings = json.dumps(settings) model_type = model_type or self.backend.custom_model_type settings = settings or {} application_id = None if application is None else application.id model = api.Model(name=name, dataset_ids=[x.id for x in datasets], model_type_id=model_type.id, model_type_status=model_type.status, settings=settings, application_id=application_id, tag_ids=[x.id for x in tags], id="<unknown>", api_key="<unknown>", created="<unknown>") return self.create(model) def create_testreport(self, name, model, datasets, report_type, settings, tags=None, submitToCloud=False): """ Create a test report on the platform Args: name: model: datasets: report_type: settings: tags: submitToCloud: start test report in the cloud Returns: Test report after its creation on the platform """ tags = tags or [] try: settings = settings.json() except AttributeError: settings = json.dumps(settings) report = api.Report(name=name, model_ids=[model.id], dataset_ids=[x.id for x in datasets], id="<unknown>", api_key="<unknown>", created="<unknown>", parent_id=model.id, parent_name=model.name, parent_type="model", settings=settings, tag_ids=[x.id for x in tags], report_type_id=report_type.id) return self.create(report, submitToCloud=submitToCloud) def initialize_training(self, model: Union[str, api.Model], job_type: Type[Job] = None, submitToCloud=False) -> Union[Job, None]: """ Start training flow of a model Args: model: model or model id job_type: submitToCloud: start model in the cloud Returns: Job if submitToCloud is false, otherwise None """ payload = {"submitToCloud": "true" if submitToCloud else "false"} if isinstance(model, str): model = self.get_model(model) if model.completed: print("Model already completed") return None r = self._http_post(f"{self.host}/api/models/{model.id}/start", params=payload) if submitToCloud: return None job_config = self.download_url(r.json()).json() type_selector = job_type or Job job = Job.init(type_selector=type_selector, job_config=job_config, backend=backend) return job def initialize_report(self, report: Union[str, api.Report], job_type: Type[Job] = None) -> Union[Job, None]: """ Start training flow of a model Args: report: model or model id job_type: Returns: Job """ if isinstance(report, str): report = self.get_report(model) if report.completed: print("Model already completed") return None type_selector = [job_type] if issubclass(job_type, Job) else job_type job = Job.init(job_id=report.id, api_key=report.api_key, type_selector=type_selector, backend=backend) return job def stop_model_training(self, model): """ Stop training of model Args: model: Returns: """ r = self._http_post(f"{self.host}/api/models/{model.id}/stop") return api.Model.parse_raw(r.content) def download_url(self, url, dst=None, headers=None): if url.startswith("/api/"): url = self.host + url r = self._http_get(url, stream=True, headers=headers) if r.status_code == requests.codes.ok: if dst is not None: if os.path.isdir(dst): try: d = r.headers['content-disposition'] fname = re.findall('filename="(.+)"', d)[0] except Exception: fname = os.path.basename(unquote(urlparse(url).path)) assert fname is not "" dst = os.path.join(dst, fname) else: if os.path.dirname(dst): os.makedirs(os.path.dirname(dst), exist_ok=True) with open(dst, 'wb') as f: for data in r: f.write(data) else: return r return dst
Instance variables
var antiforgery_headers
-
Get anti forgery headers from platform
:return:
Expand source code
@property def antiforgery_headers(self): """ Get anti forgery headers from platform :return: """ headers = { "Content-Type": "application/json", "Authorization": "Bearer " + self.user['token'] } r = self.session.get(self.host + "/api/account/antiforgery", headers=headers) return {**headers, 'X-XSRF-TOKEN': r.json()['requestToken']}
var backend
-
Expand source code
@property def backend(self): if isinstance(self._host, PlatformBackend): return self._host else: raise AttributeError("Backend unknown")
var host
-
Expand source code
@property def host(self): return self._host.host if isinstance(self._host, PlatformBackend) else self._host
var io
-
Expand source code
@property def io(self): return self._io
Methods
def create(self, obj: Union[Dataset, Tag, Model, Report], **kwargs)
-
Expand source code
def create(self, obj: Union[Dataset, Tag, api.Model, api.Report], **kwargs): if isinstance(obj, Dataset): payload = obj.dict(include={"name", "reference", "notes", "locked"}, by_alias=True) payload["tagIds"] = [tag.id for tag in obj.tags] r = self._http_post(f"{self.host}/api/data/", json=payload) return self.get_dataset(r.json()["datasetId"], **kwargs) elif isinstance(obj, Tag): payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True) self._http_post(f"{self.host}/api/resources/tags/", json=payload) # TODO: return tag id on api parent = self.get_tag(obj.parent_id) tag = next(filter(lambda x: x.name == obj.name, (parent.children if isinstance(parent, Tag) else parent))) return tag elif isinstance(obj, api.Model): payload = obj.dict(include={"name", "model_type_id", "application_id", "settings", "dataset_ids", "tag_ids"}, by_alias=True) payload["datasetIds"] = payload.pop("datasets") r = self._http_post(f"{self.host}/api/models", json=payload) return api.Model.parse_raw(r.content) elif isinstance(obj, api.Report): payload = obj.dict(include={"name", "parent_id", "parent_type", "report_type_id", "model_ids", "settings", "dataset_ids", "tag_ids"}, by_alias=True) payload["modelIds"] = payload.pop("models") payload["submitToCloud"] = ("submitToCloud" in kwargs and kwargs["submitToCloud"]) r = self._http_post(f"{self.host}/api/reports", json=payload) report = self.get_report(r.json()["id"]) return report else: raise NotImplementedError(f"create not implemented for type {type(obj)}")
def create_model(self, name, datasets, settings: JobSettings = None, model_type=None, tags=None, application: Application = None)
-
Create a model on the platform
Args
name: datasets: settings: model_type: tags: application:
Returns
Model after its creation on the platform
Expand source code
def create_model(self, name, datasets, settings: JobSettings = None, model_type=None, tags=None, application: api.Application = None): """ Create a model on the platform Args: name: datasets: settings: model_type: tags: application: Returns: Model after its creation on the platform """ tags = tags or [] settings = settings or {} try: settings = settings.json() except AttributeError: settings = json.dumps(settings) model_type = model_type or self.backend.custom_model_type settings = settings or {} application_id = None if application is None else application.id model = api.Model(name=name, dataset_ids=[x.id for x in datasets], model_type_id=model_type.id, model_type_status=model_type.status, settings=settings, application_id=application_id, tag_ids=[x.id for x in tags], id="<unknown>", api_key="<unknown>", created="<unknown>") return self.create(model)
def create_sftp_user(self, dataset, **kwargs) ‑> SftpUser
-
Expand source code
def create_sftp_user(self, dataset, **kwargs) -> api.SftpUser: r = self._http_post(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs) return api.SftpUser.parse_raw(r.content)
def create_testreport(self, name, model, datasets, report_type, settings, tags=None, submitToCloud=False)
-
Create a test report on the platform
Args
- name:
- model:
- datasets:
- report_type:
- settings:
- tags:
submitToCloud
- start test report in the cloud
Returns
Test report after its creation on the platform
Expand source code
def create_testreport(self, name, model, datasets, report_type, settings, tags=None, submitToCloud=False): """ Create a test report on the platform Args: name: model: datasets: report_type: settings: tags: submitToCloud: start test report in the cloud Returns: Test report after its creation on the platform """ tags = tags or [] try: settings = settings.json() except AttributeError: settings = json.dumps(settings) report = api.Report(name=name, model_ids=[model.id], dataset_ids=[x.id for x in datasets], id="<unknown>", api_key="<unknown>", created="<unknown>", parent_id=model.id, parent_name=model.name, parent_type="model", settings=settings, tag_ids=[x.id for x in tags], report_type_id=report_type.id) return self.create(report, submitToCloud=submitToCloud)
def delete(self, obj: Union[Dataset, Tag, Model, Report, SftpUser])
-
Expand source code
def delete(self, obj: Union[Dataset, Tag, api.Model, api.Report, api.SftpUser]): if isinstance(obj, Dataset): self._http_delete(f"{self.host}/api/data/{obj.id}") elif isinstance(obj, Tag): self._http_delete(f"{self.host}/api/resources/tags/{obj.id}") elif isinstance(obj, api.Model): self._http_delete(f"{self.host}/api/models/{obj.id}") elif isinstance(obj, api.Report): self._http_delete(f"{self.host}/api/reports/{obj.id}") elif isinstance(obj, api.SftpUser): self._http_delete(f"{self.host}/api/data/{obj.folder}/sftp/{obj.user_name}") else: raise NotImplementedError(f"delete not implemented for type {type(obj)}")
def download_url(self, url, dst=None, headers=None)
-
Expand source code
def download_url(self, url, dst=None, headers=None): if url.startswith("/api/"): url = self.host + url r = self._http_get(url, stream=True, headers=headers) if r.status_code == requests.codes.ok: if dst is not None: if os.path.isdir(dst): try: d = r.headers['content-disposition'] fname = re.findall('filename="(.+)"', d)[0] except Exception: fname = os.path.basename(unquote(urlparse(url).path)) assert fname is not "" dst = os.path.join(dst, fname) else: if os.path.dirname(dst): os.makedirs(os.path.dirname(dst), exist_ok=True) with open(dst, 'wb') as f: for data in r: f.write(data) else: return r return dst
def get_application(self, id=None) ‑> Union[Application, List[Application]]
-
Get application by id
Args
id:
Returns
application if id is given, otherwise a list of applications
Expand source code
def get_application(self, id=None) -> Union[api.Application, List[api.Application]]: """ Get application by id Args: id: Returns: application if id is given, otherwise a list of applications """ projects = self.get_project() applications = [a for p in projects for a in p.applications] if id is not None: applications = [a for a in applications if id in a.related_ids] if len(applications) == 1: return applications[0] else: return applications return applications
def get_artifacts(self, obj: Union[Model, Report], prefix: str = '') ‑> List[FileEntry]
-
Get artifacts for model or test report
Args
obj
- model/test report object
prefix
- object prefix (folder)
Returns
List of files
Expand source code
def get_artifacts(self, obj: Union[api.Model, api.Report], prefix: str = '') -> List[api.FileEntry]: """ Get artifacts for model or test report Args: obj: model/test report object prefix: object prefix (folder) Returns: List of files """ if isinstance(obj, api.Model): r = self._http_get(f"{self.host}/api/models/{obj.id}/artifacts?prefix={prefix}") elif isinstance(obj, api.Report): r = self._http_get(f"{self.host}/api/reports/{obj.id}/artifacts?prefix={prefix}") else: raise NotImplementedError("Artifacts not available for type") return parse_raw_as(List[api.FileEntry], r.content)
def get_available_model_types(self)
-
List all available model types
Expand source code
def get_available_model_types(self): """ List all available model types """ r = self._http_get(f"{self.host}/api/models/availabletypes") return parse_raw_as(List[api.ModelType], r.content)
def get_dataset(self, id: str = None, write_access=False, **kwargs) ‑> Union[Dataset, List[Dataset]]
-
Get dataset, or list of all datasets
Args
id
- dataset id
write_access
- resolve accessrights to the dataset
**kwargs
- Extended search criteria: use ('name', 'reference' 'locked', …)
Returns
dataset if id is given, otherwise list of datasets
Expand source code
def get_dataset(self, id: str = None, write_access=False, **kwargs) -> Union[Dataset, List[Dataset]]: """ Get dataset, or list of all datasets Args: id: dataset id write_access: resolve accessrights to the dataset **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...) Returns: dataset if id is given, otherwise list of datasets """ url = self.host + "/api/data" url = url if id is None else url + "/" + id r = self._http_get(url) if id is None: return [Dataset(**x, backend=self.backend, io=self._io, resolve_access_rights=write_access) for x in r.json() if all(x.get(k) == v for k, v in kwargs.items())] else: return Dataset(**r.json(), backend=self.backend, io=self._io, resolve_access_rights=write_access)
def get_dataset_sts_assume_role_response(self, guid)
-
Expand source code
def get_dataset_sts_assume_role_response(self, guid): cred = self._http_get(f"{self.host}/api/data/{guid}/securitycredentials") return cred.text
def get_device(self, id=None)
-
Expand source code
def get_device(self, id=None): url = self.host + "/api/devices" url = url if id is None else url + "/" + id r = self._http_get(url) return parse_raw_as(List[api.Device], r.content)
def get_model(self, id=None, **kwargs) ‑> Union[Model, List[Model]]
-
Get model or list of all models
Args
id
- model id
**kwargs
- Extended search criteria: use ('name', 'reference' 'locked', …)
Returns
model if id is given, otherwise a list of models
Expand source code
def get_model(self, id=None, **kwargs) -> Union[api.Model, List[api.Model]]: """ Get model or list of all models Args: id: model id **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...) Returns: model if id is given, otherwise a list of models """ url = self.host + "/api/models" url = url if id is None else url + "/" + id r = self._http_get(url) if id is None: models = parse_raw_as(List[api.Model], r.content) return [x for x in models if all(getattr(x, k, None) == v for k, v in kwargs.items())] else: return parse_raw_as(api.Model, r.content)
def get_modeltype(self, id=None, master=False) ‑> Union[ModelType, List[ModelType]]
-
Get model type
Args
- id:
master
- use master mode
Returns:
Expand source code
def get_modeltype(self, id=None, master=False) -> Union[api.ModelType, List[api.ModelType]]: """ Get model type Args: id: master: use master mode Returns: """ url = f"{self.host}/api/{'master/' if master else 'resources/'}modeltypes/{id if id else ''}" r = self._http_get(url) return parse_raw_as(Union[api.ModelType, List[api.ModelType]], r.content)
def get_project(self, id=None) ‑> Union[Project, List[Project]]
-
Expand source code
def get_project(self, id=None) -> Union[api.Project, List[api.Project]]: url = self.host + "/api/resources/projects" url = url if id is None else url + "/" + id r = self._http_get(url) return parse_raw_as(Union[api.Project, List[api.Project]], r.content)
def get_report(self, id=None, **kwargs) ‑> Union[Report, List[Report]]
-
Get test report, or list of all reports
Args
id
- report id
**kwargs
- Extended search criteria: use ('name', 'reference' 'locked', …)
Returns
report if id is given, otherwise a list of reports
Expand source code
def get_report(self, id=None, **kwargs) -> Union[api.Report, List[api.Report]]: """ Get test report, or list of all reports Args: id: report id **kwargs: Extended search criteria: use ('name', 'reference' 'locked', ...) Returns: report if id is given, otherwise a list of reports """ url = self.host + "/api/reports" url = url if id is None else url + "/" + id r = self._http_get(url) if id is None: reports = parse_raw_as(List[api.Report], r.content) return [x for x in reports if all(getattr(x, k, None) == v for k, v in kwargs.items())] else: return parse_raw_as(api.Report, r.content)
def get_reporttype(self, id=None, master=False) ‑> Union[ReportType, List[ReportType]]
-
Get report type
Args
- id:
master
- use master mode
Returns:
Expand source code
def get_reporttype(self, id=None, master=False) -> Union[api.ReportType, List[api.ReportType]]: """ Get report type Args: id: master: use master mode Returns: """ url = f"{self.host}/api/{'master/' if master else 'resources/'}reporttypes/{id if id else ''}" r = self._http_get(url) return parse_raw_as(Union[api.ReportType, List[api.ReportType]], r.content)
def get_schema(self, obj: Union[ModelType, ReportType])
-
Get schema for a certain model type
Args
obj
- modeltype or report type
Returns:
Expand source code
def get_schema(self, obj: Union[api.ModelType, api.ReportType]): """ Get schema for a certain model type Args: obj: modeltype or report type Returns: """ r = self._http_get(obj.settings_schema_path, headers={}) return r.json()
def get_sftp_users(self, dataset, **kwargs) ‑> List[SftpUser]
-
Expand source code
def get_sftp_users(self, dataset, **kwargs) -> List[api.SftpUser]: r = self._http_get(f"{self.host}/api/data/{dataset.id}/sftp", **kwargs) users = parse_raw_as(List[api.SftpUser], r.content) for user in users: user.folder = user.folder or dataset.id return users
def get_tag(self, id=None) ‑> Union[Tag, List[Tag]]
-
Get tag or list of all tags
Args
id
- Tag id
Returns
tag if id is given, otherwise a list of tags
Expand source code
def get_tag(self, id=None) -> Union[Tag, List[Tag]]: """ Get tag or list of all tags Args: id: Tag id Returns: tag if id is given, otherwise a list of tags """ url = self.host + "/api/resources/tags" url = url if id is None else url + "/" + id r = self._http_get(url) return parse_raw_as(Union[Tag, List[Tag]], r.content)
def get_userinfo(self)
-
Get info on user
Expand source code
def get_userinfo(self): """ Get info on user """ url = self.host + "/api/manage/index" r = self._http_get(url) return api.User.parse_raw(r.content)
def initialize_report(self, report: Union[str, Report], job_type: Type[Job] = None) ‑> Optional[Job]
-
Start training flow of a model
Args
report
- model or model id
job_type:
Returns
Job
Expand source code
def initialize_report(self, report: Union[str, api.Report], job_type: Type[Job] = None) -> Union[Job, None]: """ Start training flow of a model Args: report: model or model id job_type: Returns: Job """ if isinstance(report, str): report = self.get_report(model) if report.completed: print("Model already completed") return None type_selector = [job_type] if issubclass(job_type, Job) else job_type job = Job.init(job_id=report.id, api_key=report.api_key, type_selector=type_selector, backend=backend) return job
def initialize_training(self, model: Union[str, Model], job_type: Type[Job] = None, submitToCloud=False) ‑> Optional[Job]
-
Start training flow of a model
Args
model
- model or model id
- job_type:
submitToCloud
- start model in the cloud
Returns
Job if submitToCloud is false, otherwise None
Expand source code
def initialize_training(self, model: Union[str, api.Model], job_type: Type[Job] = None, submitToCloud=False) -> Union[Job, None]: """ Start training flow of a model Args: model: model or model id job_type: submitToCloud: start model in the cloud Returns: Job if submitToCloud is false, otherwise None """ payload = {"submitToCloud": "true" if submitToCloud else "false"} if isinstance(model, str): model = self.get_model(model) if model.completed: print("Model already completed") return None r = self._http_post(f"{self.host}/api/models/{model.id}/start", params=payload) if submitToCloud: return None job_config = self.download_url(r.json()).json() type_selector = job_type or Job job = Job.init(type_selector=type_selector, job_config=job_config, backend=backend) return job
def login(self, username, password, remember_me=False)
-
Expand source code
def login(self, username, password, remember_me=False): try: if username and password: self.config.set_credentials(username, password) else: username, password = self.config.get_credentials() except ValueError: if username is None: username = input(f"{self.host} - username: ") if password is None: password = prompt_for_password() self.config.set_credentials(username, password) res = self.session.post(self.host + "/api/account/token", data=dict(userName=username, password=password)) if not res.ok: raise PermissionError(f"Could not log in: {res.reason}") data = res.json() if remember_me and self.config.is_modified: self.config.save() return data
def stop_model_training(self, model)
-
Stop training of model
Args
model: Returns:
Expand source code
def stop_model_training(self, model): """ Stop training of model Args: model: Returns: """ r = self._http_post(f"{self.host}/api/models/{model.id}/stop") return api.Model.parse_raw(r.content)
def update(self, obj, master=False)
-
Expand source code
def update(self, obj, master=False): if isinstance(obj, Dataset): payload = obj.dict(include={"name", "reference", "notes", "locked"}) payload["tagIds"] = [tag.id for tag in obj.tags] self._http_post(f"{self.host}/api/data/{obj.id}", json=payload) if isinstance(obj, Tag): payload = obj.dict(include={"name", "parent_id"}, by_alias=True, exclude_none=True) self._http_post(f"{self.host}/api/resources/tags/{obj.id}", json=payload) if isinstance(obj, api.ModelType): if not master: raise PermissionError("Not authorized") self._http_post(f"{self.host}/api/master/modeltypes/update", json=obj.dict(by_alias=True)) if isinstance(obj, api.ReportType): if not master: raise PermissionError("Not authorized") self._http_post(f"{self.host}/api/master/reporttypes/update", json=obj.dict(by_alias=True)) else: raise NotImplementedError(f"create not implemented for type {type(obj)}")
def update_dataset_permission(self, id, user_id, group_id=None, permission_type='Editor')
-
Update dataset permissions for user
Args
id: user_id: group_id: permission_type: Returns:
Expand source code
def update_dataset_permission(self, id, user_id, group_id=None, permission_type="Editor"): """ Update dataset permissions for user Args: id: user_id: group_id: permission_type: Returns: """ payload = {"groupId": group_id, "userId": user_id, "resourceId": id, "objectType": 0, "permissionType": permission_type} r = self._http_post(self.host + "/api/admin/datasets/" + id + "/permissions", json=payload) return r
class WebApiConfig (**data: Any)
-
Configuration object for
PlatformAPI
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class WebApiConfig(BaseModel): """ Configuration object for `PlatformAPI` """ secret: bytes = b'' _config_file: ClassVar[str] = os.path.join(os.path.expanduser("~"), ".brevetti", "webapi") _modified: bool = PrivateAttr(default=False) @staticmethod def _get_fernet(): """Retrieve Fernet module""" node = uuid.getnode() key = base64.urlsafe_b64encode(node.to_bytes(6, 'little') + b'Q\x19$v>8Lx\xbaQ\x86T\x06$\x91\x04x\x1a\xc7\xa5/\x83~\xe6+m') return Fernet(key) def set_credentials(self, username: str, password: str): """Set credentials for later retrieval""" self.secret = self._get_fernet().encrypt(f"{username}:{password}".encode()) self._modified = True def get_credentials(self): """Get Username and password for platform login Returns: username, password """ try: return tuple(self._get_fernet().decrypt(self.secret).decode().split(":")) except InvalidToken as ex: raise ValueError("Invalid secret") from ex @property def is_modified(self): """Is the configuration modified?""" return self._modified @staticmethod def load(): """Load WebApiConfig from config_file""" return WebApiConfig.parse_file(WebApiConfig._config_file) def save(self): """Save WebApiConfig to config_file""" os.makedirs(os.path.dirname(WebApiConfig._config_file), exist_ok=True) with open(WebApiConfig._config_file, "w") as fp: fp.write(self.json())
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var secret : bytes
Static methods
def load()
-
Load WebApiConfig from config_file
Expand source code
@staticmethod def load(): """Load WebApiConfig from config_file""" return WebApiConfig.parse_file(WebApiConfig._config_file)
Instance variables
var is_modified
-
Is the configuration modified?
Expand source code
@property def is_modified(self): """Is the configuration modified?""" return self._modified
Methods
def get_credentials(self)
-
Get Username and password for platform login
Returns
username, password
Expand source code
def get_credentials(self): """Get Username and password for platform login Returns: username, password """ try: return tuple(self._get_fernet().decrypt(self.secret).decode().split(":")) except InvalidToken as ex: raise ValueError("Invalid secret") from ex
def save(self)
-
Save WebApiConfig to config_file
Expand source code
def save(self): """Save WebApiConfig to config_file""" os.makedirs(os.path.dirname(WebApiConfig._config_file), exist_ok=True) with open(WebApiConfig._config_file, "w") as fp: fp.write(self.json())
def set_credentials(self, username: str, password: str)
-
Set credentials for later retrieval
Expand source code
def set_credentials(self, username: str, password: str): """Set credentials for later retrieval""" self.secret = self._get_fernet().encrypt(f"{username}:{password}".encode()) self._modified = True