Module brevettiai.io.local_io
Expand source code
import hashlib
import os
import shutil
import tempfile
import hashlib
class safe_open:
    def __init__(self, path, mode="w+b"):
        """
        Temporary file backed storage for safely writing to cache
        :param path:
        :param mode:
        """
        self.path = path
        self.mode = mode
    def __enter__(self):
        self._file = tempfile.NamedTemporaryFile(self.mode, delete=False)
        return self._file
    def __exit__(self, exc_type, exc_val, exc_tb):
        self._file.close()
        try:
            if exc_type is None:
                os.rename(self._file.name, self.path)
        except OSError:
            os.remove(self._file.name)
class LocalIO:
    def __init__(self):
        self.cache_files = False
    def resolve_access_rights(self, *args, **kwargs):
        pass
    @staticmethod
    def file_cache(path, cache_root, data_getter, max_cache_usage_fraction):
        """
        Cache file data to local file
        :param path: source path
        :param cache_root: root of cache location
        :param data_getter: function getting the data from th
        :param max_cache_usage_fraction: do not fill disk more than this fraction
        :return:
        """
        # Target
        cp = os.path.join(cache_root,
                          os.path.normpath(path.replace(":", "", 1)))
        if len(cp) > 260 and os.name == "nt":
            cp = os.path.join(cache_root, "file_path_hash", hashlib.sha1(path.replace(":", "", 1).encode()).hexdigest() + "." + path.split(".")[-1])
        # Cache hit
        if os.path.isfile(cp):
            with open(cp, 'rb') as fp:
                output = fp.read()
            return output
        # Load data
        output = data_getter(path)
        # Cache Update
        total, used, free = shutil.disk_usage(cache_root)
        if used / total < max_cache_usage_fraction:
            os.makedirs(os.path.dirname(cp), exist_ok=True)
            with safe_open(cp, 'wb') as fp:
                fp.write(output)
        return output
    @staticmethod
    def read(path):
        with open(path, 'rb') as fp:
            return fp.read()
    @staticmethod
    def write(path, content):
        if isinstance(content, (bytes, bytearray)):
            with open(path, 'wb') as fp:
                return fp.write(content)
        elif isinstance(content, str):
            with open(path, 'w') as fp:
                return fp.write(content)
        else:
            with open(path, 'wb') as fp:
                return fp.write(content.read())
    @staticmethod
    def remove(path):
        if not os.path.exists(path):
            raise IOError("Path: '%s' does not exist" % path)
        if os.path.isfile(path):
            os.remove(path)
        else:
            shutil.rmtree(path)
    @staticmethod
    def copy(src, dst, *args, **kwargs):
        return shutil.copyfile(src, dst, *args, **kwargs)
    @staticmethod
    def move(src, dst, *args, **kwargs):
        return shutil.move(src, dst, *args, **kwargs)
    @staticmethod
    def make_dirs(path, exist_ok=True):
        return os.makedirs(path, exist_ok=exist_ok)
    def walk(self, path, exclude_hidden=False, **kwargs):
        if exclude_hidden:
            yield from self.walk_visible(path)
        else:
            yield from os.walk(path)
    @staticmethod
    def walk_visible(path):
        for r, d, f in os.walk(path):
            d[:] = (x for x in d if not x.startswith("."))
            f[:] = (x for x in f if not x.startswith("."))
            yield r, d, f
    @staticmethod
    def isfile(path):
        return os.path.isfile(path)
    @staticmethod
    def get_md5(path):
        with open(path, "rb") as f:
            file_hash = hashlib.md5()
            buf = f.read(8192)
            while buf:
                file_hash.update(buf)
                buf = f.read(8192)
            return file_hash.hexdigest()
Classes
class LocalIO- 
Expand source code
class LocalIO: def __init__(self): self.cache_files = False def resolve_access_rights(self, *args, **kwargs): pass @staticmethod def file_cache(path, cache_root, data_getter, max_cache_usage_fraction): """ Cache file data to local file :param path: source path :param cache_root: root of cache location :param data_getter: function getting the data from th :param max_cache_usage_fraction: do not fill disk more than this fraction :return: """ # Target cp = os.path.join(cache_root, os.path.normpath(path.replace(":", "", 1))) if len(cp) > 260 and os.name == "nt": cp = os.path.join(cache_root, "file_path_hash", hashlib.sha1(path.replace(":", "", 1).encode()).hexdigest() + "." + path.split(".")[-1]) # Cache hit if os.path.isfile(cp): with open(cp, 'rb') as fp: output = fp.read() return output # Load data output = data_getter(path) # Cache Update total, used, free = shutil.disk_usage(cache_root) if used / total < max_cache_usage_fraction: os.makedirs(os.path.dirname(cp), exist_ok=True) with safe_open(cp, 'wb') as fp: fp.write(output) return output @staticmethod def read(path): with open(path, 'rb') as fp: return fp.read() @staticmethod def write(path, content): if isinstance(content, (bytes, bytearray)): with open(path, 'wb') as fp: return fp.write(content) elif isinstance(content, str): with open(path, 'w') as fp: return fp.write(content) else: with open(path, 'wb') as fp: return fp.write(content.read()) @staticmethod def remove(path): if not os.path.exists(path): raise IOError("Path: '%s' does not exist" % path) if os.path.isfile(path): os.remove(path) else: shutil.rmtree(path) @staticmethod def copy(src, dst, *args, **kwargs): return shutil.copyfile(src, dst, *args, **kwargs) @staticmethod def move(src, dst, *args, **kwargs): return shutil.move(src, dst, *args, **kwargs) @staticmethod def make_dirs(path, exist_ok=True): return os.makedirs(path, exist_ok=exist_ok) def walk(self, path, exclude_hidden=False, **kwargs): if exclude_hidden: yield from self.walk_visible(path) else: yield from os.walk(path) @staticmethod def walk_visible(path): for r, d, f in os.walk(path): d[:] = (x for x in d if not x.startswith(".")) f[:] = (x for x in f if not x.startswith(".")) yield r, d, f @staticmethod def isfile(path): return os.path.isfile(path) @staticmethod def get_md5(path): with open(path, "rb") as f: file_hash = hashlib.md5() buf = f.read(8192) while buf: file_hash.update(buf) buf = f.read(8192) return file_hash.hexdigest()Static methods
def copy(src, dst, *args, **kwargs)- 
Expand source code
@staticmethod def copy(src, dst, *args, **kwargs): return shutil.copyfile(src, dst, *args, **kwargs) def file_cache(path, cache_root, data_getter, max_cache_usage_fraction)- 
Cache file data to local file :param path: source path :param cache_root: root of cache location :param data_getter: function getting the data from th :param max_cache_usage_fraction: do not fill disk more than this fraction :return:
Expand source code
@staticmethod def file_cache(path, cache_root, data_getter, max_cache_usage_fraction): """ Cache file data to local file :param path: source path :param cache_root: root of cache location :param data_getter: function getting the data from th :param max_cache_usage_fraction: do not fill disk more than this fraction :return: """ # Target cp = os.path.join(cache_root, os.path.normpath(path.replace(":", "", 1))) if len(cp) > 260 and os.name == "nt": cp = os.path.join(cache_root, "file_path_hash", hashlib.sha1(path.replace(":", "", 1).encode()).hexdigest() + "." + path.split(".")[-1]) # Cache hit if os.path.isfile(cp): with open(cp, 'rb') as fp: output = fp.read() return output # Load data output = data_getter(path) # Cache Update total, used, free = shutil.disk_usage(cache_root) if used / total < max_cache_usage_fraction: os.makedirs(os.path.dirname(cp), exist_ok=True) with safe_open(cp, 'wb') as fp: fp.write(output) return output def get_md5(path)- 
Expand source code
@staticmethod def get_md5(path): with open(path, "rb") as f: file_hash = hashlib.md5() buf = f.read(8192) while buf: file_hash.update(buf) buf = f.read(8192) return file_hash.hexdigest() def isfile(path)- 
Expand source code
@staticmethod def isfile(path): return os.path.isfile(path) def make_dirs(path, exist_ok=True)- 
Expand source code
@staticmethod def make_dirs(path, exist_ok=True): return os.makedirs(path, exist_ok=exist_ok) def move(src, dst, *args, **kwargs)- 
Expand source code
@staticmethod def move(src, dst, *args, **kwargs): return shutil.move(src, dst, *args, **kwargs) def read(path)- 
Expand source code
@staticmethod def read(path): with open(path, 'rb') as fp: return fp.read() def remove(path)- 
Expand source code
@staticmethod def remove(path): if not os.path.exists(path): raise IOError("Path: '%s' does not exist" % path) if os.path.isfile(path): os.remove(path) else: shutil.rmtree(path) def walk_visible(path)- 
Expand source code
@staticmethod def walk_visible(path): for r, d, f in os.walk(path): d[:] = (x for x in d if not x.startswith(".")) f[:] = (x for x in f if not x.startswith(".")) yield r, d, f def write(path, content)- 
Expand source code
@staticmethod def write(path, content): if isinstance(content, (bytes, bytearray)): with open(path, 'wb') as fp: return fp.write(content) elif isinstance(content, str): with open(path, 'w') as fp: return fp.write(content) else: with open(path, 'wb') as fp: return fp.write(content.read()) 
Methods
def resolve_access_rights(self, *args, **kwargs)- 
Expand source code
def resolve_access_rights(self, *args, **kwargs): pass def walk(self, path, exclude_hidden=False, **kwargs)- 
Expand source code
def walk(self, path, exclude_hidden=False, **kwargs): if exclude_hidden: yield from self.walk_visible(path) else: yield from os.walk(path) 
 class safe_open (path, mode='w+b')- 
Temporary file backed storage for safely writing to cache :param path: :param mode:
Expand source code
class safe_open: def __init__(self, path, mode="w+b"): """ Temporary file backed storage for safely writing to cache :param path: :param mode: """ self.path = path self.mode = mode def __enter__(self): self._file = tempfile.NamedTemporaryFile(self.mode, delete=False) return self._file def __exit__(self, exc_type, exc_val, exc_tb): self._file.close() try: if exc_type is None: os.rename(self._file.name, self.path) except OSError: os.remove(self._file.name)