Module brevettiai.io.minio_io

Expand source code
import hashlib
import mimetypes
import os
from functools import partial
from io import BytesIO, SEEK_CUR

import backoff
import certifi
import urllib3
from minio import Minio
from minio.commonconfig import CopySource
from minio.error import S3Error

from . import path as io_path


def token_error_fallback(f, set_client):
    def _token_error_fallback(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except S3Error as ex:
            if ex.code == "ExpiredToken":
                client = set_client()

                # If put_object call and data is BytesIO seekable stream.
                # Move stream pointer back relatively to the content length before retrying
                if f.__name__ == "put_object":
                    data = kwargs.get("data", args[2])
                    length = kwargs.get("length", args[3])
                    if hasattr(data, "seek"):
                        data.seek(-length, SEEK_CUR)

                return getattr(client, f.__name__)(*args, **kwargs)
            raise ex

    return _token_error_fallback


class TSPoolManager(urllib3.PoolManager):
    """
    Fixed pool manager: https://github.com/urllib3/urllib3/issues/1252
    """
    def _new_pool(self, scheme, host, port, request_context=None):
        result = super()._new_pool(scheme, host, port, request_context)

        class PoolProxy:
            def __getattr__(self, item):
                return getattr(result, item)

            def close(self):
                pass

            def __del__(self):
                result.close()

        return PoolProxy()


class MinioIO:
    http_pool = TSPoolManager(
        num_pools=20,
        #timeout=5,  #urllib3.Timeout.DEFAULT_TIMEOUT,
        maxsize=10,
        #block=False,
        cert_reqs='CERT_REQUIRED',
        ca_certs=certifi.where(),
        retries=urllib3.Retry(
            total=0,
            backoff_factor=0.2,
            status_forcelist=[500, 502, 503, 504]
        )
    )

    def __init__(self, cache_files: bool = True, credentials=None):
        self.routes = {}
        self.cache_files = cache_files

        if credentials is None:
            self.credentials = credentials

    def client_factory(self, prefix, credentials_func):
        def _update_client():
            client = Minio(**credentials_func(), secure=True, http_client=self.http_pool)

            # Decorate all functions on client with token error fallback to recursively create new client
            for name in dir(client):
                func = getattr(client, name)
                if not name.startswith("_") and callable(func):
                    setattr(client, name, token_error_fallback(func, _update_client))

            # Update routes
            self.routes[prefix] = client
            return client

        return _update_client()

    def resolve_access_rights(self, path, *args, **kwargs):
        self.set_route(path, *args, **kwargs)

    def set_route(self, prefix, resource_id, resource_type, mode='r'):
        credentials_func = partial(self.credentials.get_credentials,
                                   resource_id, resource_type=resource_type, mode=mode)
        client = self.client_factory(prefix=prefix,
                                     credentials_func=credentials_func)
        return client

    def get_client(self, path):
        try:
            return next(v for k, v in self.routes.items() if path.startswith(k))
        except StopIteration:
            raise KeyError(f"Not able to match path '{path}' to storage route")

    @backoff.on_exception(backoff.expo, (S3Error, urllib3.exceptions.MaxRetryError), max_tries=5)
    def read(self, path, *, client=None):
        try:
            file_path = path[5:] if path.startswith("s3://") else path
            bucket, obj = file_path.split("/", 1)
            client = client or self.get_client(path)
            return client.get_object(bucket, obj).data
        except S3Error as err:
            if err.code == "NoSuchKey":
                raise KeyError(err)
            else:
                raise err

    @backoff.on_exception(backoff.expo, (S3Error, urllib3.exceptions.MaxRetryError), max_tries=5)
    def write(self, path, content, *, client=None):
        file_path = path[5:] if path.startswith("s3://") else path
        bucket = file_path.split("/", 1)
        bucket, obj = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])
        content_type = mimetypes.guess_type(obj)[0] or "application/octet"
        data = content.encode() if isinstance(content, str) else content
        data = BytesIO(data) if isinstance(data, bytes) else data
        length = data.seek(0, os.SEEK_END)
        client = client or self.get_client(path)
        data.seek(0)
        digest = hashlib.md5(data.getbuffer())
        return client.put_object(bucket, obj, data, length, content_type=content_type,
                                 metadata={"md5": digest.hexdigest()})

    def copy(self, src, dst, *args, **kwargs):
        srcclient = self.get_client(src)
        dstclient = self.get_client(dst)

        if srcclient == dstclient:
            src = src[5:] if src.startswith("s3://") else src
            dst = dst[5:] if dst.startswith("s3://") else dst

            bucket = dst.split("/", 1)
            bucketdst, dst = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])

            bucket = src.split("/", 1)
            bucketsrc, src = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])

            return srcclient.copy_object(bucketdst, dst, source=CopySource(bucketsrc, src), *args, **kwargs)
        else:
            return self.write(dst, client=dstclient, content=self.read(src, client=srcclient))

    def remove(self, path):
        client = self.get_client(path)
        file_path = path[5:] if path.startswith("s3://") else path
        bucket, obj = file_path.split("/", 1)
        return client.remove_object(bucket, obj)

    def move(self, src, dst, *args, **kwargs):
        self.copy(src, dst, *args, **kwargs)
        self.remove(src)

    def make_dirs(self, path, exist_ok=True):
        pass

    def isfile(self, path):
        try:
            self.stat_object(path)
            return True
        except S3Error as err:
            if err.code == "NoSuchKey":
                return False
            else:
                raise err

    def stat_object(self, path):
        obj_path = path[5:] if path.startswith("s3://") else path
        bucket = obj_path.split("/", 1)
        bucket, obj = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])
        client = self.get_client(path)
        return client.stat_object(bucket, obj)

    def walk(self, path, prefix=None, recursive=True, include_object=False, exclude_hidden=False, **kwargs):
        folder_path = path.rstrip("/") + "/"
        folder_path = folder_path[5:] if folder_path.startswith("s3://") else folder_path
        bucket = folder_path.split("/", 1)
        prefix = bucket[1] if prefix is None else "/".join((bucket[1], prefix)) if len(bucket) == 2 else prefix
        bucket = bucket[0]

        if exclude_hidden:
            kwargs["start_after"] = prefix + ".\uFFFD"

        client = self.get_client(path)
        objects = client.list_objects(bucket, prefix=prefix,
                                      recursive=recursive, **kwargs)

        name_start = len(prefix) if len(prefix) else 0
        s3_path = "s3://" + folder_path
        yield from self._walk_objects(objects, name_start, s3_path, include_object)

    @staticmethod
    def _walk_objects(objects, name_start, s3_path, include_object):
        last, base = None, None
        files = []
        for p in objects:
            if not p.is_dir:
                _obj = p.object_name[name_start:].rsplit("/", 1)
                try:
                    base, file = _obj
                except ValueError:
                    base, file = (None, _obj[0])
                out = (file, p) if include_object else file
                if base == last:
                    files.append(out)
                else:
                    # Yield folder
                    yield io_path.safe_join(s3_path, last), [], files
                    # Clean state for next yield
                    last = base
                    files = [out]
        else:
            yield io_path.safe_join(s3_path, last), [], files

    def get_md5(self, path):
        fobj = self.stat_object(path)
        try:
            return fobj.metadata["x-amz-meta-md5"]
        except KeyError:
            md5 = self.calculate_md5(path)
            self.copy(src=path, dst=path, metadata={**fobj.metadata, "md5": md5})
            return md5

    def calculate_md5(self, path):
        digest = hashlib.md5(self.read(path)).hexdigest()
        return digest

Functions

def token_error_fallback(f, set_client)
Expand source code
def token_error_fallback(f, set_client):
    def _token_error_fallback(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except S3Error as ex:
            if ex.code == "ExpiredToken":
                client = set_client()

                # If put_object call and data is BytesIO seekable stream.
                # Move stream pointer back relatively to the content length before retrying
                if f.__name__ == "put_object":
                    data = kwargs.get("data", args[2])
                    length = kwargs.get("length", args[3])
                    if hasattr(data, "seek"):
                        data.seek(-length, SEEK_CUR)

                return getattr(client, f.__name__)(*args, **kwargs)
            raise ex

    return _token_error_fallback

Classes

class MinioIO (cache_files: bool = True, credentials=None)
Expand source code
class MinioIO:
    http_pool = TSPoolManager(
        num_pools=20,
        #timeout=5,  #urllib3.Timeout.DEFAULT_TIMEOUT,
        maxsize=10,
        #block=False,
        cert_reqs='CERT_REQUIRED',
        ca_certs=certifi.where(),
        retries=urllib3.Retry(
            total=0,
            backoff_factor=0.2,
            status_forcelist=[500, 502, 503, 504]
        )
    )

    def __init__(self, cache_files: bool = True, credentials=None):
        self.routes = {}
        self.cache_files = cache_files

        if credentials is None:
            self.credentials = credentials

    def client_factory(self, prefix, credentials_func):
        def _update_client():
            client = Minio(**credentials_func(), secure=True, http_client=self.http_pool)

            # Decorate all functions on client with token error fallback to recursively create new client
            for name in dir(client):
                func = getattr(client, name)
                if not name.startswith("_") and callable(func):
                    setattr(client, name, token_error_fallback(func, _update_client))

            # Update routes
            self.routes[prefix] = client
            return client

        return _update_client()

    def resolve_access_rights(self, path, *args, **kwargs):
        self.set_route(path, *args, **kwargs)

    def set_route(self, prefix, resource_id, resource_type, mode='r'):
        credentials_func = partial(self.credentials.get_credentials,
                                   resource_id, resource_type=resource_type, mode=mode)
        client = self.client_factory(prefix=prefix,
                                     credentials_func=credentials_func)
        return client

    def get_client(self, path):
        try:
            return next(v for k, v in self.routes.items() if path.startswith(k))
        except StopIteration:
            raise KeyError(f"Not able to match path '{path}' to storage route")

    @backoff.on_exception(backoff.expo, (S3Error, urllib3.exceptions.MaxRetryError), max_tries=5)
    def read(self, path, *, client=None):
        try:
            file_path = path[5:] if path.startswith("s3://") else path
            bucket, obj = file_path.split("/", 1)
            client = client or self.get_client(path)
            return client.get_object(bucket, obj).data
        except S3Error as err:
            if err.code == "NoSuchKey":
                raise KeyError(err)
            else:
                raise err

    @backoff.on_exception(backoff.expo, (S3Error, urllib3.exceptions.MaxRetryError), max_tries=5)
    def write(self, path, content, *, client=None):
        file_path = path[5:] if path.startswith("s3://") else path
        bucket = file_path.split("/", 1)
        bucket, obj = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])
        content_type = mimetypes.guess_type(obj)[0] or "application/octet"
        data = content.encode() if isinstance(content, str) else content
        data = BytesIO(data) if isinstance(data, bytes) else data
        length = data.seek(0, os.SEEK_END)
        client = client or self.get_client(path)
        data.seek(0)
        digest = hashlib.md5(data.getbuffer())
        return client.put_object(bucket, obj, data, length, content_type=content_type,
                                 metadata={"md5": digest.hexdigest()})

    def copy(self, src, dst, *args, **kwargs):
        srcclient = self.get_client(src)
        dstclient = self.get_client(dst)

        if srcclient == dstclient:
            src = src[5:] if src.startswith("s3://") else src
            dst = dst[5:] if dst.startswith("s3://") else dst

            bucket = dst.split("/", 1)
            bucketdst, dst = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])

            bucket = src.split("/", 1)
            bucketsrc, src = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])

            return srcclient.copy_object(bucketdst, dst, source=CopySource(bucketsrc, src), *args, **kwargs)
        else:
            return self.write(dst, client=dstclient, content=self.read(src, client=srcclient))

    def remove(self, path):
        client = self.get_client(path)
        file_path = path[5:] if path.startswith("s3://") else path
        bucket, obj = file_path.split("/", 1)
        return client.remove_object(bucket, obj)

    def move(self, src, dst, *args, **kwargs):
        self.copy(src, dst, *args, **kwargs)
        self.remove(src)

    def make_dirs(self, path, exist_ok=True):
        pass

    def isfile(self, path):
        try:
            self.stat_object(path)
            return True
        except S3Error as err:
            if err.code == "NoSuchKey":
                return False
            else:
                raise err

    def stat_object(self, path):
        obj_path = path[5:] if path.startswith("s3://") else path
        bucket = obj_path.split("/", 1)
        bucket, obj = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])
        client = self.get_client(path)
        return client.stat_object(bucket, obj)

    def walk(self, path, prefix=None, recursive=True, include_object=False, exclude_hidden=False, **kwargs):
        folder_path = path.rstrip("/") + "/"
        folder_path = folder_path[5:] if folder_path.startswith("s3://") else folder_path
        bucket = folder_path.split("/", 1)
        prefix = bucket[1] if prefix is None else "/".join((bucket[1], prefix)) if len(bucket) == 2 else prefix
        bucket = bucket[0]

        if exclude_hidden:
            kwargs["start_after"] = prefix + ".\uFFFD"

        client = self.get_client(path)
        objects = client.list_objects(bucket, prefix=prefix,
                                      recursive=recursive, **kwargs)

        name_start = len(prefix) if len(prefix) else 0
        s3_path = "s3://" + folder_path
        yield from self._walk_objects(objects, name_start, s3_path, include_object)

    @staticmethod
    def _walk_objects(objects, name_start, s3_path, include_object):
        last, base = None, None
        files = []
        for p in objects:
            if not p.is_dir:
                _obj = p.object_name[name_start:].rsplit("/", 1)
                try:
                    base, file = _obj
                except ValueError:
                    base, file = (None, _obj[0])
                out = (file, p) if include_object else file
                if base == last:
                    files.append(out)
                else:
                    # Yield folder
                    yield io_path.safe_join(s3_path, last), [], files
                    # Clean state for next yield
                    last = base
                    files = [out]
        else:
            yield io_path.safe_join(s3_path, last), [], files

    def get_md5(self, path):
        fobj = self.stat_object(path)
        try:
            return fobj.metadata["x-amz-meta-md5"]
        except KeyError:
            md5 = self.calculate_md5(path)
            self.copy(src=path, dst=path, metadata={**fobj.metadata, "md5": md5})
            return md5

    def calculate_md5(self, path):
        digest = hashlib.md5(self.read(path)).hexdigest()
        return digest

Class variables

var http_pool

Methods

def calculate_md5(self, path)
Expand source code
def calculate_md5(self, path):
    digest = hashlib.md5(self.read(path)).hexdigest()
    return digest
def client_factory(self, prefix, credentials_func)
Expand source code
def client_factory(self, prefix, credentials_func):
    def _update_client():
        client = Minio(**credentials_func(), secure=True, http_client=self.http_pool)

        # Decorate all functions on client with token error fallback to recursively create new client
        for name in dir(client):
            func = getattr(client, name)
            if not name.startswith("_") and callable(func):
                setattr(client, name, token_error_fallback(func, _update_client))

        # Update routes
        self.routes[prefix] = client
        return client

    return _update_client()
def copy(self, src, dst, *args, **kwargs)
Expand source code
def copy(self, src, dst, *args, **kwargs):
    srcclient = self.get_client(src)
    dstclient = self.get_client(dst)

    if srcclient == dstclient:
        src = src[5:] if src.startswith("s3://") else src
        dst = dst[5:] if dst.startswith("s3://") else dst

        bucket = dst.split("/", 1)
        bucketdst, dst = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])

        bucket = src.split("/", 1)
        bucketsrc, src = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])

        return srcclient.copy_object(bucketdst, dst, source=CopySource(bucketsrc, src), *args, **kwargs)
    else:
        return self.write(dst, client=dstclient, content=self.read(src, client=srcclient))
def get_client(self, path)
Expand source code
def get_client(self, path):
    try:
        return next(v for k, v in self.routes.items() if path.startswith(k))
    except StopIteration:
        raise KeyError(f"Not able to match path '{path}' to storage route")
def get_md5(self, path)
Expand source code
def get_md5(self, path):
    fobj = self.stat_object(path)
    try:
        return fobj.metadata["x-amz-meta-md5"]
    except KeyError:
        md5 = self.calculate_md5(path)
        self.copy(src=path, dst=path, metadata={**fobj.metadata, "md5": md5})
        return md5
def isfile(self, path)
Expand source code
def isfile(self, path):
    try:
        self.stat_object(path)
        return True
    except S3Error as err:
        if err.code == "NoSuchKey":
            return False
        else:
            raise err
def make_dirs(self, path, exist_ok=True)
Expand source code
def make_dirs(self, path, exist_ok=True):
    pass
def move(self, src, dst, *args, **kwargs)
Expand source code
def move(self, src, dst, *args, **kwargs):
    self.copy(src, dst, *args, **kwargs)
    self.remove(src)
def read(self, path, *, client=None)
Expand source code
@backoff.on_exception(backoff.expo, (S3Error, urllib3.exceptions.MaxRetryError), max_tries=5)
def read(self, path, *, client=None):
    try:
        file_path = path[5:] if path.startswith("s3://") else path
        bucket, obj = file_path.split("/", 1)
        client = client or self.get_client(path)
        return client.get_object(bucket, obj).data
    except S3Error as err:
        if err.code == "NoSuchKey":
            raise KeyError(err)
        else:
            raise err
def remove(self, path)
Expand source code
def remove(self, path):
    client = self.get_client(path)
    file_path = path[5:] if path.startswith("s3://") else path
    bucket, obj = file_path.split("/", 1)
    return client.remove_object(bucket, obj)
def resolve_access_rights(self, path, *args, **kwargs)
Expand source code
def resolve_access_rights(self, path, *args, **kwargs):
    self.set_route(path, *args, **kwargs)
def set_route(self, prefix, resource_id, resource_type, mode='r')
Expand source code
def set_route(self, prefix, resource_id, resource_type, mode='r'):
    credentials_func = partial(self.credentials.get_credentials,
                               resource_id, resource_type=resource_type, mode=mode)
    client = self.client_factory(prefix=prefix,
                                 credentials_func=credentials_func)
    return client
def stat_object(self, path)
Expand source code
def stat_object(self, path):
    obj_path = path[5:] if path.startswith("s3://") else path
    bucket = obj_path.split("/", 1)
    bucket, obj = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])
    client = self.get_client(path)
    return client.stat_object(bucket, obj)
def walk(self, path, prefix=None, recursive=True, include_object=False, exclude_hidden=False, **kwargs)
Expand source code
def walk(self, path, prefix=None, recursive=True, include_object=False, exclude_hidden=False, **kwargs):
    folder_path = path.rstrip("/") + "/"
    folder_path = folder_path[5:] if folder_path.startswith("s3://") else folder_path
    bucket = folder_path.split("/", 1)
    prefix = bucket[1] if prefix is None else "/".join((bucket[1], prefix)) if len(bucket) == 2 else prefix
    bucket = bucket[0]

    if exclude_hidden:
        kwargs["start_after"] = prefix + ".\uFFFD"

    client = self.get_client(path)
    objects = client.list_objects(bucket, prefix=prefix,
                                  recursive=recursive, **kwargs)

    name_start = len(prefix) if len(prefix) else 0
    s3_path = "s3://" + folder_path
    yield from self._walk_objects(objects, name_start, s3_path, include_object)
def write(self, path, content, *, client=None)
Expand source code
@backoff.on_exception(backoff.expo, (S3Error, urllib3.exceptions.MaxRetryError), max_tries=5)
def write(self, path, content, *, client=None):
    file_path = path[5:] if path.startswith("s3://") else path
    bucket = file_path.split("/", 1)
    bucket, obj = tuple(bucket) if len(bucket) == 2 else ("", bucket[0])
    content_type = mimetypes.guess_type(obj)[0] or "application/octet"
    data = content.encode() if isinstance(content, str) else content
    data = BytesIO(data) if isinstance(data, bytes) else data
    length = data.seek(0, os.SEEK_END)
    client = client or self.get_client(path)
    data.seek(0)
    digest = hashlib.md5(data.getbuffer())
    return client.put_object(bucket, obj, data, length, content_type=content_type,
                             metadata={"md5": digest.hexdigest()})
class TSPoolManager (num_pools=10, headers=None, **connection_pool_kw)
Expand source code
class TSPoolManager(urllib3.PoolManager):
    """
    Fixed pool manager: https://github.com/urllib3/urllib3/issues/1252
    """
    def _new_pool(self, scheme, host, port, request_context=None):
        result = super()._new_pool(scheme, host, port, request_context)

        class PoolProxy:
            def __getattr__(self, item):
                return getattr(result, item)

            def close(self):
                pass

            def __del__(self):
                result.close()

        return PoolProxy()

Ancestors

  • urllib3.poolmanager.PoolManager
  • urllib3.request.RequestMethods