From f6a0ccd32b895cbf5fd8ac41b88f086b79aebac9 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Wed, 18 Jun 2025 00:44:46 -0700 Subject: [PATCH] Workflow CodeGen (#2740) --- skyvern/__init__.py | 22 +- skyvern/core/__init__.py | 0 skyvern/core/code_generations/__init__.py | 0 .../code_run_context_manager.py | 22 ++ .../core/code_generations/generate_code.py | 191 +++++++++++----- .../core/code_generations/run_initializer.py | 23 ++ skyvern/core/code_generations/skyvern_page.py | 206 ++++++++++++++++++ .../transform_workflow_run.py | 81 +++++++ .../code_generations/workflow_wrappers.py | 59 +++++ skyvern/services/run_code_service.py | 12 + 10 files changed, 565 insertions(+), 51 deletions(-) create mode 100644 skyvern/core/__init__.py create mode 100644 skyvern/core/code_generations/__init__.py create mode 100644 skyvern/core/code_generations/code_run_context_manager.py create mode 100644 skyvern/core/code_generations/run_initializer.py create mode 100644 skyvern/core/code_generations/skyvern_page.py create mode 100644 skyvern/core/code_generations/transform_workflow_run.py create mode 100644 skyvern/core/code_generations/workflow_wrappers.py create mode 100644 skyvern/services/run_code_service.py diff --git a/skyvern/__init__.py b/skyvern/__init__.py index b374bba5..51dacac6 100644 --- a/skyvern/__init__.py +++ b/skyvern/__init__.py @@ -23,5 +23,25 @@ setup_logger() from skyvern.forge import app # noqa: E402, F401 from skyvern.library import Skyvern # noqa: E402 +from skyvern.core.code_generations.skyvern_page import RunContext, SkyvernPage # noqa: E402 +from skyvern.core.code_generations.run_initializer import setup # noqa: E402 +from skyvern.core.code_generations.workflow_wrappers import ( # noqa: E402 + workflow, # noqa: E402 + task_block, # noqa: E402 + file_download_block, # noqa: E402 + email_block, # noqa: E402 + wait_block, # noqa: E402 +) # noqa: E402 -__all__ = ["Skyvern"] + +__all__ = [ + "Skyvern", + "SkyvernPage", + "RunContext", + "setup", + "workflow", + "task_block", + "file_download_block", + "email_block", + "wait_block", +] diff --git a/skyvern/core/__init__.py b/skyvern/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/skyvern/core/code_generations/__init__.py b/skyvern/core/code_generations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/skyvern/core/code_generations/code_run_context_manager.py b/skyvern/core/code_generations/code_run_context_manager.py new file mode 100644 index 00000000..12087978 --- /dev/null +++ b/skyvern/core/code_generations/code_run_context_manager.py @@ -0,0 +1,22 @@ +from skyvern.core.code_generations.skyvern_page import RunContext + + +class CodeRunContextManager: + """ + Manages the run context for code runs. + """ + + def __init__(self) -> None: + self.run_contexts: dict[str, RunContext] = {} + """ + run_id -> RunContext + """ + + def get_run_context(self, run_id: str) -> RunContext | None: + return self.run_contexts.get(run_id) + + def set_run_context(self, run_id: str, run_context: RunContext) -> None: + self.run_contexts[run_id] = run_context + + def delete_run_context(self, run_id: str) -> None: + self.run_contexts.pop(run_id, None) diff --git a/skyvern/core/code_generations/generate_code.py b/skyvern/core/code_generations/generate_code.py index af2f4dc7..3a02fab3 100644 --- a/skyvern/core/code_generations/generate_code.py +++ b/skyvern/core/code_generations/generate_code.py @@ -1,18 +1,15 @@ # skyvern_codegen_cst.py """ -Generate a runnable Skyvern workflow script **with LibCST**. +Generate a runnable Skyvern workflow script. Example ------- -from skyvern_codegen_cst import generate_workflow_script - -src = generate_workflow_script( - workflow=workflow_dict, - tasks=[task1, task2, ...], - actions_by_task={ - task1["task_id"]: task1_actions, - task2["task_id"]: task2_actions, - }, +generated_code = generate_workflow_script( + file_name="workflow.py", + workflow_run_request=workflow_run_request, + workflow=workflow, + tasks=tasks, + actions_by_task=actions_by_task, ) Path("workflow.py").write_text(src) """ @@ -20,11 +17,14 @@ Path("workflow.py").write_text(src) from __future__ import annotations import keyword -from typing import Any, Iterable, Mapping +from enum import StrEnum +from typing import Any import libcst as cst from libcst import Attribute, Call, Dict, DictElement, FunctionDef, Name, Param +from skyvern.webeye.actions.action_types import ActionType + # --------------------------------------------------------------------- # # 1. helpers # # --------------------------------------------------------------------- # @@ -42,6 +42,8 @@ ACTION_MAP = { "drag": "drag", "solve_captcha": "solve_captcha", "verification_code": "verification_code", + "wait": "wait", + "extract": "extract", } INDENT = " " * 4 @@ -86,12 +88,47 @@ def _value(value: Any) -> cst.BaseExpression: # --------------------------------------------------------------------- # -def _make_decorator(block: Mapping[str, Any]) -> cst.Decorator: +def _workflow_decorator(wf_req: dict[str, Any]) -> cst.Decorator: + """ + Build @skyvern.workflow( + title="...", totp_url=..., totp_identifier=..., webhook_callback_url=..., max_steps=... + ) + """ + + # helper that skips “None” so the output is concise + def kw(key: str, value: Any) -> cst.Arg | None: + if value is None: + return None + return cst.Arg(keyword=cst.Name(key), value=_value(value)) + + args: list = list( + filter( + None, + [ + kw("title", wf_req.get("title", "")), + kw("totp_url", wf_req.get("totp_url")), + kw("totp_identifier", wf_req.get("totp_identifier")), + kw("webhook_url", wf_req.get("webhook_url")), + kw("max_steps", wf_req.get("max_steps")), + ], + ) + ) + + return cst.Decorator( + decorator=cst.Call( + func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("workflow")), + args=args, + ) + ) + + +def _make_decorator(block: dict[str, Any]) -> cst.Decorator: bt = block["block_type"] deco_name = { "task": "task_block", "file_download": "file_download_block", "send_email": "email_block", + "wait": "wait_block", }[bt] kwargs = [] @@ -104,12 +141,18 @@ def _make_decorator(block: Mapping[str, Any]) -> cst.Decorator: "totp_identifier": "totp_identifier", "webhook_callback_url": "webhook_callback_url", "max_steps_per_run": "max_steps", + "wait_sec": "seconds", } for src_key, kw in field_map.items(): v = block.get(src_key) if v not in (None, "", [], {}): - kwargs.append(cst.Arg(value=_value(v), keyword=Name(kw))) + if isinstance(v, StrEnum): + v = v.value + try: + kwargs.append(cst.Arg(value=_value(v), keyword=Name(kw))) + except Exception: + raise # booleans if block.get("complete_on_download"): @@ -125,7 +168,7 @@ def _make_decorator(block: Mapping[str, Any]) -> cst.Decorator: ) -def _action_to_stmt(act: Mapping[str, Any]) -> cst.BaseStatement: +def _action_to_stmt(act: dict[str, Any]) -> cst.BaseStatement: """ Turn one Action dict into: @@ -157,14 +200,16 @@ def _action_to_stmt(act: Mapping[str, Any]) -> cst.BaseStatement: return cst.SimpleStatementLine([cst.Expr(await_expr)]) -def _build_block_fn(block: Mapping[str, Any], actions: Iterable[Mapping[str, Any]]) -> FunctionDef: - name = _safe_name(block["title"]) +def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> FunctionDef: + name = _safe_name(block.get("title") or block.get("label") or f"block_{block.get('workflow_run_block_id')}") body_stmts: list[cst.BaseStatement] = [] if block.get("url"): body_stmts.append(cst.parse_statement(f"await page.goto({repr(block['url'])})")) for act in actions: + if act["action_type"] in [ActionType.COMPLETE]: + continue body_stmts.append(_action_to_stmt(act)) if not body_stmts: @@ -174,8 +219,8 @@ def _build_block_fn(block: Mapping[str, Any], actions: Iterable[Mapping[str, Any name=Name(name), params=cst.Parameters( params=[ - Param(name=Name("page")), - Param(name=Name("context")), + Param(name=Name("page"), annotation=cst.Annotation(cst.Name("SkyvernPage"))), + Param(name=Name("context"), annotation=cst.Annotation(cst.Name("RunContext"))), ] ), decorators=[_make_decorator(block)], @@ -185,7 +230,7 @@ def _build_block_fn(block: Mapping[str, Any], actions: Iterable[Mapping[str, Any ) -def _build_model(workflow: Mapping[str, Any]) -> cst.ClassDef: +def _build_model(workflow: dict[str, Any]) -> cst.ClassDef: """ class WorkflowParameters(BaseModel): ein_info: str @@ -216,31 +261,65 @@ def _build_model(workflow: Mapping[str, Any]) -> cst.ClassDef: ) -def _build_cached_params() -> cst.SimpleStatementLine: - src = "cached_parameters = WorkflowParameters(**{k: f'<{k}>' for k in WorkflowParameters.model_fields})" - return cst.parse_statement(src) +def _build_cached_params(values: dict[str, Any]) -> cst.SimpleStatementLine: + """ + Make a CST for: + cached_parameters = WorkflowParameters(ein_info="...", ...) + """ + call = cst.Call( + func=cst.Name("WorkflowParameters"), + args=[cst.Arg(keyword=cst.Name(k), value=_value(v)) for k, v in values.items()], + ) + + assign = cst.Assign( + targets=[cst.AssignTarget(cst.Name("cached_parameters"))], + value=call, + ) + return cst.SimpleStatementLine([assign]) -def _build_run_fn(task_fns: list[str]) -> FunctionDef: - body = [cst.parse_statement("page, context = await skyvern.setup(parameters.model_dump())")] + [ - cst.parse_statement(f"await {_safe_name(t)}(page, context)") for t in task_fns +def _build_run_fn(task_titles: list[str], wf_req: dict[str, Any]) -> FunctionDef: + body = [ + cst.parse_statement("page, context = await skyvern.setup(parameters.model_dump())"), + *[cst.parse_statement(f"await {_safe_name(t)}(page, context)") for t in task_titles], ] + params = cst.Parameters( + params=[ + Param( + name=cst.Name("parameters"), + annotation=cst.Annotation(cst.Name("WorkflowParameters")), + default=cst.Name("cached_parameters"), + ), + Param( + name=cst.Name("title"), + annotation=cst.Annotation(cst.Name("str")), + default=_value(wf_req.get("title", "")), + ), + Param( + name=cst.Name("webhook_url"), + annotation=cst.Annotation(cst.parse_expression("str | None")), + default=_value(wf_req.get("webhook_url")), + ), + Param( + name=cst.Name("totp_url"), + annotation=cst.Annotation(cst.parse_expression("str | None")), + default=_value(wf_req.get("totp_url")), + ), + Param( + name=cst.Name("totp_identifier"), + annotation=cst.Annotation(cst.parse_expression("str | None")), + default=_value(wf_req.get("totp_identifier")), + ), + ] + ) + return FunctionDef( - name=Name("run_workflow"), - decorators=[cst.Decorator(Attribute(value=Name("skyvern"), attr=Name("workflow")))], - params=cst.Parameters( - params=[ - Param( - name=Name("parameters"), - default=Name("cached_parameters"), - annotation=cst.Annotation(Name("WorkflowParameters")), - ) - ] - ), - body=cst.IndentedBlock(body), - returns=None, + name=cst.Name("run_workflow"), asynchronous=cst.Asynchronous(), + decorators=[_workflow_decorator(wf_req)], + params=params, + body=cst.IndentedBlock(body), ) @@ -251,9 +330,11 @@ def _build_run_fn(task_fns: list[str]) -> FunctionDef: def generate_workflow_script( *, - workflow: Mapping[str, Any], - tasks: Iterable[Mapping[str, Any]], - actions_by_task: Mapping[str, Iterable[Mapping[str, Any]]], + file_name: str, + workflow_run_request: dict[str, Any], + workflow: dict[str, Any], + tasks: list[dict[str, Any]], + actions_by_task: dict[str, list[dict[str, Any]]], ) -> str: """ Build a LibCST Module and emit .code (PEP-8-formatted source). @@ -285,31 +366,41 @@ def generate_workflow_script( # --- class + cached params ----------------------------------------- model_cls = _build_model(workflow) - cached_params_stmt = _build_cached_params() + cached_params_stmt = _build_cached_params(workflow_run_request.get("parameters", {})) # --- blocks --------------------------------------------------------- - block_fns: list[FunctionDef] = [] - task_titles = [] - for t in tasks: - fn = _build_block_fn(t, actions_by_task.get(t["task_id"], [])) - block_fns.append(fn) - task_titles.append(t["title"]) + block_fns = [] + length_of_tasks = len(tasks) + for idx, task in enumerate(tasks): + block_fns.append(_build_block_fn(task, actions_by_task.get(task.get("task_id", ""), []))) + if idx < length_of_tasks - 1: + block_fns.append(cst.EmptyLine()) + block_fns.append(cst.EmptyLine()) + + task_titles: list[str] = [ + t.get("title") or t.get("label") or t.get("task_id") or f"unknown_title_{idx}" for idx, t in enumerate(tasks) + ] # --- runner --------------------------------------------------------- - run_fn = _build_run_fn(task_titles) + run_fn = _build_run_fn(task_titles, workflow_run_request) module = cst.Module( body=[ *imports, cst.EmptyLine(), + cst.EmptyLine(), model_cls, cst.EmptyLine(), + cst.EmptyLine(), cached_params_stmt, cst.EmptyLine(), + cst.EmptyLine(), *block_fns, cst.EmptyLine(), - run_fn, cst.EmptyLine(), + run_fn, ] ) + with open(file_name, "w") as f: + f.write(module.code) return module.code diff --git a/skyvern/core/code_generations/run_initializer.py b/skyvern/core/code_generations/run_initializer.py new file mode 100644 index 00000000..bfdc040e --- /dev/null +++ b/skyvern/core/code_generations/run_initializer.py @@ -0,0 +1,23 @@ +from typing import Any + +from playwright.async_api import async_playwright + +from skyvern.core.code_generations.skyvern_page import RunContext, SkyvernPage +from skyvern.forge.sdk.core import skyvern_context +from skyvern.webeye.browser_factory import BrowserContextFactory + + +# TODO: find a better name for this function +async def setup(parameters: dict[str, Any]) -> tuple[SkyvernPage, RunContext]: + # set up skyvern context + skyvern_context.set(skyvern_context.SkyvernContext()) + # start playwright + pw = await async_playwright().start() + ( + browser_context, + _, + _, + ) = await BrowserContextFactory.create_browser_context(playwright=pw) + new_page = await browser_context.new_page() + skyvern_page = SkyvernPage(page=new_page) + return skyvern_page, RunContext(parameters=parameters, page=skyvern_page) diff --git a/skyvern/core/code_generations/skyvern_page.py b/skyvern/core/code_generations/skyvern_page.py new file mode 100644 index 00000000..74ee7abc --- /dev/null +++ b/skyvern/core/code_generations/skyvern_page.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import StrEnum +from typing import Any, Callable + +from playwright.async_api import Page + +from skyvern.config import settings +from skyvern.forge.sdk.api.files import download_file +from skyvern.webeye.actions import handler_utils +from skyvern.webeye.actions.action_types import ActionType + + +class Driver(StrEnum): + PLAYWRIGHT = "playwright" + + +@dataclass +class ActionMetadata: + intention: str = "" + data: dict[str, Any] | str | None = None + timestamp: float | None = None # filled in by recorder + screenshot_path: str | None = None # if enabled + + +@dataclass +class ActionCall: + name: ActionType + args: tuple[Any, ...] + kwargs: dict[str, Any] + meta: ActionMetadata + result: Any | None = None # populated after execution + error: Exception | None = None # populated if failed + + +class SkyvernPage: + """ + A minimal adapter around the chosen driver that: + 1. Executes real browser commands + 2. Records ActionCallobjects into RunContext.trace + 3. Adds retry / fallback hooks + """ + + def __init__( + self, + page: Page, + driver: Driver = Driver.PLAYWRIGHT, + *, + recorder: Callable[[ActionCall], None] | None = None, + ): + self.driver = driver + self.page = page # e.g. Playwright's Page + self._record = recorder or (lambda ac: None) + + @staticmethod + def action_wrap( + action: ActionType, + ) -> Callable: + """ + Decorator to record the action call. + + TODOs: + - generate action record in db pre action + - generate screenshot post action + """ + + def decorator(fn: Callable) -> Callable: + async def wrapper( + skyvern_page: SkyvernPage, + *args: Any, + intention: str = "", + data: str | dict[str, Any] = "", + **kwargs: Any, + ) -> Any: + meta = ActionMetadata(intention, data) + call = ActionCall(action, args, kwargs, meta) + try: + call.result = await fn(skyvern_page, *args, **kwargs) # real driver call + return call.result + except Exception as e: + call.error = e + # LLM fallback hook could go here ... + raise + finally: + skyvern_page._record(call) + + return wrapper + + return decorator + + async def goto(self, url: str) -> None: + await self.page.goto(url) + + ######### Public Interfaces ######### + @action_wrap(ActionType.CLICK) + async def click(self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: + locator = self.page.locator(xpath) + await locator.click(timeout=5000) + + @action_wrap(ActionType.INPUT_TEXT) + async def input_text( + self, + xpath: str, + text: str, + intention: str | None = None, + data: str | dict[str, Any] | None = None, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, + ) -> None: + locator = self.page.locator(xpath) + await handler_utils.input_sequentially(locator, text, timeout=timeout) + + @action_wrap(ActionType.UPLOAD_FILE) + async def upload_file( + self, xpath: str, file_path: str, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: + file = await download_file(file_path) + await self.page.set_input_files(xpath, file) + + @action_wrap(ActionType.SELECT_OPTION) + async def select_option( + self, xpath: str, option: str, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: + locator = self.page.locator(xpath) + await locator.select_option(option, timeout=5000) + + @action_wrap(ActionType.WAIT) + async def wait( + self, seconds: float, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.NULL_ACTION) + async def null_action(self, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: ... + + @action_wrap(ActionType.SOLVE_CAPTCHA) + async def solve_captcha( + self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.TERMINATE) + async def terminate( + self, errors: list[str], intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.COMPLETE) + async def complete( + self, data_extraction_goal: str, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.RELOAD_PAGE) + async def reload_page(self, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: ... + + @action_wrap(ActionType.EXTRACT) + async def extract( + self, data_extraction_goal: str, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.VERIFICATION_CODE) + async def verification_code( + self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.SCROLL) + async def scroll( + self, amount: int, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.KEYPRESS) + async def keypress( + self, key: str, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.TYPE) + async def type(self, text: str, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: ... + + @action_wrap(ActionType.MOVE) + async def move( + self, x: int, y: int, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + @action_wrap(ActionType.DRAG) + async def drag( + self, + start_x: int, + start_y: int, + end_x: int, + end_y: int, + intention: str | None = None, + data: str | dict[str, Any] | None = None, + ) -> None: ... + + @action_wrap(ActionType.LEFT_MOUSE) + async def left_mouse( + self, x: int, y: int, intention: str | None = None, data: str | dict[str, Any] | None = None + ) -> None: ... + + +class RunContext: + """ + Lives for one workflow run. + """ + + def __init__(self, parameters: dict[str, Any], page: SkyvernPage) -> None: + self.parameters = parameters + self.page = page + self.trace: list[ActionCall] = [] diff --git a/skyvern/core/code_generations/transform_workflow_run.py b/skyvern/core/code_generations/transform_workflow_run.py new file mode 100644 index 00000000..b09907fa --- /dev/null +++ b/skyvern/core/code_generations/transform_workflow_run.py @@ -0,0 +1,81 @@ +from dataclasses import dataclass +from typing import Any, Iterable, Mapping + +import structlog + +from skyvern.forge import app +from skyvern.forge.sdk.workflow.models.block import BlockType +from skyvern.services import workflow_service + +LOG = structlog.get_logger(__name__) + + +@dataclass +class CodeGenInput: + file_name: str + workflow_run: Mapping[str, Any] + workflow: Mapping[str, Any] + workflow_blocks: Iterable[Mapping[str, Any]] + actions_by_task: Mapping[str, Iterable[Mapping[str, Any]]] + + +async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organization_id: str) -> CodeGenInput: + # get the workflow run request + workflow_run_resp = await workflow_service.get_workflow_run_response( + workflow_run_id=workflow_run_id, organization_id=organization_id + ) + if not workflow_run_resp: + raise ValueError(f"Workflow run {workflow_run_id} not found") + run_request = workflow_run_resp.run_request + if not run_request: + raise ValueError(f"Workflow run {workflow_run_id} has no run request") + workflow_run_request_json = run_request.model_dump() + + # get the workflow + workflow = await app.WORKFLOW_SERVICE.get_workflow_by_permanent_id( + workflow_permanent_id=run_request.workflow_id, organization_id=organization_id + ) + if not workflow: + raise ValueError(f"Workflow {run_request.workflow_id} not found") + workflow_json = workflow.model_dump() + + # get the tasks + ## first, get all the workflow run blocks + workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks( + workflow_run_id=workflow_run_id, organization_id=organization_id + ) + workflow_run_blocks.sort(key=lambda x: x.created_at) + workflow_block_dump = [] + # Hydrate blocks with task data + # TODO: support task v2 + actions_by_task = {} + for block in workflow_run_blocks: + block_dump = block.model_dump() + if block.block_type == BlockType.TaskV2: + raise ValueError("TaskV2 blocks are not supported yet") + if ( + block.block_type + in [BlockType.TASK, BlockType.ACTION, BlockType.EXTRACTION, BlockType.LOGIN, BlockType.NAVIGATION] + and block.task_id + ): + task = await app.DATABASE.get_task(task_id=block.task_id, organization_id=organization_id) + if not task: + LOG.warning(f"Task {block.task_id} not found") + continue + block_dump.update(task.model_dump()) + actions = await app.DATABASE.get_task_actions(task_id=block.task_id, organization_id=organization_id) + action_dumps = [] + for action in actions: + action_dump = action.model_dump() + action_dump["xpath"] = action.get_xpath() + action_dumps.append(action_dump) + actions_by_task[block.task_id] = action_dumps + workflow_block_dump.append(block_dump) + + return CodeGenInput( + file_name=f"{workflow_run_id}.py", + workflow_run=workflow_run_request_json, + workflow=workflow_json, + workflow_blocks=workflow_block_dump, + actions_by_task=actions_by_task, + ) diff --git a/skyvern/core/code_generations/workflow_wrappers.py b/skyvern/core/code_generations/workflow_wrappers.py new file mode 100644 index 00000000..c366c284 --- /dev/null +++ b/skyvern/core/code_generations/workflow_wrappers.py @@ -0,0 +1,59 @@ +from typing import Any, Callable + + +# Build a dummy workflow decorator +def workflow( + title: str | None = None, + totp_url: str | None = None, + totp_identifier: str | None = None, + webhook_url: str | None = None, + max_steps: int | None = None, +) -> Callable: + def wrapper(func: Callable) -> Callable: + return func + + return wrapper + + +def task_block( + prompt: str | None = None, + title: str | None = None, + url: str | None = None, + engine: str | None = None, + model: dict[str, Any] | None = None, + totp_url: str | None = None, + totp_identifier: str | None = None, + max_steps: int | None = None, + navigation_payload: str | None = None, + webhook_url: str | None = None, +) -> Callable: + def decorator(func: Callable) -> Callable: + return func + + return decorator + + +def file_download_block( + prompt: str | None = None, + title: str | None = None, + url: str | None = None, + max_steps: int | None = None, +) -> Callable: + def decorator(func: Callable) -> Callable: + return func + + return decorator + + +def email_block(prompt: str | None = None, title: str | None = None, url: str | None = None) -> Callable: + def decorator(func: Callable) -> Callable: + return func + + return decorator + + +def wait_block(seconds: int) -> Callable: + def decorator(func: Callable) -> Callable: + return func + + return decorator diff --git a/skyvern/services/run_code_service.py b/skyvern/services/run_code_service.py new file mode 100644 index 00000000..2cf3ec29 --- /dev/null +++ b/skyvern/services/run_code_service.py @@ -0,0 +1,12 @@ +from pathlib import Path + + +async def run_code(path: Path) -> None: + # initialize the SkyvernPage object + + # initialize workflow run context + + # load the python file and do validation + + # run the preloaded workflow + return