Pass screenshots from one block to another block (#4212)
This commit is contained in:
@@ -1227,6 +1227,7 @@ class AgentDB:
|
|||||||
workflow_run_block_id: str | None = None,
|
workflow_run_block_id: str | None = None,
|
||||||
thought_id: str | None = None,
|
thought_id: str | None = None,
|
||||||
task_v2_id: str | None = None,
|
task_v2_id: str | None = None,
|
||||||
|
limit: int | None = None,
|
||||||
) -> list[Artifact]:
|
) -> list[Artifact]:
|
||||||
try:
|
try:
|
||||||
async with self.Session() as session:
|
async with self.Session() as session:
|
||||||
@@ -1255,6 +1256,9 @@ class AgentDB:
|
|||||||
|
|
||||||
query = query.order_by(ArtifactModel.created_at.desc())
|
query = query.order_by(ArtifactModel.created_at.desc())
|
||||||
|
|
||||||
|
if limit is not None:
|
||||||
|
query = query.limit(limit)
|
||||||
|
|
||||||
artifacts = (await session.scalars(query)).all()
|
artifacts = (await session.scalars(query)).all()
|
||||||
LOG.debug("Artifacts fetched", count=len(artifacts))
|
LOG.debug("Artifacts fetched", count=len(artifacts))
|
||||||
return [convert_to_artifact(a, self.debug_enabled) for a in artifacts]
|
return [convert_to_artifact(a, self.debug_enabled) for a in artifacts]
|
||||||
@@ -1286,6 +1290,7 @@ class AgentDB:
|
|||||||
workflow_run_block_id=workflow_run_block_id,
|
workflow_run_block_id=workflow_run_block_id,
|
||||||
thought_id=thought_id,
|
thought_id=thought_id,
|
||||||
task_v2_id=task_v2_id,
|
task_v2_id=task_v2_id,
|
||||||
|
limit=1,
|
||||||
)
|
)
|
||||||
return artifacts[0] if artifacts else None
|
return artifacts[0] if artifacts else None
|
||||||
|
|
||||||
|
|||||||
@@ -397,9 +397,16 @@ class TaskOutput(BaseModel):
|
|||||||
errors: list[dict[str, Any]] = []
|
errors: list[dict[str, Any]] = []
|
||||||
downloaded_files: list[FileInfo] | None = None
|
downloaded_files: list[FileInfo] | None = None
|
||||||
downloaded_file_urls: list[str] | None = None # For backward compatibility
|
downloaded_file_urls: list[str] | None = None # For backward compatibility
|
||||||
|
task_screenshots: list[str] | None = None
|
||||||
|
workflow_screenshots: list[str] | None = None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_task(task: Task, downloaded_files: list[FileInfo] | None = None) -> TaskOutput:
|
def from_task(
|
||||||
|
task: Task,
|
||||||
|
downloaded_files: list[FileInfo] | None = None,
|
||||||
|
task_screenshots: list[str] | None = None,
|
||||||
|
workflow_screenshots: list[str] | None = None,
|
||||||
|
) -> TaskOutput:
|
||||||
# For backward compatibility, extract just the URLs from FileInfo objects
|
# For backward compatibility, extract just the URLs from FileInfo objects
|
||||||
downloaded_file_urls = [file_info.url for file_info in downloaded_files] if downloaded_files else None
|
downloaded_file_urls = [file_info.url for file_info in downloaded_files] if downloaded_files else None
|
||||||
|
|
||||||
@@ -411,6 +418,8 @@ class TaskOutput(BaseModel):
|
|||||||
errors=task.errors,
|
errors=task.errors,
|
||||||
downloaded_files=downloaded_files,
|
downloaded_files=downloaded_files,
|
||||||
downloaded_file_urls=downloaded_file_urls,
|
downloaded_file_urls=downloaded_file_urls,
|
||||||
|
task_screenshots=task_screenshots,
|
||||||
|
workflow_screenshots=workflow_screenshots,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -810,7 +810,21 @@ class BaseTaskBlock(Block):
|
|||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
||||||
|
|
||||||
task_output = TaskOutput.from_task(updated_task, downloaded_files)
|
task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls(
|
||||||
|
organization_id=workflow_run.organization_id,
|
||||||
|
task_id=updated_task.task_id,
|
||||||
|
)
|
||||||
|
workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls(
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
organization_id=workflow_run.organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
task_output = TaskOutput.from_task(
|
||||||
|
updated_task,
|
||||||
|
downloaded_files,
|
||||||
|
task_screenshots=task_screenshots,
|
||||||
|
workflow_screenshots=workflow_screenshots,
|
||||||
|
)
|
||||||
output_parameter_value = task_output.model_dump()
|
output_parameter_value = task_output.model_dump()
|
||||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value)
|
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value)
|
||||||
return await self.build_block_result(
|
return await self.build_block_result(
|
||||||
@@ -871,7 +885,21 @@ class BaseTaskBlock(Block):
|
|||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
||||||
|
|
||||||
task_output = TaskOutput.from_task(updated_task, downloaded_files)
|
task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls(
|
||||||
|
organization_id=workflow_run.organization_id,
|
||||||
|
task_id=updated_task.task_id,
|
||||||
|
)
|
||||||
|
workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls(
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
organization_id=workflow_run.organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
task_output = TaskOutput.from_task(
|
||||||
|
updated_task,
|
||||||
|
downloaded_files,
|
||||||
|
task_screenshots=task_screenshots,
|
||||||
|
workflow_screenshots=workflow_screenshots,
|
||||||
|
)
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
f"Task failed with status {updated_task.status}{retry_message}",
|
f"Task failed with status {updated_task.status}{retry_message}",
|
||||||
task_id=updated_task.task_id,
|
task_id=updated_task.task_id,
|
||||||
@@ -3668,12 +3696,23 @@ class TaskV2Block(Block):
|
|||||||
|
|
||||||
# If continue_on_failure is True, we treat the block as successful even if the task failed
|
# If continue_on_failure is True, we treat the block as successful even if the task failed
|
||||||
# This allows the workflow to continue execution despite this block's failure
|
# This allows the workflow to continue execution despite this block's failure
|
||||||
|
task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls(
|
||||||
|
organization_id=organization_id,
|
||||||
|
task_v2_id=task_v2.observer_cruise_id,
|
||||||
|
)
|
||||||
|
workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls(
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
organization_id=organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
task_v2_output = {
|
task_v2_output = {
|
||||||
"task_id": task_v2.observer_cruise_id,
|
"task_id": task_v2.observer_cruise_id,
|
||||||
"status": task_v2.status,
|
"status": task_v2.status,
|
||||||
"summary": task_v2.summary,
|
"summary": task_v2.summary,
|
||||||
"extracted_information": result_dict,
|
"extracted_information": result_dict,
|
||||||
"failure_reason": failure_reason,
|
"failure_reason": failure_reason,
|
||||||
|
"task_screenshots": task_screenshots,
|
||||||
|
"workflow_screenshots": workflow_screenshots,
|
||||||
}
|
}
|
||||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, task_v2_output)
|
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, task_v2_output)
|
||||||
return await self.build_block_result(
|
return await self.build_block_result(
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ from skyvern.exceptions import (
|
|||||||
)
|
)
|
||||||
from skyvern.forge import app
|
from skyvern.forge import app
|
||||||
from skyvern.forge.prompts import prompt_engine
|
from skyvern.forge.prompts import prompt_engine
|
||||||
from skyvern.forge.sdk.artifact.models import ArtifactType
|
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
|
||||||
from skyvern.forge.sdk.core import skyvern_context
|
from skyvern.forge.sdk.core import skyvern_context
|
||||||
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature
|
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature
|
||||||
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
|
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
|
||||||
@@ -2162,6 +2162,111 @@ class WorkflowService:
|
|||||||
async def get_tasks_by_workflow_run_id(self, workflow_run_id: str) -> list[Task]:
|
async def get_tasks_by_workflow_run_id(self, workflow_run_id: str) -> list[Task]:
|
||||||
return await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id)
|
return await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id)
|
||||||
|
|
||||||
|
async def get_recent_task_screenshot_urls(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
organization_id: str | None,
|
||||||
|
task_id: str | None = None,
|
||||||
|
task_v2_id: str | None = None,
|
||||||
|
limit: int = 3,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Return the latest action/final screenshot URLs for a task (v1 or v2)."""
|
||||||
|
|
||||||
|
artifact_types = [ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL]
|
||||||
|
|
||||||
|
artifacts: list[Artifact] = []
|
||||||
|
if task_id:
|
||||||
|
artifacts = (
|
||||||
|
await app.DATABASE.get_latest_n_artifacts(
|
||||||
|
task_id=task_id,
|
||||||
|
artifact_types=artifact_types,
|
||||||
|
organization_id=organization_id,
|
||||||
|
n=limit,
|
||||||
|
)
|
||||||
|
or []
|
||||||
|
)
|
||||||
|
elif task_v2_id:
|
||||||
|
action_artifacts = await app.DATABASE.get_artifacts_by_entity_id(
|
||||||
|
organization_id=organization_id,
|
||||||
|
artifact_type=ArtifactType.SCREENSHOT_ACTION,
|
||||||
|
task_v2_id=task_v2_id,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
final_artifacts = await app.DATABASE.get_artifacts_by_entity_id(
|
||||||
|
organization_id=organization_id,
|
||||||
|
artifact_type=ArtifactType.SCREENSHOT_FINAL,
|
||||||
|
task_v2_id=task_v2_id,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
artifacts = sorted(
|
||||||
|
(action_artifacts or []) + (final_artifacts or []),
|
||||||
|
key=lambda artifact: artifact.created_at,
|
||||||
|
reverse=True,
|
||||||
|
)[:limit]
|
||||||
|
|
||||||
|
if not artifacts:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return await app.ARTIFACT_MANAGER.get_share_links(artifacts) or []
|
||||||
|
|
||||||
|
async def get_recent_workflow_screenshot_urls(
|
||||||
|
self,
|
||||||
|
workflow_run_id: str,
|
||||||
|
organization_id: str | None = None,
|
||||||
|
limit: int = 3,
|
||||||
|
workflow_run_tasks: list[Task] | None = None,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Return latest screenshots across recent tasks in a workflow run."""
|
||||||
|
|
||||||
|
screenshot_artifacts: list[Artifact] = []
|
||||||
|
seen_artifact_ids: set[str] = set()
|
||||||
|
|
||||||
|
if workflow_run_tasks is None:
|
||||||
|
workflow_run_tasks = await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id)
|
||||||
|
|
||||||
|
for task in workflow_run_tasks[::-1]:
|
||||||
|
artifact = await app.DATABASE.get_latest_artifact(
|
||||||
|
task_id=task.task_id,
|
||||||
|
artifact_types=[ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL],
|
||||||
|
organization_id=organization_id,
|
||||||
|
)
|
||||||
|
if artifact:
|
||||||
|
screenshot_artifacts.append(artifact)
|
||||||
|
seen_artifact_ids.add(artifact.artifact_id)
|
||||||
|
if len(screenshot_artifacts) >= limit:
|
||||||
|
break
|
||||||
|
|
||||||
|
if len(screenshot_artifacts) < limit:
|
||||||
|
action_artifacts = await app.DATABASE.get_artifacts_by_entity_id(
|
||||||
|
organization_id=organization_id,
|
||||||
|
artifact_type=ArtifactType.SCREENSHOT_ACTION,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
final_artifacts = await app.DATABASE.get_artifacts_by_entity_id(
|
||||||
|
organization_id=organization_id,
|
||||||
|
artifact_type=ArtifactType.SCREENSHOT_FINAL,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
# Support runs that may not have Task rows (e.g., task_v2-only executions)
|
||||||
|
for artifact in sorted(
|
||||||
|
(action_artifacts or []) + (final_artifacts or []),
|
||||||
|
key=lambda artifact: artifact.created_at,
|
||||||
|
reverse=True,
|
||||||
|
):
|
||||||
|
if artifact.artifact_id in seen_artifact_ids:
|
||||||
|
continue
|
||||||
|
screenshot_artifacts.append(artifact)
|
||||||
|
seen_artifact_ids.add(artifact.artifact_id)
|
||||||
|
if len(screenshot_artifacts) >= limit:
|
||||||
|
break
|
||||||
|
|
||||||
|
if not screenshot_artifacts:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return await app.ARTIFACT_MANAGER.get_share_links(screenshot_artifacts) or []
|
||||||
|
|
||||||
async def build_workflow_run_status_response_by_workflow_id(
|
async def build_workflow_run_status_response_by_workflow_id(
|
||||||
self,
|
self,
|
||||||
workflow_run_id: str,
|
workflow_run_id: str,
|
||||||
@@ -2199,24 +2304,12 @@ class WorkflowService:
|
|||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
)
|
)
|
||||||
workflow_run_tasks = await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id)
|
workflow_run_tasks = await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id)
|
||||||
screenshot_artifacts = []
|
screenshot_urls: list[str] | None = await self.get_recent_workflow_screenshot_urls(
|
||||||
screenshot_urls: list[str] | None = None
|
workflow_run_id=workflow_run_id,
|
||||||
# get the last screenshot for the last 3 tasks of the workflow run
|
organization_id=organization_id,
|
||||||
for task in workflow_run_tasks[::-1]:
|
workflow_run_tasks=workflow_run_tasks,
|
||||||
screenshot_artifact = await app.DATABASE.get_latest_artifact(
|
)
|
||||||
task_id=task.task_id,
|
screenshot_urls = screenshot_urls or None
|
||||||
artifact_types=[
|
|
||||||
ArtifactType.SCREENSHOT_ACTION,
|
|
||||||
ArtifactType.SCREENSHOT_FINAL,
|
|
||||||
],
|
|
||||||
organization_id=organization_id,
|
|
||||||
)
|
|
||||||
if screenshot_artifact:
|
|
||||||
screenshot_artifacts.append(screenshot_artifact)
|
|
||||||
if len(screenshot_artifacts) >= 3:
|
|
||||||
break
|
|
||||||
if screenshot_artifacts:
|
|
||||||
screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(screenshot_artifacts)
|
|
||||||
|
|
||||||
recording_url = None
|
recording_url = None
|
||||||
# Get recording url from browser session first,
|
# Get recording url from browser session first,
|
||||||
|
|||||||
@@ -612,7 +612,21 @@ async def _update_workflow_block(
|
|||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning("Timeout getting downloaded files", task_id=task_id)
|
LOG.warning("Timeout getting downloaded files", task_id=task_id)
|
||||||
|
|
||||||
task_output = TaskOutput.from_task(updated_task, downloaded_files)
|
task_screenshots = await app.WORKFLOW_SERVICE.get_recent_task_screenshot_urls(
|
||||||
|
organization_id=context.organization_id,
|
||||||
|
task_id=task_id,
|
||||||
|
)
|
||||||
|
workflow_screenshots = await app.WORKFLOW_SERVICE.get_recent_workflow_screenshot_urls(
|
||||||
|
workflow_run_id=context.workflow_run_id,
|
||||||
|
organization_id=context.organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
task_output = TaskOutput.from_task(
|
||||||
|
updated_task,
|
||||||
|
downloaded_files,
|
||||||
|
task_screenshots=task_screenshots,
|
||||||
|
workflow_screenshots=workflow_screenshots,
|
||||||
|
)
|
||||||
final_output = task_output.model_dump()
|
final_output = task_output.model_dump()
|
||||||
step_for_billing: Step | None = None
|
step_for_billing: Step | None = None
|
||||||
if step_id:
|
if step_id:
|
||||||
|
|||||||
Reference in New Issue
Block a user