PostgresqlDatabase¶
-
class
lsst.daf.butler.registry.databases.postgresql.
PostgresqlDatabase
(*, engine: sqlalchemy.engine.base.Engine, origin: int, namespace: Optional[str, None] = None, writeable: bool = True)¶ Bases:
lsst.daf.butler.registry.interfaces.Database
An implementation of the
Database
interface for PostgreSQL.Parameters: - connection :
sqlalchemy.engine.Connection
An existing connection created by a previous call to
connect
.- origin :
int
An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.
- namespace :
str
, optional The namespace (schema) this database is associated with. If
None
, the default schema for the connection is used (which may beNone
).- writeable :
bool
, optional If
True
, allow write operations on the database, includingCREATE TABLE
.
Notes
This currently requires the psycopg2 driver to be used as the backend for SQLAlchemy. Running the tests for this class requires the
testing.postgresql
be installed, which we assume indicates that a PostgreSQL server is installed and can be run locally in userspace.Some functionality provided by this class (and used by
Registry
) requires thebtree_gist
PostgreSQL server extension to be installed an enabled on the database being connected to; this is checked at connection time.Attributes Summary
dialect
The SQLAlchemy dialect for this database engine ( sqlalchemy.engine.Dialect
).Methods Summary
assertTableWriteable
(table, msg)Raise if the given table is not writeable, either because the database connection is read-write or the table is a temporary table. declareStaticTables
(*, create)Return a context manager in which the database’s static DDL schema can be declared. delete
(table, columns, *rows)Delete one or more rows from a table. deleteWhere
(table, where)Delete rows from a table with pre-constructed WHERE clause. ensure
(table, *rows, primary_key_only)Insert one or more rows into a table, skipping any rows for which insertion would violate a unique constraint. ensureTableExists
(name, spec)Ensure that a table with the given name and specification exists, creating it if necessary. expandDatabaseEntityName
(shrunk)Retrieve the original name for a database entity that was too long to fit within the database engine’s limits. fromEngine
(engine, *, origin, namespace, …)Create a new Database
from an existingsqlalchemy.engine.Engine
.fromUri
(uri, *, origin, namespace, …)Construct a database from a SQLAlchemy URI. getExistingTable
(name, spec)Obtain an existing table with the given name and specification. getSpatialRegionRepresentation
()Return a type
that encapsulates the waylsst.sphgeom.Region
objects are stored in this database.getTimespanRepresentation
()Return a type
that encapsulates the wayTimespan
objects are stored in this database.insert
(table, *rows, returnIds, select, …)Insert one or more rows into a table, optionally returning autoincrement primary key values. isTableWriteable
(table)Check whether a table is writeable, either because the database connection is read-write or the table is a temporary table. isWriteable
()Return True
if this database can be modified by this client.makeDefaultUri
(root)Create a default connection URI appropriate for the given root directory, or None
if there can be no such default.makeEngine
(uri, *, writeable)Create a sqlalchemy.engine.Engine
from a SQLAlchemy URI.query
(sql, *args, **kwargs)Run a SELECT query against the database. replace
(table, *rows)Insert one or more rows into a table, replacing any existing rows for which insertion of a new row would violate the primary key constraint. session
()Return a context manager that represents a session (persistent connection to a database). shrinkDatabaseEntityName
(original)Return a version of the given name that fits within this database engine’s length limits for table, constraint, indexes, and sequence names. sync
(table, *, keys, Any], compared, Any], …)Insert into a table as necessary to ensure database contains values equivalent to the given ones. transaction
(*, interrupting, savepoint, lock)Return a context manager that represents a transaction. update
(table, where, str], *rows)Update one or more rows in a table. Attributes Documentation
-
dialect
¶ The SQLAlchemy dialect for this database engine (
sqlalchemy.engine.Dialect
).
Methods Documentation
-
assertTableWriteable
(table: sqlalchemy.sql.schema.Table, msg: str) → None¶ Raise if the given table is not writeable, either because the database connection is read-write or the table is a temporary table.
Parameters:
-
declareStaticTables
(*, create: bool) → Iterator[lsst.daf.butler.registry.interfaces._database.StaticTablesContext]¶ Return a context manager in which the database’s static DDL schema can be declared.
Parameters: Returns: - schema :
StaticTablesContext
A helper object that is used to add new tables.
Raises: Notes
A database’s static DDL schema must be declared before any dynamic tables are managed via calls to
ensureTableExists
orgetExistingTable
. The order in which static schema tables are added inside the context block is unimportant; they will automatically be sorted and added in an order consistent with their foreign key relationships.Examples
Given a
Database
instancedb
:with db.declareStaticTables(create=True) as schema: schema.addTable("table1", TableSpec(...)) schema.addTable("table2", TableSpec(...))
- schema :
-
delete
(table: sqlalchemy.sql.schema.Table, columns: Iterable[str], *rows) → int¶ Delete one or more rows from a table.
Parameters: - table :
sqlalchemy.schema.Table
Table that rows should be deleted from.
- columns: `~collections.abc.Iterable` of `str`
The names of columns that will be used to constrain the rows to be deleted; these will be combined via
AND
to form theWHERE
clause of the delete query.- *rows
Positional arguments are the keys of rows to be deleted, as dictionaries mapping column name to value. The keys in all dictionaries must be exactly the names in
columns
.
Returns: - count :
int
Number of rows deleted.
Raises: - ReadOnlyDatabaseError
Raised if
isWriteable
returnsFalse
when this method is called.
Notes
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
The default implementation should be sufficient for most derived classes.
- table :
-
deleteWhere
(table: sqlalchemy.sql.schema.Table, where: sqlalchemy.sql.elements.ClauseElement) → int¶ Delete rows from a table with pre-constructed WHERE clause.
Parameters: - table :
sqlalchemy.schema.Table
Table that rows should be deleted from.
- where: `sqlalchemy.sql.ClauseElement`
The names of columns that will be used to constrain the rows to be deleted; these will be combined via
AND
to form theWHERE
clause of the delete query.
Returns: - count :
int
Number of rows deleted.
Raises: - ReadOnlyDatabaseError
Raised if
isWriteable
returnsFalse
when this method is called.
Notes
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
The default implementation should be sufficient for most derived classes.
- table :
-
ensure
(table: sqlalchemy.sql.schema.Table, *rows, primary_key_only: bool = False) → int¶ Insert one or more rows into a table, skipping any rows for which insertion would violate a unique constraint.
Parameters: - table :
sqlalchemy.schema.Table
Table rows should be inserted into.
- *rows
Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.
- primary_key_only :
bool
, optional If
True
(False
is default), only skip rows that violate the primary key constraint, and raise an exception (and rollback transactions) for other constraint violations.
Returns: - count :
int
The number of rows actually inserted.
Raises: - ReadOnlyDatabaseError
Raised if
isWriteable
returnsFalse
when this method is called. This is raised even if the operation would do nothing even on a writeable database.
Notes
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
Implementations are not required to support
ensure
on tables with autoincrement keys.- table :
-
ensureTableExists
(name: str, spec: lsst.daf.butler.core.ddl.TableSpec) → sqlalchemy.sql.schema.Table¶ Ensure that a table with the given name and specification exists, creating it if necessary.
Parameters: - name :
str
Name of the table (not including namespace qualifiers).
- spec :
TableSpec
Specification for the table. This will be used when creating the table, and may be used when obtaining an existing table to check for consistency, but no such check is guaranteed.
Returns: - table :
sqlalchemy.schema.Table
SQLAlchemy representation of the table.
Raises: - ReadOnlyDatabaseError
Raised if
isWriteable
returnsFalse
, and the table does not already exist.- DatabaseConflictError
Raised if the table exists but
spec
is inconsistent with its definition.
Notes
This method may not be called within transactions. It may be called on read-only databases if and only if the table does in fact already exist.
Subclasses may override this method, but usually should not need to.
- name :
-
expandDatabaseEntityName
(shrunk: str) → str¶ Retrieve the original name for a database entity that was too long to fit within the database engine’s limits.
Parameters: - original :
str
The original name.
Returns: - shrunk :
str
The new, possibly shortened name.
- original :
-
classmethod
fromEngine
(engine: sqlalchemy.engine.base.Engine, *, origin: int, namespace: Optional[str, None] = None, writeable: bool = True) → lsst.daf.butler.registry.interfaces._database.Database¶ Create a new
Database
from an existingsqlalchemy.engine.Engine
.Parameters: - engine :
sqlalchemy.engine.Engine
The engine for the database. May be shared between
Database
instances.- origin :
int
An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.
- namespace :
str
, optional A different database namespace (i.e. schema) the new instance should be associated with. If
None
(default), the namespace (if any) is inferred from the connection.- writeable :
bool
, optional If
True
, allow write operations on the database, includingCREATE TABLE
.
Returns: - db :
Database
A new
Database
instance.
Notes
This method allows different
Database
instances to share the same engine, which is desirable when they represent different namespaces can be queried together.- engine :
-
classmethod
fromUri
(uri: str, *, origin: int, namespace: Optional[str, None] = None, writeable: bool = True) → lsst.daf.butler.registry.interfaces._database.Database¶ Construct a database from a SQLAlchemy URI.
Parameters: - uri :
str
A SQLAlchemy URI connection string.
- origin :
int
An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.
- namespace :
str
, optional A database namespace (i.e. schema) the new instance should be associated with. If
None
(default), the namespace (if any) is inferred from the URI.- writeable :
bool
, optional If
True
, allow write operations on the database, includingCREATE TABLE
.
Returns: - db :
Database
A new
Database
instance.
- uri :
-
getExistingTable
(name: str, spec: lsst.daf.butler.core.ddl.TableSpec) → Optional[sqlalchemy.sql.schema.Table, None]¶ Obtain an existing table with the given name and specification.
Parameters: - name :
str
Name of the table (not including namespace qualifiers).
- spec :
TableSpec
Specification for the table. This will be used when creating the SQLAlchemy representation of the table, and it is used to check that the actual table in the database is consistent.
Returns: Raises: - DatabaseConflictError
Raised if the table exists but
spec
is inconsistent with its definition.
Notes
This method can be called within transactions and never modifies the database.
Subclasses may override this method, but usually should not need to.
- name :
-
classmethod
getSpatialRegionRepresentation
() → Type[lsst.daf.butler.core._topology.SpatialRegionDatabaseRepresentation]¶ Return a
type
that encapsulates the waylsst.sphgeom.Region
objects are stored in this database.Database
does not automatically use the return type of this method anywhere else; calling code is responsible for making sure that DDL and queries are consistent with it.Returns: - RegionReprClass :
type
(SpatialRegionDatabaseRepresention
subclass) A type that encapsulates the way
lsst.sphgeom.Region
objects should be stored in this database.
Notes
See
getTimespanRepresentation
for comments on why this method is not more tightly integrated with the rest of theDatabase
interface.- RegionReprClass :
-
classmethod
getTimespanRepresentation
() → Type[lsst.daf.butler.core.timespan.TimespanDatabaseRepresentation]¶ Return a
type
that encapsulates the wayTimespan
objects are stored in this database.Database
does not automatically use the return type of this method anywhere else; calling code is responsible for making sure that DDL and queries are consistent with it.Returns: - TimespanReprClass :
type
(TimespanDatabaseRepresention
subclass) A type that encapsulates the way
Timespan
objects should be stored in this database.
Notes
There are two big reasons we’ve decided to keep timespan-mangling logic outside the
Database
implementations, even though the choice of representation is ultimately up to aDatabase
implementation:- Timespans appear in relatively few tables and queries in our
typical usage, and the code that operates on them is already aware
that it is working with timespans. In contrast, a
timespan-representation-aware implementation of, say,
insert
, would need to have extra logic to identify when timespan-mangling needed to occur, which would usually be useless overhead. - SQLAlchemy’s rich SELECT query expression system has no way to wrap multiple columns in a single expression object (the ORM does, but we are not using the ORM). So we would have to wrap _much_ more of that code in our own interfaces to encapsulate timespan representations there.
- TimespanReprClass :
-
insert
(table: sqlalchemy.sql.schema.Table, *rows, returnIds: bool = False, select: Optional[sqlalchemy.sql.selectable.Select, None] = None, names: Optional[Iterable[str], None] = None) → Optional[List[int], None]¶ Insert one or more rows into a table, optionally returning autoincrement primary key values.
Parameters: - table :
sqlalchemy.schema.Table
Table rows should be inserted into.
- returnIds: `bool`
If
True
(False
is default), return the values of the table’s autoincrement primary key field (which much exist).- select :
sqlalchemy.sql.Select
, optional A SELECT query expression to insert rows from. Cannot be provided with either
rows
orreturnIds=True
.- names :
Iterable
[str
], optional Names of columns in
table
to be populated, ordered to match the columns returned byselect
. Ignored ifselect
isNone
. If not provided, the columns returned byselect
must be named to match the desired columns oftable
.- *rows
Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.
Returns: Raises: - ReadOnlyDatabaseError
Raised if
isWriteable
returnsFalse
when this method is called.
Notes
The default implementation uses bulk insert syntax when
returnIds
isFalse
, and a loop over single-row insert operations when it isTrue
.Derived classes should reimplement when they can provide a more efficient implementation (especially for the latter case).
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
- table :
-
isTableWriteable
(table: sqlalchemy.sql.schema.Table) → bool¶ Check whether a table is writeable, either because the database connection is read-write or the table is a temporary table.
Parameters: - table :
sqlalchemy.schema.Table
SQLAlchemy table object to check.
Returns: - writeable :
bool
Whether this table is writeable.
- table :
-
classmethod
makeDefaultUri
(root: str) → Optional[str, None]¶ Create a default connection URI appropriate for the given root directory, or
None
if there can be no such default.
-
classmethod
makeEngine
(uri: str, *, writeable: bool = True) → sqlalchemy.engine.base.Engine¶ Create a
sqlalchemy.engine.Engine
from a SQLAlchemy URI.Parameters: Returns: - engine :
sqlalchemy.engine.Engine
A database engine.
Notes
Subclasses that support other ways to connect to a database are encouraged to add optional arguments to their implementation of this method, as long as they maintain compatibility with the base class call signature.
- engine :
-
query
(sql: sqlalchemy.sql.selectable.Selectable, *args, **kwargs) → sqlalchemy.engine.cursor.LegacyCursorResult¶ Run a SELECT query against the database.
Parameters: - sql :
sqlalchemy.sql.Selectable
A SQLAlchemy representation of a
SELECT
query.- *args
Additional positional arguments are forwarded to
sqlalchemy.engine.Connection.execute
.- **kwargs
Additional keyword arguments are forwarded to
sqlalchemy.engine.Connection.execute
.
Returns: - result :
sqlalchemy.engine.ResultProxy
Query results.
Notes
The default implementation should be sufficient for most derived classes.
- sql :
-
replace
(table: sqlalchemy.sql.schema.Table, *rows) → None¶ Insert one or more rows into a table, replacing any existing rows for which insertion of a new row would violate the primary key constraint.
Parameters: - table :
sqlalchemy.schema.Table
Table rows should be inserted into.
- *rows
Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.
Raises: - ReadOnlyDatabaseError
Raised if
isWriteable
returnsFalse
when this method is called.
Notes
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
Implementations should raise a
sqlalchemy.exc.IntegrityError
exception when a constraint other than the primary key would be violated.Implementations are not required to support
replace
on tables with autoincrement keys.- table :
-
session
() → Iterator¶ Return a context manager that represents a session (persistent connection to a database).
-
shrinkDatabaseEntityName
(original: str) → str¶ Return a version of the given name that fits within this database engine’s length limits for table, constraint, indexes, and sequence names.
Implementations should not assume that simple truncation is safe, because multiple long names often begin with the same prefix.
The default implementation simply returns the given name.
Parameters: - original :
str
The original name.
Returns: - shrunk :
str
The new, possibly shortened name.
- original :
-
sync
(table: sqlalchemy.sql.schema.Table, *, keys: Dict[str, Any], compared: Optional[Dict[str, Any], None] = None, extra: Optional[Dict[str, Any], None] = None, returning: Optional[Sequence[str], None] = None, update: bool = False) → Tuple[Optional[Dict[str, Any], None], Union[bool, Dict[str, Any]]]¶ Insert into a table as necessary to ensure database contains values equivalent to the given ones.
Parameters: - table :
sqlalchemy.schema.Table
Table to be queried and possibly inserted into.
- keys :
dict
Column name-value pairs used to search for an existing row; must be a combination that can be used to select a single row if one exists. If such a row does not exist, these values are used in the insert.
- compared :
dict
, optional Column name-value pairs that are compared to those in any existing row. If such a row does not exist, these rows are used in the insert.
- extra :
dict
, optional Column name-value pairs that are ignored if a matching row exists, but used in an insert if one is necessary.
- returning :
Sequence
ofstr
, optional The names of columns whose values should be returned.
- update :
bool
, optional If
True
(False
is default), update the existing row with the values incompared
instead of raisingDatabaseConflictError
.
Returns: - row :
dict
, optional The value of the fields indicated by
returning
, orNone
ifreturning
isNone
.- inserted_or_updated :
bool
ordict
If
True
, a new row was inserted; ifFalse
, a matching row already existed. If adict
(only possible ifupdate=True
), then an existing row was updated, and the dict maps the names of the updated columns to their old values (new values can be obtained fromcompared
).
Raises: - DatabaseConflictError
Raised if the values in
compared
do not match the values in the database.- ReadOnlyDatabaseError
Raised if
isWriteable
returnsFalse
, and no matching record already exists.
Notes
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
It may be called on read-only databases if and only if the matching row does in fact already exist.
- table :
-
transaction
(*, interrupting: bool = False, savepoint: bool = False, lock: Iterable[sqlalchemy.sql.schema.Table] = ()) → Iterator[None]¶ Return a context manager that represents a transaction.
Parameters: - interrupting :
bool
, optional If
True
(False
is default), this transaction block may not be nested without an outer one, and attempting to do so is a logic (i.e. assertion) error.- savepoint :
bool
, optional If
True
(False
is default), create aSAVEPOINT
, allowing exceptions raised by the database (e.g. due to constraint violations) during this transaction’s context to be caught outside it without also rolling back all operations in an outer transaction block. IfFalse
, transactions may still be nested, but a rollback may be generated at any level and affects all levels, and commits are deferred until the outermost block completes. If any outer transaction block was created withsavepoint=True
, all inner blocks will be as well (regardless of the actual value passed). This has no effect if this is the outermost transaction.- lock :
Iterable
[sqlalchemy.schema.Table
], optional A list of tables to lock for the duration of this transaction. These locks are guaranteed to prevent concurrent writes and allow this transaction (only) to acquire the same locks (others should block), but only prevent concurrent reads if the database engine requires that in order to block concurrent writes.
Notes
All transactions on a connection managed by one or more
Database
instances _must_ go through this method, or transaction state will not be correctly managed.- interrupting :
-
update
(table: sqlalchemy.sql.schema.Table, where: Dict[str, str], *rows) → int¶ Update one or more rows in a table.
Parameters: - table :
sqlalchemy.schema.Table
Table containing the rows to be updated.
- where :
dict
[str
,str
] A mapping from the names of columns that will be used to search for existing rows to the keys that will hold these values in the
rows
dictionaries. Note that these may not be the same due to SQLAlchemy limitations.- *rows
Positional arguments are the rows to be updated. The keys in all dictionaries must be the same, and may correspond to either a value in the
where
dictionary or the name of a column to be updated.
Returns: - count :
int
Number of rows matched (regardless of whether the update actually modified them).
Raises: - ReadOnlyDatabaseError
Raised if
isWriteable
returnsFalse
when this method is called.
Notes
May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.
The default implementation should be sufficient for most derived classes.
- table :
- connection :