From 43cab04454f8940acc2d4f130770acaf0c9576ac Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 27 Jun 2025 00:27:48 +0900 Subject: [PATCH] Add Run id as a parameter to the artifacts table (#2799) Co-authored-by: Suchintan Singh --- ...45_add_run_id_column_to_artifacts_table.py | 38 +++++++++++++++++++ .../editor/panels/WorkflowParametersPanel.tsx | 4 +- skyvern/forge/sdk/artifact/manager.py | 4 ++ skyvern/forge/sdk/core/skyvern_context.py | 3 +- skyvern/forge/sdk/db/client.py | 17 ++------- skyvern/forge/sdk/db/models.py | 1 + skyvern/forge/sdk/executor/async_executor.py | 1 + skyvern/forge/sdk/forge_log.py | 2 + skyvern/forge/sdk/workflow/models/block.py | 3 ++ skyvern/forge/sdk/workflow/service.py | 4 ++ skyvern/library/skyvern.py | 3 ++ skyvern/services/task_v2_service.py | 4 ++ 12 files changed, 69 insertions(+), 15 deletions(-) create mode 100644 alembic/versions/2025_06_26_1455-760ae45a1345_add_run_id_column_to_artifacts_table.py diff --git a/alembic/versions/2025_06_26_1455-760ae45a1345_add_run_id_column_to_artifacts_table.py b/alembic/versions/2025_06_26_1455-760ae45a1345_add_run_id_column_to_artifacts_table.py new file mode 100644 index 00000000..73f13aed --- /dev/null +++ b/alembic/versions/2025_06_26_1455-760ae45a1345_add_run_id_column_to_artifacts_table.py @@ -0,0 +1,38 @@ +"""add_run_id_column_to_artifacts_table + +Revision ID: 760ae45a1345 +Revises: afeed80576cb +Create Date: 2025-06-26 14:55:09.740481+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "760ae45a1345" +down_revision: Union[str, None] = "afeed80576cb" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("artifacts", sa.Column("run_id", sa.String(), nullable=True)) + with op.get_context().autocommit_block(): + op.execute("SET statement_timeout = '3h';") + op.execute(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_artifacts_run_id + ON artifacts (run_id) + """) + op.execute("RESET statement_timeout;") + + +def downgrade() -> None: + op.drop_column("artifacts", "run_id") + with op.get_context().autocommit_block(): + op.execute("SET statement_timeout = '3h';") + op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_artifacts_run_id") + op.execute("RESET statement_timeout;") diff --git a/skyvern-frontend/src/routes/workflows/editor/panels/WorkflowParametersPanel.tsx b/skyvern-frontend/src/routes/workflows/editor/panels/WorkflowParametersPanel.tsx index 2f6c86cc..a292ac34 100644 --- a/skyvern-frontend/src/routes/workflows/editor/panels/WorkflowParametersPanel.tsx +++ b/skyvern-frontend/src/routes/workflows/editor/panels/WorkflowParametersPanel.tsx @@ -141,7 +141,9 @@ function WorkflowParametersPanel() { ) : ( - {parameter.parameterType} + {parameter.parameterType === "onepassword" + ? "credential" + : parameter.parameterType} )} diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index 95662778..2cd0bc85 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -33,6 +33,7 @@ class ArtifactManager: workflow_run_block_id: str | None = None, thought_id: str | None = None, task_v2_id: str | None = None, + run_id: str | None = None, ai_suggestion_id: str | None = None, data: bytes | None = None, path: str | None = None, @@ -49,6 +50,8 @@ class ArtifactManager: task_v2_id = context.task_v2_id if not task_id and context: task_id = context.task_id + if not run_id and context: + run_id = context.run_id artifact = await app.DATABASE.create_artifact( artifact_id, @@ -60,6 +63,7 @@ class ArtifactManager: workflow_run_block_id=workflow_run_block_id, thought_id=thought_id, task_v2_id=task_v2_id, + run_id=run_id, organization_id=organization_id, ai_suggestion_id=ai_suggestion_id, ) diff --git a/skyvern/forge/sdk/core/skyvern_context.py b/skyvern/forge/sdk/core/skyvern_context.py index d51b9fa8..63e4c120 100644 --- a/skyvern/forge/sdk/core/skyvern_context.py +++ b/skyvern/forge/sdk/core/skyvern_context.py @@ -18,6 +18,7 @@ class SkyvernContext: max_steps_override: int | None = None browser_session_id: str | None = None tz_info: ZoneInfo | None = None + run_id: str | None = None totp_codes: dict[str, str | None] = field(default_factory=dict) log: list[dict] = field(default_factory=list) hashed_href_map: dict[str, str] = field(default_factory=dict) @@ -26,7 +27,7 @@ class SkyvernContext: max_screenshot_scrolling_times: int | None = None def __repr__(self) -> str: - return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override})" + return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override}, run_id={self.run_id})" def __str__(self) -> str: return self.__repr__() diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 53e2a8b2..dc35ae4e 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -224,14 +224,15 @@ class AgentDB: artifact_id: str, artifact_type: str, uri: str, + organization_id: str, step_id: str | None = None, task_id: str | None = None, workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, task_v2_id: str | None = None, + run_id: str | None = None, thought_id: str | None = None, ai_suggestion_id: str | None = None, - organization_id: str | None = None, ) -> Artifact: try: async with self.Session() as session: @@ -245,6 +246,7 @@ class AgentDB: workflow_run_block_id=workflow_run_block_id, observer_cruise_id=task_v2_id, observer_thought_id=thought_id, + run_id=run_id, ai_suggestion_id=ai_suggestion_id, organization_id=organization_id, ) @@ -1024,18 +1026,7 @@ class AgentDB: async with self.Session() as session: query = select(ArtifactModel).filter_by(organization_id=organization_id) - if run.task_run_type in [ - RunType.task_v1, - RunType.openai_cua, - RunType.anthropic_cua, - ]: - query = query.filter_by(task_id=run.run_id) - elif run.task_run_type == RunType.task_v2: - query = query.filter_by(observer_cruise_id=run.run_id) - elif run.task_run_type == RunType.workflow_run: - query = query.filter_by(workflow_run_id=run.run_id) - else: - return [] + query = query.filter_by(run_id=run.run_id) if artifact_types: query = query.filter(ArtifactModel.artifact_type.in_(artifact_types)) diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 825318c8..9fb8e848 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -193,6 +193,7 @@ class ArtifactModel(Base): step_id = Column(String, index=True) artifact_type = Column(String) uri = Column(String) + run_id = Column(String, nullable=True, index=True) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) modified_at = Column( DateTime, diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index b579fafd..e2766245 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -105,6 +105,7 @@ class BackgroundTaskExecutor(AsyncExecutor): context: SkyvernContext = skyvern_context.ensure_context() context.task_id = task.task_id + context.run_id = context.run_id or task.task_id context.organization_id = organization_id context.max_steps_override = max_steps_override context.max_screenshot_scrolling_times = task.max_screenshot_scrolling_times diff --git a/skyvern/forge/sdk/forge_log.py b/skyvern/forge/sdk/forge_log.py index 01f8c37f..0a907d32 100644 --- a/skyvern/forge/sdk/forge_log.py +++ b/skyvern/forge/sdk/forge_log.py @@ -31,6 +31,8 @@ def add_kv_pairs_to_msg(logger: logging.Logger, method_name: str, event_dict: Ev event_dict["organization_name"] = context.organization_name if context.task_id: event_dict["task_id"] = context.task_id + if context.run_id: + event_dict["run_id"] = context.run_id if context.workflow_id: event_dict["workflow_id"] = context.workflow_id if context.workflow_run_id: diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 57f24c32..022b724e 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -2544,6 +2544,8 @@ class TaskV2Block(Block): browser_session_id=browser_session_id, ) finally: + context: skyvern_context.SkyvernContext | None = skyvern_context.current() + current_run_id = context.run_id if context and context.run_id else workflow_run_id skyvern_context.set( skyvern_context.SkyvernContext( organization_id=organization_id, @@ -2551,6 +2553,7 @@ class TaskV2Block(Block): workflow_id=workflow_run.workflow_id, workflow_permanent_id=workflow_run.workflow_permanent_id, workflow_run_id=workflow_run_id, + run_id=current_run_id, browser_session_id=browser_session_id, max_screenshot_scrolling_times=workflow_run.max_screenshot_scrolling_times, ) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index b630a261..6a22d7ac 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -177,6 +177,8 @@ class WorkflowService: webhook_callback_url=workflow_request.webhook_callback_url, max_screenshot_scrolling_times=workflow_request.max_screenshot_scrolling_times, ) + context: skyvern_context.SkyvernContext | None = skyvern_context.current() + current_run_id = context.run_id if context and context.run_id else workflow_run.workflow_run_id skyvern_context.set( SkyvernContext( organization_id=organization.organization_id, @@ -184,6 +186,8 @@ class WorkflowService: request_id=request_id, workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, + run_id=current_run_id, + workflow_permanent_id=workflow_run.workflow_permanent_id, max_steps_override=max_steps_override, max_screenshot_scrolling_times=workflow_request.max_screenshot_scrolling_times, ) diff --git a/skyvern/library/skyvern.py b/skyvern/library/skyvern.py index 1196b382..bff3a856 100644 --- a/skyvern/library/skyvern.py +++ b/skyvern/library/skyvern.py @@ -136,11 +136,14 @@ class Skyvern(AsyncSkyvern): organization_id=organization.organization_id, ) try: + context: skyvern_context.SkyvernContext | None = skyvern_context.current() + current_run_id = context.run_id if context and context.run_id else task.task_id skyvern_context.set( SkyvernContext( organization_id=organization.organization_id, organization_name=organization.organization_name, task_id=task.task_id, + run_id=current_run_id, max_steps_override=max_steps, ) ) diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 73e01c0b..4582a33c 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -186,6 +186,7 @@ async def initialize_task_v2( context = skyvern_context.current() if context: context.task_v2_id = task_v2.observer_cruise_id + context.run_id = context.run_id or task_v2.observer_cruise_id context.max_screenshot_scrolling_times = max_screenshot_scrolling_times thought = await app.DATABASE.create_thought( @@ -458,6 +459,8 @@ async def run_task_v2_helper( ###################### run task v2 ###################### + context: skyvern_context.SkyvernContext | None = skyvern_context.current() + current_run_id = context.run_id if context and context.run_id else task_v2.task_id skyvern_context.set( SkyvernContext( organization_id=organization_id, @@ -465,6 +468,7 @@ async def run_task_v2_helper( workflow_run_id=workflow_run_id, request_id=request_id, task_v2_id=task_v2_id, + run_id=current_run_id, browser_session_id=browser_session_id, max_screenshot_scrolling_times=task_v2.max_screenshot_scrolling_times, )