use common logic when building task response (#1311)

This commit is contained in:
LawyZheng
2024-12-03 12:31:55 +08:00
committed by GitHub
parent 51acf4204c
commit a206f51cb1
2 changed files with 110 additions and 134 deletions

View File

@@ -51,7 +51,7 @@ from skyvern.forge.sdk.core.security import generate_skyvern_signature
from skyvern.forge.sdk.core.validators import validate_url
from skyvern.forge.sdk.db.enums import TaskType
from skyvern.forge.sdk.models import Organization, Step, StepStatus
from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskStatus
from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus
@@ -1547,81 +1547,7 @@ class ForgeAgent:
)
return
screenshot_url = None
recording_url = None
latest_action_screenshot_urls: list[str] | None = None
downloaded_file_urls: list[str] | None = None
# get the artifact of the screenshot and get the screenshot_url
screenshot_artifact = await app.DATABASE.get_artifact(
task_id=task.task_id,
step_id=last_step.step_id,
artifact_type=ArtifactType.SCREENSHOT_FINAL,
organization_id=task.organization_id,
)
if screenshot_artifact:
screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact)
recording_artifact = await app.DATABASE.get_artifact(
task_id=task.task_id,
step_id=last_step.step_id,
artifact_type=ArtifactType.RECORDING,
organization_id=task.organization_id,
)
if recording_artifact:
recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact)
# get the artifact of the last TASK_RESPONSE_ACTION_SCREENSHOT_COUNT screenshots and get the screenshot_url
latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts(
task_id=task.task_id,
organization_id=task.organization_id,
artifact_types=[ArtifactType.SCREENSHOT_ACTION],
n=settings.TASK_RESPONSE_ACTION_SCREENSHOT_COUNT,
)
if latest_action_screenshot_artifacts:
latest_action_screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(
latest_action_screenshot_artifacts
)
else:
LOG.error("Failed to get latest action screenshots")
if task.organization_id:
try:
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
downloaded_file_urls = await app.STORAGE.get_downloaded_files(
organization_id=task.organization_id, task_id=task.task_id, workflow_run_id=task.workflow_run_id
)
except asyncio.TimeoutError:
LOG.warning(
"Timeout to get downloaded files",
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
)
except Exception:
LOG.warning(
"Failed to get downloaded files",
exc_info=True,
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
)
# get the latest task from the db to get the latest status, extracted_information, and failure_reason
task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id)
if not task_from_db:
LOG.error("Failed to get task from db when sending task response")
raise TaskNotFound(task_id=task.task_id)
task = task_from_db
task_response = task.to_task_response(
action_screenshot_urls=latest_action_screenshot_urls,
screenshot_url=screenshot_url,
recording_url=recording_url,
downloaded_file_urls=downloaded_file_urls,
)
if not task.webhook_callback_url:
LOG.info("Task has no webhook callback url. Not sending task response")
return
task_response = await self.build_task_response(task=task, last_step=last_step)
# send task_response to the webhook callback url
timestamp = str(int(datetime.utcnow().timestamp()))
@@ -1664,6 +1590,107 @@ class ForgeAgent:
except Exception as e:
raise FailedToSendWebhook(task_id=task.task_id) from e
async def build_task_response(
self,
task: Task,
last_step: Step | None = None,
failure_reason: str | None = None,
need_browser_log: bool = False,
) -> TaskResponse:
# no last step means the task didn't start, so we don't have any other artifacts
if last_step is None:
return task.to_task_response(
failure_reason=failure_reason,
)
screenshot_url = None
recording_url = None
browser_console_log_url: str | None = None
latest_action_screenshot_urls: list[str] | None = None
downloaded_file_urls: list[str] | None = None
# get the artifact of the screenshot and get the screenshot_url
screenshot_artifact = await app.DATABASE.get_artifact(
task_id=task.task_id,
step_id=last_step.step_id,
artifact_type=ArtifactType.SCREENSHOT_FINAL,
organization_id=task.organization_id,
)
if screenshot_artifact:
screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact)
recording_artifact = await app.DATABASE.get_artifact(
task_id=task.task_id,
step_id=last_step.step_id,
artifact_type=ArtifactType.RECORDING,
organization_id=task.organization_id,
)
if recording_artifact:
recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact)
# get the artifact of the last TASK_RESPONSE_ACTION_SCREENSHOT_COUNT screenshots and get the screenshot_url
latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts(
task_id=task.task_id,
organization_id=task.organization_id,
artifact_types=[ArtifactType.SCREENSHOT_ACTION],
n=settings.TASK_RESPONSE_ACTION_SCREENSHOT_COUNT,
)
if latest_action_screenshot_artifacts:
latest_action_screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(
latest_action_screenshot_artifacts
)
else:
LOG.error(
"Failed to get latest action screenshots",
task_id=task.task_id,
task_status=task.status,
)
if task.organization_id:
try:
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
downloaded_file_urls = await app.STORAGE.get_downloaded_files(
organization_id=task.organization_id, task_id=task.task_id, workflow_run_id=task.workflow_run_id
)
except asyncio.TimeoutError:
LOG.warning(
"Timeout to get downloaded files",
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
)
except Exception:
LOG.warning(
"Failed to get downloaded files",
exc_info=True,
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
)
if need_browser_log:
browser_console_log = await app.DATABASE.get_latest_artifact(
task_id=task.task_id,
artifact_types=[ArtifactType.BROWSER_CONSOLE_LOG],
organization_id=task.organization_id,
)
if browser_console_log:
browser_console_log_url = await app.ARTIFACT_MANAGER.get_share_link(browser_console_log)
# get the latest task from the db to get the latest status, extracted_information, and failure_reason
task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id)
if not task_from_db:
LOG.error("Failed to get task from db when sending task response")
raise TaskNotFound(task_id=task.task_id)
task = task_from_db
return task.to_task_response(
action_screenshot_urls=latest_action_screenshot_urls,
screenshot_url=screenshot_url,
recording_url=recording_url,
downloaded_file_urls=downloaded_file_urls,
browser_console_log_url=browser_console_log_url,
failure_reason=failure_reason,
)
async def cleanup_browser_and_create_artifacts(
self, close_browser_on_completion: bool, last_step: Step, task: Task
) -> None:

View File

@@ -28,7 +28,7 @@ from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.aws import aws_client
from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
from skyvern.forge.sdk.artifact.models import Artifact
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.core.permissions.permission_checker_factory import PermissionCheckerFactory
from skyvern.forge.sdk.core.security import generate_skyvern_signature
@@ -255,53 +255,7 @@ async def get_task(
# get latest step
latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=current_org.organization_id)
if not latest_step:
return task_obj.to_task_response()
screenshot_url = None
# todo (kerem): only access artifacts through the artifact manager instead of db
screenshot_artifact = await app.DATABASE.get_latest_artifact(
task_id=task_obj.task_id,
step_id=latest_step.step_id,
artifact_types=[ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL],
organization_id=current_org.organization_id,
)
if screenshot_artifact:
screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact)
recording_artifact = await app.DATABASE.get_latest_artifact(
task_id=task_obj.task_id,
artifact_types=[ArtifactType.RECORDING],
organization_id=current_org.organization_id,
)
recording_url = None
if recording_artifact:
recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact)
browser_console_log = await app.DATABASE.get_latest_artifact(
task_id=task_obj.task_id,
artifact_types=[ArtifactType.BROWSER_CONSOLE_LOG],
organization_id=current_org.organization_id,
)
browser_console_log_url = None
if browser_console_log:
browser_console_log_url = await app.ARTIFACT_MANAGER.get_share_link(browser_console_log)
# get the artifact of the last screenshot and get the screenshot_url
latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts(
task_id=task_obj.task_id,
organization_id=task_obj.organization_id,
artifact_types=[ArtifactType.SCREENSHOT_ACTION],
n=settings.TASK_RESPONSE_ACTION_SCREENSHOT_COUNT,
)
latest_action_screenshot_urls: list[str] | None = None
if latest_action_screenshot_artifacts:
latest_action_screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(latest_action_screenshot_artifacts)
elif task_obj.status in [TaskStatus.terminated, TaskStatus.completed]:
LOG.error(
"Failed to get latest action screenshots in task response",
task_id=task_id,
task_status=task_obj.status,
)
return await app.agent.build_task_response(task=task_obj)
failure_reason: str | None = None
if task_obj.status == TaskStatus.failed and (latest_step.output or task_obj.failure_reason):
@@ -319,13 +273,8 @@ async def get_task(
if len(action_results_string) > 0:
failure_reason += "(Exceptions: " + str(action_results_string) + ")"
return task_obj.to_task_response(
action_screenshot_urls=latest_action_screenshot_urls,
screenshot_url=screenshot_url,
recording_url=recording_url,
browser_console_log_url=browser_console_log_url,
failure_reason=failure_reason,
return await app.agent.build_task_response(
task=task_obj, last_step=latest_step, failure_reason=failure_reason, need_browser_log=True
)
@@ -381,12 +330,12 @@ async def retry_webhook(
# get latest step
latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=current_org.organization_id)
if not latest_step:
return task_obj.to_task_response()
return await app.agent.build_task_response(task=task_obj)
# retry the webhook
await app.agent.execute_task_webhook(task=task_obj, last_step=latest_step, api_key=x_api_key)
return task_obj.to_task_response()
return await app.agent.build_task_response(task=task_obj, last_step=latest_step)
@base_router.get("/internal/tasks/{task_id}", response_model=list[Task])
@@ -454,7 +403,7 @@ async def get_agent_tasks(
order_by_column=sort,
application=application,
)
return ORJSONResponse([task.to_task_response().model_dump() for task in tasks])
return ORJSONResponse([(await app.agent.build_task_response(task=task)).model_dump() for task in tasks])
@base_router.get("/internal/tasks", tags=["agent"], response_model=list[Task])