Batch artifact insert processing (#4306)

This commit is contained in:
LawyZheng
2025-12-16 23:00:51 +08:00
committed by GitHub
parent 90e7f7b3cf
commit a1ec9cc633
2 changed files with 416 additions and 28 deletions

View File

@@ -1,6 +1,7 @@
import asyncio
import time
from collections import defaultdict
from dataclasses import dataclass
import structlog
@@ -8,6 +9,7 @@ from skyvern.forge import app
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.db.id import generate_artifact_id
from skyvern.forge.sdk.db.models import ArtifactModel
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought
@@ -16,10 +18,95 @@ from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
LOG = structlog.get_logger(__name__)
@dataclass
class ArtifactBatchData:
"""
Data class for batch artifact creation.
Attributes:
artifact_model: The ArtifactModel instance to insert
data: Optional bytes data to upload
path: Optional file path to upload from
"""
artifact_model: ArtifactModel
data: bytes | None = None
path: str | None = None
def __post_init__(self) -> None:
"""Validate that exactly one of data or path is provided."""
if self.data is not None and self.path is not None:
raise ValueError("Cannot specify both data and path for artifact upload")
@dataclass
class BulkArtifactCreationRequest:
"""
Request data for bulk artifact creation.
Attributes:
artifacts: List of artifact batch data to create
primary_key: Primary key for tracking upload tasks (e.g., task_id, cruise_id)
"""
artifacts: list[ArtifactBatchData]
primary_key: str
class ArtifactManager:
# task_id -> list of aio_tasks for uploading artifacts
upload_aiotasks_map: dict[str, list[asyncio.Task[None]]] = defaultdict(list)
@staticmethod
def _build_artifact_model(
artifact_id: str,
artifact_type: ArtifactType,
uri: str,
organization_id: str,
step_id: str | None = None,
task_id: str | None = None,
workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None,
thought_id: str | None = None,
task_v2_id: str | None = None,
run_id: str | None = None,
ai_suggestion_id: str | None = None,
) -> ArtifactModel:
"""
Helper function to build an ArtifactModel instance.
Args:
artifact_id: Unique artifact identifier
artifact_type: Type of the artifact
uri: Storage URI for the artifact
organization_id: Organization ID
step_id: Optional step ID
task_id: Optional task ID
workflow_run_id: Optional workflow run ID
workflow_run_block_id: Optional workflow run block ID
thought_id: Optional thought ID (stored as observer_thought_id)
task_v2_id: Optional task v2 ID (stored as observer_cruise_id)
run_id: Optional run ID
ai_suggestion_id: Optional AI suggestion ID
Returns:
ArtifactModel instance ready for database insertion
"""
return ArtifactModel(
artifact_id=artifact_id,
artifact_type=artifact_type,
uri=uri,
organization_id=organization_id,
task_id=task_id,
step_id=step_id,
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
observer_cruise_id=task_v2_id,
observer_thought_id=thought_id,
run_id=run_id,
ai_suggestion_id=ai_suggestion_id,
)
async def _create_artifact(
self,
aio_task_primary_key: str,
@@ -280,6 +367,275 @@ class ArtifactManager:
data=data,
)
async def _bulk_create_artifacts(
self,
request: BulkArtifactCreationRequest,
) -> list[str]:
"""
Bulk create multiple artifacts in a single database transaction.
Args:
request: BulkArtifactCreationRequest containing artifacts and primary key
Returns:
List of artifact IDs
"""
if not request.artifacts:
return []
# Extract models for bulk insert
artifact_models = [artifact_data.artifact_model for artifact_data in request.artifacts]
# Bulk insert artifacts
artifacts = await app.DATABASE.bulk_create_artifacts(artifact_models)
# Fire and forget upload tasks
for artifact, artifact_data in zip(artifacts, request.artifacts):
if artifact_data.data is not None:
aio_task = asyncio.create_task(app.STORAGE.store_artifact(artifact, artifact_data.data))
self.upload_aiotasks_map[request.primary_key].append(aio_task)
elif artifact_data.path is not None:
aio_task = asyncio.create_task(app.STORAGE.store_artifact_from_path(artifact, artifact_data.path))
self.upload_aiotasks_map[request.primary_key].append(aio_task)
return [model.artifact_id for model in artifact_models]
def _prepare_step_artifacts(
self,
step: Step,
artifact_type: ArtifactType,
data: bytes,
screenshots: list[bytes] | None = None,
) -> BulkArtifactCreationRequest:
"""Helper to prepare artifact batch request for Step-based artifacts."""
artifacts = []
# Main artifact
artifact_id = generate_artifact_id()
uri = app.STORAGE.build_uri(
organization_id=step.organization_id,
artifact_id=artifact_id,
step=step,
artifact_type=artifact_type,
)
artifacts.append(
ArtifactBatchData(
artifact_model=self._build_artifact_model(
artifact_id=artifact_id,
artifact_type=artifact_type,
uri=uri,
organization_id=step.organization_id,
step_id=step.step_id,
task_id=step.task_id,
),
data=data,
)
)
# Screenshot artifacts
for screenshot in screenshots or []:
screenshot_id = generate_artifact_id()
screenshot_uri = app.STORAGE.build_uri(
organization_id=step.organization_id,
artifact_id=screenshot_id,
step=step,
artifact_type=ArtifactType.SCREENSHOT_LLM,
)
artifacts.append(
ArtifactBatchData(
artifact_model=self._build_artifact_model(
artifact_id=screenshot_id,
artifact_type=ArtifactType.SCREENSHOT_LLM,
uri=screenshot_uri,
organization_id=step.organization_id,
step_id=step.step_id,
task_id=step.task_id,
),
data=screenshot,
)
)
return BulkArtifactCreationRequest(artifacts=artifacts, primary_key=step.task_id)
def _prepare_task_v2_artifacts(
self,
task_v2: TaskV2,
artifact_type: ArtifactType,
data: bytes,
screenshots: list[bytes] | None = None,
) -> BulkArtifactCreationRequest:
"""Helper to prepare artifact batch request for TaskV2-based artifacts."""
context = skyvern_context.current()
workflow_run_id = context.workflow_run_id if context else task_v2.workflow_run_id
workflow_run_block_id = context.parent_workflow_run_block_id if context else None
artifacts = []
# Main artifact
artifact_id = generate_artifact_id()
uri = app.STORAGE.build_task_v2_uri(
organization_id=task_v2.organization_id,
artifact_id=artifact_id,
task_v2=task_v2,
artifact_type=artifact_type,
)
artifacts.append(
ArtifactBatchData(
artifact_model=self._build_artifact_model(
artifact_id=artifact_id,
artifact_type=artifact_type,
uri=uri,
organization_id=task_v2.organization_id,
task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
),
data=data,
)
)
# Screenshot artifacts
for screenshot in screenshots or []:
screenshot_id = generate_artifact_id()
screenshot_uri = app.STORAGE.build_task_v2_uri(
organization_id=task_v2.organization_id,
artifact_id=screenshot_id,
task_v2=task_v2,
artifact_type=ArtifactType.SCREENSHOT_LLM,
)
artifacts.append(
ArtifactBatchData(
artifact_model=self._build_artifact_model(
artifact_id=screenshot_id,
artifact_type=ArtifactType.SCREENSHOT_LLM,
uri=screenshot_uri,
organization_id=task_v2.organization_id,
task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
),
data=screenshot,
)
)
return BulkArtifactCreationRequest(artifacts=artifacts, primary_key=task_v2.observer_cruise_id)
def _prepare_thought_artifacts(
self,
thought: Thought,
artifact_type: ArtifactType,
data: bytes,
screenshots: list[bytes] | None = None,
) -> BulkArtifactCreationRequest:
"""Helper to prepare artifact batch request for Thought-based artifacts."""
artifacts = []
# Main artifact
artifact_id = generate_artifact_id()
uri = app.STORAGE.build_thought_uri(
organization_id=thought.organization_id,
artifact_id=artifact_id,
thought=thought,
artifact_type=artifact_type,
)
artifacts.append(
ArtifactBatchData(
artifact_model=self._build_artifact_model(
artifact_id=artifact_id,
artifact_type=artifact_type,
uri=uri,
organization_id=thought.organization_id,
thought_id=thought.observer_thought_id,
task_v2_id=thought.observer_cruise_id,
workflow_run_id=thought.workflow_run_id,
workflow_run_block_id=thought.workflow_run_block_id,
),
data=data,
)
)
# Screenshot artifacts
for screenshot in screenshots or []:
screenshot_id = generate_artifact_id()
screenshot_uri = app.STORAGE.build_thought_uri(
organization_id=thought.organization_id,
artifact_id=screenshot_id,
thought=thought,
artifact_type=ArtifactType.SCREENSHOT_LLM,
)
artifacts.append(
ArtifactBatchData(
artifact_model=self._build_artifact_model(
artifact_id=screenshot_id,
artifact_type=ArtifactType.SCREENSHOT_LLM,
uri=screenshot_uri,
organization_id=thought.organization_id,
thought_id=thought.observer_thought_id,
task_v2_id=thought.observer_cruise_id,
workflow_run_id=thought.workflow_run_id,
workflow_run_block_id=thought.workflow_run_block_id,
),
data=screenshot,
)
)
return BulkArtifactCreationRequest(artifacts=artifacts, primary_key=thought.observer_cruise_id)
def _prepare_ai_suggestion_artifacts(
self,
ai_suggestion: AISuggestion,
artifact_type: ArtifactType,
data: bytes,
screenshots: list[bytes] | None = None,
) -> BulkArtifactCreationRequest:
"""Helper to prepare artifact batch request for AISuggestion-based artifacts."""
artifacts = []
# Main artifact
artifact_id = generate_artifact_id()
uri = app.STORAGE.build_ai_suggestion_uri(
organization_id=ai_suggestion.organization_id,
artifact_id=artifact_id,
ai_suggestion=ai_suggestion,
artifact_type=artifact_type,
)
artifacts.append(
ArtifactBatchData(
artifact_model=self._build_artifact_model(
artifact_id=artifact_id,
artifact_type=artifact_type,
uri=uri,
organization_id=ai_suggestion.organization_id,
ai_suggestion_id=ai_suggestion.ai_suggestion_id,
),
data=data,
)
)
# Screenshot artifacts
for screenshot in screenshots or []:
screenshot_id = generate_artifact_id()
screenshot_uri = app.STORAGE.build_ai_suggestion_uri(
organization_id=ai_suggestion.organization_id,
artifact_id=screenshot_id,
ai_suggestion=ai_suggestion,
artifact_type=ArtifactType.SCREENSHOT_LLM,
)
artifacts.append(
ArtifactBatchData(
artifact_model=self._build_artifact_model(
artifact_id=screenshot_id,
artifact_type=ArtifactType.SCREENSHOT_LLM,
uri=screenshot_uri,
organization_id=ai_suggestion.organization_id,
ai_suggestion_id=ai_suggestion.ai_suggestion_id,
),
data=screenshot,
)
)
return BulkArtifactCreationRequest(artifacts=artifacts, primary_key=ai_suggestion.ai_suggestion_id)
async def create_llm_artifact(
self,
data: bytes,
@@ -290,54 +646,53 @@ class ArtifactManager:
task_v2: TaskV2 | None = None,
ai_suggestion: AISuggestion | None = None,
) -> None:
"""
Create LLM artifact with optional screenshots using bulk insert.
Args:
data: Main artifact data
artifact_type: Type of the main artifact
screenshots: Optional list of screenshot data
step: Optional Step entity
thought: Optional Thought entity
task_v2: Optional TaskV2 entity
ai_suggestion: Optional AISuggestion entity
"""
if step:
await self.create_artifact(
request = self._prepare_step_artifacts(
step=step,
artifact_type=artifact_type,
data=data,
screenshots=screenshots,
)
for screenshot in screenshots or []:
await self.create_artifact(
step=step,
artifact_type=ArtifactType.SCREENSHOT_LLM,
data=screenshot,
)
await self._bulk_create_artifacts(request)
elif task_v2:
await self.create_task_v2_artifact(
request = self._prepare_task_v2_artifacts(
task_v2=task_v2,
artifact_type=artifact_type,
data=data,
screenshots=screenshots,
)
for screenshot in screenshots or []:
await self.create_task_v2_artifact(
task_v2=task_v2,
artifact_type=ArtifactType.SCREENSHOT_LLM,
data=screenshot,
)
await self._bulk_create_artifacts(request)
elif thought:
await self.create_thought_artifact(
request = self._prepare_thought_artifacts(
thought=thought,
artifact_type=artifact_type,
data=data,
screenshots=screenshots,
)
for screenshot in screenshots or []:
await self.create_thought_artifact(
thought=thought,
artifact_type=ArtifactType.SCREENSHOT_LLM,
data=screenshot,
)
await self._bulk_create_artifacts(request)
elif ai_suggestion:
await self.create_ai_suggestion_artifact(
request = self._prepare_ai_suggestion_artifacts(
ai_suggestion=ai_suggestion,
artifact_type=artifact_type,
data=data,
screenshots=screenshots,
)
for screenshot in screenshots or []:
await self.create_ai_suggestion_artifact(
ai_suggestion=ai_suggestion,
artifact_type=ArtifactType.SCREENSHOT_LLM,
data=screenshot,
)
await self._bulk_create_artifacts(request)
async def update_artifact_data(
self,

View File

@@ -371,6 +371,39 @@ class AgentDB:
LOG.exception("UnexpectedError")
raise
async def bulk_create_artifacts(
self,
artifact_models: list[ArtifactModel],
) -> list[Artifact]:
"""
Bulk create multiple artifacts in a single database transaction.
Args:
artifact_models: List of ArtifactModel instances to insert
Returns:
List of created Artifact objects
"""
if not artifact_models:
return []
try:
async with self.Session() as session:
session.add_all(artifact_models)
await session.commit()
# Refresh all artifacts to get their created_at and modified_at values
for artifact in artifact_models:
await session.refresh(artifact)
return [convert_to_artifact(artifact, self.debug_enabled) for artifact in artifact_models]
except SQLAlchemyError:
LOG.exception("SQLAlchemyError during bulk artifact creation")
raise
except Exception:
LOG.exception("UnexpectedError during bulk artifact creation")
raise
async def get_task(self, task_id: str, organization_id: str | None = None) -> Task | None:
"""Get a task by its id"""
try: