diff --git a/alembic/versions/2025_10_02_2256-cce87185dbb5_add_code_gen_column_to_workflow_runs.py b/alembic/versions/2025_10_02_2256-cce87185dbb5_add_code_gen_column_to_workflow_runs.py new file mode 100644 index 00000000..59532cb0 --- /dev/null +++ b/alembic/versions/2025_10_02_2256-cce87185dbb5_add_code_gen_column_to_workflow_runs.py @@ -0,0 +1,31 @@ +"""add code_gen column to workflow_runs + +Revision ID: cce87185dbb5 +Revises: d36daac4941e +Create Date: 2025-10-02 22:56:11.649948+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "cce87185dbb5" +down_revision: Union[str, None] = "d36daac4941e" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("workflow_runs", sa.Column("code_gen", sa.Boolean(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("workflow_runs", "code_gen") + # ### end Alembic commands ### diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py index d4c2c88f..9a6f62c4 100644 --- a/skyvern/forge/agent_functions.py +++ b/skyvern/forge/agent_functions.py @@ -626,7 +626,7 @@ class AgentFunction: not context or not context.root_workflow_run_id or not context.organization_id - or context.published_workflow_script_id + or not context.generate_script ): return root_workflow_run_id = context.root_workflow_run_id diff --git a/skyvern/forge/sdk/core/skyvern_context.py b/skyvern/forge/sdk/core/skyvern_context.py index 7cf3ed3c..754c1f49 100644 --- a/skyvern/forge/sdk/core/skyvern_context.py +++ b/skyvern/forge/sdk/core/skyvern_context.py @@ -48,7 +48,7 @@ class SkyvernContext: Example output value: {"loop_value": "str", "output_parameter": "the key of the parameter", "output_value": Any} """ - published_workflow_script_id: str | None = None + generate_script: bool = True def __repr__(self) -> str: return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, step_id={self.step_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override}, run_id={self.run_id})" diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 2c0da470..1e78416c 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1700,6 +1700,7 @@ class AgentDB: run_with: str | None = None, debug_session_id: str | None = None, ai_fallback: bool | None = None, + code_gen: bool | None = None, ) -> WorkflowRun: try: async with self.Session() as session: @@ -1721,6 +1722,7 @@ class AgentDB: run_with=run_with, debug_session_id=debug_session_id, ai_fallback=ai_fallback, + code_gen=code_gen, ) session.add(workflow_run) await session.commit() diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index dd10fc91..3ea31ce0 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -292,6 +292,7 @@ class WorkflowRunModel(Base): run_with = Column(String, nullable=True) # 'agent' or 'code' debug_session_id: Column = Column(String, nullable=True) ai_fallback = Column(Boolean, nullable=True) + code_gen = Column(Boolean, nullable=True) queued_at = Column(DateTime, nullable=True) started_at = Column(DateTime, nullable=True) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 0e78a171..d4d31c97 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -328,6 +328,7 @@ def convert_to_workflow_run( if workflow_run_model.script_run else None, run_with=workflow_run_model.run_with, + code_gen=workflow_run_model.code_gen, ) diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index d3a92a1d..4c66aab1 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -48,7 +48,6 @@ class AsyncExecutor(abc.ABC): browser_session_id: str | None, block_labels: list[str] | None, block_outputs: dict[str, Any] | None, - code_gen: bool | None = None, **kwargs: dict, ) -> None: pass @@ -154,7 +153,6 @@ class BackgroundTaskExecutor(AsyncExecutor): browser_session_id: str | None, block_labels: list[str] | None, block_outputs: dict[str, Any] | None, - code_gen: bool | None = None, **kwargs: dict, ) -> None: if background_tasks: @@ -175,7 +173,6 @@ class BackgroundTaskExecutor(AsyncExecutor): browser_session_id=browser_session_id, block_labels=block_labels, block_outputs=block_outputs, - code_gen=code_gen, ) else: LOG.warning("Background tasks not enabled, skipping workflow execution") diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 411fef0d..5a2b1137 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1009,7 +1009,6 @@ async def run_block( user_id=user_id, browser_session_id=browser_session_id, block_outputs=block_run_request.block_outputs, - code_gen=block_run_request.code_gen, ) return BlockRunResponse( diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index d5927973..8beee786 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -238,7 +238,6 @@ class Block(BaseModel, abc.ABC): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: pass @@ -296,7 +295,6 @@ class Block(BaseModel, abc.ABC): parent_workflow_run_block_id: str | None = None, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: workflow_run_block_id = None @@ -346,7 +344,6 @@ class Block(BaseModel, abc.ABC): workflow_run_block_id, organization_id=organization_id, browser_session_id=browser_session_id, - code_gen=code_gen, **kwargs, ) except Exception as e: @@ -503,7 +500,6 @@ class BaseTaskBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: workflow_run_context = self.get_workflow_run_context(workflow_run_id) @@ -1311,7 +1307,6 @@ class ForLoopBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: workflow_run_context = self.get_workflow_run_context(workflow_run_id) @@ -1489,7 +1484,6 @@ async def wrapper(): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: await app.AGENT_FUNCTION.validate_code_block(organization_id=organization_id) @@ -1656,7 +1650,6 @@ class TextPromptBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: # Validate block execution @@ -1741,7 +1734,6 @@ class DownloadToS3Block(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: # get workflow run context @@ -1830,7 +1822,6 @@ class UploadToS3Block(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: # get workflow run context @@ -2008,7 +1999,6 @@ class FileUploadBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: # get workflow run context @@ -2426,7 +2416,6 @@ class SendEmailBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: workflow_run_context = self.get_workflow_run_context(workflow_run_id) @@ -2662,7 +2651,6 @@ class FileParserBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: workflow_run_context = self.get_workflow_run_context(workflow_run_id) @@ -2802,7 +2790,6 @@ class PDFParserBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: workflow_run_context = self.get_workflow_run_context(workflow_run_id) @@ -2903,7 +2890,6 @@ class WaitBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: # TODO: we need to support to interrupt the sleep when the workflow run failed/cancelled/terminated @@ -2948,7 +2934,6 @@ class ValidationBlock(BaseTaskBlock): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: task_order, _ = await self.get_task_order(workflow_run_id, 0) @@ -3051,7 +3036,6 @@ class TaskV2Block(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus # noqa: PLC0415 @@ -3123,7 +3107,6 @@ class TaskV2Block(Block): request_id=None, max_steps_override=self.max_steps, browser_session_id=browser_session_id, - code_gen=code_gen, ) finally: context: skyvern_context.SkyvernContext | None = skyvern_context.current() @@ -3241,7 +3224,6 @@ class HttpRequestBlock(Block): workflow_run_block_id: str, organization_id: str | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, **kwargs: dict, ) -> BlockResult: """Execute the HTTP request and return the response""" diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index 6759d819..f773b101 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -145,6 +145,7 @@ class WorkflowRun(BaseModel): job_id: str | None = None sequential_key: str | None = None ai_fallback: bool | None = None + code_gen: bool | None = None queued_at: datetime | None = None started_at: datetime | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index c9180976..ec29cde4 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -197,6 +197,7 @@ class WorkflowService: max_steps_override: int | None = None, parent_workflow_run_id: str | None = None, debug_session_id: str | None = None, + code_gen: bool | None = None, ) -> WorkflowRun: """ Create a workflow run and its parameters. Validate the workflow and the organization. If there are missing @@ -231,6 +232,7 @@ class WorkflowService: parent_workflow_run_id=parent_workflow_run_id, sequential_key=workflow.sequential_key, debug_session_id=debug_session_id, + code_gen=code_gen, ) LOG.info( f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}", @@ -243,6 +245,7 @@ class WorkflowService: max_screenshot_scrolling_times=workflow_request.max_screenshot_scrolls, ai_fallback=workflow_request.ai_fallback, run_with=workflow_request.run_with, + code_gen=code_gen, ) context: skyvern_context.SkyvernContext | None = skyvern_context.current() current_run_id = context.run_id if context and context.run_id else workflow_run.workflow_run_id @@ -322,7 +325,6 @@ class WorkflowService: block_labels: list[str] | None = None, block_outputs: dict[str, Any] | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, ) -> WorkflowRun: """Execute a workflow.""" organization_id = organization.organization_id @@ -407,8 +409,11 @@ class WorkflowService: # Check if there's a related workflow script that should be used instead workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels) current_context = skyvern_context.current() - if workflow_script and current_context: - current_context.published_workflow_script_id = workflow_script.script_id + if current_context: + if workflow_script: + current_context.generate_script = False + if workflow_run.code_gen: + current_context.generate_script = True is_script_run = self.should_run_script(workflow, workflow_run) if workflow_script and is_script_run: LOG.info( @@ -452,7 +457,6 @@ class WorkflowService: workflow=workflow, workflow_run=workflow_run, block_labels=block_labels, - code_gen=code_gen, ) else: LOG.info( @@ -1053,6 +1057,7 @@ class WorkflowService: parent_workflow_run_id: str | None = None, sequential_key: str | None = None, debug_session_id: str | None = None, + code_gen: bool | None = None, ) -> WorkflowRun: # validate the browser session id if workflow_request.browser_session_id: @@ -1080,6 +1085,7 @@ class WorkflowService: run_with=workflow_request.run_with, debug_session_id=debug_session_id, ai_fallback=workflow_request.ai_fallback, + code_gen=code_gen, ) async def _update_workflow_run_status( @@ -2684,8 +2690,8 @@ class WorkflowService: workflow: Workflow, workflow_run: WorkflowRun, block_labels: list[str] | None = None, - code_gen: bool | None = None, ) -> None: + code_gen = workflow_run.code_gen LOG.info( "Generate script?", block_labels=block_labels, diff --git a/skyvern/services/block_service.py b/skyvern/services/block_service.py index 5e6171b5..6ee78f9d 100644 --- a/skyvern/services/block_service.py +++ b/skyvern/services/block_service.py @@ -45,6 +45,7 @@ async def ensure_workflow_run( max_steps=x_max_steps_override, request_id=context.request_id, debug_session_id=block_run_request.debug_session_id, + code_gen=block_run_request.code_gen, ) return workflow_run @@ -62,7 +63,6 @@ async def execute_blocks( user_id: str, browser_session_id: str | None = None, block_outputs: dict[str, t.Any] | None = None, - code_gen: bool | None = None, ) -> None: """ Runs one or more blocks of a workflow. @@ -115,5 +115,4 @@ async def execute_blocks( api_key=api_key, block_labels=block_labels, block_outputs=block_outputs, - code_gen=code_gen, ) diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 6f260263..6c6fe03b 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -306,7 +306,6 @@ async def run_task_v2( request_id: str | None = None, max_steps_override: str | int | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, ) -> TaskV2: organization_id = organization.organization_id try: @@ -335,7 +334,6 @@ async def run_task_v2( request_id=request_id, max_steps_override=max_steps_override, browser_session_id=browser_session_id, - code_gen=code_gen, ) except TaskTerminationError as e: task_v2 = await mark_task_v2_as_terminated( @@ -392,7 +390,6 @@ async def run_task_v2_helper( request_id: str | None = None, max_steps_override: str | int | None = None, browser_session_id: str | None = None, - code_gen: bool | None = None, ) -> tuple[Workflow, WorkflowRun, TaskV2] | tuple[None, None, TaskV2]: organization_id = organization.organization_id task_v2_id = task_v2.observer_cruise_id @@ -800,7 +797,6 @@ async def run_task_v2_helper( block_result = await block.execute_safe( workflow_run_id=workflow_run_id, organization_id=organization_id, - code_gen=code_gen, ) task_history_record["status"] = str(block_result.status) if block_result.failure_reason: @@ -904,11 +900,10 @@ async def run_task_v2_helper( context=context, screenshots=completion_screenshots, ) - if task_v2.run_with == "code" or code_gen: # TODO(jdo): not sure about this one... + if task_v2.run_with == "code": await app.WORKFLOW_SERVICE.generate_script_if_needed( workflow=workflow, workflow_run=workflow_run, - code_gen=code_gen, ) break diff --git a/skyvern/services/workflow_service.py b/skyvern/services/workflow_service.py index 09cd904d..787ba931 100644 --- a/skyvern/services/workflow_service.py +++ b/skyvern/services/workflow_service.py @@ -23,6 +23,7 @@ async def prepare_workflow( max_steps: int | None = None, request_id: str | None = None, debug_session_id: str | None = None, + code_gen: bool | None = None, ) -> WorkflowRun: """ Prepare a workflow to be run. @@ -40,6 +41,7 @@ async def prepare_workflow( max_steps_override=max_steps, is_template_workflow=template, debug_session_id=debug_session_id, + code_gen=code_gen, ) workflow = await app.WORKFLOW_SERVICE.get_workflow_by_permanent_id(