From 68dae6dddd4637744319ba317fbaa783b761b3bb Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Mon, 23 Dec 2024 01:13:25 -0800 Subject: [PATCH] loop_over, current_value and current_index in the workflow_run_blocks table (#1425) --- ...0_add_current_value_current_index_loop_.py | 35 +++++++++++++++++++ skyvern/forge/sdk/db/client.py | 9 +++++ skyvern/forge/sdk/db/models.py | 3 ++ skyvern/forge/sdk/schemas/workflow_runs.py | 2 +- skyvern/forge/sdk/workflow/models/block.py | 23 ++++++++++++ 5 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 alembic/versions/2024_12_23_0907-cf3cd8d666b0_add_current_value_current_index_loop_.py diff --git a/alembic/versions/2024_12_23_0907-cf3cd8d666b0_add_current_value_current_index_loop_.py b/alembic/versions/2024_12_23_0907-cf3cd8d666b0_add_current_value_current_index_loop_.py new file mode 100644 index 00000000..dcb968a8 --- /dev/null +++ b/alembic/versions/2024_12_23_0907-cf3cd8d666b0_add_current_value_current_index_loop_.py @@ -0,0 +1,35 @@ +"""add current_value, current_index, loop_values to workflow_run_blocks table + +Revision ID: cf3cd8d666b0 +Revises: 5be249d8dc96 +Create Date: 2024-12-23 09:07:57.592369+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "cf3cd8d666b0" +down_revision: Union[str, None] = "5be249d8dc96" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("workflow_run_blocks", sa.Column("loop_values", sa.JSON(), nullable=True)) + op.add_column("workflow_run_blocks", sa.Column("current_value", sa.String(), nullable=True)) + op.add_column("workflow_run_blocks", sa.Column("current_index", sa.Integer(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("workflow_run_blocks", "current_index") + op.drop_column("workflow_run_blocks", "current_value") + op.drop_column("workflow_run_blocks", "loop_values") + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 74f1f38d..3a954d0c 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -2084,6 +2084,9 @@ class AgentDB: failure_reason: str | None = None, task_id: str | None = None, organization_id: str | None = None, + loop_values: list | None = None, + current_value: str | None = None, + current_index: int | None = None, ) -> WorkflowRunBlock: async with self.Session() as session: workflow_run_block = ( @@ -2102,6 +2105,12 @@ class AgentDB: workflow_run_block.task_id = task_id if failure_reason: workflow_run_block.failure_reason = failure_reason + if loop_values: + workflow_run_block.loop_values = loop_values + if current_value: + workflow_run_block.current_value = current_value + if current_index: + workflow_run_block.current_index = current_index await session.commit() await session.refresh(workflow_run_block) else: diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index b5f50de3..9a04e385 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -504,6 +504,9 @@ class WorkflowRunBlockModel(Base): output = Column(JSON, nullable=True) continue_on_failure = Column(Boolean, nullable=False, default=False) failure_reason = Column(String, nullable=True) + loop_values = Column(JSON, nullable=True) + current_value = Column(String, nullable=True) + current_index = Column(Integer, nullable=True) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) diff --git a/skyvern/forge/sdk/schemas/workflow_runs.py b/skyvern/forge/sdk/schemas/workflow_runs.py index c5679bf3..73225b21 100644 --- a/skyvern/forge/sdk/schemas/workflow_runs.py +++ b/skyvern/forge/sdk/schemas/workflow_runs.py @@ -36,7 +36,7 @@ class WorkflowRunBlock(BaseModel): loop_values: list[Any] | None = None # block inside a loop block - current_item: Any | None = None + current_value: str | None = None current_index: int | None = None diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 0dba39be..00fee77a 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -98,6 +98,7 @@ class BlockResult: output_parameter_value: dict[str, Any] | list | str | None = None status: BlockStatus | None = None failure_reason: str | None = None + workflow_run_block_id: str | None = None class Block(BaseModel, abc.ABC): @@ -155,6 +156,7 @@ class Block(BaseModel, abc.ABC): output_parameter=self.output_parameter, output_parameter_value=output_parameter_value, status=status, + workflow_run_block_id=workflow_run_block_id, ) def format_block_parameter_template_from_workflow_run_context( @@ -777,6 +779,21 @@ class ForLoopBlock(Block): "output_value": workflow_run_context.get_value(block_output.output_parameter.key), } ) + try: + if block_output.workflow_run_block_id: + await app.DATABASE.update_workflow_run_block( + workflow_run_block_id=block_output.workflow_run_block_id, + organization_id=organization_id, + current_value=str(loop_over_value), + current_index=loop_idx, + ) + except Exception: + LOG.warning( + "Failed to update workflow run block", + workflow_run_block_id=block_output.workflow_run_block_id, + loop_over_value=loop_over_value, + loop_idx=loop_idx, + ) loop_block = original_loop_block block_outputs.append(block_output) if block_output.status == BlockStatus.canceled: @@ -833,6 +850,12 @@ class ForLoopBlock(Block): organization_id=organization_id, ) + await app.DATABASE.update_workflow_run_block( + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + loop_values=loop_over_values, + ) + LOG.info( f"Number of loop_over values: {len(loop_over_values)}", block_type=self.block_type,