Store screenshot artifacts, generate URLs when needed (#4506)

This commit is contained in:
Marc Kelechava
2026-01-20 22:49:33 -08:00
committed by GitHub
parent 5d7814a925
commit d5e3894198
5 changed files with 163 additions and 30 deletions

View File

@@ -944,11 +944,11 @@ class BaseTaskBlock(Block):
except asyncio.TimeoutError:
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls(
task_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_artifacts(
organization_id=workflow_run.organization_id,
task_id=updated_task.task_id,
)
workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls(
workflow_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_artifacts(
workflow_run_id=workflow_run_id,
organization_id=workflow_run.organization_id,
)
@@ -956,8 +956,8 @@ class BaseTaskBlock(Block):
task_output = TaskOutput.from_task(
updated_task,
downloaded_files,
task_screenshots=task_screenshots,
workflow_screenshots=workflow_screenshots,
task_screenshot_artifact_ids=[a.artifact_id for a in task_screenshot_artifacts],
workflow_screenshot_artifact_ids=[a.artifact_id for a in workflow_screenshot_artifacts],
)
output_parameter_value = task_output.model_dump()
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value)
@@ -1020,11 +1020,11 @@ class BaseTaskBlock(Block):
except asyncio.TimeoutError:
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls(
task_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_artifacts(
organization_id=workflow_run.organization_id,
task_id=updated_task.task_id,
)
workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls(
workflow_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_artifacts(
workflow_run_id=workflow_run_id,
organization_id=workflow_run.organization_id,
)
@@ -1032,8 +1032,8 @@ class BaseTaskBlock(Block):
task_output = TaskOutput.from_task(
updated_task,
downloaded_files,
task_screenshots=task_screenshots,
workflow_screenshots=workflow_screenshots,
task_screenshot_artifact_ids=[a.artifact_id for a in task_screenshot_artifacts],
workflow_screenshot_artifact_ids=[a.artifact_id for a in workflow_screenshot_artifacts],
)
LOG.warning(
f"Task failed with status {updated_task.status}{retry_message}",
@@ -3965,11 +3965,11 @@ class TaskV2Block(Block):
# If continue_on_failure is True, we treat the block as successful even if the task failed
# This allows the workflow to continue execution despite this block's failure
task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls(
task_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_artifacts(
organization_id=organization_id,
task_v2_id=task_v2.observer_cruise_id,
)
workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls(
workflow_screenshot_artifacts = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_artifacts(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
@@ -3980,8 +3980,8 @@ class TaskV2Block(Block):
"summary": task_v2.summary,
"extracted_information": result_dict,
"failure_reason": failure_reason,
"task_screenshots": task_screenshots,
"workflow_screenshots": workflow_screenshots,
"task_screenshot_artifact_ids": [a.artifact_id for a in task_screenshot_artifacts],
"workflow_screenshot_artifact_ids": [a.artifact_id for a in workflow_screenshot_artifacts],
}
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, task_v2_output)
return await self.build_block_result(

View File

@@ -354,6 +354,78 @@ class WorkflowService:
results.extend(WorkflowService._collect_extracted_information(item))
return results
async def _generate_urls_from_artifact_ids(
self,
artifact_ids: list[str],
organization_id: str | None,
) -> list[str]:
"""Generate presigned URLs from artifact IDs."""
if not artifact_ids or not organization_id:
return []
artifacts = await app.DATABASE.get_artifacts_by_ids(artifact_ids, organization_id)
if not artifacts:
return []
return await app.ARTIFACT_MANAGER.get_share_links(artifacts) or []
async def _refresh_output_screenshot_urls(
self,
value: Any,
organization_id: str | None,
workflow_run_id: str,
) -> Any:
"""
Recursively walk through output values and generate presigned URLs for screenshots.
TaskOutput dicts stored in workflow_run_output_parameters contain artifact IDs.
This method finds any TaskOutput-like dicts and generates fresh presigned URLs
from the stored artifact IDs.
For backwards compatibility with old data that stored URLs directly (now expired),
we also check for task_id and regenerate URLs using the task_id lookup.
"""
if isinstance(value, dict):
# Check if this looks like a TaskOutput with screenshot artifact IDs (new format)
has_artifact_ids = "task_screenshot_artifact_ids" in value or "workflow_screenshot_artifact_ids" in value
# Also check for old format (URLs stored directly) for backwards compat
has_old_format = "task_id" in value and ("task_screenshots" in value or "workflow_screenshots" in value)
if has_artifact_ids:
# New format: generate URLs from artifact IDs
if value.get("task_screenshot_artifact_ids"):
value["task_screenshots"] = await self._generate_urls_from_artifact_ids(
value["task_screenshot_artifact_ids"],
organization_id,
)
if value.get("workflow_screenshot_artifact_ids"):
value["workflow_screenshots"] = await self._generate_urls_from_artifact_ids(
value["workflow_screenshot_artifact_ids"],
organization_id,
)
elif has_old_format:
# Old format (backwards compat): regenerate URLs using task_id lookup
task_id = value.get("task_id")
if value.get("task_screenshots"):
value["task_screenshots"] = await self.get_recent_task_screenshot_urls(
organization_id=organization_id,
task_id=task_id,
)
if value.get("workflow_screenshots"):
value["workflow_screenshots"] = await self.get_recent_workflow_screenshot_urls(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
else:
# Recurse into nested dicts
for k, v in value.items():
value[k] = await self._refresh_output_screenshot_urls(v, organization_id, workflow_run_id)
elif isinstance(value, list):
# Recurse into list items
for i, item in enumerate(value):
value[i] = await self._refresh_output_screenshot_urls(item, organization_id, workflow_run_id)
return value
async def _validate_credential_id(self, credential_id: str, organization: Organization) -> None:
credential = await app.DATABASE.get_credential(credential_id, organization_id=organization.organization_id)
if credential is None:
@@ -2445,15 +2517,15 @@ class WorkflowService:
async def get_tasks_by_workflow_run_id(self, workflow_run_id: str) -> list[Task]:
return await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id)
async def get_recent_task_screenshot_urls(
async def get_recent_task_screenshot_artifacts(
self,
*,
organization_id: str | None,
task_id: str | None = None,
task_v2_id: str | None = None,
limit: int = 3,
) -> list[str]:
"""Return the latest action/final screenshot URLs for a task (v1 or v2)."""
) -> list[Artifact]:
"""Return the latest action/final screenshot artifacts for a task (v1 or v2)."""
artifact_types = [ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL]
@@ -2487,19 +2559,35 @@ class WorkflowService:
reverse=True,
)[:limit]
return artifacts
async def get_recent_task_screenshot_urls(
self,
*,
organization_id: str | None,
task_id: str | None = None,
task_v2_id: str | None = None,
limit: int = 3,
) -> list[str]:
"""Return the latest action/final screenshot URLs for a task (v1 or v2)."""
artifacts = await self.get_recent_task_screenshot_artifacts(
organization_id=organization_id,
task_id=task_id,
task_v2_id=task_v2_id,
limit=limit,
)
if not artifacts:
return []
return await app.ARTIFACT_MANAGER.get_share_links(artifacts) or []
async def get_recent_workflow_screenshot_urls(
async def get_recent_workflow_screenshot_artifacts(
self,
workflow_run_id: str,
organization_id: str | None = None,
limit: int = 3,
workflow_run_tasks: list[Task] | None = None,
) -> list[str]:
"""Return latest screenshots across recent tasks in a workflow run."""
) -> list[Artifact]:
"""Return latest screenshot artifacts across recent tasks in a workflow run."""
screenshot_artifacts: list[Artifact] = []
seen_artifact_ids: set[str] = set()
@@ -2545,10 +2633,25 @@ class WorkflowService:
if len(screenshot_artifacts) >= limit:
break
if not screenshot_artifacts:
return []
return screenshot_artifacts
return await app.ARTIFACT_MANAGER.get_share_links(screenshot_artifacts) or []
async def get_recent_workflow_screenshot_urls(
self,
workflow_run_id: str,
organization_id: str | None = None,
limit: int = 3,
workflow_run_tasks: list[Task] | None = None,
) -> list[str]:
"""Return latest screenshot URLs across recent tasks in a workflow run."""
artifacts = await self.get_recent_workflow_screenshot_artifacts(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
limit=limit,
workflow_run_tasks=workflow_run_tasks,
)
if not artifacts:
return []
return await app.ARTIFACT_MANAGER.get_share_links(artifacts) or []
async def build_workflow_run_status_response_by_workflow_id(
self,
@@ -2668,6 +2771,10 @@ class WorkflowService:
if output.value is not None:
extracted_information.extend(WorkflowService._collect_extracted_information(output.value))
outputs[EXTRACTED_INFORMATION_KEY] = extracted_information
# Refresh any expired presigned screenshot URLs in the outputs
outputs = await self._refresh_output_screenshot_urls(
outputs, organization_id=organization_id, workflow_run_id=workflow_run_id
)
errors: list[dict[str, Any]] = []
for task in workflow_run_tasks: