Multipage workflow improvements (#875)
This commit is contained in:
@@ -1264,7 +1264,7 @@ class AgentDB:
|
|||||||
LOG.error("SQLAlchemyError", exc_info=True)
|
LOG.error("SQLAlchemyError", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def create_workflow_run_output_parameter(
|
async def create_or_update_workflow_run_output_parameter(
|
||||||
self,
|
self,
|
||||||
workflow_run_id: str,
|
workflow_run_id: str,
|
||||||
output_parameter_id: str,
|
output_parameter_id: str,
|
||||||
@@ -1272,6 +1272,24 @@ class AgentDB:
|
|||||||
) -> WorkflowRunOutputParameter:
|
) -> WorkflowRunOutputParameter:
|
||||||
try:
|
try:
|
||||||
async with self.Session() as session:
|
async with self.Session() as session:
|
||||||
|
# check if the workflow run output parameter already exists
|
||||||
|
# if it does, update the value
|
||||||
|
if workflow_run_output_parameter := (
|
||||||
|
await session.scalars(
|
||||||
|
select(WorkflowRunOutputParameterModel)
|
||||||
|
.filter_by(workflow_run_id=workflow_run_id)
|
||||||
|
.filter_by(output_parameter_id=output_parameter_id)
|
||||||
|
)
|
||||||
|
).first():
|
||||||
|
LOG.info(
|
||||||
|
f"Updating existing workflow run output parameter with {workflow_run_output_parameter.workflow_run_id} - {workflow_run_output_parameter.output_parameter_id}"
|
||||||
|
)
|
||||||
|
workflow_run_output_parameter.value = value
|
||||||
|
await session.commit()
|
||||||
|
await session.refresh(workflow_run_output_parameter)
|
||||||
|
return convert_to_workflow_run_output_parameter(workflow_run_output_parameter, self.debug_enabled)
|
||||||
|
|
||||||
|
# if it does not exist, create a new one
|
||||||
workflow_run_output_parameter = WorkflowRunOutputParameterModel(
|
workflow_run_output_parameter = WorkflowRunOutputParameterModel(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
output_parameter_id=output_parameter_id,
|
output_parameter_id=output_parameter_id,
|
||||||
@@ -1286,6 +1304,33 @@ class AgentDB:
|
|||||||
LOG.error("SQLAlchemyError", exc_info=True)
|
LOG.error("SQLAlchemyError", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
async def update_workflow_run_output_parameter(
|
||||||
|
self,
|
||||||
|
workflow_run_id: str,
|
||||||
|
output_parameter_id: str,
|
||||||
|
value: dict[str, Any] | list | str | None,
|
||||||
|
) -> WorkflowRunOutputParameter:
|
||||||
|
try:
|
||||||
|
async with self.Session() as session:
|
||||||
|
workflow_run_output_parameter = (
|
||||||
|
await session.scalars(
|
||||||
|
select(WorkflowRunOutputParameterModel)
|
||||||
|
.filter_by(workflow_run_id=workflow_run_id)
|
||||||
|
.filter_by(output_parameter_id=output_parameter_id)
|
||||||
|
)
|
||||||
|
).first()
|
||||||
|
if not workflow_run_output_parameter:
|
||||||
|
raise NotFoundError(
|
||||||
|
f"WorkflowRunOutputParameter not found for {workflow_run_id} and {output_parameter_id}"
|
||||||
|
)
|
||||||
|
workflow_run_output_parameter.value = value
|
||||||
|
await session.commit()
|
||||||
|
await session.refresh(workflow_run_output_parameter)
|
||||||
|
return convert_to_workflow_run_output_parameter(workflow_run_output_parameter, self.debug_enabled)
|
||||||
|
except SQLAlchemyError:
|
||||||
|
LOG.error("SQLAlchemyError", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
async def get_workflow_parameters(self, workflow_id: str) -> list[WorkflowParameter]:
|
async def get_workflow_parameters(self, workflow_id: str) -> list[WorkflowParameter]:
|
||||||
try:
|
try:
|
||||||
async with self.Session() as session:
|
async with self.Session() as session:
|
||||||
|
|||||||
@@ -276,8 +276,7 @@ class WorkflowRunContext:
|
|||||||
self, parameter: OutputParameter, value: dict[str, Any] | list | str | None
|
self, parameter: OutputParameter, value: dict[str, Any] | list | str | None
|
||||||
) -> None:
|
) -> None:
|
||||||
if parameter.key in self.values:
|
if parameter.key in self.values:
|
||||||
LOG.warning(f"Output parameter {parameter.output_parameter_id} already has a registered value")
|
LOG.warning(f"Output parameter {parameter.output_parameter_id} already has a registered value, overwriting")
|
||||||
return
|
|
||||||
|
|
||||||
self.values[parameter.key] = value
|
self.values[parameter.key] = value
|
||||||
await self.set_parameter_values_for_output_parameter_dependent_blocks(parameter, value)
|
await self.set_parameter_values_for_output_parameter_dependent_blocks(parameter, value)
|
||||||
|
|||||||
@@ -87,19 +87,11 @@ class Block(BaseModel, abc.ABC):
|
|||||||
workflow_run_id: str,
|
workflow_run_id: str,
|
||||||
value: dict[str, Any] | list | str | None = None,
|
value: dict[str, Any] | list | str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
if workflow_run_context.has_value(self.output_parameter.key):
|
|
||||||
LOG.warning(
|
|
||||||
"Output parameter value already recorded",
|
|
||||||
output_parameter_id=self.output_parameter.output_parameter_id,
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
await workflow_run_context.register_output_parameter_value_post_execution(
|
await workflow_run_context.register_output_parameter_value_post_execution(
|
||||||
parameter=self.output_parameter,
|
parameter=self.output_parameter,
|
||||||
value=value,
|
value=value,
|
||||||
)
|
)
|
||||||
await app.DATABASE.create_workflow_run_output_parameter(
|
await app.DATABASE.create_or_update_workflow_run_output_parameter(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
output_parameter_id=self.output_parameter.output_parameter_id,
|
output_parameter_id=self.output_parameter.output_parameter_id,
|
||||||
value=value,
|
value=value,
|
||||||
|
|||||||
Reference in New Issue
Block a user