Database#
- class lsst.daf.butler.registry.interfaces.Database(*, origin: int, engine: Engine, namespace: str | None = None, metadata: DatabaseMetadata | None = None, allow_temporary_tables: bool = True)#
Bases:
ABCAn abstract interface that represents a particular database engine’s representation of a single schema/namespace/database.
Parameters#
- origin
int An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.
- engine
sqlalchemy.engine.Engine The SQLAlchemy engine for this
Database.- namespace
str, optional Name of the schema or namespace this instance is associated with. This is passed as the
schemaargument when constructing asqlalchemy.schema.MetaDatainstance. We usenamespaceinstead to avoid confusion between “schema means namespace” and “schema means table definitions”.- metadata
sqlalchemy.schema.MetaData, optional Object representing the tables and other schema entities. If not provided, will be generated during the next call to
declareStaticTables.- allow_temporary_tables
bool, optional If
True, database operations will be allowed to use temporary tables. IfFalse, other SQL constructs will be used instead of temporary tables when possible.
Notes#
Databaserequires all write operations to go through its special named methods. Our write patterns are sufficiently simple that we don’t really need the full flexibility of SQL insert/update/delete syntax, and we need non-standard (but common) functionality in these operations sufficiently often that it seems worthwhile to provide our own generic API.In contrast,
Database.queryallows arbitrarySELECTqueries (via their SQLAlchemy representation) to be run, as we expect these to require significantly more sophistication while still being limited to standard SQL.Databaseitself has several underscore-prefixed attributes:_engine: SQLAlchemy object representing its engine._connection: method returning a context manager forsqlalchemy.engine.Connectionobject._metadata: thesqlalchemy.schema.MetaDataobject representingthe tables and other schema entities.
These are considered protected (derived classes may access them, but other code should not), and read-only, aside from executing SQL via
_connection.Attributes Summary
The SQLAlchemy dialect for this database engine (
sqlalchemy.engine.Dialect).Whether this database supports the
ANY_VALUEaggregate function or something equivalent.Whether this database supports the
DISTINCT ONSQL construct.Methods Summary
apply_any_aggregate(column)Wrap the given SQLAlchemy column in the
ANY_VALUEaggregate function or its equivalent.assertTableWriteable(table, msg)Raise if the given table is not writeable, either because the database connection is read-write or the table is a temporary table.
clone()Make an independent copy of this
Databaseobject.constant_rows(fields, *rows[, name])Return a SQLAlchemy object that represents a small number of constant-valued rows.
declareStaticTables(*, create)Return a context manager in which the database's static DDL schema can be declared.
delete(table, columns, *rows)Delete one or more rows from a table.
deleteWhere(table, where)Delete rows from a table with pre-constructed WHERE clause.
dispose()Close all open database connections held by this
Databaseinstance.ensure(table, *rows[, primary_key_only])Insert one or more rows into a table, skipping any rows for which insertion would violate a unique constraint.
ensureTableExists(name, spec)Ensure that a table with the given name and specification exists, creating it if necessary.
expandDatabaseEntityName(shrunk)Retrieve the original name for a database entity that was too long to fit within the database engine's limits.
fromEngine(engine, *, origin[, namespace, ...])Create a new
Databasefrom an existingsqlalchemy.engine.Engine.fromUri(uri, *, origin[, namespace, ...])Construct a database from a SQLAlchemy URI.
getExistingTable(name, spec)Obtain an existing table with the given name and specification.
Return a
typethat encapsulates the wayTimespanobjects are stored in this database.Return the maximum number of rows that should be passed to
constant_rowsfor this backend.glob_expression(expression, pattern)Produce boolean expression for expression match against glob-like pattern.
insert(table, *rows[, returnIds, select, names])Insert one or more rows into a table, optionally returning autoincrement primary key values.
Return
Trueif there is currently a database connection open with an active transaction;Falseotherwise.isTableWriteable(table)Check whether a table is writeable, either because the database connection is read-write or the table is a temporary table.
Return
Trueif this database can be modified by this client.makeDefaultUri(root)Create a default connection URI appropriate for the given root directory, or
Noneif there can be no such default.makeEngine(uri, *[, writeable])Create a
sqlalchemy.engine.Enginefrom a SQLAlchemy URI.query(sql, *args, **kwargs)Run a SELECT query against the database.
replace(table, *rows)Insert one or more rows into a table, replacing any existing rows for which insertion of a new row would violate the primary key constraint.
session()Return a context manager that represents a session (persistent connection to a database).
shrinkDatabaseEntityName(original)Return a version of the given name that fits within this database engine's length limits for table, constraint, indexes, and sequence names.
sync(table, *, keys[, compared, extra, ...])Insert into a table as necessary to ensure database contains values equivalent to the given ones.
temporary_table(spec[, name])Return a context manager that creates and then drops a temporary table.
transaction(*[, interrupting, savepoint, ...])Return a context manager that represents a transaction.
update(table, where, *rows)Update one or more rows in a table.
Attributes Documentation
- dialect#
The SQLAlchemy dialect for this database engine (
sqlalchemy.engine.Dialect).
- has_any_aggregate#
Whether this database supports the
ANY_VALUEaggregate function or something equivalent.
- has_distinct_on#
Whether this database supports the
DISTINCT ONSQL construct.
- supports_temporary_tables#
Methods Documentation
- abstract apply_any_aggregate(column: ColumnElement[Any]) ColumnElement[Any]#
Wrap the given SQLAlchemy column in the
ANY_VALUEaggregate function or its equivalent.Parameters#
- column
sqlalchemy.ColumnElement Original column to wrap.
Returns#
- wrapped
sqlalchemy.ColumnElement A column element of the same SQL type that can appear in the
SELECTclause even when this column does not appear in theGROUP BYclause.
Notes#
This method’s behavior is unspecified when
has_any_aggregateisFalse; the caller is responsible for checking that property first.- column
- assertTableWriteable(table: Table, msg: str) None#
Raise if the given table is not writeable, either because the database connection is read-write or the table is a temporary table.
Parameters#
- table
sqlalchemy.schema.Table SQLAlchemy table object to check.
- msg
str, optional If provided, raise
ReadOnlyDatabaseErrorinstead of returningFalse, with this message.
- table
- abstract constant_rows(fields: NamedValueAbstractSet[FieldSpec], *rows: dict, name: str | None = None) FromClause#
Return a SQLAlchemy object that represents a small number of constant-valued rows.
Parameters#
- fields
NamedValueAbstractSet[ddl.FieldSpec] The columns of the rows. Unique and foreign key constraints are ignored.
- *rows
dict Values for the rows.
- name
str, optional If provided, the name of the SQL construct. If not provided, an opaque but unique identifier is generated.
Returns#
- from_clause
sqlalchemy.sql.FromClause SQLAlchemy object representing the given rows. This is guaranteed to be something that can be directly joined into a
SELECTquery’sFROMclause, and will not involve a temporary table that needs to be cleaned up later.
Notes#
The default implementation uses the SQL-standard
VALUESconstruct, but support for that construct is varied enough across popular RDBMSs that the method is still marked abstract to force explicit opt-in via delegation tosuper.- fields
- declareStaticTables(*, create: bool) Iterator[StaticTablesContext]#
Return a context manager in which the database’s static DDL schema can be declared.
Parameters#
- create
bool If
True, attempt to create all tables at the end of the context. IfFalse, they will be assumed to already exist.
Returns#
- schema
StaticTablesContext A helper object that is used to add new tables.
Raises#
- ReadOnlyDatabaseError
Raised if
createisTrue,Database.isWriteableisFalse, and one or more declared tables do not already exist.
Examples#
Given a
Databaseinstancedb:with db.declareStaticTables(create=True) as schema: schema.addTable("table1", TableSpec(...)) schema.addTable("table2", TableSpec(...))
Notes#
A database’s static DDL schema must be declared before any dynamic tables are managed via calls to
ensureTableExistsorgetExistingTable. The order in which static schema tables are added inside the context block is unimportant; they will automatically be sorted and added in an order consistent with their foreign key relationships.- create
- delete(table: Table, columns: Iterable[str], *rows: dict) int#
Delete one or more rows from a table.
Parameters#
- table
sqlalchemy.schema.Table Table that rows should be deleted from.
- columns
Iterableofstr The names of columns that will be used to constrain the rows to be deleted; these will be combined via
ANDto form theWHEREclause of the delete query.- *rows
Positional arguments are the keys of rows to be deleted, as dictionaries mapping column name to value. The keys in all dictionaries must be exactly the names in
columns.
Returns#
- count
int Number of rows deleted.
Raises#
- ReadOnlyDatabaseError
Raised if
isWriteablereturnsFalsewhen this method is called.
Notes#
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
The default implementation should be sufficient for most derived classes.
- table
- deleteWhere(table: Table, where: ColumnElement) int#
Delete rows from a table with pre-constructed WHERE clause.
Parameters#
- table
sqlalchemy.schema.Table Table that rows should be deleted from.
- where
sqlalchemy.sql.ClauseElement The names of columns that will be used to constrain the rows to be deleted; these will be combined via
ANDto form theWHEREclause of the delete query.
Returns#
- count
int Number of rows deleted.
Raises#
- ReadOnlyDatabaseError
Raised if
isWriteablereturnsFalsewhen this method is called.
Notes#
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
The default implementation should be sufficient for most derived classes.
- table
- abstract ensure(table: Table, *rows: dict, primary_key_only: bool = False) int#
Insert one or more rows into a table, skipping any rows for which insertion would violate a unique constraint.
Parameters#
- table
sqlalchemy.schema.Table Table rows should be inserted into.
- *rows
Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.
- primary_key_only
bool, optional If
True(Falseis default), only skip rows that violate the primary key constraint, and raise an exception (and rollback transactions) for other constraint violations.
Returns#
- count
int The number of rows actually inserted.
Raises#
- ReadOnlyDatabaseError
Raised if
isWriteablereturnsFalsewhen this method is called. This is raised even if the operation would do nothing even on a writeable database.
Notes#
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
Implementations are not required to support
ensureon tables with autoincrement keys.- table
- ensureTableExists(name: str, spec: TableSpec) Table#
Ensure that a table with the given name and specification exists, creating it if necessary.
Parameters#
- name
str Name of the table (not including namespace qualifiers).
- spec
TableSpec Specification for the table. This will be used when creating the table, and may be used when obtaining an existing table to check for consistency, but no such check is guaranteed.
Returns#
- table
sqlalchemy.schema.Table SQLAlchemy representation of the table.
Raises#
- ReadOnlyDatabaseError
Raised if
isWriteablereturnsFalse, and the table does not already exist.- DatabaseConflictError
Raised if the table exists but
specis inconsistent with its definition.
Notes#
This method may not be called within transactions. It may be called on read-only databases if and only if the table does in fact already exist.
Subclasses may override this method, but usually should not need to.
- name
- expandDatabaseEntityName(shrunk: str) str#
Retrieve the original name for a database entity that was too long to fit within the database engine’s limits.
Parameters#
- shrunk
str The original name.
Returns#
- shrunk
str The new, possibly shortened name.
- shrunk
- abstract classmethod fromEngine(engine: Engine, *, origin: int, namespace: str | None = None, writeable: bool = True) Database#
Create a new
Databasefrom an existingsqlalchemy.engine.Engine.Parameters#
- engine
sqlalchemy.engine.Engine The engine for the database. May be shared between
Databaseinstances.- origin
int An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.
- namespace
str, optional A different database namespace (i.e. schema) the new instance should be associated with. If
None(default), the namespace (if any) is inferred from the connection.- writeable
bool, optional If
True, allow write operations on the database, includingCREATE TABLE.
Returns#
Notes#
This method allows different
Databaseinstances to share the same engine, which is desirable when they represent different namespaces can be queried together.- engine
- classmethod fromUri(uri: str | URL, *, origin: int, namespace: str | None = None, writeable: bool = True, allow_temporary_tables: bool = True) Database#
Construct a database from a SQLAlchemy URI.
Parameters#
- uri
strorsqlalchemy.engine.URL A SQLAlchemy URI connection string.
- origin
int An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.
- namespace
str, optional A database namespace (i.e. schema) the new instance should be associated with. If
None(default), the namespace (if any) is inferred from the URI.- writeable
bool, optional If
True, allow write operations on the database, includingCREATE TABLE.- allow_temporary_tables
bool, optional If
True, database operations will be allowed to use temporary tables. IfFalse, other SQL constructs will be used instead of temporary tables when possible.
Returns#
- uri
- getExistingTable(name: str, spec: TableSpec) Table | None#
Obtain an existing table with the given name and specification.
Parameters#
- name
str Name of the table (not including namespace qualifiers).
- spec
TableSpec Specification for the table. This will be used when creating the SQLAlchemy representation of the table, and it is used to check that the actual table in the database is consistent.
Returns#
- table
sqlalchemy.schema.TableorNone SQLAlchemy representation of the table, or
Noneif it does not exist.
Raises#
- DatabaseConflictError
Raised if the table exists but
specis inconsistent with its definition.
Notes#
This method can be called within transactions and never modifies the database.
Subclasses may override this method, but usually should not need to.
- name
- classmethod getTimespanRepresentation() type[TimespanDatabaseRepresentation]#
Return a
typethat encapsulates the wayTimespanobjects are stored in this database.Databasedoes not automatically use the return type of this method anywhere else; calling code is responsible for making sure that DDL and queries are consistent with it.Returns#
- TimespanReprClass
type(TimespanDatabaseRepresentionsubclass) A type that encapsulates the way
Timespanobjects should be stored in this database.
Notes#
There are two big reasons we’ve decided to keep timespan-mangling logic outside the
Databaseimplementations, even though the choice of representation is ultimately up to aDatabaseimplementation:Timespans appear in relatively few tables and queries in our typical usage, and the code that operates on them is already aware that it is working with timespans. In contrast, a timespan-representation-aware implementation of, say,
insert, would need to have extra logic to identify when timespan-mangling needed to occur, which would usually be useless overhead.SQLAlchemy’s rich SELECT query expression system has no way to wrap multiple columns in a single expression object (the ORM does, but we are not using the ORM). So we would have to wrap _much_ more of that code in our own interfaces to encapsulate timespan representations there.
- TimespanReprClass
- get_constant_rows_max() int#
Return the maximum number of rows that should be passed to
constant_rowsfor this backend.Returns#
- max
int Maximum number of rows.
Notes#
This should reflect typical performance profiles (or a guess at these), not just hard database engine limits.
- max
- abstract glob_expression(expression: ColumnElement[Any], pattern: str) ColumnElement[bool]#
Produce boolean expression for expression match against glob-like pattern.
Parameters#
- expression
sqlalchemy.ColumnElement Column expression that evaluates to string.
- pattern
str GLob pattern string.
Returns#
- glob
sqlalchemy.ColumnElement Boolean expression that matches
expressionagainstpattern.
- expression
- insert(table: Table, *rows: dict, returnIds: bool = False, select: SelectBase | None = None, names: Iterable[str] | None = None) list[int] | None#
Insert one or more rows into a table, optionally returning autoincrement primary key values.
Parameters#
- table
sqlalchemy.schema.Table Table rows should be inserted into.
- *rows
dict Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.
- returnIds
bool, optional If
True(Falseis default), return the values of the table’s autoincrement primary key field (which much exist).- select
sqlalchemy.sql.SelectBase, optional A SELECT query expression to insert rows from. Cannot be provided with either
rowsorreturnIds=True.- names
Iterable[str], optional Names of columns in
tableto be populated, ordered to match the columns returned byselect. Ignored ifselectisNone. If not provided, the columns returned byselectmust be named to match the desired columns oftable.
Returns#
- ids
None, orlistofint If
returnIdsisTrue, alistcontaining the inserted values for the table’s autoincrement primary key.
Raises#
- ReadOnlyDatabaseError
Raised if
isWriteablereturnsFalsewhen this method is called.
Notes#
The default implementation uses bulk insert syntax when
returnIdsisFalse, and a loop over single-row insert operations when it isTrue.Derived classes should reimplement when they can provide a more efficient implementation (especially for the latter case).
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
- table
- isInTransaction() bool#
Return
Trueif there is currently a database connection open with an active transaction;Falseotherwise.
- isTableWriteable(table: Table) bool#
Check whether a table is writeable, either because the database connection is read-write or the table is a temporary table.
Parameters#
- table
sqlalchemy.schema.Table SQLAlchemy table object to check.
Returns#
- writeable
bool Whether this table is writeable.
- table
- abstract isWriteable() bool#
Return
Trueif this database can be modified by this client.
- classmethod makeDefaultUri(root: str) str | None#
Create a default connection URI appropriate for the given root directory, or
Noneif there can be no such default.Parameters#
- root
str Root string to use to build connection URI.
Returns#
- uri
strorNone The URI string or
None.
- root
- abstract classmethod makeEngine(uri: str | URL, *, writeable: bool = True) Engine#
Create a
sqlalchemy.engine.Enginefrom a SQLAlchemy URI.Parameters#
- uri
strorsqlalchemy.engine.URL A SQLAlchemy URI connection string.
- writeable
bool, optional If
True, allow write operations on the database, includingCREATE TABLE.
Returns#
- engine
sqlalchemy.engine.Engine A database engine.
Notes#
Subclasses that support other ways to connect to a database are encouraged to add optional arguments to their implementation of this method, as long as they maintain compatibility with the base class call signature.
- uri
- query(sql: Executable | SelectBase, *args: Any, **kwargs: Any) Iterator[CursorResult]#
Run a SELECT query against the database.
Parameters#
Returns#
- result_context
sqlalchemy.engine.CursorResults Context manager that returns the query result object when entered. These results are invalidated when the context is exited.
- result_context
- abstract replace(table: Table, *rows: dict) None#
Insert one or more rows into a table, replacing any existing rows for which insertion of a new row would violate the primary key constraint.
Parameters#
- table
sqlalchemy.schema.Table Table rows should be inserted into.
- *rows
Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.
Raises#
- ReadOnlyDatabaseError
Raised if
isWriteablereturnsFalsewhen this method is called.
Notes#
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
Implementations should raise a
sqlalchemy.exc.IntegrityErrorexception when a constraint other than the primary key would be violated.Implementations are not required to support
replaceon tables with autoincrement keys.- table
- final session() Iterator[None]#
Return a context manager that represents a session (persistent connection to a database).
Returns#
- context
AbstractContextManager[None] A context manager that does not return a value when entered.
Notes#
This method should be used when a sequence of read-only SQL operations will be performed in rapid succession without a requirement that they yield consistent results in the presence of concurrent writes (or, more rarely, when conflicting concurrent writes are rare/impossible and the session will be open long enough that a transaction is inadvisable).
- context
- shrinkDatabaseEntityName(original: str) str#
Return a version of the given name that fits within this database engine’s length limits for table, constraint, indexes, and sequence names.
Implementations should not assume that simple truncation is safe, because multiple long names often begin with the same prefix.
The default implementation simply returns the given name.
Parameters#
- original
str The original name.
Returns#
- shrunk
str The new, possibly shortened name.
- original
- sync(table: Table, *, keys: dict[str, Any], compared: dict[str, Any] | None = None, extra: dict[str, Any] | None = None, returning: Sequence[str] | None = None, update: bool = False) tuple[dict[str, Any] | None, bool | dict[str, Any]]#
Insert into a table as necessary to ensure database contains values equivalent to the given ones.
Parameters#
- table
sqlalchemy.schema.Table Table to be queried and possibly inserted into.
- keys
dict Column name-value pairs used to search for an existing row; must be a combination that can be used to select a single row if one exists. If such a row does not exist, these values are used in the insert.
- compared
dict, optional Column name-value pairs that are compared to those in any existing row. If such a row does not exist, these rows are used in the insert.
- extra
dict, optional Column name-value pairs that are ignored if a matching row exists, but used in an insert if one is necessary.
- returning
Sequenceofstr, optional The names of columns whose values should be returned.
- update
bool, optional If
True(Falseis default), update the existing row with the values incomparedinstead of raisingDatabaseConflictError.
Returns#
- row
dict, optional The value of the fields indicated by
returning, orNoneifreturningisNone.- inserted_or_updated
boolordict If
True, a new row was inserted; ifFalse, a matching row already existed. If adict(only possible ifupdate=True), then an existing row was updated, and the dict maps the names of the updated columns to their old values (new values can be obtained fromcompared).
Raises#
- DatabaseConflictError
Raised if the values in
compareddo not match the values in the database.- ReadOnlyDatabaseError
Raised if
isWriteablereturnsFalse, and no matching record already exists.
Notes#
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
It may be called on read-only databases if and only if the matching row does in fact already exist.
- table
- temporary_table(spec: TableSpec, name: str | None = None) Iterator[Table]#
Return a context manager that creates and then drops a temporary table.
Parameters#
- spec
ddl.TableSpec Specification for the columns. Unique and foreign key constraints may be ignored.
- name
str, optional If provided, the name of the SQL construct. If not provided, an opaque but unique identifier is generated.
Returns#
- context
AbstractContextManager[sqlalchemy.schema.Table] A context manager that returns a SQLAlchemy representation of the temporary table when entered.
Notes#
Temporary tables may be created, dropped, and written to even in read-only databases - at least according to the Python-level protections in the
Databaseclasses. Server permissions may say otherwise, but in that case they probably need to be modified to support the full range of expected read-only butler behavior.- spec
- final transaction(*, interrupting: bool = False, savepoint: bool = False, lock: Iterable[Table] = (), for_temp_tables: bool = False) Iterator[None]#
Return a context manager that represents a transaction.
Parameters#
- interrupting
bool, optional If
True(Falseis default), this transaction block may not be nested within an outer one, and attempting to do so is a logic (i.e. assertion) error.- savepoint
bool, optional If
True(Falseis default), create aSAVEPOINT, allowing exceptions raised by the database (e.g. due to constraint violations) during this transaction’s context to be caught outside it without also rolling back all operations in an outer transaction block. IfFalse, transactions may still be nested, but a rollback may be generated at any level and affects all levels, and commits are deferred until the outermost block completes. If any outer transaction block was created withsavepoint=True, all inner blocks will be as well (regardless of the actual value passed). This has no effect if this is the outermost transaction.- lock
Iterable[sqlalchemy.schema.Table], optional A list of tables to lock for the duration of this transaction. These locks are guaranteed to prevent concurrent writes and allow this transaction (only) to acquire the same locks (others should block), but only prevent concurrent reads if the database engine requires that in order to block concurrent writes.
- for_temp_tables
bool, optional If
True, this transaction may involve creating temporary tables.
Returns#
- context
AbstractContextManager[None] A context manager that commits the transaction when it is exited without error and rolls back the transactoin when it is exited via an exception.
Notes#
All transactions on a connection managed by one or more
Databaseinstances _must_ go through this method, or transaction state will not be correctly managed.- interrupting
- update(table: Table, where: dict[str, str], *rows: dict) int#
Update one or more rows in a table.
Parameters#
- table
sqlalchemy.schema.Table Table containing the rows to be updated.
- where
dict[str,str] A mapping from the names of columns that will be used to search for existing rows to the keys that will hold these values in the
rowsdictionaries. Note that these may not be the same due to SQLAlchemy limitations.- *rows
Positional arguments are the rows to be updated. The keys in all dictionaries must be the same, and may correspond to either a value in the
wheredictionary or the name of a column to be updated.
Returns#
- count
int Number of rows matched (regardless of whether the update actually modified them).
Raises#
- ReadOnlyDatabaseError
Raised if
isWriteablereturnsFalsewhen this method is called.
Notes#
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
The default implementation should be sufficient for most derived classes.
- table
- origin