Source code for scystream.sdk.config.compute_block_utils
from typing import Union
from pydantic_core import PydanticUndefinedType
from scystream.sdk.config.models import (
ComputeBlock,
Entrypoint,
InputOutputModel,
FILE_TYPE_IDENTIFIER,
DATABASE_TABLE_TYPE_IDENTIFIER,
CUSTOM_TYPE_IDENTIFIER,
)
from scystream.sdk.env.settings import (
DatabaseSettings,
InputSettings,
OutputSettings,
FileSettings,
)
from scystream.sdk.config.entrypoints import get_registered_functions
def _get_pydantic_default_value_or_none(value):
if type(value.default) is PydanticUndefinedType:
return None
return value.default
def _get_io_type(subj: Union[InputSettings, OutputSettings]) -> str:
"""
Determines the type of input/output based on the subject class.
:param subject: The settings class
:return: The determined type ("file", "database_table", "custom")
"""
if issubclass(subj, FileSettings):
return FILE_TYPE_IDENTIFIER
elif issubclass(subj, DatabaseSettings):
return DATABASE_TABLE_TYPE_IDENTIFIER
return CUSTOM_TYPE_IDENTIFIER
def _build_input_output_dict_from_class(
subject: Union[InputSettings, OutputSettings],
):
config_dict = {}
identifier = getattr(subject, "__identifier__", None)
for key, value in subject.model_fields.items():
default_value = _get_pydantic_default_value_or_none(value)
if identifier:
key = f"{identifier}_{key}"
config_dict[key] = default_value
return InputOutputModel(
type=_get_io_type(subject),
description="<to-be-set>",
config=config_dict,
)
[docs]
def get_compute_block() -> ComputeBlock:
"""
Converts Entrypoints & Settings defined in the Code
to a ComputeBlock instance.
"""
entrypoints = {}
for entrypoint, func in get_registered_functions().items():
envs = {}
inputs = {}
outputs = {}
if func["settings"]:
entrypoint_settings_class = func["settings"]
for key, value in entrypoint_settings_class.model_fields.items():
if isinstance(value.default_factory, type) and issubclass(
value.default_factory, InputSettings
):
inputs[key] = _build_input_output_dict_from_class(
value.default_factory
)
elif isinstance(value.default_factory, type) and issubclass(
value.default_factory, OutputSettings
):
outputs[key] = _build_input_output_dict_from_class(
value.default_factory
)
else:
envs[key] = _get_pydantic_default_value_or_none(value)
entrypoints[entrypoint] = Entrypoint(
description="<tbd>",
envs=envs if envs != {} else None,
inputs=inputs if inputs != {} else None,
outputs=outputs if outputs != {} else None,
)
return ComputeBlock(
name="<tbs>",
description="<tbs>",
author="<tbs>",
entrypoints=entrypoints,
docker_image="<tbs>",
)