Observer artifact creation (#1345)

This commit is contained in:
Shuchang Zheng
2024-12-07 12:22:11 -08:00
committed by GitHub
parent 7591873546
commit 127f25c663
9 changed files with 199 additions and 63 deletions

View File

@@ -1,6 +1,7 @@
import asyncio
import time
from collections import defaultdict
from typing import Literal
import structlog
@@ -8,6 +9,7 @@ from skyvern.forge import app
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
from skyvern.forge.sdk.db.id import generate_artifact_id
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.observers import ObserverThought
LOG = structlog.get_logger(__name__)
@@ -16,6 +18,49 @@ class ArtifactManager:
# task_id -> list of aio_tasks for uploading artifacts
upload_aiotasks_map: dict[str, list[asyncio.Task[None]]] = defaultdict(list)
async def _create_artifact(
self,
aio_task_primary_key: str,
artifact_id: str,
artifact_type: ArtifactType,
uri: str,
step_id: str | None = None,
task_id: str | None = None,
workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None,
observer_thought_id: str | None = None,
observer_cruise_id: str | None = None,
organization_id: str | None = None,
data: bytes | None = None,
path: str | None = None,
) -> str:
if data is None and path is None:
raise ValueError("Either data or path must be provided to create an artifact.")
if data and path:
raise ValueError("Both data and path cannot be provided to create an artifact.")
artifact = await app.DATABASE.create_artifact(
artifact_id,
artifact_type,
uri,
step_id=step_id,
task_id=task_id,
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
observer_thought_id=observer_thought_id,
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
)
if data:
# Fire and forget
aio_task = asyncio.create_task(app.STORAGE.store_artifact(artifact, data))
self.upload_aiotasks_map[aio_task_primary_key].append(aio_task)
elif path:
# Fire and forget
aio_task = asyncio.create_task(app.STORAGE.store_artifact_from_path(artifact, path))
self.upload_aiotasks_map[aio_task_primary_key].append(aio_task)
return artifact_id
async def create_artifact(
self,
step: Step,
@@ -23,43 +68,48 @@ class ArtifactManager:
data: bytes | None = None,
path: str | None = None,
) -> str:
# TODO (kerem): Which is better?
# current: (disadvantage: we create the artifact_id UUID here)
# 1. generate artifact_id UUID here
# 2. build uri with artifact_id, step_id, task_id, artifact_type
# 3. create artifact in db using artifact_id, step_id, task_id, artifact_type, uri
# 4. store artifact in storage
# alternative: (disadvantage: two db calls)
# 1. create artifact in db without the URI
# 2. build uri with artifact_id, step_id, task_id, artifact_type
# 3. update artifact in db with the URI
# 4. store artifact in storage
if data is None and path is None:
raise ValueError("Either data or path must be provided to create an artifact.")
if data and path:
raise ValueError("Both data and path cannot be provided to create an artifact.")
artifact_id = generate_artifact_id()
uri = app.STORAGE.build_uri(artifact_id, step, artifact_type)
artifact = await app.DATABASE.create_artifact(
artifact_id,
step.step_id,
step.task_id,
artifact_type,
uri,
return await self._create_artifact(
aio_task_primary_key=step.task_id,
artifact_id=artifact_id,
artifact_type=artifact_type,
uri=uri,
step_id=step.step_id,
task_id=step.task_id,
organization_id=step.organization_id,
data=data,
path=path,
)
if data:
# Fire and forget
aio_task = asyncio.create_task(app.STORAGE.store_artifact(artifact, data))
self.upload_aiotasks_map[step.task_id].append(aio_task)
elif path:
# Fire and forget
aio_task = asyncio.create_task(app.STORAGE.store_artifact_from_path(artifact, path))
self.upload_aiotasks_map[step.task_id].append(aio_task)
return artifact_id
async def create_observer_thought_artifact(
self,
observer_thought: ObserverThought,
artifact_type: ArtifactType,
data: bytes | None = None,
path: str | None = None,
) -> str:
artifact_id = generate_artifact_id()
uri = app.STORAGE.build_observer_thought_uri(artifact_id, observer_thought, artifact_type)
return await self._create_artifact(
aio_task_primary_key=observer_thought.observer_cruise_id,
artifact_id=artifact_id,
artifact_type=artifact_type,
uri=uri,
observer_thought_id=observer_thought.observer_thought_id,
observer_cruise_id=observer_thought.observer_cruise_id,
organization_id=observer_thought.organization_id,
data=data,
path=path,
)
async def update_artifact_data(self, artifact_id: str | None, organization_id: str | None, data: bytes) -> None:
async def update_artifact_data(
self,
artifact_id: str | None,
organization_id: str | None,
data: bytes,
primary_key: Literal["task_id", "observer_thought_id"] = "task_id",
) -> None:
if not artifact_id or not organization_id:
return None
artifact = await app.DATABASE.get_artifact_by_id(artifact_id, organization_id)
@@ -67,7 +117,14 @@ class ArtifactManager:
return
# Fire and forget
aio_task = asyncio.create_task(app.STORAGE.store_artifact(artifact, data))
self.upload_aiotasks_map[artifact.task_id].append(aio_task)
if primary_key == "task_id":
if not artifact.task_id:
raise ValueError("Task ID is required to update artifact data.")
self.upload_aiotasks_map[artifact.task_id].append(aio_task)
elif primary_key == "observer_thought_id":
if not artifact.observer_thought_id:
raise ValueError("Observer Thought ID is required to update artifact data.")
self.upload_aiotasks_map[artifact.observer_thought_id].append(aio_task)
async def retrieve_artifact(self, artifact: Artifact) -> bytes | None:
return await app.STORAGE.retrieve_artifact(artifact)