From 1f6309c405e4c2882e1ce0a564a16883f007293b Mon Sep 17 00:00:00 2001 From: Marc Kelechava Date: Fri, 5 Dec 2025 12:30:05 -0800 Subject: [PATCH] Pass screenshots from one block to another block (#4212) --- skyvern/forge/sdk/db/client.py | 5 + skyvern/forge/sdk/schemas/tasks.py | 11 +- skyvern/forge/sdk/workflow/models/block.py | 43 ++++++- skyvern/forge/sdk/workflow/service.py | 131 ++++++++++++++++++--- skyvern/services/script_service.py | 16 ++- 5 files changed, 183 insertions(+), 23 deletions(-) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index c74a2dd3..b96ed039 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1227,6 +1227,7 @@ class AgentDB: workflow_run_block_id: str | None = None, thought_id: str | None = None, task_v2_id: str | None = None, + limit: int | None = None, ) -> list[Artifact]: try: async with self.Session() as session: @@ -1255,6 +1256,9 @@ class AgentDB: query = query.order_by(ArtifactModel.created_at.desc()) + if limit is not None: + query = query.limit(limit) + artifacts = (await session.scalars(query)).all() LOG.debug("Artifacts fetched", count=len(artifacts)) return [convert_to_artifact(a, self.debug_enabled) for a in artifacts] @@ -1286,6 +1290,7 @@ class AgentDB: workflow_run_block_id=workflow_run_block_id, thought_id=thought_id, task_v2_id=task_v2_id, + limit=1, ) return artifacts[0] if artifacts else None diff --git a/skyvern/forge/sdk/schemas/tasks.py b/skyvern/forge/sdk/schemas/tasks.py index fa467431..2151881c 100644 --- a/skyvern/forge/sdk/schemas/tasks.py +++ b/skyvern/forge/sdk/schemas/tasks.py @@ -397,9 +397,16 @@ class TaskOutput(BaseModel): errors: list[dict[str, Any]] = [] downloaded_files: list[FileInfo] | None = None downloaded_file_urls: list[str] | None = None # For backward compatibility + task_screenshots: list[str] | None = None + workflow_screenshots: list[str] | None = None @staticmethod - def from_task(task: Task, downloaded_files: list[FileInfo] | None = None) -> TaskOutput: + def from_task( + task: Task, + downloaded_files: list[FileInfo] | None = None, + task_screenshots: list[str] | None = None, + workflow_screenshots: list[str] | None = None, + ) -> TaskOutput: # For backward compatibility, extract just the URLs from FileInfo objects downloaded_file_urls = [file_info.url for file_info in downloaded_files] if downloaded_files else None @@ -411,6 +418,8 @@ class TaskOutput(BaseModel): errors=task.errors, downloaded_files=downloaded_files, downloaded_file_urls=downloaded_file_urls, + task_screenshots=task_screenshots, + workflow_screenshots=workflow_screenshots, ) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 2cb05704..da1786f7 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -810,7 +810,21 @@ class BaseTaskBlock(Block): except asyncio.TimeoutError: LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id) - task_output = TaskOutput.from_task(updated_task, downloaded_files) + task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls( + organization_id=workflow_run.organization_id, + task_id=updated_task.task_id, + ) + workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls( + workflow_run_id=workflow_run_id, + organization_id=workflow_run.organization_id, + ) + + task_output = TaskOutput.from_task( + updated_task, + downloaded_files, + task_screenshots=task_screenshots, + workflow_screenshots=workflow_screenshots, + ) output_parameter_value = task_output.model_dump() await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value) return await self.build_block_result( @@ -871,7 +885,21 @@ class BaseTaskBlock(Block): except asyncio.TimeoutError: LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id) - task_output = TaskOutput.from_task(updated_task, downloaded_files) + task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls( + organization_id=workflow_run.organization_id, + task_id=updated_task.task_id, + ) + workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls( + workflow_run_id=workflow_run_id, + organization_id=workflow_run.organization_id, + ) + + task_output = TaskOutput.from_task( + updated_task, + downloaded_files, + task_screenshots=task_screenshots, + workflow_screenshots=workflow_screenshots, + ) LOG.warning( f"Task failed with status {updated_task.status}{retry_message}", task_id=updated_task.task_id, @@ -3668,12 +3696,23 @@ class TaskV2Block(Block): # If continue_on_failure is True, we treat the block as successful even if the task failed # This allows the workflow to continue execution despite this block's failure + task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls( + organization_id=organization_id, + task_v2_id=task_v2.observer_cruise_id, + ) + workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + task_v2_output = { "task_id": task_v2.observer_cruise_id, "status": task_v2.status, "summary": task_v2.summary, "extracted_information": result_dict, "failure_reason": failure_reason, + "task_screenshots": task_screenshots, + "workflow_screenshots": workflow_screenshots, } await self.record_output_parameter_value(workflow_run_context, workflow_run_id, task_v2_output) return await self.build_block_result( diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 7c4d766d..1ce712ef 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -37,7 +37,7 @@ from skyvern.exceptions import ( ) from skyvern.forge import app from skyvern.forge.prompts import prompt_engine -from skyvern.forge.sdk.artifact.models import ArtifactType +from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature from skyvern.forge.sdk.core.skyvern_context import SkyvernContext @@ -2162,6 +2162,111 @@ class WorkflowService: async def get_tasks_by_workflow_run_id(self, workflow_run_id: str) -> list[Task]: return await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id) + async def get_recent_task_screenshot_urls( + self, + *, + organization_id: str | None, + task_id: str | None = None, + task_v2_id: str | None = None, + limit: int = 3, + ) -> list[str]: + """Return the latest action/final screenshot URLs for a task (v1 or v2).""" + + artifact_types = [ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL] + + artifacts: list[Artifact] = [] + if task_id: + artifacts = ( + await app.DATABASE.get_latest_n_artifacts( + task_id=task_id, + artifact_types=artifact_types, + organization_id=organization_id, + n=limit, + ) + or [] + ) + elif task_v2_id: + action_artifacts = await app.DATABASE.get_artifacts_by_entity_id( + organization_id=organization_id, + artifact_type=ArtifactType.SCREENSHOT_ACTION, + task_v2_id=task_v2_id, + limit=limit, + ) + final_artifacts = await app.DATABASE.get_artifacts_by_entity_id( + organization_id=organization_id, + artifact_type=ArtifactType.SCREENSHOT_FINAL, + task_v2_id=task_v2_id, + limit=limit, + ) + artifacts = sorted( + (action_artifacts or []) + (final_artifacts or []), + key=lambda artifact: artifact.created_at, + reverse=True, + )[:limit] + + if not artifacts: + return [] + + return await app.ARTIFACT_MANAGER.get_share_links(artifacts) or [] + + async def get_recent_workflow_screenshot_urls( + self, + workflow_run_id: str, + organization_id: str | None = None, + limit: int = 3, + workflow_run_tasks: list[Task] | None = None, + ) -> list[str]: + """Return latest screenshots across recent tasks in a workflow run.""" + + screenshot_artifacts: list[Artifact] = [] + seen_artifact_ids: set[str] = set() + + if workflow_run_tasks is None: + workflow_run_tasks = await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id) + + for task in workflow_run_tasks[::-1]: + artifact = await app.DATABASE.get_latest_artifact( + task_id=task.task_id, + artifact_types=[ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL], + organization_id=organization_id, + ) + if artifact: + screenshot_artifacts.append(artifact) + seen_artifact_ids.add(artifact.artifact_id) + if len(screenshot_artifacts) >= limit: + break + + if len(screenshot_artifacts) < limit: + action_artifacts = await app.DATABASE.get_artifacts_by_entity_id( + organization_id=organization_id, + artifact_type=ArtifactType.SCREENSHOT_ACTION, + workflow_run_id=workflow_run_id, + limit=limit, + ) + final_artifacts = await app.DATABASE.get_artifacts_by_entity_id( + organization_id=organization_id, + artifact_type=ArtifactType.SCREENSHOT_FINAL, + workflow_run_id=workflow_run_id, + limit=limit, + ) + # Support runs that may not have Task rows (e.g., task_v2-only executions) + for artifact in sorted( + (action_artifacts or []) + (final_artifacts or []), + key=lambda artifact: artifact.created_at, + reverse=True, + ): + if artifact.artifact_id in seen_artifact_ids: + continue + screenshot_artifacts.append(artifact) + seen_artifact_ids.add(artifact.artifact_id) + if len(screenshot_artifacts) >= limit: + break + + if not screenshot_artifacts: + return [] + + return await app.ARTIFACT_MANAGER.get_share_links(screenshot_artifacts) or [] + async def build_workflow_run_status_response_by_workflow_id( self, workflow_run_id: str, @@ -2199,24 +2304,12 @@ class WorkflowService: organization_id=organization_id, ) workflow_run_tasks = await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id) - screenshot_artifacts = [] - screenshot_urls: list[str] | None = None - # get the last screenshot for the last 3 tasks of the workflow run - for task in workflow_run_tasks[::-1]: - screenshot_artifact = await app.DATABASE.get_latest_artifact( - task_id=task.task_id, - artifact_types=[ - ArtifactType.SCREENSHOT_ACTION, - ArtifactType.SCREENSHOT_FINAL, - ], - organization_id=organization_id, - ) - if screenshot_artifact: - screenshot_artifacts.append(screenshot_artifact) - if len(screenshot_artifacts) >= 3: - break - if screenshot_artifacts: - screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(screenshot_artifacts) + screenshot_urls: list[str] | None = await self.get_recent_workflow_screenshot_urls( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + workflow_run_tasks=workflow_run_tasks, + ) + screenshot_urls = screenshot_urls or None recording_url = None # Get recording url from browser session first, diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 8669331c..f63c6072 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -612,7 +612,21 @@ async def _update_workflow_block( except asyncio.TimeoutError: LOG.warning("Timeout getting downloaded files", task_id=task_id) - task_output = TaskOutput.from_task(updated_task, downloaded_files) + task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls( + organization_id=context.organization_id, + task_id=task_id, + ) + workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls( + workflow_run_id=context.workflow_run_id, + organization_id=context.organization_id, + ) + + task_output = TaskOutput.from_task( + updated_task, + downloaded_files, + task_screenshots=task_screenshots, + workflow_screenshots=workflow_screenshots, + ) final_output = task_output.model_dump() step_for_billing: Step | None = None if step_id: