Source code for utils.hdf

# -*- coding: utf-8 -*-
# utils/hdf.py

"""
Helper functions for HDF5 functionality
"""

from builtins import str
from builtins import object
import logging
import inspect
import os.path
import h5py
import numpy as np
from utils import isCallable, isString, isList, isNumber, isInteger, classname

HDFString = str # Python 3 default
try:
    HDFString = unicode # fails with Python 3
except NameError:
    pass

[docs]def getCallerInfo(referenceType = None, stackOffset = 0): """*referenceType*: Stop the search for a frame when this type for a local 'self' is found. *stackOffset*: grab that frame counted from the last instead of search""" out = "" stack = inspect.stack() if isinstance(referenceType, type) and referenceType is not type(None): # search for the appropriate frame in user code frame = stack[stackOffset][0] while ('self' not in frame.f_locals or isinstance(frame.f_locals['self'], referenceType)): stackOffset += 1 frame = stack[stackOffset][0] else: stackOffset += 2 if len(stack) > stackOffset: frame = stack[stackOffset][0] head, fn = os.path.split(frame.f_code.co_filename) # func = inspect.getframeinfo(frame).function out = (u"({} l.{})".format(fn, frame.f_lineno)) return out
[docs]def HDFCleanup(infile): """Unused space is reclaimed by making a copy of the contents in the current hdf 5 file object, and moves the copy in place of the original. if the input argument supplied is a file name instead of an HDF5 object, the method returns nothing. Else, the method returns the new HDF5 object""" inputIsFilename = False if isString(infile): # a filename was supplied infile = h5py.File(infile) inputIsFilename = True def hdfDeepCopy(infile, outfilename): """Copies the internals of an open HDF5 file object (infile) to a second file.""" infile.flush() outfile = h5py.File(outfilename, "w") # create file, truncate if exists for item in infile: outfile.copy(infile[item], item) for attr_name in infile.attrs: outfile.attrs[attr_name] = infile.attrs[attr_name] outfile.close() # shutil.move is more flexible than os.rename, and can move across disks. origfname = infile.filename # generate a temporary file for the original backup tempfname = "{}.hdf5".format(str(uuid.uuid4().hex)) # temporary output filename tempofname = "{}.hdf5".format(str(uuid.uuid4().hex)) shutil.copy(origfname, tempfname) # backup copy try: hdfDeepCopy(infile, tempofname) # copy internals infile.close() # close old file shutil.move(tempofname, origfname) # replace with new except: # move back the backup shutil.move(tempfname, origfname) infile = h5py.File(origfname) # reopen new file # cleanup: for filename in [tempfname, tempofname]: try: os.remove(filename) except OSError: pass if not inputIsFilename: return infile
[docs]class HDFWriter(object): """Represents an open HDF file location in memory and keeps track of the current address/name for reading or writing. Once this object looses scope, its data is actually written to file.""" _handle = None _location = None def __init__(self, hdfHandle, rootLocation = None): self._handle = hdfHandle self._location = hdfHandle.name if rootLocation is not None: self._location = rootLocation def __enter__(self): """Implements a *with* statement context manager.""" self._handle.__enter__() return self def __exit__(self, *args, **kwargs): """Implements a *with* statement context manager.""" return self._handle.__exit__(*args, **kwargs) @classmethod
[docs] def open(cls, filename, rootLocation = None): return HDFWriter(h5py.File(filename, driver = 'core', backing_store = True), rootLocation)
@property def location(self): return self._location
[docs] def log(self, msg): logging.debug(u"[{}] {}".format(classname(self), msg))
def _writeLocation(self): if self.location not in self._handle: self._handle.require_group(self.location) return self._handle[self.location]
[docs] def writeAttributes(self, **kwargs): for key, value in kwargs.items(): # leading _ from keys are removed, they come from __getstate__() # which gathers member variables (usually private with leading _) self.writeAttribute(key.lstrip('_'), value)
[docs] def writeAttribute(self, key, value): if value is None: return if isString(value): value = HDFString(value) elif type(value) == type(True): # filter boolean value = np.int8(value) self.log("attribute '{loc}/{k}': '{v}'" .format(loc = self.location.rstrip('/'), k = key, v = value)) self._writeLocation().attrs[key] = value
[docs] def writeDataset(self, name, data): if not isList(data): self.log("dataset {} is not of a list type! (={})" .format(name, classname(data))) return shape = getattr(data, 'shape', "(len: {})".format(len(data))) self.log("dataset '{loc}/{name}' {shape}" .format(loc = self.location.rstrip('/'), name = name, shape = shape)) writeLocation = self._writeLocation() if name in writeLocation: del writeLocation[name] try: writeLocation.create_dataset(name, data = data, compression = "gzip") except TypeError: # a TypeError is raised for non-chunkable data (such as string) writeLocation.create_dataset(name, data = data)
[docs] def writeMembers(self, obj, *members): assert(len(members)) for member in members: self.writeMember(obj, member)
[docs] def writeMember(self, obj, memberName): if isString(obj): logging.warning(u"String as object provided! " + self._warningPrefix(obj, memberName)) if isInteger(memberName) and isList(obj): member = obj[memberName] memberName = str(memberName) else: member = getattr(obj, memberName, None) if member is None: self.log(u"skipped " + self._warningPrefix(obj, memberName) + u"It is empty or does not exist (=None).") return if isCallable(member) and not hasattr(member, "hdfWrite"): member = member() if hasattr(member, "hdfWrite"): # support instances and types # store the member in a group of its own oldLocation = self.location self._location = "/".join((oldLocation.rstrip('/'), memberName)) member.hdfWrite(self) # recursion entry, mind the loops! self._location = oldLocation elif isList(member): self.writeDataset(memberName, member) elif isString(member) or isNumber(member): self.writeAttribute(memberName, member) else: self.log(u"skipped " + self._warningPrefix(obj, memberName) + "(={}) It is not a compatible value type!" .format(classname(member)))
def _warningPrefix(self, obj, memberName): return (u"{cls}.{mem} {cal} ".format(cal = getCallerInfo(type(self)), cls = classname(obj), mem = memberName))
[docs]class HDFMixin(object):
[docs] def hdfStore(self, filename, rootLocation = None): """Writes itself to an HDF file at the given position or group.""" with HDFWriter.open(filename, rootLocation) as hdf: self._hdfWrite(hdf)
def _hdfWrite(self, hdf): """A private wrapper for custom hdfWrite() methods in sub classes. Allows to handle common preprocessing without requiring to call the method in the parent class.""" assert isinstance(hdf, HDFWriter) self.hdfWrite(hdf)
[docs] def hdfWrite(self, hdf): """To be overridden by sub classes to store themselves in an HDF structure. *hdf*: a HDFWriter instance.""" logging.warning(u"Not defined: " + hdf._warningPrefix(self, "hdfWrite()")) pass
@classmethod
[docs] def hdfLoad(self): """Restores an instance of this type from a given HDF file location or group.""" pass
# vim: set ts=4 sts=4 sw=4 tw=0: