Source code for scystream.sdk.spark_manager
import pkg_resources
from pyspark.sql import SparkSession
from scystream.sdk.config import SDKConfig
from scystream.sdk.database_handling.database_manager import (
SparkDatabaseOperations,
)
[docs]
class SparkManager:
[docs]
def __init__(self):
self.config: SDKConfig = SDKConfig()
psql_jar_path = pkg_resources.resource_filename(
"scystream.sdk",
"spark_jars/postgresql-42.7.4.jar",
)
"""
When starting the ComputeBlock using Apache Spark's DockerOperator,
ensure that the container runs in the same network as the spark-master
and spark-worker nodes. Otherwise, Spark jobs may fail.
"""
self.session = (
SparkSession.builder.master(self.config.cb_spark_master)
.appName(self.config.app_name)
.config("spark.jars", psql_jar_path)
.getOrCreate()
)
[docs]
def setup_pg(self, dsn: str) -> SparkDatabaseOperations:
if not dsn.startswith("postgresql://"):
raise ValueError(
"Spark integration currently only supports PostgreSQL DSNs."
)
return SparkDatabaseOperations(self.session, dsn)
[docs]
def stop_session(self):
if self.session:
self.session.stop()
self.session = None