script gen: extract action support (#3238)
This commit is contained in:
@@ -4,17 +4,19 @@ import hashlib
|
||||
import importlib.util
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from typing import Any, cast
|
||||
|
||||
import structlog
|
||||
from fastapi import BackgroundTasks, HTTPException
|
||||
|
||||
from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT
|
||||
from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS
|
||||
from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager
|
||||
from skyvern.exceptions import ScriptNotFound, WorkflowRunNotFound
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.forge.sdk.schemas.tasks import TaskStatus
|
||||
from skyvern.forge.sdk.schemas.files import FileInfo
|
||||
from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus
|
||||
from skyvern.forge.sdk.workflow.models.block import BlockStatus, BlockType
|
||||
from skyvern.schemas.scripts import CreateScriptResponse, FileNode, ScriptFileCreate
|
||||
|
||||
@@ -314,31 +316,75 @@ async def _create_workflow_block_run_and_task(
|
||||
return None, None
|
||||
|
||||
|
||||
async def _update_workflow_block_status(
|
||||
async def _record_output_parameter_value(
|
||||
workflow_run_id: str,
|
||||
output: dict[str, Any] | list | str | None,
|
||||
) -> None:
|
||||
# TODO support this in the future
|
||||
# workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
|
||||
# await workflow_run_context.register_output_parameter_value_post_execution(
|
||||
# parameter=self.output_parameter,
|
||||
# value=value,
|
||||
# )
|
||||
# await app.DATABASE.create_or_update_workflow_run_output_parameter(
|
||||
# workflow_run_id=workflow_run_id,
|
||||
# output_parameter_id=self.output_parameter.output_parameter_id,
|
||||
# value=value,
|
||||
# )
|
||||
return
|
||||
|
||||
|
||||
async def _update_workflow_block(
|
||||
workflow_run_block_id: str,
|
||||
status: BlockStatus,
|
||||
task_id: str | None = None,
|
||||
task_status: TaskStatus = TaskStatus.completed,
|
||||
failure_reason: str | None = None,
|
||||
output: dict[str, Any] | list | str | None = None,
|
||||
) -> None:
|
||||
"""Update the status of a workflow run block."""
|
||||
try:
|
||||
context = skyvern_context.current()
|
||||
if not context or not context.organization_id:
|
||||
if not context or not context.organization_id or not context.workflow_run_id:
|
||||
return
|
||||
await app.DATABASE.update_workflow_run_block(
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=context.organization_id if context else None,
|
||||
status=status,
|
||||
failure_reason=failure_reason,
|
||||
)
|
||||
final_output = output
|
||||
if task_id:
|
||||
await app.DATABASE.update_task(
|
||||
updated_task = await app.DATABASE.update_task(
|
||||
task_id=task_id,
|
||||
organization_id=context.organization_id,
|
||||
status=task_status,
|
||||
failure_reason=failure_reason,
|
||||
extracted_information=output,
|
||||
)
|
||||
downloaded_files: list[FileInfo] = []
|
||||
try:
|
||||
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
||||
downloaded_files = await app.STORAGE.get_downloaded_files(
|
||||
organization_id=context.organization_id,
|
||||
run_id=context.workflow_run_id,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
LOG.warning("Timeout getting downloaded files", task_id=task_id)
|
||||
|
||||
task_output = TaskOutput.from_task(updated_task, downloaded_files)
|
||||
final_output = task_output.model_dump()
|
||||
await app.DATABASE.update_workflow_run_block(
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=context.organization_id if context else None,
|
||||
status=status,
|
||||
failure_reason=failure_reason,
|
||||
output=final_output,
|
||||
)
|
||||
else:
|
||||
final_output = None
|
||||
await app.DATABASE.update_workflow_run_block(
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=context.organization_id if context else None,
|
||||
status=status,
|
||||
failure_reason=failure_reason,
|
||||
)
|
||||
await _record_output_parameter_value(context.workflow_run_id, final_output)
|
||||
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
"Failed to update workflow block status",
|
||||
@@ -349,12 +395,12 @@ async def _update_workflow_block_status(
|
||||
)
|
||||
|
||||
|
||||
async def _run_cached_function(cache_key: str) -> None:
|
||||
async def _run_cached_function(cache_key: str) -> Any:
|
||||
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
|
||||
if cached_fn:
|
||||
# TODO: handle exceptions here and fall back to AI run in case of error
|
||||
run_context = script_run_context_manager.ensure_run_context()
|
||||
await cached_fn(page=run_context.page, context=run_context)
|
||||
return await cached_fn(page=run_context.page, context=run_context)
|
||||
else:
|
||||
raise Exception(f"Cache key {cache_key} not found")
|
||||
|
||||
@@ -378,12 +424,12 @@ async def run_task(
|
||||
|
||||
# Update block status to completed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
await _update_workflow_block(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
|
||||
except Exception as e:
|
||||
# Update block status to failed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -393,7 +439,7 @@ async def run_task(
|
||||
raise
|
||||
else:
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -422,12 +468,12 @@ async def download(
|
||||
|
||||
# Update block status to completed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
await _update_workflow_block(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
|
||||
except Exception as e:
|
||||
# Update block status to failed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -437,7 +483,7 @@ async def download(
|
||||
raise
|
||||
else:
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -466,12 +512,12 @@ async def action(
|
||||
|
||||
# Update block status to completed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
await _update_workflow_block(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
|
||||
except Exception as e:
|
||||
# Update block status to failed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -481,7 +527,7 @@ async def action(
|
||||
raise
|
||||
else:
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -510,12 +556,12 @@ async def login(
|
||||
|
||||
# Update block status to completed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
await _update_workflow_block(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
|
||||
except Exception as e:
|
||||
# Update block status to failed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -525,7 +571,7 @@ async def login(
|
||||
raise
|
||||
else:
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -540,36 +586,44 @@ async def extract(
|
||||
url: str | None = None,
|
||||
max_steps: int | None = None,
|
||||
cache_key: str | None = None,
|
||||
) -> None:
|
||||
) -> dict[str, Any] | list | str | None:
|
||||
# Auto-create workflow block run and task if workflow_run_id is available
|
||||
workflow_run_block_id, task_id = await _create_workflow_block_run_and_task(
|
||||
block_type=BlockType.EXTRACTION,
|
||||
prompt=prompt,
|
||||
url=url,
|
||||
)
|
||||
output: dict[str, Any] | list | str | None = None
|
||||
|
||||
if cache_key:
|
||||
try:
|
||||
await _run_cached_function(cache_key)
|
||||
output = cast(dict[str, Any] | list | str | None, await _run_cached_function(cache_key))
|
||||
|
||||
# Update block status to completed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id)
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.completed,
|
||||
task_id=task_id,
|
||||
output=output,
|
||||
)
|
||||
return output
|
||||
|
||||
except Exception as e:
|
||||
# Update block status to failed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
task_status=TaskStatus.failed,
|
||||
failure_reason=str(e),
|
||||
output=output,
|
||||
)
|
||||
raise
|
||||
else:
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(
|
||||
await _update_workflow_block(
|
||||
workflow_run_block_id,
|
||||
BlockStatus.failed,
|
||||
task_id=task_id,
|
||||
@@ -588,12 +642,12 @@ async def wait(seconds: int) -> None:
|
||||
|
||||
# Update block status to completed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed)
|
||||
await _update_workflow_block(workflow_run_block_id, BlockStatus.completed)
|
||||
|
||||
except Exception as e:
|
||||
# Update block status to failed if workflow block was created
|
||||
if workflow_run_block_id:
|
||||
await _update_workflow_block_status(workflow_run_block_id, BlockStatus.failed, failure_reason=str(e))
|
||||
await _update_workflow_block(workflow_run_block_id, BlockStatus.failed, failure_reason=str(e))
|
||||
raise
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user