Module brevettiai.io.local_io
Expand source code
import hashlib
import os
import shutil
import tempfile
import hashlib
class safe_open:
def __init__(self, path, mode="w+b"):
"""
Temporary file backed storage for safely writing to cache
:param path:
:param mode:
"""
self.path = path
self.mode = mode
def __enter__(self):
self._file = tempfile.NamedTemporaryFile(self.mode, delete=False)
return self._file
def __exit__(self, exc_type, exc_val, exc_tb):
self._file.close()
try:
if exc_type is None:
os.rename(self._file.name, self.path)
except OSError:
os.remove(self._file.name)
class LocalIO:
def __init__(self):
self.cache_files = False
def resolve_access_rights(self, *args, **kwargs):
pass
@staticmethod
def file_cache(path, cache_root, data_getter, max_cache_usage_fraction):
"""
Cache file data to local file
:param path: source path
:param cache_root: root of cache location
:param data_getter: function getting the data from th
:param max_cache_usage_fraction: do not fill disk more than this fraction
:return:
"""
# Target
cp = os.path.join(cache_root,
os.path.normpath(path.replace(":", "", 1)))
if len(cp) > 260 and os.name == "nt":
cp = os.path.join(cache_root, "file_path_hash", hashlib.sha1(path.replace(":", "", 1).encode()).hexdigest() + "." + path.split(".")[-1])
# Cache hit
if os.path.isfile(cp):
with open(cp, 'rb') as fp:
output = fp.read()
return output
# Load data
output = data_getter(path)
# Cache Update
total, used, free = shutil.disk_usage(cache_root)
if used / total < max_cache_usage_fraction:
os.makedirs(os.path.dirname(cp), exist_ok=True)
with safe_open(cp, 'wb') as fp:
fp.write(output)
return output
@staticmethod
def read(path):
with open(path, 'rb') as fp:
return fp.read()
@staticmethod
def write(path, content):
if isinstance(content, (bytes, bytearray)):
with open(path, 'wb') as fp:
return fp.write(content)
elif isinstance(content, str):
with open(path, 'w') as fp:
return fp.write(content)
else:
with open(path, 'wb') as fp:
return fp.write(content.read())
@staticmethod
def remove(path):
if not os.path.exists(path):
raise IOError("Path: '%s' does not exist" % path)
if os.path.isfile(path):
os.remove(path)
else:
shutil.rmtree(path)
@staticmethod
def copy(src, dst, *args, **kwargs):
return shutil.copyfile(src, dst, *args, **kwargs)
@staticmethod
def move(src, dst, *args, **kwargs):
return shutil.move(src, dst, *args, **kwargs)
@staticmethod
def make_dirs(path, exist_ok=True):
return os.makedirs(path, exist_ok=exist_ok)
def walk(self, path, exclude_hidden=False, **kwargs):
if exclude_hidden:
yield from self.walk_visible(path)
else:
yield from os.walk(path)
@staticmethod
def walk_visible(path):
for r, d, f in os.walk(path):
d[:] = (x for x in d if not x.startswith("."))
f[:] = (x for x in f if not x.startswith("."))
yield r, d, f
@staticmethod
def isfile(path):
return os.path.isfile(path)
@staticmethod
def get_md5(path):
with open(path, "rb") as f:
file_hash = hashlib.md5()
buf = f.read(8192)
while buf:
file_hash.update(buf)
buf = f.read(8192)
return file_hash.hexdigest()
Classes
class LocalIO
-
Expand source code
class LocalIO: def __init__(self): self.cache_files = False def resolve_access_rights(self, *args, **kwargs): pass @staticmethod def file_cache(path, cache_root, data_getter, max_cache_usage_fraction): """ Cache file data to local file :param path: source path :param cache_root: root of cache location :param data_getter: function getting the data from th :param max_cache_usage_fraction: do not fill disk more than this fraction :return: """ # Target cp = os.path.join(cache_root, os.path.normpath(path.replace(":", "", 1))) if len(cp) > 260 and os.name == "nt": cp = os.path.join(cache_root, "file_path_hash", hashlib.sha1(path.replace(":", "", 1).encode()).hexdigest() + "." + path.split(".")[-1]) # Cache hit if os.path.isfile(cp): with open(cp, 'rb') as fp: output = fp.read() return output # Load data output = data_getter(path) # Cache Update total, used, free = shutil.disk_usage(cache_root) if used / total < max_cache_usage_fraction: os.makedirs(os.path.dirname(cp), exist_ok=True) with safe_open(cp, 'wb') as fp: fp.write(output) return output @staticmethod def read(path): with open(path, 'rb') as fp: return fp.read() @staticmethod def write(path, content): if isinstance(content, (bytes, bytearray)): with open(path, 'wb') as fp: return fp.write(content) elif isinstance(content, str): with open(path, 'w') as fp: return fp.write(content) else: with open(path, 'wb') as fp: return fp.write(content.read()) @staticmethod def remove(path): if not os.path.exists(path): raise IOError("Path: '%s' does not exist" % path) if os.path.isfile(path): os.remove(path) else: shutil.rmtree(path) @staticmethod def copy(src, dst, *args, **kwargs): return shutil.copyfile(src, dst, *args, **kwargs) @staticmethod def move(src, dst, *args, **kwargs): return shutil.move(src, dst, *args, **kwargs) @staticmethod def make_dirs(path, exist_ok=True): return os.makedirs(path, exist_ok=exist_ok) def walk(self, path, exclude_hidden=False, **kwargs): if exclude_hidden: yield from self.walk_visible(path) else: yield from os.walk(path) @staticmethod def walk_visible(path): for r, d, f in os.walk(path): d[:] = (x for x in d if not x.startswith(".")) f[:] = (x for x in f if not x.startswith(".")) yield r, d, f @staticmethod def isfile(path): return os.path.isfile(path) @staticmethod def get_md5(path): with open(path, "rb") as f: file_hash = hashlib.md5() buf = f.read(8192) while buf: file_hash.update(buf) buf = f.read(8192) return file_hash.hexdigest()
Static methods
def copy(src, dst, *args, **kwargs)
-
Expand source code
@staticmethod def copy(src, dst, *args, **kwargs): return shutil.copyfile(src, dst, *args, **kwargs)
def file_cache(path, cache_root, data_getter, max_cache_usage_fraction)
-
Cache file data to local file :param path: source path :param cache_root: root of cache location :param data_getter: function getting the data from th :param max_cache_usage_fraction: do not fill disk more than this fraction :return:
Expand source code
@staticmethod def file_cache(path, cache_root, data_getter, max_cache_usage_fraction): """ Cache file data to local file :param path: source path :param cache_root: root of cache location :param data_getter: function getting the data from th :param max_cache_usage_fraction: do not fill disk more than this fraction :return: """ # Target cp = os.path.join(cache_root, os.path.normpath(path.replace(":", "", 1))) if len(cp) > 260 and os.name == "nt": cp = os.path.join(cache_root, "file_path_hash", hashlib.sha1(path.replace(":", "", 1).encode()).hexdigest() + "." + path.split(".")[-1]) # Cache hit if os.path.isfile(cp): with open(cp, 'rb') as fp: output = fp.read() return output # Load data output = data_getter(path) # Cache Update total, used, free = shutil.disk_usage(cache_root) if used / total < max_cache_usage_fraction: os.makedirs(os.path.dirname(cp), exist_ok=True) with safe_open(cp, 'wb') as fp: fp.write(output) return output
def get_md5(path)
-
Expand source code
@staticmethod def get_md5(path): with open(path, "rb") as f: file_hash = hashlib.md5() buf = f.read(8192) while buf: file_hash.update(buf) buf = f.read(8192) return file_hash.hexdigest()
def isfile(path)
-
Expand source code
@staticmethod def isfile(path): return os.path.isfile(path)
def make_dirs(path, exist_ok=True)
-
Expand source code
@staticmethod def make_dirs(path, exist_ok=True): return os.makedirs(path, exist_ok=exist_ok)
def move(src, dst, *args, **kwargs)
-
Expand source code
@staticmethod def move(src, dst, *args, **kwargs): return shutil.move(src, dst, *args, **kwargs)
def read(path)
-
Expand source code
@staticmethod def read(path): with open(path, 'rb') as fp: return fp.read()
def remove(path)
-
Expand source code
@staticmethod def remove(path): if not os.path.exists(path): raise IOError("Path: '%s' does not exist" % path) if os.path.isfile(path): os.remove(path) else: shutil.rmtree(path)
def walk_visible(path)
-
Expand source code
@staticmethod def walk_visible(path): for r, d, f in os.walk(path): d[:] = (x for x in d if not x.startswith(".")) f[:] = (x for x in f if not x.startswith(".")) yield r, d, f
def write(path, content)
-
Expand source code
@staticmethod def write(path, content): if isinstance(content, (bytes, bytearray)): with open(path, 'wb') as fp: return fp.write(content) elif isinstance(content, str): with open(path, 'w') as fp: return fp.write(content) else: with open(path, 'wb') as fp: return fp.write(content.read())
Methods
def resolve_access_rights(self, *args, **kwargs)
-
Expand source code
def resolve_access_rights(self, *args, **kwargs): pass
def walk(self, path, exclude_hidden=False, **kwargs)
-
Expand source code
def walk(self, path, exclude_hidden=False, **kwargs): if exclude_hidden: yield from self.walk_visible(path) else: yield from os.walk(path)
class safe_open (path, mode='w+b')
-
Temporary file backed storage for safely writing to cache :param path: :param mode:
Expand source code
class safe_open: def __init__(self, path, mode="w+b"): """ Temporary file backed storage for safely writing to cache :param path: :param mode: """ self.path = path self.mode = mode def __enter__(self): self._file = tempfile.NamedTemporaryFile(self.mode, delete=False) return self._file def __exit__(self, exc_type, exc_val, exc_tb): self._file.close() try: if exc_type is None: os.rename(self._file.name, self.path) except OSError: os.remove(self._file.name)