script skyvern fallback (#3285)

This commit is contained in:
Shuchang Zheng
2025-08-24 13:45:00 -07:00
committed by GitHub
parent d119c0ac92
commit 53d8c69e08
6 changed files with 246 additions and 82 deletions

View File

@@ -18,8 +18,11 @@ from skyvern.exceptions import ScriptNotFound, WorkflowRunNotFound
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.models import StepStatus
from skyvern.forge.sdk.schemas.files import FileInfo
from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus
from skyvern.forge.sdk.workflow.models.block import TaskBlock
from skyvern.schemas.runs import RunEngine
from skyvern.schemas.scripts import CreateScriptResponse, FileNode, ScriptFileCreate
from skyvern.schemas.workflows import BlockStatus, BlockType
@@ -245,14 +248,14 @@ async def _create_workflow_block_run_and_task(
block_type: BlockType,
prompt: str | None = None,
url: str | None = None,
) -> tuple[str | None, str | None]:
) -> tuple[str | None, str | None, str | None]:
"""
Create a workflow block run and optionally a task if workflow_run_id is available in context.
Returns (workflow_run_block_id, task_id) tuple.
"""
context = skyvern_context.current()
if not context or not context.workflow_run_id or not context.organization_id:
return None, None
return None, None, None
workflow_run_id = context.workflow_run_id
organization_id = context.organization_id
@@ -294,6 +297,7 @@ async def _create_workflow_block_run_and_task(
order=0,
retry_index=0,
organization_id=organization_id,
status=StepStatus.running,
)
step_id = step.step_id
@@ -307,7 +311,7 @@ async def _create_workflow_block_run_and_task(
context.step_id = step_id
context.task_id = task_id
return workflow_run_block_id, task_id
return workflow_run_block_id, task_id, step_id
except Exception as e:
LOG.warning(
@@ -317,7 +321,7 @@ async def _create_workflow_block_run_and_task(
workflow_run_id=context.workflow_run_id,
exc_info=True,
)
return None, None
return None, None, None
async def _record_output_parameter_value(
@@ -357,6 +361,9 @@ async def _update_workflow_block(
status: BlockStatus,
task_id: str | None = None,
task_status: TaskStatus = TaskStatus.completed,
step_id: str | None = None,
step_status: StepStatus = StepStatus.completed,
is_last: bool | None = True,
label: str | None = None,
failure_reason: str | None = None,
output: dict[str, Any] | list | str | None = None,
@@ -368,6 +375,14 @@ async def _update_workflow_block(
return
final_output = output
if task_id:
if step_id:
await app.DATABASE.update_step(
step_id=step_id,
task_id=task_id,
organization_id=context.organization_id,
status=step_status,
is_last=is_last,
)
updated_task = await app.DATABASE.update_task(
task_id=task_id,
organization_id=context.organization_id,
@@ -430,6 +445,129 @@ async def _run_cached_function(cache_key: str) -> Any:
raise Exception(f"Cache key {cache_key} not found")
async def _fallback_to_ai_run(
cache_key: str,
prompt: str | None = None,
url: str | None = None,
engine: RunEngine = RunEngine.skyvern_v1,
complete_criterion: str | None = None,
terminate_criterion: str | None = None,
data_extraction_goal: str | None = None,
schema: dict[str, Any] | list | str | None = None,
error_code_mapping: dict[str, str] | None = None,
max_steps: int | None = None,
complete_on_download: bool = False,
download_suffix: str | None = None,
totp_verification_url: str | None = None,
totp_identifier: str | None = None,
complete_verification: bool = True,
include_action_history_in_verification: bool = False,
error: Exception | None = None,
workflow_run_block_id: str | None = None,
) -> None:
context = skyvern_context.current()
if not (
context
and context.organization_id
and context.workflow_run_id
and context.workflow_id
and context.task_id
and context.step_id
):
return
try:
organization_id = context.organization_id
LOG.info(
"Script fallback to AI run",
cache_key=cache_key,
organization_id=organization_id,
workflow_id=context.workflow_id,
workflow_run_id=context.workflow_run_id,
task_id=context.task_id,
step_id=context.step_id,
)
# 1. fail the previous step
previous_step = await app.DATABASE.update_step(
step_id=context.step_id,
task_id=context.task_id,
organization_id=organization_id,
status=StepStatus.failed,
)
# 2. create a new step for ai run
ai_step = await app.DATABASE.create_step(
task_id=context.task_id,
organization_id=organization_id,
order=previous_step.order + 1,
retry_index=0,
)
context.step_id = ai_step.step_id
# 3. build the task block
# 4. run execute_step
organization = await app.DATABASE.get_organization(organization_id=organization_id)
if not organization:
raise Exception(f"Organization is missing organization_id={organization_id}")
task = await app.DATABASE.get_task(task_id=context.task_id, organization_id=organization_id)
if not task:
raise Exception(f"Task is missing task_id={context.task_id}")
workflow = await app.DATABASE.get_workflow(workflow_id=context.workflow_id, organization_id=organization_id)
if not workflow:
return
# get the output_paramter
output_parameter = workflow.get_output_parameter(cache_key)
if not output_parameter:
return
task_block = TaskBlock(
label=cache_key,
url=task.url,
navigation_goal=prompt,
output_parameter=output_parameter,
title=cache_key,
engine=engine,
complete_criterion=complete_criterion,
terminate_criterion=terminate_criterion,
data_extraction_goal=data_extraction_goal,
data_schema=schema,
error_code_mapping=error_code_mapping,
max_steps_per_run=max_steps,
complete_on_download=complete_on_download,
download_suffix=download_suffix,
totp_verification_url=totp_verification_url,
totp_identifier=totp_identifier,
complete_verification=complete_verification,
include_action_history_in_verification=include_action_history_in_verification,
)
await app.agent.execute_step(
organization=organization,
task=task,
step=ai_step,
task_block=task_block,
)
# Update block status to completed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.completed,
task_id=context.task_id,
step_id=context.step_id,
label=cache_key,
)
except Exception as e:
LOG.warning("Failed to fallback to AI run", cache_key=cache_key, exc_info=True)
# Update block status to failed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=context.task_id,
task_status=TaskStatus.failed,
label=cache_key,
failure_reason=str(e),
)
raise e
async def run_task(
prompt: str,
url: str | None = None,
@@ -437,7 +575,7 @@ async def run_task(
cache_key: str | None = None,
) -> None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id = await _create_workflow_block_run_and_task(
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.TASK,
prompt=prompt,
url=url,
@@ -453,22 +591,22 @@ async def run_task(
# Update block status to completed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id, BlockStatus.completed, task_id=task_id, label=cache_key
workflow_run_block_id,
BlockStatus.completed,
task_id=task_id,
step_id=step_id,
label=cache_key,
)
except Exception as e:
# TODO: fallback to AI run in case of error
# Update block status to failed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
label=cache_key,
failure_reason=str(e),
)
raise
await _fallback_to_ai_run(
cache_key=cache_key,
prompt=prompt,
url=url,
max_steps=max_steps,
error=e,
workflow_run_block_id=workflow_run_block_id,
)
finally:
# clear the prompt in the RunContext
run_context.prompt = None
@@ -479,6 +617,8 @@ async def run_task(
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
run_context.prompt = None
@@ -492,7 +632,7 @@ async def download(
cache_key: str | None = None,
) -> None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id = await _create_workflow_block_run_and_task(
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.FILE_DOWNLOAD,
prompt=prompt,
url=url,
@@ -508,21 +648,23 @@ async def download(
# Update block status to completed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id, BlockStatus.completed, task_id=task_id, label=cache_key
workflow_run_block_id,
BlockStatus.completed,
task_id=task_id,
step_id=step_id,
label=cache_key,
)
except Exception as e:
# Update block status to failed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
label=cache_key,
failure_reason=str(e),
)
raise
await _fallback_to_ai_run(
cache_key=cache_key,
prompt=prompt,
url=url,
max_steps=max_steps,
complete_on_download=True,
error=e,
workflow_run_block_id=workflow_run_block_id,
)
finally:
run_context.prompt = None
else:
@@ -532,6 +674,8 @@ async def download(
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
run_context.prompt = None
@@ -545,7 +689,7 @@ async def action(
cache_key: str | None = None,
) -> None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id = await _create_workflow_block_run_and_task(
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.ACTION,
prompt=prompt,
url=url,
@@ -561,21 +705,22 @@ async def action(
# Update block status to completed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id, BlockStatus.completed, task_id=task_id, label=cache_key
workflow_run_block_id,
BlockStatus.completed,
task_id=task_id,
step_id=step_id,
label=cache_key,
)
except Exception as e:
# Update block status to failed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
label=cache_key,
failure_reason=str(e),
)
raise
await _fallback_to_ai_run(
cache_key=cache_key,
prompt=prompt,
url=url,
max_steps=max_steps,
error=e,
workflow_run_block_id=workflow_run_block_id,
)
finally:
run_context.prompt = None
else:
@@ -585,6 +730,8 @@ async def action(
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
run_context.prompt = None
@@ -598,7 +745,7 @@ async def login(
cache_key: str | None = None,
) -> None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id = await _create_workflow_block_run_and_task(
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.LOGIN,
prompt=prompt,
url=url,
@@ -614,21 +761,22 @@ async def login(
# Update block status to completed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id, BlockStatus.completed, task_id=task_id, label=cache_key
workflow_run_block_id,
BlockStatus.completed,
task_id=task_id,
step_id=step_id,
label=cache_key,
)
except Exception as e:
# Update block status to failed if workflow block was created
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
label=cache_key,
failure_reason=str(e),
)
raise
await _fallback_to_ai_run(
cache_key=cache_key,
prompt=prompt,
url=url,
max_steps=max_steps,
error=e,
workflow_run_block_id=workflow_run_block_id,
)
finally:
run_context.prompt = None
else:
@@ -638,6 +786,8 @@ async def login(
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
run_context.prompt = None
@@ -651,7 +801,7 @@ async def extract(
cache_key: str | None = None,
) -> dict[str, Any] | list | str | None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id = await _create_workflow_block_run_and_task(
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.EXTRACTION,
prompt=prompt,
url=url,
@@ -671,6 +821,7 @@ async def extract(
workflow_run_block_id,
BlockStatus.completed,
task_id=task_id,
step_id=step_id,
output=output,
label=cache_key,
)
@@ -683,6 +834,8 @@ async def extract(
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason=str(e),
output=output,
label=cache_key,
@@ -697,6 +850,8 @@ async def extract(
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
run_context.prompt = None
@@ -705,7 +860,7 @@ async def extract(
async def wait(seconds: int) -> None:
# Auto-create workflow block run if workflow_run_id is available (wait block doesn't create tasks)
workflow_run_block_id, _ = await _create_workflow_block_run_and_task(block_type=BlockType.WAIT)
workflow_run_block_id, _, _ = await _create_workflow_block_run_and_task(block_type=BlockType.WAIT)
try:
await asyncio.sleep(seconds)