Module brevettiai.io.utils

Expand source code
import os
import getpass
import warnings
from . import path
from .local_io import LocalIO
from .minio_io import MinioIO
import urllib


class IoTools:
    def __init__(self, cache_root=None, localio=LocalIO(), minio=MinioIO(),
                 max_cache_usage_fraction=0.8, path=path):
        """
        :param cache_path: common cache path
        :param localio: path to local file storage backend
        :param minio: path to minio backend
        :param max_cache_usage_fraction: stop caching when exeeding this usage fraction
        """
        self.minio = minio
        self.localio = localio
        self.cache_root = cache_root
        self.max_cache_usage_fraction = max_cache_usage_fraction
        self.path = path

    @staticmethod
    def factory(**kwargs):
        """
        Build IoTools with new backends
        :param args:
        :param kwargs:
        :return:
        """
        kwargs["minio"] = kwargs.get("minio", MinioIO())
        kwargs["localio"] = kwargs.get("localio", LocalIO())
        return IoTools(**kwargs)

    def set_cache_root(self, root):
        root = os.path.normpath(root)
        assert os.path.isdir(root), "Cache root must be a path to a directory"
        self.cache_root = root

    def get_backend(self, path):
        if path.startswith("s3://"):
            return self.minio
        else:
            return self.localio

    def resolve_access_rights(self, path, *args, **kwargs):
        backend = self.get_backend(path)
        backend.resolve_access_rights(path, *args, **kwargs)

    def read_file(self, path, cache=None, errors="raise"):
        try:
            path = path if isinstance(path, str) else str(path, "utf-8")
            backend = self.get_backend(path)

            cache_root = cache or self.cache_root
            if cache_root and backend.cache_files:
                return self.localio.file_cache(path, cache_root, backend.read, self.max_cache_usage_fraction)
            else:
                return backend.read(path)
        except Exception as ex:
            if errors == "raise":
                raise ex
            return None

    def write_file(self, path, content):
        return self.get_backend(path).write(path, content)

    def remove(self, path):
        return self.get_backend(path).remove(path)

    def copy(self, src, dst, *args, **kwargs):
        src_backend = self.get_backend(src)
        dst_backend = self.get_backend(dst)
        if src_backend == dst_backend:
            return src_backend.copy(src, dst, *args, **kwargs)
        else:
            return dst_backend.write(dst, src_backend.read(src))

    def move(self, src, dst, *args, **kwargs):
        if src == dst:
            return
        src_backend = self.get_backend(src)
        dst_backend = self.get_backend(dst)
        if src_backend == dst_backend:
            return src_backend.move(src, dst, *args, **kwargs)
        else:
            raise NotImplementedError("Cross origin move")

    def make_dirs(self, path):
        backend = self.get_backend(path)
        return backend.make_dirs(path)

    def walk(self, path, exclude_hidden=False, **kwargs):
        backend = self.get_backend(path)
        return backend.walk(path, exclude_hidden=exclude_hidden, **kwargs)

    def isfile(self, path):
        backend = self.get_backend(path)
        return backend.isfile(path)

    @staticmethod
    def get_uri(path):
        if path.startswith("gs://"):
            path = path[5:]
        elif path.startswith("s3://"):
            path = path[len(path[:5].split("/", 1)[0]) + 5:]
        return urllib.parse.quote(path, safe='')

    def get_md5(self, path):
        backend = self.get_backend(path)
        return backend.get_md5(path)


io_tools = IoTools()


def load_file_safe(x, cache_dir=None, io=io_tools):
    """
    Load a file safely with and without tensorflow
    :param x: path to file
    :param cache_dir:
    :param io:
    :return:
    """
    try:
        assert x
        try:
            x = x.numpy()
        except AttributeError:
            pass
        buf = io.read_file(x, cache_dir)
        return buf or b''
    except Exception as ex:
        #log.error(str(np.array(x), "utf8"), exc_info=ex)
        return b''


def prompt_for_password(prompt="Password:"):
    password = getpass.getpass(prompt)
    if len(password) <= 1:
        warnings.warn(f"Did you mean to enter a password with length {len(password)}")
        if os.name == "nt":
            warnings.warn(f""" Beware of windows terminal bug, when pasting secrets into a Windows terminal using Ctrl+V
                In addition to Ctrl+V, Shift+Insert also doesn't work. This behavior is the same Command Prompt and PowerShell on Windows 10.

                Workarounds include:

                - Type the password
                - Clicking `Edit > Paste` from the window menu
                - Enabling `Properties > Options > Use Ctrl+Shift+C/V as Copy/Paste` from the menu
                - Use right-click to paste
                - Using the new Windows Terminal: https://www.microsoft.com/en-us/p/windows-terminal/9n0dx20hk701
                - provide the password as an environment variable - which is currently supported by web_api / PlatformAPI
            """)
    return password

Functions

def load_file_safe(x, cache_dir=None, io=<brevettiai.io.utils.IoTools object>)

Load a file safely with and without tensorflow :param x: path to file :param cache_dir: :param io: :return:

Expand source code
def load_file_safe(x, cache_dir=None, io=io_tools):
    """
    Load a file safely with and without tensorflow
    :param x: path to file
    :param cache_dir:
    :param io:
    :return:
    """
    try:
        assert x
        try:
            x = x.numpy()
        except AttributeError:
            pass
        buf = io.read_file(x, cache_dir)
        return buf or b''
    except Exception as ex:
        #log.error(str(np.array(x), "utf8"), exc_info=ex)
        return b''
def prompt_for_password(prompt='Password:')
Expand source code
def prompt_for_password(prompt="Password:"):
    password = getpass.getpass(prompt)
    if len(password) <= 1:
        warnings.warn(f"Did you mean to enter a password with length {len(password)}")
        if os.name == "nt":
            warnings.warn(f""" Beware of windows terminal bug, when pasting secrets into a Windows terminal using Ctrl+V
                In addition to Ctrl+V, Shift+Insert also doesn't work. This behavior is the same Command Prompt and PowerShell on Windows 10.

                Workarounds include:

                - Type the password
                - Clicking `Edit > Paste` from the window menu
                - Enabling `Properties > Options > Use Ctrl+Shift+C/V as Copy/Paste` from the menu
                - Use right-click to paste
                - Using the new Windows Terminal: https://www.microsoft.com/en-us/p/windows-terminal/9n0dx20hk701
                - provide the password as an environment variable - which is currently supported by web_api / PlatformAPI
            """)
    return password

Classes

class IoTools (cache_root=None, localio=<brevettiai.io.local_io.LocalIO object>, minio=<brevettiai.io.minio_io.MinioIO object>, max_cache_usage_fraction=0.8, path=<module 'brevettiai.io.path' from '/opt/atlassian/pipelines/agent/build/brevettiai/io/path.py'>)

:param cache_path: common cache path :param localio: path to local file storage backend :param minio: path to minio backend :param max_cache_usage_fraction: stop caching when exeeding this usage fraction

Expand source code
class IoTools:
    def __init__(self, cache_root=None, localio=LocalIO(), minio=MinioIO(),
                 max_cache_usage_fraction=0.8, path=path):
        """
        :param cache_path: common cache path
        :param localio: path to local file storage backend
        :param minio: path to minio backend
        :param max_cache_usage_fraction: stop caching when exeeding this usage fraction
        """
        self.minio = minio
        self.localio = localio
        self.cache_root = cache_root
        self.max_cache_usage_fraction = max_cache_usage_fraction
        self.path = path

    @staticmethod
    def factory(**kwargs):
        """
        Build IoTools with new backends
        :param args:
        :param kwargs:
        :return:
        """
        kwargs["minio"] = kwargs.get("minio", MinioIO())
        kwargs["localio"] = kwargs.get("localio", LocalIO())
        return IoTools(**kwargs)

    def set_cache_root(self, root):
        root = os.path.normpath(root)
        assert os.path.isdir(root), "Cache root must be a path to a directory"
        self.cache_root = root

    def get_backend(self, path):
        if path.startswith("s3://"):
            return self.minio
        else:
            return self.localio

    def resolve_access_rights(self, path, *args, **kwargs):
        backend = self.get_backend(path)
        backend.resolve_access_rights(path, *args, **kwargs)

    def read_file(self, path, cache=None, errors="raise"):
        try:
            path = path if isinstance(path, str) else str(path, "utf-8")
            backend = self.get_backend(path)

            cache_root = cache or self.cache_root
            if cache_root and backend.cache_files:
                return self.localio.file_cache(path, cache_root, backend.read, self.max_cache_usage_fraction)
            else:
                return backend.read(path)
        except Exception as ex:
            if errors == "raise":
                raise ex
            return None

    def write_file(self, path, content):
        return self.get_backend(path).write(path, content)

    def remove(self, path):
        return self.get_backend(path).remove(path)

    def copy(self, src, dst, *args, **kwargs):
        src_backend = self.get_backend(src)
        dst_backend = self.get_backend(dst)
        if src_backend == dst_backend:
            return src_backend.copy(src, dst, *args, **kwargs)
        else:
            return dst_backend.write(dst, src_backend.read(src))

    def move(self, src, dst, *args, **kwargs):
        if src == dst:
            return
        src_backend = self.get_backend(src)
        dst_backend = self.get_backend(dst)
        if src_backend == dst_backend:
            return src_backend.move(src, dst, *args, **kwargs)
        else:
            raise NotImplementedError("Cross origin move")

    def make_dirs(self, path):
        backend = self.get_backend(path)
        return backend.make_dirs(path)

    def walk(self, path, exclude_hidden=False, **kwargs):
        backend = self.get_backend(path)
        return backend.walk(path, exclude_hidden=exclude_hidden, **kwargs)

    def isfile(self, path):
        backend = self.get_backend(path)
        return backend.isfile(path)

    @staticmethod
    def get_uri(path):
        if path.startswith("gs://"):
            path = path[5:]
        elif path.startswith("s3://"):
            path = path[len(path[:5].split("/", 1)[0]) + 5:]
        return urllib.parse.quote(path, safe='')

    def get_md5(self, path):
        backend = self.get_backend(path)
        return backend.get_md5(path)

Static methods

def factory(**kwargs)

Build IoTools with new backends :param args: :param kwargs: :return:

Expand source code
@staticmethod
def factory(**kwargs):
    """
    Build IoTools with new backends
    :param args:
    :param kwargs:
    :return:
    """
    kwargs["minio"] = kwargs.get("minio", MinioIO())
    kwargs["localio"] = kwargs.get("localio", LocalIO())
    return IoTools(**kwargs)
def get_uri(path)
Expand source code
@staticmethod
def get_uri(path):
    if path.startswith("gs://"):
        path = path[5:]
    elif path.startswith("s3://"):
        path = path[len(path[:5].split("/", 1)[0]) + 5:]
    return urllib.parse.quote(path, safe='')

Methods

def copy(self, src, dst, *args, **kwargs)
Expand source code
def copy(self, src, dst, *args, **kwargs):
    src_backend = self.get_backend(src)
    dst_backend = self.get_backend(dst)
    if src_backend == dst_backend:
        return src_backend.copy(src, dst, *args, **kwargs)
    else:
        return dst_backend.write(dst, src_backend.read(src))
def get_backend(self, path)
Expand source code
def get_backend(self, path):
    if path.startswith("s3://"):
        return self.minio
    else:
        return self.localio
def get_md5(self, path)
Expand source code
def get_md5(self, path):
    backend = self.get_backend(path)
    return backend.get_md5(path)
def isfile(self, path)
Expand source code
def isfile(self, path):
    backend = self.get_backend(path)
    return backend.isfile(path)
def make_dirs(self, path)
Expand source code
def make_dirs(self, path):
    backend = self.get_backend(path)
    return backend.make_dirs(path)
def move(self, src, dst, *args, **kwargs)
Expand source code
def move(self, src, dst, *args, **kwargs):
    if src == dst:
        return
    src_backend = self.get_backend(src)
    dst_backend = self.get_backend(dst)
    if src_backend == dst_backend:
        return src_backend.move(src, dst, *args, **kwargs)
    else:
        raise NotImplementedError("Cross origin move")
def read_file(self, path, cache=None, errors='raise')
Expand source code
def read_file(self, path, cache=None, errors="raise"):
    try:
        path = path if isinstance(path, str) else str(path, "utf-8")
        backend = self.get_backend(path)

        cache_root = cache or self.cache_root
        if cache_root and backend.cache_files:
            return self.localio.file_cache(path, cache_root, backend.read, self.max_cache_usage_fraction)
        else:
            return backend.read(path)
    except Exception as ex:
        if errors == "raise":
            raise ex
        return None
def remove(self, path)
Expand source code
def remove(self, path):
    return self.get_backend(path).remove(path)
def resolve_access_rights(self, path, *args, **kwargs)
Expand source code
def resolve_access_rights(self, path, *args, **kwargs):
    backend = self.get_backend(path)
    backend.resolve_access_rights(path, *args, **kwargs)
def set_cache_root(self, root)
Expand source code
def set_cache_root(self, root):
    root = os.path.normpath(root)
    assert os.path.isdir(root), "Cache root must be a path to a directory"
    self.cache_root = root
def walk(self, path, exclude_hidden=False, **kwargs)
Expand source code
def walk(self, path, exclude_hidden=False, **kwargs):
    backend = self.get_backend(path)
    return backend.walk(path, exclude_hidden=exclude_hidden, **kwargs)
def write_file(self, path, content)
Expand source code
def write_file(self, path, content):
    return self.get_backend(path).write(path, content)