loop_over, current_value and current_index in the workflow_run_blocks table (#1425)

This commit is contained in:
Shuchang Zheng
2024-12-23 01:13:25 -08:00
committed by GitHub
parent 2029c0c41f
commit 68dae6dddd
5 changed files with 71 additions and 1 deletions

View File

@@ -0,0 +1,35 @@
"""add current_value, current_index, loop_values to workflow_run_blocks table
Revision ID: cf3cd8d666b0
Revises: 5be249d8dc96
Create Date: 2024-12-23 09:07:57.592369+00:00
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "cf3cd8d666b0"
down_revision: Union[str, None] = "5be249d8dc96"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("workflow_run_blocks", sa.Column("loop_values", sa.JSON(), nullable=True))
op.add_column("workflow_run_blocks", sa.Column("current_value", sa.String(), nullable=True))
op.add_column("workflow_run_blocks", sa.Column("current_index", sa.Integer(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("workflow_run_blocks", "current_index")
op.drop_column("workflow_run_blocks", "current_value")
op.drop_column("workflow_run_blocks", "loop_values")
# ### end Alembic commands ###

View File

@@ -2084,6 +2084,9 @@ class AgentDB:
failure_reason: str | None = None,
task_id: str | None = None,
organization_id: str | None = None,
loop_values: list | None = None,
current_value: str | None = None,
current_index: int | None = None,
) -> WorkflowRunBlock:
async with self.Session() as session:
workflow_run_block = (
@@ -2102,6 +2105,12 @@ class AgentDB:
workflow_run_block.task_id = task_id
if failure_reason:
workflow_run_block.failure_reason = failure_reason
if loop_values:
workflow_run_block.loop_values = loop_values
if current_value:
workflow_run_block.current_value = current_value
if current_index:
workflow_run_block.current_index = current_index
await session.commit()
await session.refresh(workflow_run_block)
else:

View File

@@ -504,6 +504,9 @@ class WorkflowRunBlockModel(Base):
output = Column(JSON, nullable=True)
continue_on_failure = Column(Boolean, nullable=False, default=False)
failure_reason = Column(String, nullable=True)
loop_values = Column(JSON, nullable=True)
current_value = Column(String, nullable=True)
current_index = Column(Integer, nullable=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)

View File

@@ -36,7 +36,7 @@ class WorkflowRunBlock(BaseModel):
loop_values: list[Any] | None = None
# block inside a loop block
current_item: Any | None = None
current_value: str | None = None
current_index: int | None = None

View File

@@ -98,6 +98,7 @@ class BlockResult:
output_parameter_value: dict[str, Any] | list | str | None = None
status: BlockStatus | None = None
failure_reason: str | None = None
workflow_run_block_id: str | None = None
class Block(BaseModel, abc.ABC):
@@ -155,6 +156,7 @@ class Block(BaseModel, abc.ABC):
output_parameter=self.output_parameter,
output_parameter_value=output_parameter_value,
status=status,
workflow_run_block_id=workflow_run_block_id,
)
def format_block_parameter_template_from_workflow_run_context(
@@ -777,6 +779,21 @@ class ForLoopBlock(Block):
"output_value": workflow_run_context.get_value(block_output.output_parameter.key),
}
)
try:
if block_output.workflow_run_block_id:
await app.DATABASE.update_workflow_run_block(
workflow_run_block_id=block_output.workflow_run_block_id,
organization_id=organization_id,
current_value=str(loop_over_value),
current_index=loop_idx,
)
except Exception:
LOG.warning(
"Failed to update workflow run block",
workflow_run_block_id=block_output.workflow_run_block_id,
loop_over_value=loop_over_value,
loop_idx=loop_idx,
)
loop_block = original_loop_block
block_outputs.append(block_output)
if block_output.status == BlockStatus.canceled:
@@ -833,6 +850,12 @@ class ForLoopBlock(Block):
organization_id=organization_id,
)
await app.DATABASE.update_workflow_run_block(
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
loop_values=loop_over_values,
)
LOG.info(
f"Number of loop_over values: {len(loop_over_values)}",
block_type=self.block_type,