From a1ec9cc633b9ea73a5c095e1a1ddb244a254a605 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Tue, 16 Dec 2025 23:00:51 +0800 Subject: [PATCH] Batch artifact insert processing (#4306) --- skyvern/forge/sdk/artifact/manager.py | 411 ++++++++++++++++++++++++-- skyvern/forge/sdk/db/agent_db.py | 33 +++ 2 files changed, 416 insertions(+), 28 deletions(-) diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index acd6f46a..36548ba9 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -1,6 +1,7 @@ import asyncio import time from collections import defaultdict +from dataclasses import dataclass import structlog @@ -8,6 +9,7 @@ from skyvern.forge import app from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.db.id import generate_artifact_id +from skyvern.forge.sdk.db.models import ArtifactModel from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought @@ -16,10 +18,95 @@ from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock LOG = structlog.get_logger(__name__) +@dataclass +class ArtifactBatchData: + """ + Data class for batch artifact creation. + + Attributes: + artifact_model: The ArtifactModel instance to insert + data: Optional bytes data to upload + path: Optional file path to upload from + """ + + artifact_model: ArtifactModel + data: bytes | None = None + path: str | None = None + + def __post_init__(self) -> None: + """Validate that exactly one of data or path is provided.""" + if self.data is not None and self.path is not None: + raise ValueError("Cannot specify both data and path for artifact upload") + + +@dataclass +class BulkArtifactCreationRequest: + """ + Request data for bulk artifact creation. + + Attributes: + artifacts: List of artifact batch data to create + primary_key: Primary key for tracking upload tasks (e.g., task_id, cruise_id) + """ + + artifacts: list[ArtifactBatchData] + primary_key: str + + class ArtifactManager: # task_id -> list of aio_tasks for uploading artifacts upload_aiotasks_map: dict[str, list[asyncio.Task[None]]] = defaultdict(list) + @staticmethod + def _build_artifact_model( + artifact_id: str, + artifact_type: ArtifactType, + uri: str, + organization_id: str, + step_id: str | None = None, + task_id: str | None = None, + workflow_run_id: str | None = None, + workflow_run_block_id: str | None = None, + thought_id: str | None = None, + task_v2_id: str | None = None, + run_id: str | None = None, + ai_suggestion_id: str | None = None, + ) -> ArtifactModel: + """ + Helper function to build an ArtifactModel instance. + + Args: + artifact_id: Unique artifact identifier + artifact_type: Type of the artifact + uri: Storage URI for the artifact + organization_id: Organization ID + step_id: Optional step ID + task_id: Optional task ID + workflow_run_id: Optional workflow run ID + workflow_run_block_id: Optional workflow run block ID + thought_id: Optional thought ID (stored as observer_thought_id) + task_v2_id: Optional task v2 ID (stored as observer_cruise_id) + run_id: Optional run ID + ai_suggestion_id: Optional AI suggestion ID + + Returns: + ArtifactModel instance ready for database insertion + """ + return ArtifactModel( + artifact_id=artifact_id, + artifact_type=artifact_type, + uri=uri, + organization_id=organization_id, + task_id=task_id, + step_id=step_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + observer_cruise_id=task_v2_id, + observer_thought_id=thought_id, + run_id=run_id, + ai_suggestion_id=ai_suggestion_id, + ) + async def _create_artifact( self, aio_task_primary_key: str, @@ -280,6 +367,275 @@ class ArtifactManager: data=data, ) + async def _bulk_create_artifacts( + self, + request: BulkArtifactCreationRequest, + ) -> list[str]: + """ + Bulk create multiple artifacts in a single database transaction. + + Args: + request: BulkArtifactCreationRequest containing artifacts and primary key + + Returns: + List of artifact IDs + """ + if not request.artifacts: + return [] + + # Extract models for bulk insert + artifact_models = [artifact_data.artifact_model for artifact_data in request.artifacts] + + # Bulk insert artifacts + artifacts = await app.DATABASE.bulk_create_artifacts(artifact_models) + + # Fire and forget upload tasks + for artifact, artifact_data in zip(artifacts, request.artifacts): + if artifact_data.data is not None: + aio_task = asyncio.create_task(app.STORAGE.store_artifact(artifact, artifact_data.data)) + self.upload_aiotasks_map[request.primary_key].append(aio_task) + elif artifact_data.path is not None: + aio_task = asyncio.create_task(app.STORAGE.store_artifact_from_path(artifact, artifact_data.path)) + self.upload_aiotasks_map[request.primary_key].append(aio_task) + + return [model.artifact_id for model in artifact_models] + + def _prepare_step_artifacts( + self, + step: Step, + artifact_type: ArtifactType, + data: bytes, + screenshots: list[bytes] | None = None, + ) -> BulkArtifactCreationRequest: + """Helper to prepare artifact batch request for Step-based artifacts.""" + artifacts = [] + + # Main artifact + artifact_id = generate_artifact_id() + uri = app.STORAGE.build_uri( + organization_id=step.organization_id, + artifact_id=artifact_id, + step=step, + artifact_type=artifact_type, + ) + artifacts.append( + ArtifactBatchData( + artifact_model=self._build_artifact_model( + artifact_id=artifact_id, + artifact_type=artifact_type, + uri=uri, + organization_id=step.organization_id, + step_id=step.step_id, + task_id=step.task_id, + ), + data=data, + ) + ) + + # Screenshot artifacts + for screenshot in screenshots or []: + screenshot_id = generate_artifact_id() + screenshot_uri = app.STORAGE.build_uri( + organization_id=step.organization_id, + artifact_id=screenshot_id, + step=step, + artifact_type=ArtifactType.SCREENSHOT_LLM, + ) + artifacts.append( + ArtifactBatchData( + artifact_model=self._build_artifact_model( + artifact_id=screenshot_id, + artifact_type=ArtifactType.SCREENSHOT_LLM, + uri=screenshot_uri, + organization_id=step.organization_id, + step_id=step.step_id, + task_id=step.task_id, + ), + data=screenshot, + ) + ) + + return BulkArtifactCreationRequest(artifacts=artifacts, primary_key=step.task_id) + + def _prepare_task_v2_artifacts( + self, + task_v2: TaskV2, + artifact_type: ArtifactType, + data: bytes, + screenshots: list[bytes] | None = None, + ) -> BulkArtifactCreationRequest: + """Helper to prepare artifact batch request for TaskV2-based artifacts.""" + context = skyvern_context.current() + workflow_run_id = context.workflow_run_id if context else task_v2.workflow_run_id + workflow_run_block_id = context.parent_workflow_run_block_id if context else None + + artifacts = [] + + # Main artifact + artifact_id = generate_artifact_id() + uri = app.STORAGE.build_task_v2_uri( + organization_id=task_v2.organization_id, + artifact_id=artifact_id, + task_v2=task_v2, + artifact_type=artifact_type, + ) + artifacts.append( + ArtifactBatchData( + artifact_model=self._build_artifact_model( + artifact_id=artifact_id, + artifact_type=artifact_type, + uri=uri, + organization_id=task_v2.organization_id, + task_v2_id=task_v2.observer_cruise_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + ), + data=data, + ) + ) + + # Screenshot artifacts + for screenshot in screenshots or []: + screenshot_id = generate_artifact_id() + screenshot_uri = app.STORAGE.build_task_v2_uri( + organization_id=task_v2.organization_id, + artifact_id=screenshot_id, + task_v2=task_v2, + artifact_type=ArtifactType.SCREENSHOT_LLM, + ) + artifacts.append( + ArtifactBatchData( + artifact_model=self._build_artifact_model( + artifact_id=screenshot_id, + artifact_type=ArtifactType.SCREENSHOT_LLM, + uri=screenshot_uri, + organization_id=task_v2.organization_id, + task_v2_id=task_v2.observer_cruise_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + ), + data=screenshot, + ) + ) + + return BulkArtifactCreationRequest(artifacts=artifacts, primary_key=task_v2.observer_cruise_id) + + def _prepare_thought_artifacts( + self, + thought: Thought, + artifact_type: ArtifactType, + data: bytes, + screenshots: list[bytes] | None = None, + ) -> BulkArtifactCreationRequest: + """Helper to prepare artifact batch request for Thought-based artifacts.""" + artifacts = [] + + # Main artifact + artifact_id = generate_artifact_id() + uri = app.STORAGE.build_thought_uri( + organization_id=thought.organization_id, + artifact_id=artifact_id, + thought=thought, + artifact_type=artifact_type, + ) + artifacts.append( + ArtifactBatchData( + artifact_model=self._build_artifact_model( + artifact_id=artifact_id, + artifact_type=artifact_type, + uri=uri, + organization_id=thought.organization_id, + thought_id=thought.observer_thought_id, + task_v2_id=thought.observer_cruise_id, + workflow_run_id=thought.workflow_run_id, + workflow_run_block_id=thought.workflow_run_block_id, + ), + data=data, + ) + ) + + # Screenshot artifacts + for screenshot in screenshots or []: + screenshot_id = generate_artifact_id() + screenshot_uri = app.STORAGE.build_thought_uri( + organization_id=thought.organization_id, + artifact_id=screenshot_id, + thought=thought, + artifact_type=ArtifactType.SCREENSHOT_LLM, + ) + artifacts.append( + ArtifactBatchData( + artifact_model=self._build_artifact_model( + artifact_id=screenshot_id, + artifact_type=ArtifactType.SCREENSHOT_LLM, + uri=screenshot_uri, + organization_id=thought.organization_id, + thought_id=thought.observer_thought_id, + task_v2_id=thought.observer_cruise_id, + workflow_run_id=thought.workflow_run_id, + workflow_run_block_id=thought.workflow_run_block_id, + ), + data=screenshot, + ) + ) + + return BulkArtifactCreationRequest(artifacts=artifacts, primary_key=thought.observer_cruise_id) + + def _prepare_ai_suggestion_artifacts( + self, + ai_suggestion: AISuggestion, + artifact_type: ArtifactType, + data: bytes, + screenshots: list[bytes] | None = None, + ) -> BulkArtifactCreationRequest: + """Helper to prepare artifact batch request for AISuggestion-based artifacts.""" + artifacts = [] + + # Main artifact + artifact_id = generate_artifact_id() + uri = app.STORAGE.build_ai_suggestion_uri( + organization_id=ai_suggestion.organization_id, + artifact_id=artifact_id, + ai_suggestion=ai_suggestion, + artifact_type=artifact_type, + ) + artifacts.append( + ArtifactBatchData( + artifact_model=self._build_artifact_model( + artifact_id=artifact_id, + artifact_type=artifact_type, + uri=uri, + organization_id=ai_suggestion.organization_id, + ai_suggestion_id=ai_suggestion.ai_suggestion_id, + ), + data=data, + ) + ) + + # Screenshot artifacts + for screenshot in screenshots or []: + screenshot_id = generate_artifact_id() + screenshot_uri = app.STORAGE.build_ai_suggestion_uri( + organization_id=ai_suggestion.organization_id, + artifact_id=screenshot_id, + ai_suggestion=ai_suggestion, + artifact_type=ArtifactType.SCREENSHOT_LLM, + ) + artifacts.append( + ArtifactBatchData( + artifact_model=self._build_artifact_model( + artifact_id=screenshot_id, + artifact_type=ArtifactType.SCREENSHOT_LLM, + uri=screenshot_uri, + organization_id=ai_suggestion.organization_id, + ai_suggestion_id=ai_suggestion.ai_suggestion_id, + ), + data=screenshot, + ) + ) + + return BulkArtifactCreationRequest(artifacts=artifacts, primary_key=ai_suggestion.ai_suggestion_id) + async def create_llm_artifact( self, data: bytes, @@ -290,54 +646,53 @@ class ArtifactManager: task_v2: TaskV2 | None = None, ai_suggestion: AISuggestion | None = None, ) -> None: + """ + Create LLM artifact with optional screenshots using bulk insert. + + Args: + data: Main artifact data + artifact_type: Type of the main artifact + screenshots: Optional list of screenshot data + step: Optional Step entity + thought: Optional Thought entity + task_v2: Optional TaskV2 entity + ai_suggestion: Optional AISuggestion entity + """ if step: - await self.create_artifact( + request = self._prepare_step_artifacts( step=step, artifact_type=artifact_type, data=data, + screenshots=screenshots, ) - for screenshot in screenshots or []: - await self.create_artifact( - step=step, - artifact_type=ArtifactType.SCREENSHOT_LLM, - data=screenshot, - ) + await self._bulk_create_artifacts(request) + elif task_v2: - await self.create_task_v2_artifact( + request = self._prepare_task_v2_artifacts( task_v2=task_v2, artifact_type=artifact_type, data=data, + screenshots=screenshots, ) - for screenshot in screenshots or []: - await self.create_task_v2_artifact( - task_v2=task_v2, - artifact_type=ArtifactType.SCREENSHOT_LLM, - data=screenshot, - ) + await self._bulk_create_artifacts(request) + elif thought: - await self.create_thought_artifact( + request = self._prepare_thought_artifacts( thought=thought, artifact_type=artifact_type, data=data, + screenshots=screenshots, ) - for screenshot in screenshots or []: - await self.create_thought_artifact( - thought=thought, - artifact_type=ArtifactType.SCREENSHOT_LLM, - data=screenshot, - ) + await self._bulk_create_artifacts(request) + elif ai_suggestion: - await self.create_ai_suggestion_artifact( + request = self._prepare_ai_suggestion_artifacts( ai_suggestion=ai_suggestion, artifact_type=artifact_type, data=data, + screenshots=screenshots, ) - for screenshot in screenshots or []: - await self.create_ai_suggestion_artifact( - ai_suggestion=ai_suggestion, - artifact_type=ArtifactType.SCREENSHOT_LLM, - data=screenshot, - ) + await self._bulk_create_artifacts(request) async def update_artifact_data( self, diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 48729f25..380a34d0 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -371,6 +371,39 @@ class AgentDB: LOG.exception("UnexpectedError") raise + async def bulk_create_artifacts( + self, + artifact_models: list[ArtifactModel], + ) -> list[Artifact]: + """ + Bulk create multiple artifacts in a single database transaction. + + Args: + artifact_models: List of ArtifactModel instances to insert + + Returns: + List of created Artifact objects + """ + if not artifact_models: + return [] + + try: + async with self.Session() as session: + session.add_all(artifact_models) + await session.commit() + + # Refresh all artifacts to get their created_at and modified_at values + for artifact in artifact_models: + await session.refresh(artifact) + + return [convert_to_artifact(artifact, self.debug_enabled) for artifact in artifact_models] + except SQLAlchemyError: + LOG.exception("SQLAlchemyError during bulk artifact creation") + raise + except Exception: + LOG.exception("UnexpectedError during bulk artifact creation") + raise + async def get_task(self, task_id: str, organization_id: str | None = None) -> Task | None: """Get a task by its id""" try: