#
# LSST Data Management System
# Copyright 2008, 2009, 2010 LSST Corporation.
#
# This product includes software developed by the
# LSST Project (http://www.lsst.org/).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the LSST License Statement and
# the GNU General Public License along with this program. If not,
# see <http://www.lsstcorp.org/LegalNotices/>.
#
from collections import OrderedDict
import os
import re
from lsst.daf.base import PropertySet
from lsst.daf.persistence import ButlerLocation, NoResults
from lsst.daf.persistence.policy import Policy
import lsst.pex.policy as pexPolicy
__all__ = ["Mapping", "ImageMapping", "ExposureMapping", "CalibrationMapping", "DatasetMapping"]
[docs]class Mapping(object):
"""Mapping is a base class for all mappings. Mappings are used by
the Mapper to map (determine a path to some data given some
identifiers) and standardize (convert data into some standard
format or type) data, and to query the associated registry to see
what data is available.
Subclasses must specify self.storage or else override self.map().
Public methods: lookup, have, need, getKeys, map
Mappings are specified mainly by policy. A Mapping policy should
consist of:
template (string): a Python string providing the filename for that
particular dataset type based on some data identifiers. In the
case of redundancy in the path (e.g., file uniquely specified by
the exposure number, but filter in the path), the
redundant/dependent identifiers can be looked up in the registry.
python (string): the Python type for the retrieved data (e.g.
lsst.afw.image.ExposureF)
persistable (string): the Persistable registration for the on-disk data
(e.g. ImageU)
storage (string, optional): Storage type for this dataset type (e.g.
"BoostStorage")
level (string, optional): the level in the camera hierarchy at which the
data is stored (Amp, Ccd or skyTile), if relevant
tables (string, optional): a whitespace-delimited list of tables in the
registry that can be NATURAL JOIN-ed to look up additional
information.
Parameters
----------
datasetType : `str`
Butler dataset type to be mapped.
policy : `daf_persistence.Policy` or `pexPolicy.Policy`
Mapping Policy. (pexPolicy only for backward compatibility)
registry : `lsst.obs.base.Registry`
Registry for metadata lookups.
rootStorage : Storage subclass instance
Interface to persisted repository data.
provided : `list` of `str`
Keys provided by the mapper.
"""
def __init__(self, datasetType, policy, registry, rootStorage, provided=None):
if policy is None:
raise RuntimeError("No policy provided for mapping")
if isinstance(policy, pexPolicy.Policy):
policy = Policy(policy)
self.datasetType = datasetType
self.registry = registry
self.rootStorage = rootStorage
self._template = policy['template'] # Template path
# in most cases, the template can not be used if it is empty, and is accessed via a property that will
# raise if it is used while `not self._template`. In this case we *do* allow it to be empty, for the
# purpose of fetching the key dict so that the mapping can be constructed, so that it can raise if
# it's invalid. I know it's a little odd, but it allows this template check to be introduced without a
# major refactor.
if self._template:
self.keyDict = dict([
(k, _formatMap(v, k, datasetType))
for k, v in
re.findall(r'\%\((\w+)\).*?([diouxXeEfFgGcrs])', self.template)
])
else:
self.keyDict = {}
if provided is not None:
for p in provided:
if p in self.keyDict:
del self.keyDict[p]
self.python = policy['python'] # Python type
self.persistable = policy['persistable'] # Persistable type
self.storage = policy['storage']
if 'level' in policy:
self.level = policy['level'] # Level in camera hierarchy
if 'tables' in policy:
self.tables = policy.asArray('tables')
else:
self.tables = None
self.range = None
self.columns = None
self.obsTimeName = policy['obsTimeName'] if 'obsTimeName' in policy else None
self.recipe = policy['recipe'] if 'recipe' in policy else 'default'
@property
def template(self):
if self._template: # template must not be an empty string or None
return self._template
else:
raise RuntimeError("Template is not defined for the {} dataset type, ".format(self.datasetType) +
"it must be set before it can be used.")
[docs] def keys(self):
"""Return the dict of keys and value types required for this mapping."""
return self.keyDict
[docs] def map(self, mapper, dataId, write=False):
"""Standard implementation of map function.
Parameters
----------
mapper: `lsst.daf.persistence.Mapper`
Object to be mapped.
dataId: `dict`
Dataset identifier.
Returns
-------
lsst.daf.persistence.ButlerLocation
Location of object that was mapped.
"""
actualId = self.need(iter(self.keyDict.keys()), dataId)
usedDataId = {key: actualId[key] for key in self.keyDict.keys()}
path = mapper._mapActualToPath(self.template, actualId)
if os.path.isabs(path):
raise RuntimeError("Mapped path should not be absolute.")
if not write:
# This allows mapped files to be compressed, ending in .gz or .fz, without any indication from the
# policy that the file should be compressed, easily allowing repositories to contain a combination
# of comporessed and not-compressed files.
# If needed we can add a policy flag to allow compressed files or not, and perhaps a list of
# allowed extensions that may exist at the end of the template.
for ext in (None, '.gz', '.fz'):
if ext and path.endswith(ext):
continue # if the path already ends with the extension
extPath = path + ext if ext else path
newPath = self.rootStorage.instanceSearch(extPath)
if newPath:
path = newPath
break
assert path, "Fully-qualified filename is empty."
addFunc = "add_" + self.datasetType # Name of method for additionalData
if hasattr(mapper, addFunc):
addFunc = getattr(mapper, addFunc)
additionalData = addFunc(self.datasetType, actualId)
assert isinstance(additionalData, PropertySet), \
"Bad type for returned data: %s" (type(additionalData),)
else:
additionalData = None
return ButlerLocation(pythonType=self.python, cppType=self.persistable, storageName=self.storage,
locationList=path, dataId=actualId.copy(), mapper=mapper,
storage=self.rootStorage, usedDataId=usedDataId, datasetType=self.datasetType,
additionalData=additionalData)
[docs] def lookup(self, properties, dataId):
"""Look up properties for in a metadata registry given a partial
dataset identifier.
Parameters
----------
properties : `list` of `str`
What to look up.
dataId : `dict`
Dataset identifier
Returns
-------
`list` of `tuple`
Values of properties.
"""
if self.registry is None:
raise RuntimeError("No registry for lookup")
skyMapKeys = ("tract", "patch")
where = []
values = []
# Prepare to remove skymap entries from properties list. These must
# be in the data ID, so we store which ones we're removing and create
# an OrderedDict that tells us where to re-insert them. That maps the
# name of the property to either its index in the properties list
# *after* the skymap ones have been removed (for entries that aren't
# skymap ones) or the value from the data ID (for those that are).
removed = set()
substitutions = OrderedDict()
index = 0
properties = list(properties) # don't modify the original list
for p in properties:
if p in skyMapKeys:
try:
substitutions[p] = dataId[p]
removed.add(p)
except KeyError:
raise RuntimeError(
"Cannot look up skymap key '%s'; it must be explicitly included in the data ID" % p
)
else:
substitutions[p] = index
index += 1
# Can't actually remove while iterating above, so we do it here.
for p in removed:
properties.remove(p)
fastPath = True
for p in properties:
if p not in ('filter', 'expTime', 'taiObs'):
fastPath = False
break
if fastPath and 'visit' in dataId and "raw" in self.tables:
lookupDataId = {'visit': dataId['visit']}
result = self.registry.lookup(properties, 'raw_visit', lookupDataId, template=self.template)
else:
if dataId is not None:
for k, v in dataId.items():
if self.columns and k not in self.columns:
continue
if k == self.obsTimeName:
continue
if k in skyMapKeys:
continue
where.append((k, '?'))
values.append(v)
lookupDataId = {k[0]: v for k, v in zip(where, values)}
if self.range:
# format of self.range is ('?', isBetween-lowKey, isBetween-highKey)
# here we transform that to {(lowKey, highKey): value}
lookupDataId[(self.range[1], self.range[2])] = dataId[self.obsTimeName]
result = self.registry.lookup(properties, self.tables, lookupDataId, template=self.template)
if not removed:
return result
# Iterate over the query results, re-inserting the skymap entries.
result = [tuple(v if k in removed else item[v] for k, v in substitutions.items())
for item in result]
return result
[docs] def have(self, properties, dataId):
"""Returns whether the provided data identifier has all
the properties in the provided list.
Parameters
----------
properties : `list of `str`
Properties required.
dataId : `dict`
Dataset identifier.
Returns
-------
bool
True if all properties are present.
"""
for prop in properties:
if prop not in dataId:
return False
return True
[docs] def need(self, properties, dataId):
"""Ensures all properties in the provided list are present in
the data identifier, looking them up as needed. This is only
possible for the case where the data identifies a single
exposure.
Parameters
----------
properties : `list` of `str`
Properties required.
dataId : `dict`
Partial dataset identifier
Returns
-------
`dict`
Copy of dataset identifier with enhanced values.
"""
newId = dataId.copy()
newProps = [] # Properties we don't already have
for prop in properties:
if prop not in newId:
newProps.append(prop)
if len(newProps) == 0:
return newId
lookups = self.lookup(newProps, newId)
if len(lookups) != 1:
raise NoResults("No unique lookup for %s from %s: %d matches" %
(newProps, newId, len(lookups)),
self.datasetType, dataId)
for i, prop in enumerate(newProps):
newId[prop] = lookups[0][i]
return newId
def _formatMap(ch, k, datasetType):
"""Convert a format character into a Python type."""
if ch in "diouxX":
return int
elif ch in "eEfFgG":
return float
elif ch in "crs":
return str
else:
raise RuntimeError("Unexpected format specifier %s"
" for field %s in template for dataset %s" %
(ch, k, datasetType))
[docs]class ImageMapping(Mapping):
"""ImageMapping is a Mapping subclass for non-camera images.
Parameters
----------
datasetType : `str`
Butler dataset type to be mapped.
policy : `daf_persistence.Policy` `pexPolicy.Policy`
Mapping Policy. (pexPolicy only for backward compatibility)
registry : `lsst.obs.base.Registry`
Registry for metadata lookups
root : `str`
Path of root directory
"""
def __init__(self, datasetType, policy, registry, root, **kwargs):
if isinstance(policy, pexPolicy.Policy):
policy = Policy(policy)
Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
self.columns = policy.asArray('columns') if 'columns' in policy else None
[docs]class ExposureMapping(Mapping):
"""ExposureMapping is a Mapping subclass for normal exposures.
Parameters
----------
datasetType : `str`
Butler dataset type to be mapped.
policy : `daf_persistence.Policy` or `pexPolicy.Policy`
Mapping Policy (pexPolicy only for backward compatibility)
registry : `lsst.obs.base.Registry`
Registry for metadata lookups
root : `str`
Path of root directory
"""
def __init__(self, datasetType, policy, registry, root, **kwargs):
if isinstance(policy, pexPolicy.Policy):
policy = Policy(policy)
Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
self.columns = policy.asArray('columns') if 'columns' in policy else None
[docs] def standardize(self, mapper, item, dataId):
return mapper._standardizeExposure(self, item, dataId)
[docs]class CalibrationMapping(Mapping):
"""CalibrationMapping is a Mapping subclass for calibration-type products.
The difference is that data properties in the query or template
can be looked up using a reference Mapping in addition to this one.
CalibrationMapping Policies can contain the following:
reference (string, optional)
a list of tables for finding missing dataset
identifier components (including the observation time, if a validity range
is required) in the exposure registry; note that the "tables" entry refers
to the calibration registry
refCols (string, optional)
a list of dataset properties required from the
reference tables for lookups in the calibration registry
validRange (bool)
true if the calibration dataset has a validity range
specified by a column in the tables of the reference dataset in the
exposure registry) and two columns in the tables of this calibration
dataset in the calibration registry)
obsTimeName (string, optional)
the name of the column in the reference
dataset tables containing the observation time (default "taiObs")
validStartName (string, optional)
the name of the column in the
calibration dataset tables containing the start of the validity range
(default "validStart")
validEndName (string, optional)
the name of the column in the
calibration dataset tables containing the end of the validity range
(default "validEnd")
Parameters
----------
datasetType : `str`
Butler dataset type to be mapped.
policy : `daf_persistence.Policy` or `pexPolicy.Policy`
Mapping Policy (pexPolicy only for backward compatibility)
registry : `lsst.obs.base.Registry`
Registry for metadata lookups
calibRegistry : `lsst.obs.base.Registry`
Registry for calibration metadata lookups.
calibRoot : `str`
Path of calibration root directory.
dataRoot : `str`
Path of data root directory; used for outputs only.
"""
def __init__(self, datasetType, policy, registry, calibRegistry, calibRoot, dataRoot=None, **kwargs):
if isinstance(policy, pexPolicy.Policy):
policy = Policy(policy)
Mapping.__init__(self, datasetType, policy, calibRegistry, calibRoot, **kwargs)
self.reference = policy.asArray("reference") if "reference" in policy else None
self.refCols = policy.asArray("refCols") if "refCols" in policy else None
self.refRegistry = registry
self.dataRoot = dataRoot
if "validRange" in policy and policy["validRange"]:
self.range = ("?", policy["validStartName"], policy["validEndName"])
if "columns" in policy:
self.columns = policy.asArray("columns")
if "filter" in policy:
self.setFilter = policy["filter"]
self.metadataKeys = None
if "metadataKey" in policy:
self.metadataKeys = policy.asArray("metadataKey")
[docs] def map(self, mapper, dataId, write=False):
location = Mapping.map(self, mapper, dataId, write=write)
# Want outputs to be in the output directory
if write and self.dataRoot:
location.storage = self.dataRoot
return location
[docs] def lookup(self, properties, dataId):
"""Look up properties for in a metadata registry given a partial
dataset identifier.
Parameters
----------
properties : `list` of `str`
Properties to look up.
dataId : `dict`
Dataset identifier.
Returns
-------
`list` of `tuple`
Values of properties.
"""
# Either look up taiObs in reference and then all in calibRegistry
# Or look up all in registry
newId = dataId.copy()
if self.reference is not None:
where = []
values = []
for k, v in dataId.items():
if self.refCols and k not in self.refCols:
continue
where.append(k)
values.append(v)
# Columns we need from the regular registry
if self.columns is not None:
columns = set(self.columns)
for k in dataId.keys():
columns.discard(k)
else:
columns = set(properties)
if not columns:
# Nothing to lookup in reference registry; continue with calib registry
return Mapping.lookup(self, properties, newId)
lookupDataId = dict(zip(where, values))
lookups = self.refRegistry.lookup(columns, self.reference, lookupDataId)
if len(lookups) != 1:
raise RuntimeError("No unique lookup for %s from %s: %d matches" %
(columns, dataId, len(lookups)))
if columns == set(properties):
# Have everything we need
return lookups
for i, prop in enumerate(columns):
newId[prop] = lookups[0][i]
return Mapping.lookup(self, properties, newId)
[docs] def standardize(self, mapper, item, dataId):
return mapper._standardizeExposure(self, item, dataId, filter=self.setFilter)
[docs]class DatasetMapping(Mapping):
"""DatasetMapping is a Mapping subclass for non-Exposure datasets that can
be retrieved by the standard daf_persistence mechanism.
The differences are that the Storage type must be specified and no
Exposure standardization is performed.
The "storage" entry in the Policy is mandatory; the "tables" entry is
optional; no "level" entry is allowed.
Parameters
----------
datasetType : `str`
Butler dataset type to be mapped.
policy : `daf_persistence.Policy` `pexPolicy.Policy`
Mapping Policy. (pexPolicy only for backward compatibility)
registry : `lsst.obs.base.Registry`
Registry for metadata lookups
root : `str`
Path of root directory
"""
def __init__(self, datasetType, policy, registry, root, **kwargs):
if isinstance(policy, pexPolicy.Policy):
policy = Policy(policy)
Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
self.storage = policy["storage"] # Storage type