# -*- coding: utf-8 -*-
# utils/parameter.py
from builtins import str
from builtins import range
from builtins import object
import logging
import numpy as np
from utils import mixedmethod, isList, testfor, isInteger, classproperty
from bases.algorithm.parameter import ValueRangeError
from bases.algorithm import (ParameterBase, ParameterFloat,
ParameterNumerical, ParameterBoolean,
ParameterLog, ParameterString)
from bases.algorithm import Parameter
from bases.dataset import DataSet, DisplayMixin
def _makeProperty(varName):
def getter(selforcls):
return getattr(selforcls, varName)
return property(getter)
[docs]class Moments(object):
_intensity = None # partial intensity
_total = None
_mean = None # 1st moment
_variance = None # 2nd moment
_skew = None # 3rd moment
_kurtosis = None # 4th moment
# intermediate valid contribs indices for the given parameter range
_validRange = None
# create getter properties for scalar attributes
for varName in ("_intensity", "_total", "_mean", "_variance", "_skew",
"_kurtosis"):
locals()[varName[1:]] = _makeProperty(varName)
@staticmethod
[docs] def fieldNames():
"""Returns the field names in the same order as str()"""
return (
"totalValue", "totalValueStd",
"mean", "meanStd",
"variance", "varianceStd",
"skew", "skewStd",
"kurtosis", "kurtosisStd"
)
@property
def fields(self):
"""Tuple of member data incl. uncertainty for export."""
return (self._total + self._mean + self._variance
+ self._skew + self._kurtosis)
def __str__(self):
return str(self.fields)
def __repr__(self):
return str(self)
def __init__(self, contribs, paramIndex, valueRange, fraction, algo = None):
self._setValidRange(contribs[:, paramIndex, :], valueRange)
self._calcMoments(contribs[:, paramIndex, :], fraction)
if algo is not None:
scalingFactors = algo.result[paramIndex]['scalingFactors']
self._calcPartialIntensities(contribs, scalingFactors, algo)
def _setValidRange(self, contribs, valueRange):
"""Calculate contributions mask to be within the given value range.
- contribs: parameter value sets of shape
<number of contributions x number of repetitions>
"""
testfor(contribs.ndim == 2, ValueError)
numContribs, numReps = contribs.shape
self._validRange = np.zeros_like(contribs.T, dtype = bool)
for ri in range(numReps):
# the single set of R for this calculation
rset = contribs[:, ri]
self._validRange[ri] = ((rset > min(valueRange))
* (rset < max(valueRange)))
def _calcMoments(self, contribs, fraction):
"""Calculates the moments of the distribution of the current
particular (implied) parameter.
- *contribs*: parameter value sets of shape
<number of contributions x number of repetitions>
- *fraction*: number or volume fraction
"""
numContribs, numReps = contribs.shape
val = np.zeros(numReps)
mu = np.zeros(numReps)
var = np.zeros(numReps)
skw = np.zeros(numReps)
krt = np.zeros(numReps)
# loop over each repetition
for ri in range(numReps):
# the single set of R for this calculation
if not any(self._validRange[ri]):
continue # what to do if validRange is empty?
rset = contribs[self._validRange[ri], ri]
frac = fraction[self._validRange[ri], ri]
val[ri] = sum(frac)
mu[ri] = sum(rset * frac)
if 0 != sum(frac):
mu[ri] /= sum(frac)
var[ri] = sum( (rset-mu[ri])**2 * frac )/sum(frac)
sigma = np.sqrt(abs(var[ri]))
skw[ri] = ( sum( (rset-mu[ri])**3 * frac )
/ (sum(frac) * sigma**3))
krt[ri] = ( sum( (rset-mu[ri])**4 * frac )
/ (sum(frac) * sigma**4))
DDoF = 0
if numReps > 1: # prevent division by zero in numpy.std()
DDoF = 1
self._total = (val.mean(), val.std(ddof = DDoF))
self._mean = ( mu.mean(), mu.std(ddof = DDoF))
self._variance = (var.mean(), var.std(ddof = DDoF))
self._skew = (skw.mean(), skw.std(ddof = DDoF))
self._kurtosis = (krt.mean(), krt.std(ddof = DDoF))
#partial intensities not parameter-specific. Move to model/histogram?
def _calcPartialIntensities(self, contribs, scalingFactors, algo):
"""scalingFactors: scaling and background for each repetition.
contribs: all contributions also of other parameters; not required
once the results/contribs are stored in the model/parameters then
algo.calcModel() does not need them provided externally
"""
# now we can calculate the intensity contribution by the subset of
# spheres highlighted by the range:
numContribs, numParams, numReps = contribs.shape
data = algo.data
# loop over each repetition
partialIntensities = np.zeros((numReps, data.count))
# Intensity scaling factors for matching to the experimental
# scattering pattern (Amplitude A and flat background term b,
# defined in the paper)
for ri in range(numReps):
if not any(self._validRange[ri]):
continue # what to do if validRange is empty?
rset = contribs[self._validRange[ri], :, ri]
# compensated volume for each sphere in the set
it, vset = algo.calcModel(data, rset)
it = algo.model.smear(it)
# a set of volume fractions
partialIntensities[ri, :] = it.flatten() * scalingFactors[0, ri]
ddof = 0
if len(partialIntensities) > 1:
ddof = 1
self._intensity = (partialIntensities.mean(axis = 0),
partialIntensities.std(axis = 0, ddof = ddof))
[docs]class VectorResult(object):
"""Stores multiple populations of a single result data vector.
Calculates statistics at initialization."""
_mean = None
_std = None
_full = None
# some read-only attributes
@property
def mean(self):
return self._mean
@property
def std(self):
return self._std
@property
def full(self):
return self._full
def __init__(self, vecResult):
assert vecResult.ndim == 2 # 2 dim input
self._full = vecResult
self._mean = self._full.mean(axis = 1)
ddof = 0
if len(self._full) > 1:
ddof = 1
self._std = self._full.std(axis = 1, ddof = ddof)
# put this sketch here, for the moment can be placed in a separate file later
[docs]class Histogram(DataSet, DisplayMixin):
"""Stores histogram related settings of a parameter.
The results too, eventually(?). yes, please.
Stores&calculates rangeInfo() results for all available weighting options.
"""
_param = None # the FitParameter this histogram belongs to
_binCount = None # list of bin counts
_autoFollow = None # follows model parameter setting or not.
_xscale = None # list of scalings
_yweight = None # list of weightings
_xrange = None # list of tuples/pairs
# results: class HistogramResult?
_xLowerEdge = None
_xMean = None
_xWidth = None
_bins = None
_cdf = None
_observability = None
_moments = None
@property
def param(self):
return self._param
@param.setter
def param(self, newParam):
self._param = newParam
@property
def paramName(self):
return self.param.displayName()
@property
def binCount(self):
return self._binCount
@binCount.setter
def binCount(self, binCount):
testfor(isInteger(binCount) and binCount > 0,
ValueError,
"Histogram bin count has to be an integer larger 0!")
self._binCount = max(0, int(binCount))
@property
def autoFollow(self):
return self._autoFollow
@autoFollow.setter
def autoFollow(self, value):
try:
self._autoFollow = bool(value)
except:
raise
@property
def xscale(self):
return self._xscale
@xscale.setter
def xscale(self, kind):
"""Sets it to the first available option by default."""
self._xscale = str(kind).strip() # remove whitespace eventually
if self._xscale not in self.xscaling():
self._xscale = self.xscaling(1)
@property
def yweight(self):
return self._yweight
@yweight.setter
def yweight(self, kind):
self._yweight = str(kind).strip() # remove whitespace eventually
if self._yweight not in self.yweighting():
self._yweight = self.yweighting(0)
@property
def xrange(self):
return self._xrange
@xrange.setter
def xrange(self, valueRange):
lo, hi = min(valueRange), max(valueRange)
# restrict the provided range to the current range of the parameter
lo, hi = max(self.param.min(), lo), min(self.param.max(), hi)
self._xrange = lo, hi
@property
def lower(self):
return self._xrange[0]
@property
def upper(self):
return self._xrange[1]
@property
def lowerDisplay(self):
"""Lower limit in display units including the unit text."""
return "{0:g} ({1})".format(self._param.toDisplay(self.lower),
self._param.displayMagnitudeName())
@property
def upperDisplay(self):
"""Upper limit in display units including the unit text."""
return "{0:g} ({1})".format(self._param.toDisplay(self.upper),
self._param.displayMagnitudeName())
[docs] def updateRange(self):
"""Restricts histogram range according to changed parameter range
if needed. Checks histogram range against parameter limits."""
if self.autoFollow:
self.xrange = self.param.activeRange()
self.xrange = self.xrange # call getter & setter again to verify limits
@classproperty
@classmethod
[docs] def displayDataDescr(cls):
"""Descriptive text of fields for UI display."""
return (
"Parameter",
"Auto range",
"Lower",
"Upper",
"Number of bins",
"X-axis scaling",
"Y-axis weighting"
)
@classproperty
@classmethod
[docs] def displayData(cls):
"""Properties used for UI display."""
return (
"paramName",
"autoFollow",
"lowerDisplay",
"upperDisplay",
"binCount",
"xscale",
"yweight"
)
@staticmethod
[docs] def xscaling(index = None):
avail = ('lin', 'log')
try:
return avail[index]
except:
return avail
@staticmethod
[docs] def yweighting(index = None):
avail = ('vol', 'num', 'volsqr', 'surf')
try:
return avail[index]
except:
return avail
@property
def xLowerEdge(self):
return self._xLowerEdge
def _setXLowerEdge(self):
# Now bin whilst keeping track of which contribution ends up in
# which bin: set bin edge locations
if 'lin' in self.xscale:
# histogramXLowerEdge contains #histogramBins+1 bin edges,
# or class limits.
self._xLowerEdge = np.linspace(
self.lower, self.upper, self.binCount + 1)
else:
self._xLowerEdge = np.logspace(
np.log10(self.lower), np.log10(self.upper),
self.binCount + 1)
self._setXWidth()
self._setXMean()
@property
def xMean(self):
return self._xMean
def _setXMean(self):
if self.xLowerEdge is None:
return
# NOTE: isn't this the same as:
# self.xWidth * .5 + self.xLowerEdge
self._xMean = np.zeros(self.binCount)
for i in range(self.binCount):
self._xMean[i] = self.xLowerEdge[i:i+2].mean()
@property
def xWidth(self):
return self._xWidth
def _setXWidth(self):
if self.xLowerEdge is None:
return
self._xWidth = np.diff(self.xLowerEdge)
@property
def bins(self):
return self._bins
@property
def cdf(self):
return self._cdf
@property
def observability(self):
return self._observability
def _setObservability(self, allObservability):
self._observability = np.zeros(self.binCount)
if allObservability is None:
return
testfor(allObservability.shape[0] == self.binCount, ValueError)
for bi in range(self.binCount):
# for observabilities over all repetitions select the largest
obs = allObservability[bi, :]
try:
self._observability[bi] = obs[obs < np.inf].max()
except ValueError: # obs[obs < np.inf] can be empty
pass
@property
def moments(self):
return self._moments
def _binMask(self, bini, parValues):
# indexing which contributions fall into the radius bin
return ( (parValues >= self.xLowerEdge[bini])
* (parValues < self.xLowerEdge[bini + 1]))
[docs] def calc(self, contribs, paramIndex, fractions):
self._setXLowerEdge()
self._calcRepetitions(contribs, paramIndex, fractions)
def _calcRepetitions(self, contribs, paramIndex, fractions):
numContribs, dummy, numReps = contribs.shape
binLst, obsLst, cdfLst = [], [], []
fractions, minReq = fractions[self.yweight]
for ri in range(numReps):
parValues = contribs[:, paramIndex, ri]
bins, binObs, cdf = self._calcBins(
contribs, parValues, fractions[:, ri], minReq[:, ri])
binLst.append(bins)
obsLst.append(binObs)
cdfLst.append(cdf)
# set final result: y values, CDF and observability of all bins
self._bins = VectorResult(np.vstack(binLst).T)
self._cdf = VectorResult(np.vstack(cdfLst).T)
self._setObservability(np.vstack(obsLst).T)
self._moments = Moments(contribs, paramIndex, self.xrange, fractions)
def _calcBins(self, contribs, parValues, fraction, minReq):
"""Returns np arrays for the bin values, observability and the CDF
based on the bin values."""
# single set of R for this calculation
bins = np.zeros(self.binCount)
binObs = np.zeros(self.binCount)
for bi in range(self.binCount):
val, obs = self._calcBin(
self._binMask(bi, parValues),
fraction, minReq)
bins[bi] = val
binObs[bi] = obs
cdf = self._calcCDF(bins)
return bins, binObs, cdf
def _calcBin(self, binMask, fraction, minReq):
"""
*fraction*: overall fraction (number or volume, weighting depending)
"""
# y contains the volume fraction for that radius bin
binValue = sum(fraction[binMask])
if np.isnan(binValue):
binValue = 0.
# observability below
if not any(binMask):
binMinReq = 0.
else:
binMinReq = minReq[binMask].mean()
return binValue, binMinReq
def _calcCDF(self, bins):
cdf = np.zeros_like(bins)
cdf[0] = bins[0]
for i in range(1, len(cdf)):
cdf[i] = cdf[i - 1] + bins[i]
if cdf.max() == 0.0:
return np.zeros_like(bins)
cdf /= cdf.max() # normalized to max == 1
return cdf
def __str__(self):
out = ["hist"]
for attr in self.displayData:
val = getattr(self, attr)
out.append(str(val).replace(" ", ""))
idx = self.displayData.index("paramName")
# replace the parameter display name by its internal name
# (short, no spaces)
out[idx+1] = self.param.name()
return "-".join(out)
__repr__ = __str__
[docs] def hdfWrite(self, hdf):
props = list(self.integralProps())
props.remove("param") # we need its name only
hdf.writeAttribute("param", self.param.name())
hdf.writeMembers(self, *props)
def __hash__(self):
value = 0
if self.param is not None:
value = hash(id(self.param))
for attr in "lower", "upper", "binCount", "xscale", "yweight":
value ^= hash(getattr(self, attr)) # XORed
return value
def __eq__(self, other):
"""Compares with another Histogram or tuple."""
return hash(self) == hash(other)
def __neq__(self, other):
return not self.__eq__(other)
@classmethod
[docs] def integralProps(self):
"""All properties needed to properly serialize and restore this
histogram. Same order expected by the constructor."""
return ("param", "lower", "upper", "binCount", "xscale", "yweight",
"autoFollow")
def __init__(self, param, lower, upper, binCount = 50,
xscale = None, yweight = None, autoFollow = True):
"""Creates an histogram with default bin count, will be updated later."""
logging.debug('Hist init: {} [{}, {}]'
.format(param.name(), lower, upper))
super(Histogram, self).__init__(title = "({0}, {1})".format(lower, upper))
# add it to the parameter here
if isinstance(param, ParameterBase):
self.param = param # parameter we belong to is mandatory
self.binCount = int(binCount) # bin count is mandatory
self.xrange = (float(lower), float(upper))
# setter chose the first option available for invalid options
self.xscale = xscale
self.yweight = yweight
if not isinstance(autoFollow, bool):
autoFollow = (autoFollow.title() == "True")
self.autoFollow = autoFollow
# TODO: Function toString() or toJson() or similar which makes it
# serializable and also a classmethod which constructs it from serial data
[docs]class Histograms(list):
"""Manages a set of user configured histograms for evaluation after
monte-carlo run."""
[docs] def append(self, value):
if value in self:
return
super(Histograms, self).append(value)
[docs] def clear(self):
if not len(self):
return
self[:] = [] # delete all histograms
[docs] def updateRanges(self):
"""Updates ranges of all histograms."""
# work directly on the histograms, no copy
for i in range(len(self)):
self[i].updateRange()
[docs] def calc(self, *args):
# work directly on the histograms, no copy
for i in range(len(self)):
self[i].calc(*args)
[docs] def hdfWrite(self, hdf):
hdf.writeMembers(self, *list(range(len(self))))
[docs]def isFitParam(param):
return isinstance(param, FitParameterBase)
[docs]def isActiveFitParam(param):
"""Checks any type of parameter for activeness.
Shorter than that below or a try/except clause."""
return isFitParam(param) and param.isActive()
[docs]class FitParameterBase(ParameterBase):
"""Deriving parameters for curve fitting from
bases.algorithm.parameter to introduce more specific fit
related attributes."""
ParameterBase.addAttributes(locals(), isActive = False,
activeRange = None, histograms = Histograms(),
activeValues = list())
[docs] def hdfStoreAsMember(self):
return (super(FitParameterBase, self).hdfStoreAsMember()
+ ['histograms', 'activeValues'])
def __init__(self):
super(FitParameterBase, self).__init__()
# create a copy of histograms
# otherwise all instances share the class attribute
oldHists = self.histograms()
if isList(oldHists):
self.setHistograms(Histograms())
for h in oldHists:
# point the parameter reference to this instance now
# (instead of the type previously)
h.param = self
self.histograms().append(h)
# copy activeValues as well
oldActiveValues = self.activeValues()
if isList(oldActiveValues):
self.setActiveValues(oldActiveValues[:])
@mixedmethod
def setValueRange(selforcls, newRange):
super(FitParameterBase, selforcls).setValueRange(newRange)
selforcls.histograms().updateRanges()
@mixedmethod
def setIsActive(selforcls, isActive):
selforcls._isActive = bool(isActive)
setActive = setIsActive # alias
@mixedmethod
def setActiveRange(selforcls, newRange):
# tests nicked from above
testfor(len(newRange) == 2, ValueRangeError,
"Active ranges have to consist of two values!")
# always clip range settings to min/max values:
newRange = selforcls.clip(newRange)
# sets range for active fitting parameter limits
selforcls._activeRange = (min(newRange), max(newRange))
selforcls.histograms().updateRanges()
@mixedmethod
def activeRange(selforcls):
if selforcls._activeRange is None:
return selforcls.valueRange() # fallback
else:
return selforcls._activeRange
@mixedmethod
def displayActiveRange(selforcls):
"""value bounds in display units used for parameter generator"""
vRange = selforcls.activeRange()
try:
vRange = (selforcls.toDisplay(min(vRange)),
selforcls.toDisplay(max(vRange)))
except AttributeError:
pass # toDisplay() only in ParameterFloat
return vRange
@mixedmethod
def setDisplayActiveRange(selforcls, newRange):
"""sets value range after converting input from display to SI units"""
newRange = (selforcls.toSi(min(newRange)),
selforcls.toSi(max(newRange)))
selforcls.setActiveRange(newRange)
@mixedmethod
def activeVal(selforcls, val, index = None):
"""
activeVal is set after a successful MC run. It is a list of arrays,
with each array storing the parameter values of a successful run.
If index is supplied, only the array at that list index is returned,
otherwise the entire list is returned.
Used to store the fit results in each parameter.
"""
if index is None:
return selforcls.activeValues()
else:
return selforcls.activeValues()[
index%len(selforcls.activeValues())]
@mixedmethod
def setActiveVal(selforcls, val, index = None):
"""
Sets or appends to the list of activeVal. There *could* be a race
condition if two mcFit instances try to append at the same time.
Therefore, an index can be supplied to identify the list index to set
or change. Each mcFit instance should be assigned a serial (index)
number.
If the list is not long enough to accommodate the value, it will be
extended.
Used to store the fit results in each parameter.
"""
if not selforcls.isActive():
logging.error("Value of parameter cannot be set, "
"parameter not active!")
return
tempVal = selforcls.activeValues()
if index is None:
index = len(tempVal) # append to end by default
elif index < 0:
# index == -1 means the last, not yet existing entry
index = len(tempVal) + index + 1
while len(tempVal) <= index:
# expand list to allow storage of value
tempVal.append(None)
tempVal[index] = val
selforcls.setActiveValues(tempVal)
[docs]class FitParameterString(FitParameterBase, ParameterString):
pass
[docs]class FitParameterBoolean(FitParameterBase, ParameterBoolean):
pass
[docs]class FitParameterNumerical(FitParameterBase, ParameterNumerical):
def __str__(self):
lo, hi = self.displayActiveRange()
displayValue = u', active range: [{0}, {1}]'.format(lo, hi)
return (super(FitParameterNumerical, self).__str__()
+ u"{0} ({1})".format(displayValue, self.suffix()))
# ParameterNumerical has a generator, using the active fit range here
[docs] def generate(self, lower = None, upper = None, count = 1):
# self.activeRange() is always within self.valueRange() per definition
# therefore, using the parent implementation
if lower is None:
lower = min(self.activeRange())
if upper is None:
upper = max(self.activeRange())
return super(FitParameterNumerical, self).generate(
lower = lower, upper = upper,
# alternatively ignoring lower/upper args directly:
# lower = min(self.activeRange()),
# upper = max(self.activeRange()),
count = count)
[docs]class FitParameterFloat(FitParameterNumerical, ParameterFloat):
pass
[docs]class FitParameterLog(FitParameterBase, ParameterLog):
pass
[docs]def FitParameter(*args, **kwargs):
return Parameter(*args, paramTypes = (
FitParameterBoolean,
FitParameterFloat,
FitParameterNumerical,
FitParameterString,
FitParameterBase
), **kwargs)
# vim: set ts=4 sts=4 sw=4 tw=0: