PostgresqlDatabase

class lsst.daf.butler.registry.databases.postgresql.PostgresqlDatabase(*, engine: Engine, origin: int, namespace: str | None = None, writeable: bool = True)

Bases: Database

An implementation of the Database interface for PostgreSQL.

Parameters:
enginesqlalchemy.engine.Engine

Engine to use for this connection.

originint

An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.

namespacestr, optional

The namespace (schema) this database is associated with. If None, the default schema for the connection is used (which may be None).

writeablebool, optional

If True, allow write operations on the database, including CREATE TABLE.

Notes

This currently requires the psycopg2 driver to be used as the backend for SQLAlchemy. Running the tests for this class requires the testing.postgresql be installed, which we assume indicates that a PostgreSQL server is installed and can be run locally in userspace.

Some functionality provided by this class (and used by Registry) requires the btree_gist PostgreSQL server extension to be installed an enabled on the database being connected to; this is checked at connection time.

Attributes Summary

dialect

The SQLAlchemy dialect for this database engine (sqlalchemy.engine.Dialect).

has_any_aggregate

Whether this database supports the ANY_VALUE aggregate function or something equivalent.

has_distinct_on

Whether this database supports the DISTINCT ON SQL construct.

Methods Summary

apply_any_aggregate(column)

Wrap the given SQLAlchemy column in the ANY_VALUE aggregate function or its equivalent.

assertTableWriteable(table, msg)

Raise if the given table is not writeable, either because the database connection is read-write or the table is a temporary table.

clone()

Make an independent copy of this Database object.

constant_rows(fields, *rows[, name])

Return a SQLAlchemy object that represents a small number of constant-valued rows.

declareStaticTables(*, create)

Return a context manager in which the database's static DDL schema can be declared.

delete(table, columns, *rows)

Delete one or more rows from a table.

deleteWhere(table, where)

Delete rows from a table with pre-constructed WHERE clause.

ensure(table, *rows[, primary_key_only])

Insert one or more rows into a table, skipping any rows for which insertion would violate a unique constraint.

ensureTableExists(name, spec)

Ensure that a table with the given name and specification exists, creating it if necessary.

expandDatabaseEntityName(shrunk)

Retrieve the original name for a database entity that was too long to fit within the database engine's limits.

fromEngine(engine, *, origin[, namespace, ...])

Create a new Database from an existing sqlalchemy.engine.Engine.

fromUri(uri, *, origin[, namespace, writeable])

Construct a database from a SQLAlchemy URI.

getExistingTable(name, spec)

Obtain an existing table with the given name and specification.

getTimespanRepresentation()

Return a type that encapsulates the way Timespan objects are stored in this database.

get_constant_rows_max()

Return the maximum number of rows that should be passed to constant_rows for this backend.

insert(table, *rows[, returnIds, select, names])

Insert one or more rows into a table, optionally returning autoincrement primary key values.

isTableWriteable(table)

Check whether a table is writeable, either because the database connection is read-write or the table is a temporary table.

isWriteable()

Return True if this database can be modified by this client.

makeDefaultUri(root)

Create a default connection URI appropriate for the given root directory, or None if there can be no such default.

makeEngine(uri, *[, writeable])

Create a sqlalchemy.engine.Engine from a SQLAlchemy URI.

query(sql, *args, **kwargs)

Run a SELECT query against the database.

replace(table, *rows)

Insert one or more rows into a table, replacing any existing rows for which insertion of a new row would violate the primary key constraint.

session()

Return a context manager that represents a session (persistent connection to a database).

shrinkDatabaseEntityName(original)

Return a version of the given name that fits within this database engine's length limits for table, constraint, indexes, and sequence names.

sync(table, *, keys[, compared, extra, ...])

Insert into a table as necessary to ensure database contains values equivalent to the given ones.

temporary_table(spec[, name])

Return a context manager that creates and then drops a temporary table.

transaction(*[, interrupting, savepoint, ...])

Return a context manager that represents a transaction.

update(table, where, *rows)

Update one or more rows in a table.

Attributes Documentation

dialect

The SQLAlchemy dialect for this database engine (sqlalchemy.engine.Dialect).

has_any_aggregate
has_distinct_on

Methods Documentation

apply_any_aggregate(column: ColumnElement[Any]) ColumnElement[Any]

Wrap the given SQLAlchemy column in the ANY_VALUE aggregate function or its equivalent.

Parameters:
columnsqlalchemy.ColumnElement

Original column to wrap.

Returns:
wrappedsqlalchemy.ColumnElement

A column element of the same SQL type that can appear in the SELECT clause even when this column does not appear in the GROUP BY clause.

Notes

This method’s behavior is unspecified when has_any_aggregate is False; the caller is responsible for checking that property first.

assertTableWriteable(table: Table, msg: str) None

Raise if the given table is not writeable, either because the database connection is read-write or the table is a temporary table.

Parameters:
tablesqlalchemy.schema.Table

SQLAlchemy table object to check.

msgstr, optional

If provided, raise ReadOnlyDatabaseError instead of returning False, with this message.

clone() PostgresqlDatabase

Make an independent copy of this Database object.

Returns:
dbDatabase

A new Database instance with the same configuration as this instance.

constant_rows(fields: NamedValueAbstractSet[FieldSpec], *rows: dict, name: str | None = None) FromClause

Return a SQLAlchemy object that represents a small number of constant-valued rows.

Parameters:
fieldsNamedValueAbstractSet [ ddl.FieldSpec ]

The columns of the rows. Unique and foreign key constraints are ignored.

*rowsdict

Values for the rows.

namestr, optional

If provided, the name of the SQL construct. If not provided, an opaque but unique identifier is generated.

Returns:
from_clausesqlalchemy.sql.FromClause

SQLAlchemy object representing the given rows. This is guaranteed to be something that can be directly joined into a SELECT query’s FROM clause, and will not involve a temporary table that needs to be cleaned up later.

Notes

The default implementation uses the SQL-standard VALUES construct, but support for that construct is varied enough across popular RDBMSs that the method is still marked abstract to force explicit opt-in via delegation to super.

declareStaticTables(*, create: bool) Iterator[StaticTablesContext]

Return a context manager in which the database’s static DDL schema can be declared.

Parameters:
createbool

If True, attempt to create all tables at the end of the context. If False, they will be assumed to already exist.

Returns:
schemaStaticTablesContext

A helper object that is used to add new tables.

Raises:
ReadOnlyDatabaseError

Raised if create is True, Database.isWriteable is False, and one or more declared tables do not already exist.

Notes

A database’s static DDL schema must be declared before any dynamic tables are managed via calls to ensureTableExists or getExistingTable. The order in which static schema tables are added inside the context block is unimportant; they will automatically be sorted and added in an order consistent with their foreign key relationships.

Examples

Given a Database instance db:

with db.declareStaticTables(create=True) as schema:
    schema.addTable("table1", TableSpec(...))
    schema.addTable("table2", TableSpec(...))
delete(table: Table, columns: Iterable[str], *rows: dict) int

Delete one or more rows from a table.

Parameters:
tablesqlalchemy.schema.Table

Table that rows should be deleted from.

columnsIterable of str

The names of columns that will be used to constrain the rows to be deleted; these will be combined via AND to form the WHERE clause of the delete query.

*rows

Positional arguments are the keys of rows to be deleted, as dictionaries mapping column name to value. The keys in all dictionaries must be exactly the names in columns.

Returns:
countint

Number of rows deleted.

Raises:
ReadOnlyDatabaseError

Raised if isWriteable returns False when this method is called.

Notes

May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.

The default implementation should be sufficient for most derived classes.

deleteWhere(table: Table, where: ColumnElement) int

Delete rows from a table with pre-constructed WHERE clause.

Parameters:
tablesqlalchemy.schema.Table

Table that rows should be deleted from.

wheresqlalchemy.sql.ClauseElement

The names of columns that will be used to constrain the rows to be deleted; these will be combined via AND to form the WHERE clause of the delete query.

Returns:
countint

Number of rows deleted.

Raises:
ReadOnlyDatabaseError

Raised if isWriteable returns False when this method is called.

Notes

May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.

The default implementation should be sufficient for most derived classes.

ensure(table: Table, *rows: dict, primary_key_only: bool = False) int

Insert one or more rows into a table, skipping any rows for which insertion would violate a unique constraint.

Parameters:
tablesqlalchemy.schema.Table

Table rows should be inserted into.

*rows

Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.

primary_key_onlybool, optional

If True (False is default), only skip rows that violate the primary key constraint, and raise an exception (and rollback transactions) for other constraint violations.

Returns:
countint

The number of rows actually inserted.

Raises:
ReadOnlyDatabaseError

Raised if isWriteable returns False when this method is called. This is raised even if the operation would do nothing even on a writeable database.

Notes

May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.

Implementations are not required to support ensure on tables with autoincrement keys.

ensureTableExists(name: str, spec: TableSpec) Table

Ensure that a table with the given name and specification exists, creating it if necessary.

Parameters:
namestr

Name of the table (not including namespace qualifiers).

specTableSpec

Specification for the table. This will be used when creating the table, and may be used when obtaining an existing table to check for consistency, but no such check is guaranteed.

Returns:
tablesqlalchemy.schema.Table

SQLAlchemy representation of the table.

Raises:
ReadOnlyDatabaseError

Raised if isWriteable returns False, and the table does not already exist.

DatabaseConflictError

Raised if the table exists but spec is inconsistent with its definition.

Notes

This method may not be called within transactions. It may be called on read-only databases if and only if the table does in fact already exist.

Subclasses may override this method, but usually should not need to.

expandDatabaseEntityName(shrunk: str) str

Retrieve the original name for a database entity that was too long to fit within the database engine’s limits.

Parameters:
shrunkstr

The original name.

Returns:
shrunkstr

The new, possibly shortened name.

classmethod fromEngine(engine: Engine, *, origin: int, namespace: str | None = None, writeable: bool = True) Database

Create a new Database from an existing sqlalchemy.engine.Engine.

Parameters:
enginesqlalchemy.engine.Engine

The engine for the database. May be shared between Database instances.

originint

An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.

namespacestr, optional

A different database namespace (i.e. schema) the new instance should be associated with. If None (default), the namespace (if any) is inferred from the connection.

writeablebool, optional

If True, allow write operations on the database, including CREATE TABLE.

Returns:
dbDatabase

A new Database instance.

Notes

This method allows different Database instances to share the same engine, which is desirable when they represent different namespaces can be queried together.

classmethod fromUri(uri: str | URL, *, origin: int, namespace: str | None = None, writeable: bool = True) Database

Construct a database from a SQLAlchemy URI.

Parameters:
uristr or sqlalchemy.engine.URL

A SQLAlchemy URI connection string.

originint

An integer ID that should be used as the default for any datasets, quanta, or other entities that use a (autoincrement, origin) compound primary key.

namespacestr, optional

A database namespace (i.e. schema) the new instance should be associated with. If None (default), the namespace (if any) is inferred from the URI.

writeablebool, optional

If True, allow write operations on the database, including CREATE TABLE.

Returns:
dbDatabase

A new Database instance.

getExistingTable(name: str, spec: TableSpec) Table | None

Obtain an existing table with the given name and specification.

Parameters:
namestr

Name of the table (not including namespace qualifiers).

specTableSpec

Specification for the table. This will be used when creating the SQLAlchemy representation of the table, and it is used to check that the actual table in the database is consistent.

Returns:
tablesqlalchemy.schema.Table or None

SQLAlchemy representation of the table, or None if it does not exist.

Raises:
DatabaseConflictError

Raised if the table exists but spec is inconsistent with its definition.

Notes

This method can be called within transactions and never modifies the database.

Subclasses may override this method, but usually should not need to.

classmethod getTimespanRepresentation() type[lsst.daf.butler.timespan_database_representation.TimespanDatabaseRepresentation]

Return a type that encapsulates the way Timespan objects are stored in this database.

Database does not automatically use the return type of this method anywhere else; calling code is responsible for making sure that DDL and queries are consistent with it.

Returns:
TimespanReprClasstype (TimespanDatabaseRepresention subclass)

A type that encapsulates the way Timespan objects should be stored in this database.

Notes

There are two big reasons we’ve decided to keep timespan-mangling logic outside the Database implementations, even though the choice of representation is ultimately up to a Database implementation:

  • Timespans appear in relatively few tables and queries in our typical usage, and the code that operates on them is already aware that it is working with timespans. In contrast, a timespan-representation-aware implementation of, say, insert, would need to have extra logic to identify when timespan-mangling needed to occur, which would usually be useless overhead.

  • SQLAlchemy’s rich SELECT query expression system has no way to wrap multiple columns in a single expression object (the ORM does, but we are not using the ORM). So we would have to wrap _much_ more of that code in our own interfaces to encapsulate timespan representations there.

get_constant_rows_max() int

Return the maximum number of rows that should be passed to constant_rows for this backend.

Returns:
maxint

Maximum number of rows.

Notes

This should reflect typical performance profiles (or a guess at these), not just hard database engine limits.

insert(table: Table, *rows: dict, returnIds: bool = False, select: SelectBase | None = None, names: Iterable[str] | None = None) list[int] | None

Insert one or more rows into a table, optionally returning autoincrement primary key values.

Parameters:
tablesqlalchemy.schema.Table

Table rows should be inserted into.

*rowsdict

Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.

returnIdsbool, optional

If True (False is default), return the values of the table’s autoincrement primary key field (which much exist).

selectsqlalchemy.sql.SelectBase, optional

A SELECT query expression to insert rows from. Cannot be provided with either rows or returnIds=True.

namesIterable [ str ], optional

Names of columns in table to be populated, ordered to match the columns returned by select. Ignored if select is None. If not provided, the columns returned by select must be named to match the desired columns of table.

Returns:
idsNone, or list of int

If returnIds is True, a list containing the inserted values for the table’s autoincrement primary key.

Raises:
ReadOnlyDatabaseError

Raised if isWriteable returns False when this method is called.

Notes

The default implementation uses bulk insert syntax when returnIds is False, and a loop over single-row insert operations when it is True.

Derived classes should reimplement when they can provide a more efficient implementation (especially for the latter case).

May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.

isTableWriteable(table: Table) bool

Check whether a table is writeable, either because the database connection is read-write or the table is a temporary table.

Parameters:
tablesqlalchemy.schema.Table

SQLAlchemy table object to check.

Returns:
writeablebool

Whether this table is writeable.

isWriteable() bool

Return True if this database can be modified by this client.

classmethod makeDefaultUri(root: str) str | None

Create a default connection URI appropriate for the given root directory, or None if there can be no such default.

Parameters:
rootstr

Root string to use to build connection URI.

Returns:
uristr or None

The URI string or None.

classmethod makeEngine(uri: str | URL, *, writeable: bool = True) Engine

Create a sqlalchemy.engine.Engine from a SQLAlchemy URI.

Parameters:
uristr or sqlalchemy.engine.URL

A SQLAlchemy URI connection string.

writeablebool, optional

If True, allow write operations on the database, including CREATE TABLE.

Returns:
enginesqlalchemy.engine.Engine

A database engine.

Notes

Subclasses that support other ways to connect to a database are encouraged to add optional arguments to their implementation of this method, as long as they maintain compatibility with the base class call signature.

query(sql: Executable | SelectBase, *args: Any, **kwargs: Any) Iterator[CursorResult]

Run a SELECT query against the database.

Parameters:
sqlsqlalchemy.sql.expression.SelectBase

A SQLAlchemy representation of a SELECT query.

*args

Additional positional arguments are forwarded to sqlalchemy.engine.Connection.execute.

**kwargs

Additional keyword arguments are forwarded to sqlalchemy.engine.Connection.execute.

Returns:
result_contextsqlalchemy.engine.CursorResults

Context manager that returns the query result object when entered. These results are invalidated when the context is exited.

replace(table: Table, *rows: dict) None

Insert one or more rows into a table, replacing any existing rows for which insertion of a new row would violate the primary key constraint.

Parameters:
tablesqlalchemy.schema.Table

Table rows should be inserted into.

*rows

Positional arguments are the rows to be inserted, as dictionaries mapping column name to value. The keys in all dictionaries must be the same.

Raises:
ReadOnlyDatabaseError

Raised if isWriteable returns False when this method is called.

Notes

May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.

Implementations should raise a sqlalchemy.exc.IntegrityError exception when a constraint other than the primary key would be violated.

Implementations are not required to support replace on tables with autoincrement keys.

session() Iterator[None]

Return a context manager that represents a session (persistent connection to a database).

Returns:
contextAbstractContextManager [ None ]

A context manager that does not return a value when entered.

Notes

This method should be used when a sequence of read-only SQL operations will be performed in rapid succession without a requirement that they yield consistent results in the presence of concurrent writes (or, more rarely, when conflicting concurrent writes are rare/impossible and the session will be open long enough that a transaction is inadvisable).

shrinkDatabaseEntityName(original: str) str

Return a version of the given name that fits within this database engine’s length limits for table, constraint, indexes, and sequence names.

Implementations should not assume that simple truncation is safe, because multiple long names often begin with the same prefix.

The default implementation simply returns the given name.

Parameters:
originalstr

The original name.

Returns:
shrunkstr

The new, possibly shortened name.

sync(table: Table, *, keys: dict[str, Any], compared: dict[str, Any] | None = None, extra: dict[str, Any] | None = None, returning: Sequence[str] | None = None, update: bool = False) tuple[dict[str, Any] | None, bool | dict[str, Any]]

Insert into a table as necessary to ensure database contains values equivalent to the given ones.

Parameters:
tablesqlalchemy.schema.Table

Table to be queried and possibly inserted into.

keysdict

Column name-value pairs used to search for an existing row; must be a combination that can be used to select a single row if one exists. If such a row does not exist, these values are used in the insert.

compareddict, optional

Column name-value pairs that are compared to those in any existing row. If such a row does not exist, these rows are used in the insert.

extradict, optional

Column name-value pairs that are ignored if a matching row exists, but used in an insert if one is necessary.

returningSequence of str, optional

The names of columns whose values should be returned.

updatebool, optional

If True (False is default), update the existing row with the values in compared instead of raising DatabaseConflictError.

Returns:
rowdict, optional

The value of the fields indicated by returning, or None if returning is None.

inserted_or_updatedbool or dict

If True, a new row was inserted; if False, a matching row already existed. If a dict (only possible if update=True), then an existing row was updated, and the dict maps the names of the updated columns to their old values (new values can be obtained from compared).

Raises:
DatabaseConflictError

Raised if the values in compared do not match the values in the database.

ReadOnlyDatabaseError

Raised if isWriteable returns False, and no matching record already exists.

Notes

May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.

It may be called on read-only databases if and only if the matching row does in fact already exist.

temporary_table(spec: TableSpec, name: str | None = None) Iterator[Table]

Return a context manager that creates and then drops a temporary table.

Parameters:
specddl.TableSpec

Specification for the columns. Unique and foreign key constraints may be ignored.

namestr, optional

If provided, the name of the SQL construct. If not provided, an opaque but unique identifier is generated.

Returns:
contextAbstractContextManager [ sqlalchemy.schema.Table ]

A context manager that returns a SQLAlchemy representation of the temporary table when entered.

Notes

Temporary tables may be created, dropped, and written to even in read-only databases - at least according to the Python-level protections in the Database classes. Server permissions may say otherwise, but in that case they probably need to be modified to support the full range of expected read-only butler behavior.

transaction(*, interrupting: bool = False, savepoint: bool = False, lock: Iterable[Table] = (), for_temp_tables: bool = False) Iterator[None]

Return a context manager that represents a transaction.

Parameters:
interruptingbool, optional

If True (False is default), this transaction block may not be nested without an outer one, and attempting to do so is a logic (i.e. assertion) error.

savepointbool, optional

If True (False is default), create a SAVEPOINT, allowing exceptions raised by the database (e.g. due to constraint violations) during this transaction’s context to be caught outside it without also rolling back all operations in an outer transaction block. If False, transactions may still be nested, but a rollback may be generated at any level and affects all levels, and commits are deferred until the outermost block completes. If any outer transaction block was created with savepoint=True, all inner blocks will be as well (regardless of the actual value passed). This has no effect if this is the outermost transaction.

lockIterable [ sqlalchemy.schema.Table ], optional

A list of tables to lock for the duration of this transaction. These locks are guaranteed to prevent concurrent writes and allow this transaction (only) to acquire the same locks (others should block), but only prevent concurrent reads if the database engine requires that in order to block concurrent writes.

for_temp_tablesbool, optional

If True, this transaction may involve creating temporary tables.

Returns:
contextAbstractContextManager [ None ]

A context manager that commits the transaction when it is exited without error and rolls back the transactoin when it is exited via an exception.

Notes

All transactions on a connection managed by one or more Database instances _must_ go through this method, or transaction state will not be correctly managed.

update(table: Table, where: dict[str, str], *rows: dict) int

Update one or more rows in a table.

Parameters:
tablesqlalchemy.schema.Table

Table containing the rows to be updated.

wheredict [str, str]

A mapping from the names of columns that will be used to search for existing rows to the keys that will hold these values in the rows dictionaries. Note that these may not be the same due to SQLAlchemy limitations.

*rows

Positional arguments are the rows to be updated. The keys in all dictionaries must be the same, and may correspond to either a value in the where dictionary or the name of a column to be updated.

Returns:
countint

Number of rows matched (regardless of whether the update actually modified them).

Raises:
ReadOnlyDatabaseError

Raised if isWriteable returns False when this method is called.

Notes

May be used inside transaction contexts, so implementations may not perform operations that interrupt transactions.

The default implementation should be sufficient for most derived classes.