Script - credential / secret integration (#3279)
This commit is contained in:
@@ -24,6 +24,7 @@ import libcst as cst
|
||||
import structlog
|
||||
from libcst import Attribute, Call, Dict, DictElement, FunctionDef, Name, Param
|
||||
|
||||
from skyvern.config import settings
|
||||
from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS
|
||||
from skyvern.core.script_generations.generate_workflow_parameters import (
|
||||
generate_workflow_parameters_schema,
|
||||
@@ -102,7 +103,7 @@ def _value(value: Any) -> cst.BaseExpression:
|
||||
return cst.SimpleString(repr(str(value)))
|
||||
|
||||
|
||||
def _prompt_value(prompt_text: str) -> cst.BaseExpression:
|
||||
def _render_value(prompt_text: str) -> cst.BaseExpression:
|
||||
"""Create a prompt value with template rendering logic if needed."""
|
||||
if "{{" in prompt_text and "}}" in prompt_text:
|
||||
# Generate code for: render_template(prompt_text)
|
||||
@@ -443,33 +444,7 @@ def _build_generated_model_from_schema(schema_code: str) -> cst.ClassDef | None:
|
||||
|
||||
def _build_run_task_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
|
||||
"""Build a skyvern.run_task statement."""
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_prompt_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("max_steps"),
|
||||
value=_value(block.get("max_steps_per_run", 30)),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("cache_key"),
|
||||
value=_value(block_title),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
),
|
||||
comma=cst.Comma(),
|
||||
),
|
||||
]
|
||||
|
||||
args = __build_base_task_statement(block_title, block)
|
||||
call = cst.Call(
|
||||
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("run_task")),
|
||||
args=args,
|
||||
@@ -487,7 +462,7 @@ def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.Si
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_prompt_value(block.get("navigation_goal", "")),
|
||||
value=_render_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
@@ -503,7 +478,7 @@ def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.Si
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("download_suffix"),
|
||||
value=_value(block.get("download_suffix", "")),
|
||||
value=_render_value(block.get("download_suffix", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
@@ -536,15 +511,7 @@ def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.Simp
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_prompt_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("max_steps"),
|
||||
value=_value(block.get("max_steps_per_run", 30)),
|
||||
value=_render_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
@@ -574,49 +541,7 @@ def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.Simp
|
||||
|
||||
def _build_login_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
|
||||
"""Build a skyvern.login statement."""
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("title"),
|
||||
value=_value(block_title),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_prompt_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("totp_identifier"),
|
||||
value=_value(block.get("totp_identifier", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("webhook_callback_url"),
|
||||
value=_value(block.get("webhook_callback_url", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("cache_key"),
|
||||
value=_value(block_title),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
),
|
||||
comma=cst.Comma(),
|
||||
),
|
||||
]
|
||||
|
||||
args = __build_base_task_statement(block_title, block)
|
||||
call = cst.Call(
|
||||
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("login")),
|
||||
args=args,
|
||||
@@ -634,7 +559,7 @@ def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.Sim
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_value(block.get("data_extraction_goal", "")),
|
||||
value=_render_value(block.get("data_extraction_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
@@ -667,7 +592,7 @@ def _build_navigate_statement(block_title: str, block: dict[str, Any]) -> cst.Si
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_prompt_value(block.get("navigation_goal", "")),
|
||||
value=_render_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
@@ -683,7 +608,7 @@ def _build_navigate_statement(block_title: str, block: dict[str, Any]) -> cst.Si
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("max_steps"),
|
||||
value=_value(block.get("max_steps_per_run", 30)),
|
||||
value=_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
@@ -773,7 +698,7 @@ def _build_validate_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_prompt_value(block.get("navigation_goal", "")),
|
||||
value=_render_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
),
|
||||
@@ -823,7 +748,7 @@ def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.Si
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_prompt_value(block.get("navigation_goal", "")),
|
||||
value=_render_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
@@ -831,7 +756,7 @@ def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.Si
|
||||
),
|
||||
cst.Arg(
|
||||
keyword=cst.Name("max_steps"),
|
||||
value=_value(block.get("max_steps_per_run", 30)),
|
||||
value=_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
),
|
||||
@@ -876,6 +801,74 @@ def _build_goto_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
|
||||
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
|
||||
|
||||
|
||||
def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list[cst.Arg]:
|
||||
args = [
|
||||
cst.Arg(
|
||||
keyword=cst.Name("prompt"),
|
||||
value=_render_value(block.get("navigation_goal", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
),
|
||||
]
|
||||
if block.get("url"):
|
||||
args.append(
|
||||
cst.Arg(
|
||||
keyword=cst.Name("url"),
|
||||
value=_render_value(block.get("url", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
)
|
||||
)
|
||||
if block.get("max_steps_per_run"):
|
||||
args.append(
|
||||
cst.Arg(
|
||||
keyword=cst.Name("max_steps"),
|
||||
value=_render_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
)
|
||||
)
|
||||
if block.get("totp_identifier"):
|
||||
args.append(
|
||||
cst.Arg(
|
||||
keyword=cst.Name("totp_identifier"),
|
||||
value=_render_value(block.get("totp_identifier", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
)
|
||||
)
|
||||
if block.get("totp_verification_url"):
|
||||
args.append(
|
||||
cst.Arg(
|
||||
keyword=cst.Name("totp_url"),
|
||||
value=_render_value(block.get("totp_verification_url", "")),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
last_line=cst.SimpleWhitespace(INDENT),
|
||||
),
|
||||
)
|
||||
)
|
||||
args.append(
|
||||
cst.Arg(
|
||||
keyword=cst.Name("cache_key"),
|
||||
value=_value(block_title),
|
||||
whitespace_after_arg=cst.ParenthesizedWhitespace(
|
||||
indent=True,
|
||||
),
|
||||
comma=cst.Comma(),
|
||||
)
|
||||
)
|
||||
return args
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# 4. function builders #
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
@@ -4,11 +4,26 @@ from pydantic import BaseModel
|
||||
|
||||
from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager
|
||||
from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.forge.sdk.workflow.models.parameter import WorkflowParameterType
|
||||
|
||||
|
||||
async def setup(
|
||||
parameters: dict[str, Any], generated_parameter_cls: type[BaseModel] | None = None
|
||||
) -> tuple[SkyvernPage, RunContext]:
|
||||
# transform any secrets/credential parameters. For example, if there's only one credential in the parameters: {"cred_12345": "cred_12345"},
|
||||
# it should be transformed to {"cred_12345": {"username": "secret_5fBoa_username", "password": "secret_5fBoa_password"}}
|
||||
# context comes from app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
|
||||
context = skyvern_context.current()
|
||||
if context and context.organization_id and context.workflow_run_id:
|
||||
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(context.workflow_run_id)
|
||||
parameters_in_workflow_context = workflow_run_context.parameters
|
||||
for key in parameters:
|
||||
if key in parameters_in_workflow_context:
|
||||
parameter = parameters_in_workflow_context[key]
|
||||
if parameter.workflow_parameter_type == WorkflowParameterType.CREDENTIAL_ID:
|
||||
parameters[key] = workflow_run_context.values[key]
|
||||
skyvern_page = await SkyvernPage.create()
|
||||
run_context = RunContext(
|
||||
parameters=parameters,
|
||||
|
||||
@@ -329,6 +329,11 @@ class SkyvernPage:
|
||||
If the prompt generation or parsing fails for any reason we fall back to
|
||||
inputting the originally supplied ``text``.
|
||||
"""
|
||||
# format the text with the actual value of the parameter if it's a secret when running a workflow
|
||||
context = skyvern_context.current()
|
||||
if context and context.workflow_run_id:
|
||||
text = await _get_actual_value_of_parameter_if_secret(context.workflow_run_id, text)
|
||||
|
||||
locator = self.page.locator(f"xpath={xpath}")
|
||||
await handler_utils.input_sequentially(locator, text, timeout=timeout)
|
||||
|
||||
@@ -500,3 +505,16 @@ class RunContext:
|
||||
self.page = page
|
||||
self.trace: list[ActionCall] = []
|
||||
self.prompt: str | None = None
|
||||
|
||||
|
||||
async def _get_actual_value_of_parameter_if_secret(workflow_run_id: str, parameter: str) -> Any:
|
||||
"""
|
||||
Get the actual value of a parameter if it's a secret. If it's not a secret, return the parameter value as is.
|
||||
|
||||
Just return the parameter value if the task isn't a workflow's task.
|
||||
|
||||
This is only used for InputTextAction, UploadFileAction, and ClickAction (if it has a file_url).
|
||||
"""
|
||||
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
|
||||
secret_value = workflow_run_context.get_original_secret_value_or_none(parameter)
|
||||
return secret_value if secret_value is not None else parameter
|
||||
|
||||
Reference in New Issue
Block a user