Source code for sas.sascalc.dataloader.data_info

"""
    Module that contains classes to hold information read from
    reduced data files.

    A good description of the data members can be found in
    the CanSAS 1D XML data format:

    http://www.smallangles.net/wgwiki/index.php/cansas1d_documentation
"""
#####################################################################
# This software was developed by the University of Tennessee as part of the
# Distributed Data Analysis of Neutron Scattering Experiments (DANSE)
# project funded by the US National Science Foundation.
# See the license text in license.txt
# copyright 2008, University of Tennessee
######################################################################

# TODO: Keep track of data manipulation in the 'process' data structure.
# TODO: This module should be independent of plottables. We should write
#        an adapter class for plottables when needed.

from __future__ import print_function

import math
from math import fabs
import copy

import numpy as np

from sas.sascalc.data_util.uncertainty import Uncertainty


[docs]class plottable_1D(object): """ Data1D is a place holder for 1D plottables. """ # The presence of these should be mutually # exclusive with the presence of Qdev (dx) x = None y = None dx = None dy = None # Slit smearing length dxl = None # Slit smearing width dxw = None # SESANS specific params (wavelengths for spin echo length calculation) lam = None dlam = None # Units _xaxis = '' _xunit = '' _yaxis = '' _yunit = ''
[docs] def __init__(self, x, y, dx=None, dy=None, dxl=None, dxw=None, lam=None, dlam=None): self.x = np.asarray(x) self.y = np.asarray(y) if dx is not None: self.dx = np.asarray(dx) if dy is not None: self.dy = np.asarray(dy) if dxl is not None: self.dxl = np.asarray(dxl) if dxw is not None: self.dxw = np.asarray(dxw) if lam is not None: self.lam = np.asarray(lam) if dlam is not None: self.dlam = np.asarray(dlam)
[docs] def xaxis(self, label, unit): """ set the x axis label and unit """ self._xaxis = label self._xunit = unit
[docs] def yaxis(self, label, unit): """ set the y axis label and unit """ self._yaxis = label self._yunit = unit
[docs]class plottable_2D(object): """ Data2D is a place holder for 2D plottables. """ xmin = None xmax = None ymin = None ymax = None data = None qx_data = None qy_data = None q_data = None err_data = None dqx_data = None dqy_data = None mask = None x_bins = None y_bins = None # Units _xaxis = '' _xunit = '' _yaxis = '' _yunit = '' _zaxis = '' _zunit = ''
[docs] def __init__(self, data=None, err_data=None, qx_data=None, qy_data=None, q_data=None, mask=None, dqx_data=None, dqy_data=None, xmin=None, xmax=None, ymin=None, ymax=None, zmin=None, zmax=None, x_bins=None, y_bins=None): self.data = np.asarray(data) self.qx_data = np.asarray(qx_data) self.qy_data = np.asarray(qy_data) self.q_data = np.asarray(q_data) if mask is not None: self.mask = np.asarray(mask) else: self.mask = np.ones(self.data.shape, dtype=bool) if err_data is not None: self.err_data = np.asarray(err_data) if dqx_data is not None: self.dqx_data = np.asarray(dqx_data) if dqy_data is not None: self.dqy_data = np.asarray(dqy_data) # plot limits self.xmin = xmin self.xmax = xmax self.ymin = ymin self.ymax = ymax self.zmin = zmin self.zmax = zmax self.y_bins = x_bins if x_bins else [] self.x_bins = y_bins if y_bins else []
[docs] def xaxis(self, label, unit): """ set the x axis label and unit """ self._xaxis = label self._xunit = unit
[docs] def yaxis(self, label, unit): """ set the y axis label and unit """ self._yaxis = label self._yunit = unit
[docs] def zaxis(self, label, unit): """ set the z axis label and unit """ self._zaxis = label self._zunit = unit
[docs]class Vector(object): """ Vector class to hold multi-dimensional objects """ # x component x = None # y component y = None # z component z = None
[docs] def __init__(self, x=None, y=None, z=None): """ Initialization. Components that are not set a set to None by default. :param x: x component :param y: y component :param z: z component """ self.x = x self.y = y self.z = z
[docs] def __str__(self): msg = "x = %s\ty = %s\tz = %s" % (str(self.x), str(self.y), str(self.z)) return msg
[docs]class Detector(object): """ Class to hold detector information """ # Name of the instrument [string] name = None # Sample to detector distance [float] [mm] distance = None distance_unit = 'mm' # Offset of this detector position in X, Y, # (and Z if necessary) [Vector] [mm] offset = None offset_unit = 'm' # Orientation (rotation) of this detector in roll, # pitch, and yaw [Vector] [degrees] orientation = None orientation_unit = 'degree' # Center of the beam on the detector in X and Y # (and Z if necessary) [Vector] [mm] beam_center = None beam_center_unit = 'mm' # Pixel size in X, Y, (and Z if necessary) [Vector] [mm] pixel_size = None pixel_size_unit = 'mm' # Slit length of the instrument for this detector.[float] [mm] slit_length = None slit_length_unit = 'mm'
[docs] def __init__(self): """ Initialize class attribute that are objects... """ self.offset = Vector() self.orientation = Vector() self.beam_center = Vector() self.pixel_size = Vector()
[docs] def __str__(self): _str = "Detector:\n" _str += " Name: %s\n" % self.name _str += " Distance: %s [%s]\n" % \ (str(self.distance), str(self.distance_unit)) _str += " Offset: %s [%s]\n" % \ (str(self.offset), str(self.offset_unit)) _str += " Orientation: %s [%s]\n" % \ (str(self.orientation), str(self.orientation_unit)) _str += " Beam center: %s [%s]\n" % \ (str(self.beam_center), str(self.beam_center_unit)) _str += " Pixel size: %s [%s]\n" % \ (str(self.pixel_size), str(self.pixel_size_unit)) _str += " Slit length: %s [%s]\n" % \ (str(self.slit_length), str(self.slit_length_unit)) return _str
[docs]class Aperture(object): # Name name = None # Type type = None # Size name size_name = None # Aperture size [Vector] size = None size_unit = 'mm' # Aperture distance [float] distance = None distance_unit = 'mm'
[docs] def __init__(self): self.size = Vector()
[docs]class Collimation(object): """ Class to hold collimation information """ # Name name = None # Length [float] [mm] length = None length_unit = 'mm' # Aperture aperture = None
[docs] def __init__(self): self.aperture = []
[docs] def __str__(self): _str = "Collimation:\n" _str += " Length: %s [%s]\n" % \ (str(self.length), str(self.length_unit)) for item in self.aperture: _str += " Aperture size:%s [%s]\n" % \ (str(item.size), str(item.size_unit)) _str += " Aperture_dist:%s [%s]\n" % \ (str(item.distance), str(item.distance_unit)) return _str
[docs]class Source(object): """ Class to hold source information """ # Name name = None # Generic radiation type (Type and probe give more specific info) [string] radiation = None # Type and probe are only written to by the NXcanSAS reader # Specific radiation type (Synchotron X-ray, Reactor neutron, etc) [string] type = None # Radiation probe (generic probe such as neutron, x-ray, muon, etc) [string] probe = None # Beam size name beam_size_name = None # Beam size [Vector] [mm] beam_size = None beam_size_unit = 'mm' # Beam shape [string] beam_shape = None # Wavelength [float] [Angstrom] wavelength = None wavelength_unit = 'A' # Minimum wavelength [float] [Angstrom] wavelength_min = None wavelength_min_unit = 'nm' # Maximum wavelength [float] [Angstrom] wavelength_max = None wavelength_max_unit = 'nm' # Wavelength spread [float] [Angstrom] wavelength_spread = None wavelength_spread_unit = 'percent'
[docs] def __init__(self): self.beam_size = Vector()
[docs] def __str__(self): _str = "Source:\n" radiation = self.radiation if self.radiation is None and self.type and self.probe: radiation = self.type + " " + self.probe _str += " Radiation: %s\n" % str(radiation) _str += " Shape: %s\n" % str(self.beam_shape) _str += " Wavelength: %s [%s]\n" % \ (str(self.wavelength), str(self.wavelength_unit)) _str += " Waveln_min: %s [%s]\n" % \ (str(self.wavelength_min), str(self.wavelength_min_unit)) _str += " Waveln_max: %s [%s]\n" % \ (str(self.wavelength_max), str(self.wavelength_max_unit)) _str += " Waveln_spread:%s [%s]\n" % \ (str(self.wavelength_spread), str(self.wavelength_spread_unit)) _str += " Beam_size: %s [%s]\n" % \ (str(self.beam_size), str(self.beam_size_unit)) return _str
""" Definitions of radiation types """ NEUTRON = 'neutron' XRAY = 'x-ray' MUON = 'muon' ELECTRON = 'electron'
[docs]class Sample(object): """ Class to hold the sample description """ # Short name for sample name = '' # ID ID = '' # Thickness [float] [mm] thickness = None thickness_unit = 'mm' # Transmission [float] [fraction] transmission = None # Temperature [float] [No Default] temperature = None temperature_unit = None # Position [Vector] [mm] position = None position_unit = 'mm' # Orientation [Vector] [degrees] orientation = None orientation_unit = 'degree' # Details details = None # SESANS zacceptance zacceptance = (0,"") yacceptance = (0,"")
[docs] def __init__(self): self.position = Vector() self.orientation = Vector() self.details = []
[docs] def __str__(self): _str = "Sample:\n" _str += " ID: %s\n" % str(self.ID) _str += " Transmission: %s\n" % str(self.transmission) _str += " Thickness: %s [%s]\n" % \ (str(self.thickness), str(self.thickness_unit)) _str += " Temperature: %s [%s]\n" % \ (str(self.temperature), str(self.temperature_unit)) _str += " Position: %s [%s]\n" % \ (str(self.position), str(self.position_unit)) _str += " Orientation: %s [%s]\n" % \ (str(self.orientation), str(self.orientation_unit)) _str += " Details:\n" for item in self.details: _str += " %s\n" % item return _str
[docs]class Process(object): """ Class that holds information about the processes performed on the data. """ name = '' date = '' description = '' term = None notes = None
[docs] def __init__(self): self.term = [] self.notes = []
[docs] def is_empty(self): """ Return True if the object is empty """ return (len(self.name) == 0 and len(self.date) == 0 and len(self.description) == 0 and len(self.term) == 0 and len(self.notes) == 0)
[docs] def single_line_desc(self): """ Return a single line string representing the process """ return "%s %s %s" % (self.name, self.date, self.description)
[docs] def __str__(self): _str = "Process:\n" _str += " Name: %s\n" % self.name _str += " Date: %s\n" % self.date _str += " Description: %s\n" % self.description for item in self.term: _str += " Term: %s\n" % item for item in self.notes: _str += " Note: %s\n" % item return _str
[docs]class TransmissionSpectrum(object): """ Class that holds information about transmission spectrum for white beams and spallation sources. """ name = '' timestamp = '' # Wavelength (float) [A] wavelength = None wavelength_unit = 'A' # Transmission (float) [unit less] transmission = None transmission_unit = '' # Transmission Deviation (float) [unit less] transmission_deviation = None transmission_deviation_unit = ''
[docs] def __init__(self): self.wavelength = [] self.transmission = [] self.transmission_deviation = []
[docs] def __str__(self): _str = "Transmission Spectrum:\n" _str += " Name: \t{0}\n".format(self.name) _str += " Timestamp: \t{0}\n".format(self.timestamp) _str += " Wavelength unit: \t{0}\n".format(self.wavelength_unit) _str += " Transmission unit:\t{0}\n".format(self.transmission_unit) _str += " Trans. Dev. unit: \t{0}\n".format( self.transmission_deviation_unit) length_list = [len(self.wavelength), len(self.transmission), len(self.transmission_deviation)] _str += " Number of Pts: \t{0}\n".format(max(length_list)) return _str
[docs]class DataInfo(object): """ Class to hold the data read from a file. It includes four blocks of data for the instrument description, the sample description, the data itself and any other meta data. """ # Title title = '' # Run number run = None # Run name run_name = None # File name filename = '' # Notes notes = None # Processes (Action on the data) process = None # Instrument name instrument = '' # Detector information detector = None # Sample information sample = None # Source information source = None # Collimation information collimation = None # Transmission Spectrum INfo trans_spectrum = None # Additional meta-data meta_data = None # Loading errors errors = None # SESANS data check isSesans = None
[docs] def __init__(self): """ Initialization """ # Title self.title = '' # Run number self.run = [] self.run_name = {} # File name self.filename = '' # Notes self.notes = [] # Processes (Action on the data) self.process = [] # Instrument name self.instrument = '' # Detector information self.detector = [] # Sample information self.sample = Sample() # Source information self.source = Source() # Collimation information self.collimation = [] # Transmission Spectrum self.trans_spectrum = [] # Additional meta-data self.meta_data = {} # Loading errors self.errors = [] # SESANS data check self.isSesans = False
[docs] def append_empty_process(self): """ """ self.process.append(Process())
[docs] def add_notes(self, message=""): """ Add notes to datainfo """ self.notes.append(message)
[docs] def __str__(self): """ Nice printout """ _str = f"File: {self.filename}\n" _str += f"Title: {self.title}\n" _str += f"Run: {self.run}\n" _str += f"SESANS: {self.isSesans}\n" _str += f"Instrument: {self.instrument}\n" _str += f"{str(self.sample)}\n" _str += f"{str(self.source)}\n" for item in self.detector: _str += f"{str(item)}\n" for item in self.collimation: _str += f"{str(item)}\n" for item in self.process: _str += f"{str(item)}\n" for item in self.notes: _str += f"{str(item)}\n" for item in self.trans_spectrum: _str += f"{str(item)}\n" return _str
# Private method to perform operation. Not implemented for DataInfo, # but should be implemented for each data class inherited from DataInfo # that holds actual data (ex.: Data1D)
[docs] def _perform_operation(self, other, operation): """ Private method to perform operation. Not implemented for DataInfo, but should be implemented for each data class inherited from DataInfo that holds actual data (ex.: Data1D) """ return NotImplemented
[docs] def _perform_union(self, other): """ Private method to perform union operation. Not implemented for DataInfo, but should be implemented for each data class inherited from DataInfo that holds actual data (ex.: Data1D) """ return NotImplemented
[docs] def __add__(self, other): """ Add two data sets :param other: data set to add to the current one :return: new data set :raise ValueError: raised when two data sets are incompatible """ def operation(a, b): return a + b return self._perform_operation(other, operation)
[docs] def __radd__(self, other): """ Add two data sets :param other: data set to add to the current one :return: new data set :raise ValueError: raised when two data sets are incompatible """ def operation(a, b): return b + a return self._perform_operation(other, operation)
[docs] def __sub__(self, other): """ Subtract two data sets :param other: data set to subtract from the current one :return: new data set :raise ValueError: raised when two data sets are incompatible """ def operation(a, b): return a - b return self._perform_operation(other, operation)
[docs] def __rsub__(self, other): """ Subtract two data sets :param other: data set to subtract from the current one :return: new data set :raise ValueError: raised when two data sets are incompatible """ def operation(a, b): return b - a return self._perform_operation(other, operation)
[docs] def __mul__(self, other): """ Multiply two data sets :param other: data set to subtract from the current one :return: new data set :raise ValueError: raised when two data sets are incompatible """ def operation(a, b): return a * b return self._perform_operation(other, operation)
[docs] def __rmul__(self, other): """ Multiply two data sets :param other: data set to subtract from the current one :return: new data set :raise ValueError: raised when two data sets are incompatible """ def operation(a, b): return b * a return self._perform_operation(other, operation)
[docs] def __truediv__(self, other): """ Divided a data set by another :param other: data set that the current one is divided by :return: new data set :raise ValueError: raised when two data sets are incompatible """ def operation(a, b): return a/b return self._perform_operation(other, operation)
__div__ = __truediv__
[docs] def __rtruediv__(self, other): """ Divided a data set by another :param other: data set that the current one is divided by :return: new data set :raise ValueError: raised when two data sets are incompatible """ def operation(a, b): return b/a return self._perform_operation(other, operation)
__rdiv__ = __rtruediv__
[docs] def __or__(self, other): """ Union a data set with another :param other: data set to be unified :return: new data set :raise ValueError: raised when two data sets are incompatible """ return self._perform_union(other)
[docs] def __ror__(self, other): """ Union a data set with another :param other: data set to be unified :return: new data set :raise ValueError: raised when two data sets are incompatible """ return self._perform_union(other)
[docs]class Data1D(plottable_1D, DataInfo): """ 1D data class """
[docs] def __init__(self, x=None, y=None, dx=None, dy=None, lam=None, dlam=None, isSesans=False): DataInfo.__init__(self) plottable_1D.__init__(self, x, y, dx, dy, None, None, lam, dlam) self.isSesans = isSesans try: if self.isSesans: # the data is SESANS self.x_unit = 'A' self.y_unit = 'pol' elif not self.isSesans: # the data is SANS self.x_unit = '1/A' self.y_unit = '1/cm' except Exception: # the data is not recognized, notifying user raise TypeError('Check documentation for supported 1D data formats')
[docs] def __str__(self): """ Nice printout """ _str = "%s\n" % DataInfo.__str__(self) _str += "Data:\n" _str += " Type: %s\n" % self.__class__.__name__ _str += " X-axis: %s\t[%s]\n" % (self._xaxis, self._xunit) _str += " Y-axis: %s\t[%s]\n" % (self._yaxis, self._yunit) _str += " Length: %g\n" % len(self.x) return _str
[docs] def is_slit_smeared(self): """ Check whether the data has slit smearing information :return: True is slit smearing info is present, False otherwise """ def _check(v): return ((v.__class__ == list or v.__class__ == np.ndarray) and len(v) > 0 and min(v) > 0) return _check(self.dxl) or _check(self.dxw)
[docs] def clone_without_data(self, length=0, clone=None): """ Clone the current object, without copying the data (which will be filled out by a subsequent operation). The data arrays will be initialized to zero. :param length: length of the data array to be initialized :param clone: if provided, the data will be copied to clone """ from copy import deepcopy if clone is None or not issubclass(clone.__class__, Data1D): x = np.zeros(length) dx = np.zeros(length) y = np.zeros(length) dy = np.zeros(length) lam = np.zeros(length) dlam = np.zeros(length) clone = Data1D(x, y, lam=lam, dx=dx, dy=dy, dlam=dlam) clone.title = self.title clone.run = self.run clone.filename = self.filename clone.instrument = self.instrument clone.notes = deepcopy(self.notes) clone.process = deepcopy(self.process) clone.detector = deepcopy(self.detector) clone.sample = deepcopy(self.sample) clone.source = deepcopy(self.source) clone.collimation = deepcopy(self.collimation) clone.trans_spectrum = deepcopy(self.trans_spectrum) clone.meta_data = deepcopy(self.meta_data) clone.errors = deepcopy(self.errors) return clone
[docs] def copy_from_datainfo(self, data1d): """ copy values of Data1D of type DataLaoder.Data_info """ self.x = copy.deepcopy(data1d.x) self.y = copy.deepcopy(data1d.y) self.dy = copy.deepcopy(data1d.dy) if hasattr(data1d, "dx"): self.dx = copy.deepcopy(data1d.dx) if hasattr(data1d, "dxl"): self.dxl = copy.deepcopy(data1d.dxl) if hasattr(data1d, "dxw"): self.dxw = copy.deepcopy(data1d.dxw) self.xaxis(data1d._xaxis, data1d._xunit) self.yaxis(data1d._yaxis, data1d._yunit) self.title = data1d.title
[docs] def _validity_check(self, other): """ Checks that the data lengths are compatible. Checks that the x vectors are compatible. Returns errors vectors equal to original errors vectors if they were present or vectors of zeros when none was found. :param other: other data set for operation :return: dy for self, dy for other [numpy arrays] :raise ValueError: when lengths are not compatible """ dy_other = None if isinstance(other, Data1D): # Check that data lengths are the same if len(self.x) != len(other.x) or len(self.y) != len(other.y): msg = "Unable to perform operation: data length are not equal" raise ValueError(msg) # Here we could also extrapolate between data points TOLERANCE = 0.01 for i in range(len(self.x)): if fabs(self.x[i] - other.x[i]) > self.x[i]*TOLERANCE: msg = "Incompatible data sets: x-values do not match" raise ValueError(msg) # Check that the other data set has errors, otherwise # create zero vector dy_other = other.dy if other.dy is None or (len(other.dy) != len(other.y)): dy_other = np.zeros(len(other.y)) # Check that we have errors, otherwise create zero vector dy = self.dy if self.dy is None or (len(self.dy) != len(self.y)): dy = np.zeros(len(self.y)) return dy, dy_other
[docs] def _perform_operation(self, other, operation): """ """ # First, check the data compatibility dy, dy_other = self._validity_check(other) result = self.clone_without_data(len(self.x)) if self.dxw is None: result.dxw = None else: result.dxw = np.zeros(len(self.x)) if self.dxl is None: result.dxl = None else: result.dxl = np.zeros(len(self.x)) for i in range(len(self.x)): result.x[i] = self.x[i] if self.dx is not None and len(self.x) == len(self.dx): result.dx[i] = self.dx[i] if self.dxw is not None and len(self.x) == len(self.dxw): result.dxw[i] = self.dxw[i] if self.dxl is not None and len(self.x) == len(self.dxl): result.dxl[i] = self.dxl[i] a = Uncertainty(self.y[i], dy[i]**2) if isinstance(other, Data1D): b = Uncertainty(other.y[i], dy_other[i]**2) if other.dx is not None: result.dx[i] *= self.dx[i] result.dx[i] += (other.dx[i]**2) result.dx[i] /= 2 result.dx[i] = math.sqrt(result.dx[i]) if result.dxl is not None and other.dxl is not None: result.dxl[i] *= self.dxl[i] result.dxl[i] += (other.dxl[i]**2) result.dxl[i] /= 2 result.dxl[i] = math.sqrt(result.dxl[i]) else: b = other output = operation(a, b) result.y[i] = output.x result.dy[i] = math.sqrt(math.fabs(output.variance)) return result
[docs] def _validity_check_union(self, other): """ Checks that the data lengths are compatible. Checks that the x vectors are compatible. Returns errors vectors equal to original errors vectors if they were present or vectors of zeros when none was found. :param other: other data set for operation :return: bool :raise ValueError: when data types are not compatible """ if not isinstance(other, Data1D): msg = "Unable to perform operation: different types of data set" raise ValueError(msg) return True
[docs] def _perform_union(self, other): """ """ # First, check the data compatibility self._validity_check_union(other) result = self.clone_without_data(len(self.x) + len(other.x)) if self.dy is None or other.dy is None: result.dy = None else: result.dy = np.zeros(len(self.x) + len(other.x)) if self.dx is None or other.dx is None: result.dx = None else: result.dx = np.zeros(len(self.x) + len(other.x)) if self.dxw is None or other.dxw is None: result.dxw = None else: result.dxw = np.zeros(len(self.x) + len(other.x)) if self.dxl is None or other.dxl is None: result.dxl = None else: result.dxl = np.zeros(len(self.x) + len(other.x)) result.x = np.append(self.x, other.x) # argsorting ind = np.argsort(result.x) result.x = result.x[ind] result.y = np.append(self.y, other.y) result.y = result.y[ind] if result.dy is not None: result.dy = np.append(self.dy, other.dy) result.dy = result.dy[ind] if result.dx is not None: result.dx = np.append(self.dx, other.dx) result.dx = result.dx[ind] if result.dxw is not None: result.dxw = np.append(self.dxw, other.dxw) result.dxw = result.dxw[ind] if result.dxl is not None: result.dxl = np.append(self.dxl, other.dxl) result.dxl = result.dxl[ind] return result
[docs]class Data2D(plottable_2D, DataInfo): """ 2D data class """ # Units for Q-values Q_unit = '1/A' # Units for I(Q) values I_unit = '1/cm' # No 2D SESANS data as of yet. Always set it to False isSesans = False
[docs] def __init__(self, data=None, err_data=None, qx_data=None, qy_data=None, q_data=None, mask=None, dqx_data=None, dqy_data=None, xmin=None, xmax=None, ymin=None, ymax=None, zmin=None, zmax=None): DataInfo.__init__(self) plottable_2D.__init__(self, data=data, err_data=err_data, qx_data=qx_data, qy_data=qy_data, dqx_data=dqx_data, dqy_data=dqy_data, q_data=q_data, mask=mask, xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax, zmin=zmin, zmax=zmax) if len(self.detector) > 0: raise RuntimeError("Data2D: Detector bank already filled at init")
[docs] def __str__(self): _str = "%s\n" % DataInfo.__str__(self) _str += "Data:\n" _str += " Type: %s\n" % self.__class__.__name__ _str += " X-axis: %s\t[%s]\n" % (self._xaxis, self._xunit) _str += " Y-axis: %s\t[%s]\n" % (self._yaxis, self._yunit) _str += " Z-axis: %s\t[%s]\n" % (self._zaxis, self._zunit) _str += " Length: %g \n" % (len(self.data)) _str += " Shape: (%d, %d)\n" % (len(self.y_bins), len(self.x_bins)) return _str
[docs] def clone_without_data(self, length=0, clone=None): """ Clone the current object, without copying the data (which will be filled out by a subsequent operation). The data arrays will be initialized to zero. :param length: length of the data array to be initialized :param clone: if provided, the data will be copied to clone """ from copy import deepcopy if clone is None or not issubclass(clone.__class__, Data2D): data = np.zeros(length) err_data = np.zeros(length) qx_data = np.zeros(length) qy_data = np.zeros(length) q_data = np.zeros(length) mask = np.zeros(length) clone = Data2D(data=data, err_data=err_data, qx_data=qx_data, qy_data=qy_data, q_data=q_data, mask=mask) clone._xaxis = self._xaxis clone._yaxis = self._yaxis clone._zaxis = self._zaxis clone._xunit = self._xunit clone._yunit = self._yunit clone._zunit = self._zunit clone.x_bins = self.x_bins clone.y_bins = self.y_bins clone.title = self.title clone.run = self.run clone.filename = self.filename clone.instrument = self.instrument clone.notes = deepcopy(self.notes) clone.process = deepcopy(self.process) clone.detector = deepcopy(self.detector) clone.sample = deepcopy(self.sample) clone.source = deepcopy(self.source) clone.collimation = deepcopy(self.collimation) clone.trans_spectrum = deepcopy(self.trans_spectrum) clone.meta_data = deepcopy(self.meta_data) clone.errors = deepcopy(self.errors) return clone
[docs] def copy_from_datainfo(self, data2d): """ copy value of Data2D of type DataLoader.data_info """ self.data = copy.deepcopy(data2d.data) self.qx_data = copy.deepcopy(data2d.qx_data) self.qy_data = copy.deepcopy(data2d.qy_data) self.q_data = copy.deepcopy(data2d.q_data) self.mask = copy.deepcopy(data2d.mask) self.err_data = copy.deepcopy(data2d.err_data) self.x_bins = copy.deepcopy(data2d.x_bins) self.y_bins = copy.deepcopy(data2d.y_bins) if data2d.dqx_data is not None: self.dqx_data = copy.deepcopy(data2d.dqx_data) if data2d.dqy_data is not None: self.dqy_data = copy.deepcopy(data2d.dqy_data) self.xmin = data2d.xmin self.xmax = data2d.xmax self.ymin = data2d.ymin self.ymax = data2d.ymax if hasattr(data2d, "zmin"): self.zmin = data2d.zmin if hasattr(data2d, "zmax"): self.zmax = data2d.zmax self.xaxis(data2d._xaxis, data2d._xunit) self.yaxis(data2d._yaxis, data2d._yunit) self.title = data2d.title
[docs] def _validity_check(self, other): """ Checks that the data lengths are compatible. Checks that the x vectors are compatible. Returns errors vectors equal to original errors vectors if they were present or vectors of zeros when none was found. :param other: other data set for operation :return: dy for self, dy for other [numpy arrays] :raise ValueError: when lengths are not compatible """ err_other = None TOLERANCE = 0.01 msg_base = "Incompatible data sets: q-values do not match: " if isinstance(other, Data2D): # Check that data lengths are the same if (len(self.data) != len(other.data) or len(self.qx_data) != len(other.qx_data) or len(self.qy_data) != len(other.qy_data)): msg = "Unable to perform operation: data length are not equal" raise ValueError(msg) for ind in range(len(self.data)): if (fabs(self.qx_data[ind] - other.qx_data[ind]) > fabs(self.qx_data[ind])*TOLERANCE): msg = f"{msg_base}{self.qx_data[ind]} {other.qx_data[ind]}" raise ValueError(msg) if (fabs(self.qy_data[ind] - other.qy_data[ind]) > fabs(self.qy_data[ind])*TOLERANCE): msg = f"{msg_base}{self.qy_data[ind]} {other.qy_data[ind]}" raise ValueError(msg) # Check that the scales match err_other = other.err_data if (other.err_data is None or (len(other.err_data) != len(other.data))): err_other = np.zeros(len(other.data)) # Check that we have errors, otherwise create zero vector err = self.err_data if self.err_data is None or (len(self.err_data) != len(self.data)): err = np.zeros(len(other.data)) return err, err_other
[docs] def _perform_operation(self, other, operation): """ Perform 2D operations between data sets :param other: other data set :param operation: function defining the operation """ # First, check the data compatibility dy, dy_other = self._validity_check(other) result = self.clone_without_data(np.size(self.data)) if self.dqx_data is None or self.dqy_data is None: result.dqx_data = None result.dqy_data = None else: result.dqx_data = np.zeros(len(self.data)) result.dqy_data = np.zeros(len(self.data)) for i in range(np.size(self.data)): result.data[i] = self.data[i] if (self.err_data is not None and np.size(self.data) == np.size(self.err_data)): result.err_data[i] = self.err_data[i] if self.dqx_data is not None: result.dqx_data[i] = self.dqx_data[i] if self.dqy_data is not None: result.dqy_data[i] = self.dqy_data[i] result.qx_data[i] = self.qx_data[i] result.qy_data[i] = self.qy_data[i] result.q_data[i] = self.q_data[i] result.mask[i] = self.mask[i] a = Uncertainty(self.data[i], dy[i]**2) if isinstance(other, Data2D): b = Uncertainty(other.data[i], dy_other[i]**2) if other.dqx_data is not None and result.dqx_data is not None: result.dqx_data[i] *= self.dqx_data[i] result.dqx_data[i] += (other.dqx_data[i]**2) result.dqx_data[i] /= 2 result.dqx_data[i] = math.sqrt(result.dqx_data[i]) if other.dqy_data is not None and result.dqy_data is not None: result.dqy_data[i] *= self.dqy_data[i] result.dqy_data[i] += (other.dqy_data[i]**2) result.dqy_data[i] /= 2 result.dqy_data[i] = math.sqrt(result.dqy_data[i]) else: b = other output = operation(a, b) result.data[i] = output.x result.err_data[i] = math.sqrt(math.fabs(output.variance)) return result
[docs] @staticmethod def _validity_check_union(self, other): """ Checks that the data lengths are compatible. Checks that the x vectors are compatible. Returns errors vectors equal to original errors vectors if they were present or vectors of zeros when none was found. :param other: other data set for operation :return: bool :raise ValueError: when data types are not compatible """ if not isinstance(other, Data2D): msg = "Unable to perform operation: different types of data set" raise ValueError(msg) return True
[docs] def _perform_union(self, other): """ Perform 2D operations between data sets :param other: other data set :param operation: function defining the operation """ # First, check the data compatibility self._validity_check_union(other) result = self.clone_without_data(np.size(self.data) + np.size(other.data)) result.xmin = self.xmin result.xmax = self.xmax result.ymin = self.ymin result.ymax = self.ymax if (self.dqx_data is None or self.dqy_data is None or other.dqx_data is None or other.dqy_data is None): result.dqx_data = None result.dqy_data = None else: result.dqx_data = np.zeros(len(self.data) + np.size(other.data)) result.dqy_data = np.zeros(len(self.data) + np.size(other.data)) result.data = np.append(self.data, other.data) result.qx_data = np.append(self.qx_data, other.qx_data) result.qy_data = np.append(self.qy_data, other.qy_data) result.q_data = np.append(self.q_data, other.q_data) result.mask = np.append(self.mask, other.mask) if result.err_data is not None: result.err_data = np.append(self.err_data, other.err_data) if self.dqx_data is not None: result.dqx_data = np.append(self.dqx_data, other.dqx_data) if self.dqy_data is not None: result.dqy_data = np.append(self.dqy_data, other.dqy_data) return result
[docs]def combine_data_info_with_plottable(data, datainfo): """ A function that combines the DataInfo data in self.current_datainto with a plottable_1D or 2D data object. :param data: A plottable_1D or plottable_2D data object :param datainfo: A DataInfo object to be combined with the plottable :return: A fully specified Data1D or Data2D object """ if isinstance(data, plottable_1D): final_dataset = Data1D(data.x, data.y, isSesans=datainfo.isSesans) final_dataset.dx = data.dx final_dataset.dy = data.dy final_dataset.dxl = data.dxl final_dataset.dxw = data.dxw final_dataset.x_unit = data._xunit final_dataset.y_unit = data._yunit final_dataset.xaxis(data._xaxis, data._xunit) final_dataset.yaxis(data._yaxis, data._yunit) elif isinstance(data, plottable_2D): final_dataset = Data2D(data.data, data.err_data, data.qx_data, data.qy_data, data.q_data, data.mask, data.dqx_data, data.dqy_data) final_dataset.xaxis(data._xaxis, data._xunit) final_dataset.yaxis(data._yaxis, data._yunit) final_dataset.zaxis(data._zaxis, data._zunit) final_dataset.x_bins = data.x_bins final_dataset.y_bins = data.y_bins else: return_string = ("Should Never Happen: _combine_data_info_with_plottabl" "e input is not a plottable1d or plottable2d data " "object") return return_string if hasattr(data, "xmax"): final_dataset.xmax = data.xmax if hasattr(data, "ymax"): final_dataset.ymax = data.ymax if hasattr(data, "xmin"): final_dataset.xmin = data.xmin if hasattr(data, "ymin"): final_dataset.ymin = data.ymin final_dataset.isSesans = datainfo.isSesans final_dataset.title = datainfo.title final_dataset.run = datainfo.run final_dataset.run_name = datainfo.run_name final_dataset.filename = datainfo.filename final_dataset.notes = datainfo.notes final_dataset.process = datainfo.process final_dataset.instrument = datainfo.instrument final_dataset.detector = datainfo.detector final_dataset.sample = datainfo.sample final_dataset.source = datainfo.source final_dataset.collimation = datainfo.collimation final_dataset.trans_spectrum = datainfo.trans_spectrum final_dataset.meta_data = datainfo.meta_data final_dataset.errors = datainfo.errors return final_dataset