diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index 68ed10e5..6dc01ad9 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -9,6 +9,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT from skyvern.forge.sdk.db.id import generate_artifact_id from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought +from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock LOG = structlog.get_logger(__name__) @@ -151,6 +152,27 @@ class ArtifactManager: path=path, ) + async def create_workflow_run_block_artifact( + self, + workflow_run_block: WorkflowRunBlock, + artifact_type: ArtifactType, + data: bytes | None = None, + path: str | None = None, + ) -> str: + artifact_id = generate_artifact_id() + uri = app.STORAGE.build_workflow_run_block_uri(artifact_id, workflow_run_block, artifact_type) + return await self._create_artifact( + aio_task_primary_key=workflow_run_block.workflow_run_block_id, + artifact_id=artifact_id, + artifact_type=artifact_type, + uri=uri, + workflow_run_block_id=workflow_run_block.workflow_run_block_id, + workflow_run_id=workflow_run_block.workflow_run_id, + organization_id=workflow_run_block.organization_id, + data=data, + path=path, + ) + async def create_llm_artifact( self, data: bytes, diff --git a/skyvern/forge/sdk/artifact/storage/base.py b/skyvern/forge/sdk/artifact/storage/base.py index 1de90610..bc9a59e4 100644 --- a/skyvern/forge/sdk/artifact/storage/base.py +++ b/skyvern/forge/sdk/artifact/storage/base.py @@ -3,6 +3,7 @@ from abc import ABC, abstractmethod from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought +from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock # TODO: This should be a part of the ArtifactType model FILE_EXTENTSION_MAP: dict[ArtifactType, str] = { @@ -52,6 +53,12 @@ class BaseStorage(ABC): ) -> str: pass + @abstractmethod + def build_workflow_run_block_uri( + self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType + ) -> str: + pass + @abstractmethod async def store_artifact(self, artifact: Artifact, data: bytes) -> None: pass diff --git a/skyvern/forge/sdk/artifact/storage/local.py b/skyvern/forge/sdk/artifact/storage/local.py index 506ed57d..a12e966b 100644 --- a/skyvern/forge/sdk/artifact/storage/local.py +++ b/skyvern/forge/sdk/artifact/storage/local.py @@ -12,6 +12,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT from skyvern.forge.sdk.artifact.storage.base import FILE_EXTENTSION_MAP, BaseStorage from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought +from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock LOG = structlog.get_logger() @@ -40,6 +41,12 @@ class LocalStorage(BaseStorage): file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"file://{self.artifact_path}/{settings.ENV}/observers/{observer_cruise.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + def build_workflow_run_block_uri( + self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType + ) -> str: + file_ext = FILE_EXTENTSION_MAP[artifact_type] + return f"file://{self.artifact_path}/{settings.ENV}/workflow_runs/{workflow_run_block.workflow_run_id}/{workflow_run_block.workflow_run_block_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + async def store_artifact(self, artifact: Artifact, data: bytes) -> None: file_path = None try: diff --git a/skyvern/forge/sdk/artifact/storage/s3.py b/skyvern/forge/sdk/artifact/storage/s3.py index 59dded16..e5cb5f6e 100644 --- a/skyvern/forge/sdk/artifact/storage/s3.py +++ b/skyvern/forge/sdk/artifact/storage/s3.py @@ -16,6 +16,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT from skyvern.forge.sdk.artifact.storage.base import FILE_EXTENTSION_MAP, BaseStorage from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverThought +from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock class S3Storage(BaseStorage): @@ -43,6 +44,12 @@ class S3Storage(BaseStorage): file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"s3://{self.bucket}/{settings.ENV}/observers/{observer_cruise.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + def build_workflow_run_block_uri( + self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType + ) -> str: + file_ext = FILE_EXTENTSION_MAP[artifact_type] + return f"s3://{self.bucket}/{settings.ENV}/workflow_runs/{workflow_run_block.workflow_run_id}/{workflow_run_block.workflow_run_block_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + async def store_artifact(self, artifact: Artifact, data: bytes) -> None: await self.async_client.upload_file(artifact.uri, data) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index ac1c208e..6be5efc8 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -380,6 +380,7 @@ def convert_to_workflow_run_block( block = WorkflowRunBlock( workflow_run_block_id=workflow_run_block_model.workflow_run_block_id, workflow_run_id=workflow_run_block_model.workflow_run_id, + organization_id=workflow_run_block_model.organization_id, parent_workflow_run_block_id=workflow_run_block_model.parent_workflow_run_block_id, block_type=BlockType(workflow_run_block_model.block_type), label=workflow_run_block_model.label, diff --git a/skyvern/forge/sdk/schemas/workflow_runs.py b/skyvern/forge/sdk/schemas/workflow_runs.py index 8199a271..531b7114 100644 --- a/skyvern/forge/sdk/schemas/workflow_runs.py +++ b/skyvern/forge/sdk/schemas/workflow_runs.py @@ -14,6 +14,7 @@ from skyvern.webeye.actions.actions import Action class WorkflowRunBlock(BaseModel): workflow_run_block_id: str workflow_run_id: str + organization_id: str | None = None parent_workflow_run_block_id: str | None = None block_type: BlockType label: str | None = None diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index b4f3c18e..bd39a596 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -17,6 +17,7 @@ from skyvern.forge.sdk.schemas.observers import ( ObserverCruise, ObserverCruiseStatus, ObserverMetadata, + ObserverThought, ObserverThoughtScenario, ObserverThoughtType, ) @@ -91,7 +92,7 @@ async def initialize_observer_cruise( ) metadata_prompt = prompt_engine.load_prompt("observer_generate_metadata", user_goal=user_prompt, user_url=user_url) - metadata_response = await app.SECONDARY_LLM_API_HANDLER(prompt=metadata_prompt, observer_cruise=observer_cruise) + metadata_response = await app.SECONDARY_LLM_API_HANDLER(prompt=metadata_prompt, observer_thought=observer_thought) # validate LOG.info(f"Initialized observer initial response: {metadata_response}") url: str = metadata_response.get("url", "") @@ -423,6 +424,7 @@ async def run_observer_cruise( prompt=observer_completion_prompt, observer_cruise=observer_thought, ) + await _record_thought_screenshot(observer_thought=observer_thought, workflow_run_id=workflow_run_id) LOG.info( "Observer completion check response", completion_resp=completion_resp, @@ -895,3 +897,18 @@ async def get_observer_thought_timelines( ) for thought in observer_thoughts ] + + +async def _record_thought_screenshot(observer_thought: ObserverThought, workflow_run_id: str) -> None: + # get the browser state for the workflow run + browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id=workflow_run_id) + if not browser_state: + LOG.warning("No browser state found for the workflow run", workflow_run_id=workflow_run_id) + return + # get the screenshot for the workflow run + screenshot = await browser_state.take_screenshot(full_page=True) + await app.ARTIFACT_MANAGER.create_observer_thought_artifact( + observer_thought=observer_thought, + artifact_type=ArtifactType.SCREENSHOT_LLM, + data=screenshot, + ) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 89767589..b156c6fd 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -43,6 +43,7 @@ from skyvern.forge.sdk.api.files import ( get_path_for_workflow_download_directory, ) from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory +from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.db.enums import TaskType from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext @@ -205,6 +206,18 @@ class Block(BaseModel, abc.ABC): block_type=self.block_type, continue_on_failure=self.continue_on_failure, ) + # create a screenshot + browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id) + if not browser_state: + LOG.warning("No browser state found when creating workflow_run_block", workflow_run_id=workflow_run_id) + else: + screenshot = await browser_state.take_screenshot(full_page=True) + if screenshot: + await app.ARTIFACT_MANAGER.create_workflow_run_block_artifact( + workflow_run_block=workflow_run_block, + artifact_type=ArtifactType.SCREENSHOT_LLM, + data=screenshot, + ) workflow_run_block_id = workflow_run_block.workflow_run_block_id return await self.execute(workflow_run_id, workflow_run_block_id, organization_id=organization_id, **kwargs) except Exception as e: