diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 4e4fd684..02796482 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -1325,6 +1325,30 @@ class AgentDB(BaseAlchemyDB): LOG.exception("UnexpectedError") raise + async def get_artifacts_by_ids( + self, + artifact_ids: list[str], + organization_id: str, + ) -> list[Artifact]: + if not artifact_ids: + return [] + try: + async with self.Session() as session: + artifacts = ( + await session.scalars( + select(ArtifactModel) + .filter(ArtifactModel.artifact_id.in_(artifact_ids)) + .filter_by(organization_id=organization_id) + ) + ).all() + return [convert_to_artifact(artifact, self.debug_enabled) for artifact in artifacts] + except SQLAlchemyError: + LOG.exception("SQLAlchemyError") + raise + except Exception: + LOG.exception("UnexpectedError") + raise + async def get_artifacts_by_entity_id( self, *, diff --git a/skyvern/forge/sdk/schemas/tasks.py b/skyvern/forge/sdk/schemas/tasks.py index b7b98895..0f60a7fa 100644 --- a/skyvern/forge/sdk/schemas/tasks.py +++ b/skyvern/forge/sdk/schemas/tasks.py @@ -404,13 +404,15 @@ class TaskOutput(BaseModel): downloaded_file_urls: list[str] | None = None # For backward compatibility task_screenshots: list[str] | None = None workflow_screenshots: list[str] | None = None + task_screenshot_artifact_ids: list[str] | None = None + workflow_screenshot_artifact_ids: list[str] | None = None @staticmethod def from_task( task: Task, downloaded_files: list[FileInfo] | None = None, - task_screenshots: list[str] | None = None, - workflow_screenshots: list[str] | None = None, + task_screenshot_artifact_ids: list[str] | None = None, + workflow_screenshot_artifact_ids: list[str] | None = None, ) -> TaskOutput: # For backward compatibility, extract just the URLs from FileInfo objects downloaded_file_urls = [file_info.url for file_info in downloaded_files] if downloaded_files else None @@ -423,8 +425,8 @@ class TaskOutput(BaseModel): errors=task.errors, downloaded_files=downloaded_files, downloaded_file_urls=downloaded_file_urls, - task_screenshots=task_screenshots, - workflow_screenshots=workflow_screenshots, + task_screenshot_artifact_ids=task_screenshot_artifact_ids, + workflow_screenshot_artifact_ids=workflow_screenshot_artifact_ids, ) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 7a54bd51..db619c8c 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -944,11 +944,11 @@ class BaseTaskBlock(Block): except asyncio.TimeoutError: LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id) - task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls( + task_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_artifacts( organization_id=workflow_run.organization_id, task_id=updated_task.task_id, ) - workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls( + workflow_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_artifacts( workflow_run_id=workflow_run_id, organization_id=workflow_run.organization_id, ) @@ -956,8 +956,8 @@ class BaseTaskBlock(Block): task_output = TaskOutput.from_task( updated_task, downloaded_files, - task_screenshots=task_screenshots, - workflow_screenshots=workflow_screenshots, + task_screenshot_artifact_ids=[a.artifact_id for a in task_screenshot_artifacts], + workflow_screenshot_artifact_ids=[a.artifact_id for a in workflow_screenshot_artifacts], ) output_parameter_value = task_output.model_dump() await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value) @@ -1020,11 +1020,11 @@ class BaseTaskBlock(Block): except asyncio.TimeoutError: LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id) - task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls( + task_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_artifacts( organization_id=workflow_run.organization_id, task_id=updated_task.task_id, ) - workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls( + workflow_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_artifacts( workflow_run_id=workflow_run_id, organization_id=workflow_run.organization_id, ) @@ -1032,8 +1032,8 @@ class BaseTaskBlock(Block): task_output = TaskOutput.from_task( updated_task, downloaded_files, - task_screenshots=task_screenshots, - workflow_screenshots=workflow_screenshots, + task_screenshot_artifact_ids=[a.artifact_id for a in task_screenshot_artifacts], + workflow_screenshot_artifact_ids=[a.artifact_id for a in workflow_screenshot_artifacts], ) LOG.warning( f"Task failed with status {updated_task.status}{retry_message}", @@ -3965,11 +3965,11 @@ class TaskV2Block(Block): # If continue_on_failure is True, we treat the block as successful even if the task failed # This allows the workflow to continue execution despite this block's failure - task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls( + task_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_artifacts( organization_id=organization_id, task_v2_id=task_v2.observer_cruise_id, ) - workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls( + workflow_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_artifacts( workflow_run_id=workflow_run_id, organization_id=organization_id, ) @@ -3980,8 +3980,8 @@ class TaskV2Block(Block): "summary": task_v2.summary, "extracted_information": result_dict, "failure_reason": failure_reason, - "task_screenshots": task_screenshots, - "workflow_screenshots": workflow_screenshots, + "task_screenshot_artifact_ids": [a.artifact_id for a in task_screenshot_artifacts], + "workflow_screenshot_artifact_ids": [a.artifact_id for a in workflow_screenshot_artifacts], } await self.record_output_parameter_value(workflow_run_context, workflow_run_id, task_v2_output) return await self.build_block_result( diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 90abe741..7f315f53 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -354,6 +354,78 @@ class WorkflowService: results.extend(WorkflowService._collect_extracted_information(item)) return results + async def _generate_urls_from_artifact_ids( + self, + artifact_ids: list[str], + organization_id: str | None, + ) -> list[str]: + """Generate presigned URLs from artifact IDs.""" + if not artifact_ids or not organization_id: + return [] + + artifacts = await app.DATABASE.get_artifacts_by_ids(artifact_ids, organization_id) + if not artifacts: + return [] + + return await app.ARTIFACT_MANAGER.get_share_links(artifacts) or [] + + async def _refresh_output_screenshot_urls( + self, + value: Any, + organization_id: str | None, + workflow_run_id: str, + ) -> Any: + """ + Recursively walk through output values and generate presigned URLs for screenshots. + + TaskOutput dicts stored in workflow_run_output_parameters contain artifact IDs. + This method finds any TaskOutput-like dicts and generates fresh presigned URLs + from the stored artifact IDs. + + For backwards compatibility with old data that stored URLs directly (now expired), + we also check for task_id and regenerate URLs using the task_id lookup. + """ + if isinstance(value, dict): + # Check if this looks like a TaskOutput with screenshot artifact IDs (new format) + has_artifact_ids = "task_screenshot_artifact_ids" in value or "workflow_screenshot_artifact_ids" in value + # Also check for old format (URLs stored directly) for backwards compat + has_old_format = "task_id" in value and ("task_screenshots" in value or "workflow_screenshots" in value) + + if has_artifact_ids: + # New format: generate URLs from artifact IDs + if value.get("task_screenshot_artifact_ids"): + value["task_screenshots"] = await self._generate_urls_from_artifact_ids( + value["task_screenshot_artifact_ids"], + organization_id, + ) + if value.get("workflow_screenshot_artifact_ids"): + value["workflow_screenshots"] = await self._generate_urls_from_artifact_ids( + value["workflow_screenshot_artifact_ids"], + organization_id, + ) + elif has_old_format: + # Old format (backwards compat): regenerate URLs using task_id lookup + task_id = value.get("task_id") + if value.get("task_screenshots"): + value["task_screenshots"] = await self.get_recent_task_screenshot_urls( + organization_id=organization_id, + task_id=task_id, + ) + if value.get("workflow_screenshots"): + value["workflow_screenshots"] = await self.get_recent_workflow_screenshot_urls( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + else: + # Recurse into nested dicts + for k, v in value.items(): + value[k] = await self._refresh_output_screenshot_urls(v, organization_id, workflow_run_id) + elif isinstance(value, list): + # Recurse into list items + for i, item in enumerate(value): + value[i] = await self._refresh_output_screenshot_urls(item, organization_id, workflow_run_id) + return value + async def _validate_credential_id(self, credential_id: str, organization: Organization) -> None: credential = await app.DATABASE.get_credential(credential_id, organization_id=organization.organization_id) if credential is None: @@ -2445,15 +2517,15 @@ class WorkflowService: async def get_tasks_by_workflow_run_id(self, workflow_run_id: str) -> list[Task]: return await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id) - async def get_recent_task_screenshot_urls( + async def get_recent_task_screenshot_artifacts( self, *, organization_id: str | None, task_id: str | None = None, task_v2_id: str | None = None, limit: int = 3, - ) -> list[str]: - """Return the latest action/final screenshot URLs for a task (v1 or v2).""" + ) -> list[Artifact]: + """Return the latest action/final screenshot artifacts for a task (v1 or v2).""" artifact_types = [ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL] @@ -2487,19 +2559,35 @@ class WorkflowService: reverse=True, )[:limit] + return artifacts + + async def get_recent_task_screenshot_urls( + self, + *, + organization_id: str | None, + task_id: str | None = None, + task_v2_id: str | None = None, + limit: int = 3, + ) -> list[str]: + """Return the latest action/final screenshot URLs for a task (v1 or v2).""" + artifacts = await self.get_recent_task_screenshot_artifacts( + organization_id=organization_id, + task_id=task_id, + task_v2_id=task_v2_id, + limit=limit, + ) if not artifacts: return [] - return await app.ARTIFACT_MANAGER.get_share_links(artifacts) or [] - async def get_recent_workflow_screenshot_urls( + async def get_recent_workflow_screenshot_artifacts( self, workflow_run_id: str, organization_id: str | None = None, limit: int = 3, workflow_run_tasks: list[Task] | None = None, - ) -> list[str]: - """Return latest screenshots across recent tasks in a workflow run.""" + ) -> list[Artifact]: + """Return latest screenshot artifacts across recent tasks in a workflow run.""" screenshot_artifacts: list[Artifact] = [] seen_artifact_ids: set[str] = set() @@ -2545,10 +2633,25 @@ class WorkflowService: if len(screenshot_artifacts) >= limit: break - if not screenshot_artifacts: - return [] + return screenshot_artifacts - return await app.ARTIFACT_MANAGER.get_share_links(screenshot_artifacts) or [] + async def get_recent_workflow_screenshot_urls( + self, + workflow_run_id: str, + organization_id: str | None = None, + limit: int = 3, + workflow_run_tasks: list[Task] | None = None, + ) -> list[str]: + """Return latest screenshot URLs across recent tasks in a workflow run.""" + artifacts = await self.get_recent_workflow_screenshot_artifacts( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + limit=limit, + workflow_run_tasks=workflow_run_tasks, + ) + if not artifacts: + return [] + return await app.ARTIFACT_MANAGER.get_share_links(artifacts) or [] async def build_workflow_run_status_response_by_workflow_id( self, @@ -2668,6 +2771,10 @@ class WorkflowService: if output.value is not None: extracted_information.extend(WorkflowService._collect_extracted_information(output.value)) outputs[EXTRACTED_INFORMATION_KEY] = extracted_information + # Refresh any expired presigned screenshot URLs in the outputs + outputs = await self._refresh_output_screenshot_urls( + outputs, organization_id=organization_id, workflow_run_id=workflow_run_id + ) errors: list[dict[str, Any]] = [] for task in workflow_run_tasks: diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 0ce4cd4d..98e3019d 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -638,11 +638,11 @@ async def _update_workflow_block( except asyncio.TimeoutError: LOG.warning("Timeout getting downloaded files", task_id=task_id) - task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls( + task_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_artifacts( organization_id=context.organization_id, task_id=task_id, ) - workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls( + workflow_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_artifacts( workflow_run_id=context.workflow_run_id, organization_id=context.organization_id, ) @@ -650,8 +650,8 @@ async def _update_workflow_block( task_output = TaskOutput.from_task( updated_task, downloaded_files, - task_screenshots=task_screenshots, - workflow_screenshots=workflow_screenshots, + task_screenshot_artifact_ids=[a.artifact_id for a in task_screenshot_artifacts], + workflow_screenshot_artifact_ids=[a.artifact_id for a in workflow_screenshot_artifacts], ) final_output = task_output.model_dump() step_for_billing: Step | None = None