Module brevettiai.io.utils
Expand source code
import os
import getpass
import warnings
from . import path
from .local_io import LocalIO
from .minio_io import MinioIO
import urllib
class IoTools:
def __init__(self, cache_root=None, localio=LocalIO(), minio=MinioIO(),
max_cache_usage_fraction=0.8, path=path):
"""
:param cache_path: common cache path
:param localio: path to local file storage backend
:param minio: path to minio backend
:param max_cache_usage_fraction: stop caching when exeeding this usage fraction
"""
self.minio = minio
self.localio = localio
self.cache_root = cache_root
self.max_cache_usage_fraction = max_cache_usage_fraction
self.path = path
@staticmethod
def factory(**kwargs):
"""
Build IoTools with new backends
:param args:
:param kwargs:
:return:
"""
kwargs["minio"] = kwargs.get("minio", MinioIO())
kwargs["localio"] = kwargs.get("localio", LocalIO())
return IoTools(**kwargs)
def set_cache_root(self, root):
root = os.path.normpath(root)
assert os.path.isdir(root), "Cache root must be a path to a directory"
self.cache_root = root
def get_backend(self, path):
if path.startswith("s3://"):
return self.minio
else:
return self.localio
def resolve_access_rights(self, path, *args, **kwargs):
backend = self.get_backend(path)
backend.resolve_access_rights(path, *args, **kwargs)
def read_file(self, path, cache=None, errors="raise"):
try:
path = path if isinstance(path, str) else str(path, "utf-8")
backend = self.get_backend(path)
cache_root = cache or self.cache_root
if cache_root and backend.cache_files:
return self.localio.file_cache(path, cache_root, backend.read, self.max_cache_usage_fraction)
else:
return backend.read(path)
except Exception as ex:
if errors == "raise":
raise ex
return None
def write_file(self, path, content):
return self.get_backend(path).write(path, content)
def remove(self, path):
return self.get_backend(path).remove(path)
def copy(self, src, dst, *args, **kwargs):
src_backend = self.get_backend(src)
dst_backend = self.get_backend(dst)
if src_backend == dst_backend:
return src_backend.copy(src, dst, *args, **kwargs)
else:
return dst_backend.write(dst, src_backend.read(src))
def move(self, src, dst, *args, **kwargs):
if src == dst:
return
src_backend = self.get_backend(src)
dst_backend = self.get_backend(dst)
if src_backend == dst_backend:
return src_backend.move(src, dst, *args, **kwargs)
else:
raise NotImplementedError("Cross origin move")
def make_dirs(self, path):
backend = self.get_backend(path)
return backend.make_dirs(path)
def walk(self, path, exclude_hidden=False, **kwargs):
backend = self.get_backend(path)
return backend.walk(path, exclude_hidden=exclude_hidden, **kwargs)
def isfile(self, path):
backend = self.get_backend(path)
return backend.isfile(path)
@staticmethod
def get_uri(path):
if path.startswith("gs://"):
path = path[5:]
elif path.startswith("s3://"):
path = path[len(path[:5].split("/", 1)[0]) + 5:]
return urllib.parse.quote(path, safe='')
def get_md5(self, path):
backend = self.get_backend(path)
return backend.get_md5(path)
io_tools = IoTools()
def load_file_safe(x, cache_dir=None, io=io_tools):
"""
Load a file safely with and without tensorflow
:param x: path to file
:param cache_dir:
:param io:
:return:
"""
try:
assert x
try:
x = x.numpy()
except AttributeError:
pass
buf = io.read_file(x, cache_dir)
return buf or b''
except Exception as ex:
#log.error(str(np.array(x), "utf8"), exc_info=ex)
return b''
def prompt_for_password(prompt="Password:"):
password = getpass.getpass(prompt)
if len(password) <= 1:
warnings.warn(f"Did you mean to enter a password with length {len(password)}")
if os.name == "nt":
warnings.warn(f""" Beware of windows terminal bug, when pasting secrets into a Windows terminal using Ctrl+V
In addition to Ctrl+V, Shift+Insert also doesn't work. This behavior is the same Command Prompt and PowerShell on Windows 10.
Workarounds include:
- Type the password
- Clicking `Edit > Paste` from the window menu
- Enabling `Properties > Options > Use Ctrl+Shift+C/V as Copy/Paste` from the menu
- Use right-click to paste
- Using the new Windows Terminal: https://www.microsoft.com/en-us/p/windows-terminal/9n0dx20hk701
- provide the password as an environment variable - which is currently supported by web_api / PlatformAPI
""")
return password
Functions
def load_file_safe(x, cache_dir=None, io=<brevettiai.io.utils.IoTools object>)
-
Load a file safely with and without tensorflow :param x: path to file :param cache_dir: :param io: :return:
Expand source code
def load_file_safe(x, cache_dir=None, io=io_tools): """ Load a file safely with and without tensorflow :param x: path to file :param cache_dir: :param io: :return: """ try: assert x try: x = x.numpy() except AttributeError: pass buf = io.read_file(x, cache_dir) return buf or b'' except Exception as ex: #log.error(str(np.array(x), "utf8"), exc_info=ex) return b''
def prompt_for_password(prompt='Password:')
-
Expand source code
def prompt_for_password(prompt="Password:"): password = getpass.getpass(prompt) if len(password) <= 1: warnings.warn(f"Did you mean to enter a password with length {len(password)}") if os.name == "nt": warnings.warn(f""" Beware of windows terminal bug, when pasting secrets into a Windows terminal using Ctrl+V In addition to Ctrl+V, Shift+Insert also doesn't work. This behavior is the same Command Prompt and PowerShell on Windows 10. Workarounds include: - Type the password - Clicking `Edit > Paste` from the window menu - Enabling `Properties > Options > Use Ctrl+Shift+C/V as Copy/Paste` from the menu - Use right-click to paste - Using the new Windows Terminal: https://www.microsoft.com/en-us/p/windows-terminal/9n0dx20hk701 - provide the password as an environment variable - which is currently supported by web_api / PlatformAPI """) return password
Classes
class IoTools (cache_root=None, localio=<brevettiai.io.local_io.LocalIO object>, minio=<brevettiai.io.minio_io.MinioIO object>, max_cache_usage_fraction=0.8, path=<module 'brevettiai.io.path' from '/opt/atlassian/pipelines/agent/build/brevettiai/io/path.py'>)
-
:param cache_path: common cache path :param localio: path to local file storage backend :param minio: path to minio backend :param max_cache_usage_fraction: stop caching when exeeding this usage fraction
Expand source code
class IoTools: def __init__(self, cache_root=None, localio=LocalIO(), minio=MinioIO(), max_cache_usage_fraction=0.8, path=path): """ :param cache_path: common cache path :param localio: path to local file storage backend :param minio: path to minio backend :param max_cache_usage_fraction: stop caching when exeeding this usage fraction """ self.minio = minio self.localio = localio self.cache_root = cache_root self.max_cache_usage_fraction = max_cache_usage_fraction self.path = path @staticmethod def factory(**kwargs): """ Build IoTools with new backends :param args: :param kwargs: :return: """ kwargs["minio"] = kwargs.get("minio", MinioIO()) kwargs["localio"] = kwargs.get("localio", LocalIO()) return IoTools(**kwargs) def set_cache_root(self, root): root = os.path.normpath(root) assert os.path.isdir(root), "Cache root must be a path to a directory" self.cache_root = root def get_backend(self, path): if path.startswith("s3://"): return self.minio else: return self.localio def resolve_access_rights(self, path, *args, **kwargs): backend = self.get_backend(path) backend.resolve_access_rights(path, *args, **kwargs) def read_file(self, path, cache=None, errors="raise"): try: path = path if isinstance(path, str) else str(path, "utf-8") backend = self.get_backend(path) cache_root = cache or self.cache_root if cache_root and backend.cache_files: return self.localio.file_cache(path, cache_root, backend.read, self.max_cache_usage_fraction) else: return backend.read(path) except Exception as ex: if errors == "raise": raise ex return None def write_file(self, path, content): return self.get_backend(path).write(path, content) def remove(self, path): return self.get_backend(path).remove(path) def copy(self, src, dst, *args, **kwargs): src_backend = self.get_backend(src) dst_backend = self.get_backend(dst) if src_backend == dst_backend: return src_backend.copy(src, dst, *args, **kwargs) else: return dst_backend.write(dst, src_backend.read(src)) def move(self, src, dst, *args, **kwargs): if src == dst: return src_backend = self.get_backend(src) dst_backend = self.get_backend(dst) if src_backend == dst_backend: return src_backend.move(src, dst, *args, **kwargs) else: raise NotImplementedError("Cross origin move") def make_dirs(self, path): backend = self.get_backend(path) return backend.make_dirs(path) def walk(self, path, exclude_hidden=False, **kwargs): backend = self.get_backend(path) return backend.walk(path, exclude_hidden=exclude_hidden, **kwargs) def isfile(self, path): backend = self.get_backend(path) return backend.isfile(path) @staticmethod def get_uri(path): if path.startswith("gs://"): path = path[5:] elif path.startswith("s3://"): path = path[len(path[:5].split("/", 1)[0]) + 5:] return urllib.parse.quote(path, safe='') def get_md5(self, path): backend = self.get_backend(path) return backend.get_md5(path)
Static methods
def factory(**kwargs)
-
Build IoTools with new backends :param args: :param kwargs: :return:
Expand source code
@staticmethod def factory(**kwargs): """ Build IoTools with new backends :param args: :param kwargs: :return: """ kwargs["minio"] = kwargs.get("minio", MinioIO()) kwargs["localio"] = kwargs.get("localio", LocalIO()) return IoTools(**kwargs)
def get_uri(path)
-
Expand source code
@staticmethod def get_uri(path): if path.startswith("gs://"): path = path[5:] elif path.startswith("s3://"): path = path[len(path[:5].split("/", 1)[0]) + 5:] return urllib.parse.quote(path, safe='')
Methods
def copy(self, src, dst, *args, **kwargs)
-
Expand source code
def copy(self, src, dst, *args, **kwargs): src_backend = self.get_backend(src) dst_backend = self.get_backend(dst) if src_backend == dst_backend: return src_backend.copy(src, dst, *args, **kwargs) else: return dst_backend.write(dst, src_backend.read(src))
def get_backend(self, path)
-
Expand source code
def get_backend(self, path): if path.startswith("s3://"): return self.minio else: return self.localio
def get_md5(self, path)
-
Expand source code
def get_md5(self, path): backend = self.get_backend(path) return backend.get_md5(path)
def isfile(self, path)
-
Expand source code
def isfile(self, path): backend = self.get_backend(path) return backend.isfile(path)
def make_dirs(self, path)
-
Expand source code
def make_dirs(self, path): backend = self.get_backend(path) return backend.make_dirs(path)
def move(self, src, dst, *args, **kwargs)
-
Expand source code
def move(self, src, dst, *args, **kwargs): if src == dst: return src_backend = self.get_backend(src) dst_backend = self.get_backend(dst) if src_backend == dst_backend: return src_backend.move(src, dst, *args, **kwargs) else: raise NotImplementedError("Cross origin move")
def read_file(self, path, cache=None, errors='raise')
-
Expand source code
def read_file(self, path, cache=None, errors="raise"): try: path = path if isinstance(path, str) else str(path, "utf-8") backend = self.get_backend(path) cache_root = cache or self.cache_root if cache_root and backend.cache_files: return self.localio.file_cache(path, cache_root, backend.read, self.max_cache_usage_fraction) else: return backend.read(path) except Exception as ex: if errors == "raise": raise ex return None
def remove(self, path)
-
Expand source code
def remove(self, path): return self.get_backend(path).remove(path)
def resolve_access_rights(self, path, *args, **kwargs)
-
Expand source code
def resolve_access_rights(self, path, *args, **kwargs): backend = self.get_backend(path) backend.resolve_access_rights(path, *args, **kwargs)
def set_cache_root(self, root)
-
Expand source code
def set_cache_root(self, root): root = os.path.normpath(root) assert os.path.isdir(root), "Cache root must be a path to a directory" self.cache_root = root
def walk(self, path, exclude_hidden=False, **kwargs)
-
Expand source code
def walk(self, path, exclude_hidden=False, **kwargs): backend = self.get_backend(path) return backend.walk(path, exclude_hidden=exclude_hidden, **kwargs)
def write_file(self, path, content)
-
Expand source code
def write_file(self, path, content): return self.get_backend(path).write(path, content)