From 01e9678d27e07cd6476b37d663024027d8d3ad60 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Thu, 5 Dec 2024 17:14:05 -0800 Subject: [PATCH] workflow run block (#1332) --- ...254717601_introduce_workflow_run_blocks.py | 64 +++++++++++++++++++ skyvern/forge/sdk/db/id.py | 6 ++ skyvern/forge/sdk/db/models.py | 22 +++++++ 3 files changed, 92 insertions(+) create mode 100644 alembic/versions/2024_12_06_0113-de0254717601_introduce_workflow_run_blocks.py diff --git a/alembic/versions/2024_12_06_0113-de0254717601_introduce_workflow_run_blocks.py b/alembic/versions/2024_12_06_0113-de0254717601_introduce_workflow_run_blocks.py new file mode 100644 index 00000000..ec730ec9 --- /dev/null +++ b/alembic/versions/2024_12_06_0113-de0254717601_introduce_workflow_run_blocks.py @@ -0,0 +1,64 @@ +"""Introduce workflow_run_blocks + +Revision ID: de0254717601 +Revises: db41106b9f1a +Create Date: 2024-12-06 01:13:07.932965+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "de0254717601" +down_revision: Union[str, None] = "db41106b9f1a" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "workflow_run_blocks", + sa.Column("workflow_run_block_id", sa.String(), nullable=False), + sa.Column("workflow_run_id", sa.String(), nullable=False), + sa.Column("parent_workflow_run_block_id", sa.String(), nullable=True), + sa.Column("organization_id", sa.String(), nullable=True), + sa.Column("task_id", sa.String(), nullable=True), + sa.Column("label", sa.String(), nullable=True), + sa.Column("block_type", sa.String(), nullable=False), + sa.Column("status", sa.String(), nullable=False), + sa.Column("output", sa.JSON(), nullable=True), + sa.Column("continue_on_failure", sa.Boolean(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("modified_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["organization_id"], + ["organizations.organization_id"], + ), + sa.ForeignKeyConstraint( + ["parent_workflow_run_block_id"], + ["workflow_run_blocks.workflow_run_block_id"], + ), + sa.ForeignKeyConstraint( + ["task_id"], + ["tasks.task_id"], + ), + sa.ForeignKeyConstraint( + ["workflow_run_id"], + ["workflow_runs.workflow_run_id"], + ), + sa.PrimaryKeyConstraint("workflow_run_block_id"), + ) + op.create_index("wfrb_org_wfr_index", "workflow_run_blocks", ["organization_id", "workflow_run_id"], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index("wfrb_org_wfr_index", table_name="workflow_run_blocks") + op.drop_table("workflow_run_blocks") + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/db/id.py b/skyvern/forge/sdk/db/id.py index 36efab2a..da992cf5 100644 --- a/skyvern/forge/sdk/db/id.py +++ b/skyvern/forge/sdk/db/id.py @@ -36,6 +36,7 @@ ARTIFACT_PREFIX = "a" WORKFLOW_PREFIX = "w" WORKFLOW_PERMANENT_ID_PREFIX = "wpid" WORKFLOW_RUN_PREFIX = "wr" +WORKFLOW_RUN_BLOCK_PREFIX = "wrb" WORKFLOW_PARAMETER_PREFIX = "wp" AWS_SECRET_PARAMETER_PREFIX = "asp" OUTPUT_PARAMETER_PREFIX = "op" @@ -55,6 +56,11 @@ def generate_workflow_permanent_id() -> str: return f"{WORKFLOW_PERMANENT_ID_PREFIX}_{int_id}" +def generate_workflow_run_block_id() -> str: + int_id = generate_id() + return f"{WORKFLOW_RUN_BLOCK_PREFIX}_{int_id}" + + def generate_workflow_run_id() -> str: int_id = generate_id() return f"{WORKFLOW_RUN_PREFIX}_{int_id}" diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 1c1d3b68..6c61db95 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -35,6 +35,7 @@ from skyvern.forge.sdk.db.id import ( generate_workflow_id, generate_workflow_parameter_id, generate_workflow_permanent_id, + generate_workflow_run_block_id, generate_workflow_run_id, ) from skyvern.forge.sdk.schemas.tasks import ProxyLocation @@ -473,3 +474,24 @@ class ActionModel(Base): created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) + + +class WorkflowRunBlockModel(Base): + __tablename__ = "workflow_run_blocks" + __table_args__ = (Index("wfrb_org_wfr_index", "organization_id", "workflow_run_id"),) + + workflow_run_block_id = Column(String, primary_key=True, default=generate_workflow_run_block_id) + workflow_run_id = Column(String, ForeignKey("workflow_runs.workflow_run_id"), nullable=False) + parent_workflow_run_block_id = Column( + String, ForeignKey("workflow_run_blocks.workflow_run_block_id"), nullable=True + ) + organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=True) + task_id = Column(String, ForeignKey("tasks.task_id"), nullable=True) + label = Column(String, nullable=True) + block_type = Column(String, nullable=False) + status = Column(String, nullable=False) + output = Column(JSON, nullable=True) + continue_on_failure = Column(Boolean, nullable=False, default=False) + + created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) + modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)