diff --git a/alembic/versions/2025_08_23_0151-1bba8a38ddc7_remove_unique_constraints_for_workflow_.py b/alembic/versions/2025_08_23_0151-1bba8a38ddc7_remove_unique_constraints_for_workflow_.py new file mode 100644 index 00000000..3ad59b20 --- /dev/null +++ b/alembic/versions/2025_08_23_0151-1bba8a38ddc7_remove_unique_constraints_for_workflow_.py @@ -0,0 +1,34 @@ +"""remove unique constraints for workflow and script cache_key_value + +Revision ID: 1bba8a38ddc7 +Revises: f148f36edc09 +Create Date: 2025-08-23 01:51:56.114060+00:00 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "1bba8a38ddc7" +down_revision: Union[str, None] = "f148f36edc09" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint(op.f("uc_workflow_permanent_id_cache_key_value"), "workflow_scripts", type_="unique") + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_unique_constraint( + op.f("uc_workflow_permanent_id_cache_key_value"), + "workflow_scripts", + ["workflow_permanent_id", "cache_key_value"], + postgresql_nulls_not_distinct=False, + ) + # ### end Alembic commands ### diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index 171043f0..d3d8e898 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -24,6 +24,7 @@ import libcst as cst import structlog from libcst import Attribute, Call, Dict, DictElement, FunctionDef, Name, Param +from skyvern.config import settings from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS from skyvern.core.script_generations.generate_workflow_parameters import ( generate_workflow_parameters_schema, @@ -102,7 +103,7 @@ def _value(value: Any) -> cst.BaseExpression: return cst.SimpleString(repr(str(value))) -def _prompt_value(prompt_text: str) -> cst.BaseExpression: +def _render_value(prompt_text: str) -> cst.BaseExpression: """Create a prompt value with template rendering logic if needed.""" if "{{" in prompt_text and "}}" in prompt_text: # Generate code for: render_template(prompt_text) @@ -443,33 +444,7 @@ def _build_generated_model_from_schema(schema_code: str) -> cst.ClassDef | None: def _build_run_task_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: """Build a skyvern.run_task statement.""" - args = [ - cst.Arg( - keyword=cst.Name("prompt"), - value=_prompt_value(block.get("navigation_goal", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("max_steps"), - value=_value(block.get("max_steps_per_run", 30)), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("cache_key"), - value=_value(block_title), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - ), - comma=cst.Comma(), - ), - ] - + args = __build_base_task_statement(block_title, block) call = cst.Call( func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("run_task")), args=args, @@ -487,7 +462,7 @@ def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.Si args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_prompt_value(block.get("navigation_goal", "")), + value=_render_value(block.get("navigation_goal", "")), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -503,7 +478,7 @@ def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.Si ), cst.Arg( keyword=cst.Name("download_suffix"), - value=_value(block.get("download_suffix", "")), + value=_render_value(block.get("download_suffix", "")), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -536,15 +511,7 @@ def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.Simp args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_prompt_value(block.get("navigation_goal", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("max_steps"), - value=_value(block.get("max_steps_per_run", 30)), + value=_render_value(block.get("navigation_goal", "")), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -574,49 +541,7 @@ def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.Simp def _build_login_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: """Build a skyvern.login statement.""" - args = [ - cst.Arg( - keyword=cst.Name("title"), - value=_value(block_title), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("prompt"), - value=_prompt_value(block.get("navigation_goal", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("totp_identifier"), - value=_value(block.get("totp_identifier", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("webhook_callback_url"), - value=_value(block.get("webhook_callback_url", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("cache_key"), - value=_value(block_title), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - ), - comma=cst.Comma(), - ), - ] - + args = __build_base_task_statement(block_title, block) call = cst.Call( func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("login")), args=args, @@ -634,7 +559,7 @@ def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.Sim args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_value(block.get("data_extraction_goal", "")), + value=_render_value(block.get("data_extraction_goal", "")), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -667,7 +592,7 @@ def _build_navigate_statement(block_title: str, block: dict[str, Any]) -> cst.Si args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_prompt_value(block.get("navigation_goal", "")), + value=_render_value(block.get("navigation_goal", "")), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -683,7 +608,7 @@ def _build_navigate_statement(block_title: str, block: dict[str, Any]) -> cst.Si ), cst.Arg( keyword=cst.Name("max_steps"), - value=_value(block.get("max_steps_per_run", 30)), + value=_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -773,7 +698,7 @@ def _build_validate_statement(block: dict[str, Any]) -> cst.SimpleStatementLine: args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_prompt_value(block.get("navigation_goal", "")), + value=_render_value(block.get("navigation_goal", "")), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, ), @@ -823,7 +748,7 @@ def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.Si args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_prompt_value(block.get("navigation_goal", "")), + value=_render_value(block.get("navigation_goal", "")), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -831,7 +756,7 @@ def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.Si ), cst.Arg( keyword=cst.Name("max_steps"), - value=_value(block.get("max_steps_per_run", 30)), + value=_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, ), @@ -876,6 +801,74 @@ def _build_goto_statement(block: dict[str, Any]) -> cst.SimpleStatementLine: return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) +def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list[cst.Arg]: + args = [ + cst.Arg( + keyword=cst.Name("prompt"), + value=_render_value(block.get("navigation_goal", "")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ), + ] + if block.get("url"): + args.append( + cst.Arg( + keyword=cst.Name("url"), + value=_render_value(block.get("url", "")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) + if block.get("max_steps_per_run"): + args.append( + cst.Arg( + keyword=cst.Name("max_steps"), + value=_render_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) + if block.get("totp_identifier"): + args.append( + cst.Arg( + keyword=cst.Name("totp_identifier"), + value=_render_value(block.get("totp_identifier", "")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) + if block.get("totp_verification_url"): + args.append( + cst.Arg( + keyword=cst.Name("totp_url"), + value=_render_value(block.get("totp_verification_url", "")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) + args.append( + cst.Arg( + keyword=cst.Name("cache_key"), + value=_value(block_title), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + ), + comma=cst.Comma(), + ) + ) + return args + + # --------------------------------------------------------------------- # # 4. function builders # # --------------------------------------------------------------------- # diff --git a/skyvern/core/script_generations/run_initializer.py b/skyvern/core/script_generations/run_initializer.py index 80b44e9c..724d6a2f 100644 --- a/skyvern/core/script_generations/run_initializer.py +++ b/skyvern/core/script_generations/run_initializer.py @@ -4,11 +4,26 @@ from pydantic import BaseModel from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage +from skyvern.forge import app +from skyvern.forge.sdk.core import skyvern_context +from skyvern.forge.sdk.workflow.models.parameter import WorkflowParameterType async def setup( parameters: dict[str, Any], generated_parameter_cls: type[BaseModel] | None = None ) -> tuple[SkyvernPage, RunContext]: + # transform any secrets/credential parameters. For example, if there's only one credential in the parameters: {"cred_12345": "cred_12345"}, + # it should be transformed to {"cred_12345": {"username": "secret_5fBoa_username", "password": "secret_5fBoa_password"}} + # context comes from app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id) + context = skyvern_context.current() + if context and context.organization_id and context.workflow_run_id: + workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(context.workflow_run_id) + parameters_in_workflow_context = workflow_run_context.parameters + for key in parameters: + if key in parameters_in_workflow_context: + parameter = parameters_in_workflow_context[key] + if parameter.workflow_parameter_type == WorkflowParameterType.CREDENTIAL_ID: + parameters[key] = workflow_run_context.values[key] skyvern_page = await SkyvernPage.create() run_context = RunContext( parameters=parameters, diff --git a/skyvern/core/script_generations/skyvern_page.py b/skyvern/core/script_generations/skyvern_page.py index 6488732c..023dbf81 100644 --- a/skyvern/core/script_generations/skyvern_page.py +++ b/skyvern/core/script_generations/skyvern_page.py @@ -329,6 +329,11 @@ class SkyvernPage: If the prompt generation or parsing fails for any reason we fall back to inputting the originally supplied ``text``. """ + # format the text with the actual value of the parameter if it's a secret when running a workflow + context = skyvern_context.current() + if context and context.workflow_run_id: + text = await _get_actual_value_of_parameter_if_secret(context.workflow_run_id, text) + locator = self.page.locator(f"xpath={xpath}") await handler_utils.input_sequentially(locator, text, timeout=timeout) @@ -500,3 +505,16 @@ class RunContext: self.page = page self.trace: list[ActionCall] = [] self.prompt: str | None = None + + +async def _get_actual_value_of_parameter_if_secret(workflow_run_id: str, parameter: str) -> Any: + """ + Get the actual value of a parameter if it's a secret. If it's not a secret, return the parameter value as is. + + Just return the parameter value if the task isn't a workflow's task. + + This is only used for InputTextAction, UploadFileAction, and ClickAction (if it has a file_url). + """ + workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id) + secret_value = workflow_run_context.get_original_secret_value_or_none(parameter) + return secret_value if secret_value is not None else parameter diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index ec1fe4e2..7cb1a6f4 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -3930,7 +3930,7 @@ class AgentDB: .where(WorkflowScriptModel.deleted_at.is_(None)) ) - if cache_key: + if cache_key is not None: ws_script_ids_subquery = ws_script_ids_subquery.where(WorkflowScriptModel.cache_key == cache_key) # Latest version per script_id within the org and not deleted diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 33ea68d3..e2885217 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -840,11 +840,6 @@ class ScriptFileModel(Base): class WorkflowScriptModel(Base): __tablename__ = "workflow_scripts" __table_args__ = ( - UniqueConstraint( - "workflow_permanent_id", - "cache_key_value", - name="uc_workflow_permanent_id_cache_key_value", - ), Index("idx_workflow_scripts_org_created", "organization_id", "created_at"), Index("idx_workflow_scripts_workflow_permanent_id", "workflow_permanent_id"), ) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index f9337ccc..470e0f5d 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -617,6 +617,10 @@ class WorkflowService: WorkflowRunStatus.timed_out, ): workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) + # generate script for workflow if the workflow.generate_script is True AND there's no script cached for the workflow + # only generate script if the workflow run is completed + if workflow.generate_script: + await self.generate_script_for_workflow(workflow=workflow, workflow_run=workflow_run) else: LOG.info( "Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed", @@ -642,16 +646,6 @@ class WorkflowService: organization_id=organization_id, ) - # generate script for workflow if the workflow.generate_script is True AND there's no script cached for the workflow - if workflow.generate_script: - LOG.info( - "Generating script for workflow", - workflow_run_id=workflow_run_id, - workflow_id=workflow.workflow_id, - workflow_name=workflow.title, - ) - await self.generate_script_for_workflow(workflow=workflow, workflow_run=workflow_run) - return workflow_run async def create_workflow( @@ -2349,6 +2343,7 @@ class WorkflowService: """ Execute the related workflow script instead of running the workflow blocks. """ + LOG.info("Start to execute workflow script", workflow_run_id=workflow_run.workflow_run_id) try: # Render the cache_key_value to find the right script @@ -2400,7 +2395,7 @@ class WorkflowService: workflow_run: WorkflowRun, ) -> None: cache_key = workflow.cache_key - rendered_cache_key_value = "default" + rendered_cache_key_value = "" # 1) Build cache_key_value from workflow run parameters via jinja if cache_key: parameter_tuples = await app.DATABASE.get_workflow_run_parameters( diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 63ebb99f..da206eaf 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -250,11 +250,11 @@ async def _create_workflow_block_run_and_task( Create a workflow block run and optionally a task if workflow_run_id is available in context. Returns (workflow_run_block_id, task_id) tuple. """ - context = skyvern_context.ensure_context() + context = skyvern_context.current() + if not context or not context.workflow_run_id or not context.organization_id: + return None, None workflow_run_id = context.workflow_run_id organization_id = context.organization_id - if not context or not workflow_run_id or not organization_id: - return None, None try: # Create workflow run block with appropriate parameters based on block type