scystream.sdk.database_handling package#
Submodules#
scystream.sdk.database_handling.postgres_manager module#
- class scystream.sdk.database_handling.database_manager.BaseDatabaseOperations[source]#
Bases:
ABC- MAX_TABLE_NAME_LENGTH = 63#
- class scystream.sdk.database_handling.database_manager.PandasDatabaseOperations[source]#
Bases:
BaseDatabaseOperationsDatabase operations using Pandas and SQLAlchemy.
This class provides a simple interface to read from and write to any SQLAlchemy-compatible database using Pandas DataFrames. The connection is established via a DSN (Data Source Name), making this implementation backend-agnostic.
Supported databases include (but are not limited to): - PostgreSQL - MySQL - SQLite - Snowflake - Oracle
This implementation is best suited for local or small-to-medium sized datasets where distributed processing (e.g., Spark) is not required.
- __init__(dsn: str, schema: str | None = None)[source]#
Initialize the PandasDatabaseOperations instance.
- Parameters:
dsn – A SQLAlchemy-compatible database connection string (DSN), e.g.: - postgresql://user:pass@host:5432/db - mysql+pymysql://user:pass@host/db - sqlite:///local.db
schema – An optional schema used in postgres databases can be specified
- Raises:
ValueError – If the DSN is invalid or connection fails.
- Note:
Uses SQLAlchemy’s connection pooling with pool_pre_ping=True to ensure stale connections are automatically refreshed.
- read(table: str | None = None, query: str | None = None) pandas.DataFrame[source]#
Read data from the database into a Pandas DataFrame.
This method supports two modes of operation: - Reading all rows from a specified table - Executing a custom SQL query
- Parameters:
table – The name of the table to read from. Must be provided if query is not supplied. (optional)
query – A custom SQL query to execute. If provided, this overrides the table parameter. (optional)
- Raises:
ValueError – If neither table nor query is provided.
ValueError – If the table name exceeds the allowed length.
- Returns:
A Pandas DataFrame containing the query result.
- Return type:
pandas.DataFrame
- Example:
>>> db.read(table="users") >>> db.read(query="SELECT id, name FROM users WHERE active = true")
- write(table: str, data: pandas.DataFrame, mode: str = 'overwrite')[source]#
Write a Pandas DataFrame to the database.
This method writes the provided DataFrame to the specified table using SQLAlchemy. The behavior when the table already exists is controlled via the mode parameter.
- Parameters:
table – The name of the target table.
data – The Pandas DataFrame to write.
mode – The write mode. Supported options are: - ‘overwrite’: Replace the table if it exists. - ‘append’: Append data to the existing table. Defaults to ‘overwrite’. (optional)
- Raises:
ValueError – If the table name exceeds the allowed length.
ValueError – If an unsupported mode is provided.
- Note:
The DataFrame index is not written to the database.
Ensure schema compatibility when using mode=’append’.
- Example:
>>> db.write("users", df) >>> db.write("users", df, mode="append")
- class scystream.sdk.database_handling.database_manager.SparkDatabaseOperations[source]#
Bases:
BaseDatabaseOperationsClass to perform PostgreSQL operations using Apache Spark.
This class provides methods to read from and write to a PostgreSQL database using JDBC and Spark’s DataFrame API. It requires a SparkSession and a PostgresConfig object or the PostgresSettings from an input or output for database connectivity.
- read(table: str | None = None, query: str | None = None) pyspark.sql.DataFrame[source]#
Reads data from a PostgreSQL database into a Spark DataFrame.
This method can either read data from a specified table or execute a custom SQL query to retrieve data from the database.
- Parameters:
table – The name of the table to read data from. Must be provided if query is not supplied. (optional)
query – A custom SQL query to run. If provided, this overrides the table parameter. (optional)
- Returns:
A Spark DataFrame containing the result of the query or table data.
- Return type:
DataFrame
- write(table: str, dataframe, mode='overwrite', schema: str | None = None)[source]#
Writes a Spark DataFrame to a specified table in a PostgreSQL database using JDBC.
This method writes the provided DataFrame to the target PostgreSQL table, with the option to specify the write mode (overwrite, append,
etc.).
- Parameters:
table – The name of the table where data will be written.
dataframe – The Spark DataFrame containing the data to write.
mode – The write mode. Valid options are ‘overwrite’, ‘append’, ‘ignore’, and ‘error’. Defaults to ‘overwrite’. (optional)
- Note:
Ensure that the schema of the DataFrame matches the schema of the target table if the table exists.
- Note:
The mode parameter controls the behavior when the table already exists.