scystream.sdk.database_handling package#

Submodules#

scystream.sdk.database_handling.postgres_manager module#

class scystream.sdk.database_handling.database_manager.BaseDatabaseOperations[source]#

Bases: ABC

MAX_TABLE_NAME_LENGTH = 63#
__init__(dsn: str, schema: str | None = None)[source]#
abstractmethod read(table: str | None = None, query: str | None = None)[source]#
abstractmethod write(table: str, data, mode: str = 'overwrite')[source]#
class scystream.sdk.database_handling.database_manager.PandasDatabaseOperations[source]#

Bases: BaseDatabaseOperations

Database operations using Pandas and SQLAlchemy.

This class provides a simple interface to read from and write to any SQLAlchemy-compatible database using Pandas DataFrames. The connection is established via a DSN (Data Source Name), making this implementation backend-agnostic.

Supported databases include (but are not limited to): - PostgreSQL - MySQL - SQLite - Snowflake - Oracle

This implementation is best suited for local or small-to-medium sized datasets where distributed processing (e.g., Spark) is not required.

__init__(dsn: str, schema: str | None = None)[source]#

Initialize the PandasDatabaseOperations instance.

Parameters:
  • dsn – A SQLAlchemy-compatible database connection string (DSN), e.g.: - postgresql://user:pass@host:5432/db - mysql+pymysql://user:pass@host/db - sqlite:///local.db

  • schema – An optional schema used in postgres databases can be specified

Raises:

ValueError – If the DSN is invalid or connection fails.

Note:

Uses SQLAlchemy’s connection pooling with pool_pre_ping=True to ensure stale connections are automatically refreshed.

read(table: str | None = None, query: str | None = None) pandas.DataFrame[source]#

Read data from the database into a Pandas DataFrame.

This method supports two modes of operation: - Reading all rows from a specified table - Executing a custom SQL query

Parameters:
  • table – The name of the table to read from. Must be provided if query is not supplied. (optional)

  • query – A custom SQL query to execute. If provided, this overrides the table parameter. (optional)

Raises:
  • ValueError – If neither table nor query is provided.

  • ValueError – If the table name exceeds the allowed length.

Returns:

A Pandas DataFrame containing the query result.

Return type:

pandas.DataFrame

Example:
>>> db.read(table="users")
>>> db.read(query="SELECT id, name FROM users WHERE active = true")
write(table: str, data: pandas.DataFrame, mode: str = 'overwrite')[source]#

Write a Pandas DataFrame to the database.

This method writes the provided DataFrame to the specified table using SQLAlchemy. The behavior when the table already exists is controlled via the mode parameter.

Parameters:
  • table – The name of the target table.

  • data – The Pandas DataFrame to write.

  • mode – The write mode. Supported options are: - ‘overwrite’: Replace the table if it exists. - ‘append’: Append data to the existing table. Defaults to ‘overwrite’. (optional)

Raises:
  • ValueError – If the table name exceeds the allowed length.

  • ValueError – If an unsupported mode is provided.

Note:
  • The DataFrame index is not written to the database.

  • Ensure schema compatibility when using mode=’append’.

Example:
>>> db.write("users", df)
>>> db.write("users", df, mode="append")
class scystream.sdk.database_handling.database_manager.SparkDatabaseOperations[source]#

Bases: BaseDatabaseOperations

Class to perform PostgreSQL operations using Apache Spark.

This class provides methods to read from and write to a PostgreSQL database using JDBC and Spark’s DataFrame API. It requires a SparkSession and a PostgresConfig object or the PostgresSettings from an input or output for database connectivity.

__init__(spark: pyspark.sql.SparkSession, dsn: str, schema: str | None)[source]#
read(table: str | None = None, query: str | None = None) pyspark.sql.DataFrame[source]#

Reads data from a PostgreSQL database into a Spark DataFrame.

This method can either read data from a specified table or execute a custom SQL query to retrieve data from the database.

Parameters:
  • table – The name of the table to read data from. Must be provided if query is not supplied. (optional)

  • query – A custom SQL query to run. If provided, this overrides the table parameter. (optional)

Returns:

A Spark DataFrame containing the result of the query or table data.

Return type:

DataFrame

write(table: str, dataframe, mode='overwrite', schema: str | None = None)[source]#

Writes a Spark DataFrame to a specified table in a PostgreSQL database using JDBC.

This method writes the provided DataFrame to the target PostgreSQL table, with the option to specify the write mode (overwrite, append,

etc.).

Parameters:
  • table – The name of the table where data will be written.

  • dataframe – The Spark DataFrame containing the data to write.

  • mode – The write mode. Valid options are ‘overwrite’, ‘append’, ‘ignore’, and ‘error’. Defaults to ‘overwrite’. (optional)

Note:

Ensure that the schema of the DataFrame matches the schema of the target table if the table exists.

Note:

The mode parameter controls the behavior when the table already exists.