diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 6765e3a5..04bb395a 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -2671,6 +2671,59 @@ async def get_task_v2( return task_v2.model_dump(by_alias=True) +async def _flatten_workflow_run_timeline_recursive( + timeline: WorkflowRunTimeline, + organization_id: str, +) -> list[WorkflowRunTimeline]: + """ + Recursively flatten a timeline item and its children, handling TaskV2 blocks. + + TaskV2 blocks are replaced with their internal workflow run blocks. + Other blocks (like ForLoop) are kept with their children recursively processed. + """ + result = [] + + # Check if this is a TaskV2 block that needs to be flattened + if timeline.block and timeline.block.block_type == BlockType.TaskV2: + if timeline.block.block_workflow_run_id: + # Recursively flatten the TaskV2's internal workflow run + nested_timeline = await _flatten_workflow_run_timeline( + organization_id=organization_id, + workflow_run_id=timeline.block.block_workflow_run_id, + ) + result.extend(nested_timeline) + else: + LOG.warning( + "Block workflow run id is not set for task_v2 block", + workflow_run_block_id=timeline.block.workflow_run_block_id if timeline.block else None, + organization_id=organization_id, + ) + result.append(timeline) + else: + # For non-TaskV2 blocks, process children recursively to handle nested TaskV2 blocks + new_children = [] + if timeline.children: + for child in timeline.children: + child_results = await _flatten_workflow_run_timeline_recursive( + timeline=child, + organization_id=organization_id, + ) + new_children.extend(child_results) + + # Create a new timeline with processed children + processed_timeline = WorkflowRunTimeline( + type=timeline.type, + block=timeline.block, + thought=timeline.thought, + children=new_children, + created_at=timeline.created_at, + modified_at=timeline.modified_at, + ) + result.append(processed_timeline) + + return result + + async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: str) -> list[WorkflowRunTimeline]: """ Get the timeline workflow runs including the nested workflow runs in a flattened list @@ -2686,29 +2739,18 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: workflow_run_id=workflow_run_id, organization_id=organization_id, ) - # loop through the run block timeline, find the task_v2 blocks, flatten the timeline for task_v2 + + # Recursively flatten the timeline, handling TaskV2 blocks at any nesting level final_workflow_run_block_timeline = [] for timeline in workflow_run_block_timeline: if not timeline.block: continue - if timeline.block.block_type != BlockType.TaskV2: - # flatten the timeline for task_v2 - final_workflow_run_block_timeline.append(timeline) - continue - if not timeline.block.block_workflow_run_id: - LOG.warning( - "Block workflow run id is not set for task_v2 block", - workflow_run_id=workflow_run_id, - organization_id=organization_id, - task_v2_id=task_v2_obj.observer_cruise_id if task_v2_obj else None, - ) - continue - # in the future if we want to nested taskv2 shows up as a nested block, we should not flatten the timeline - workflow_blocks = await _flatten_workflow_run_timeline( + + flattened = await _flatten_workflow_run_timeline_recursive( + timeline=timeline, organization_id=organization_id, - workflow_run_id=timeline.block.block_workflow_run_id, ) - final_workflow_run_block_timeline.extend(workflow_blocks) + final_workflow_run_block_timeline.extend(flattened) if task_v2_obj and task_v2_obj.observer_cruise_id: thought_timeline = await task_v2_service.get_thought_timelines( diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index c04d8637..3776ca15 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -113,6 +113,10 @@ TASKV2_TO_BLOCK_STATUS: dict[TaskV2Status, BlockStatus] = { TaskV2Status.timed_out: BlockStatus.timed_out, } +# ForLoop constants +DEFAULT_MAX_LOOP_ITERATIONS = 100 +DEFAULT_MAX_STEPS_PER_ITERATION = 50 + class Block(BaseModel, abc.ABC): # Must be unique within workflow definition @@ -1263,6 +1267,27 @@ class ForLoopBlock(Block): current_block: BlockTypeVar | None = None for loop_idx, loop_over_value in enumerate(loop_over_values): + # Check max_iterations limit + if loop_idx >= DEFAULT_MAX_LOOP_ITERATIONS: + LOG.info( + f"ForLoopBlock: Reached max_iterations limit ({DEFAULT_MAX_LOOP_ITERATIONS}), stopping loop", + workflow_run_id=workflow_run_id, + loop_idx=loop_idx, + max_iterations=DEFAULT_MAX_LOOP_ITERATIONS, + ) + failure_block_result = await self.build_block_result( + success=False, + status=BlockStatus.failed, + failure_reason=f"Reached max_loop_iterations limit of {DEFAULT_MAX_LOOP_ITERATIONS}", + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + block_outputs.append(failure_block_result) + return LoopBlockExecutedResult( + outputs_with_loop_values=outputs_with_loop_values, + block_outputs=block_outputs, + last_block=current_block, + ) LOG.info("Starting loop iteration", loop_idx=loop_idx, loop_over_value=loop_over_value) # context parameter has been deprecated. However, it's still used by task v2 - we should migrate away from it. context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value) @@ -1270,6 +1295,16 @@ class ForLoopBlock(Block): workflow_run_context.set_value(context_parameter.key, context_parameter.value) each_loop_output_values: list[dict[str, Any]] = [] + + # Track steps for current iteration + iteration_step_count = 0 + LOG.info( + f"ForLoopBlock: Starting iteration {loop_idx} with max_steps_per_iteration={DEFAULT_MAX_STEPS_PER_ITERATION}", + workflow_run_id=workflow_run_id, + loop_idx=loop_idx, + max_steps_per_iteration=DEFAULT_MAX_STEPS_PER_ITERATION, + ) + for block_idx, loop_block in enumerate(self.loop_blocks): metadata: BlockMetadata = { "current_index": loop_idx, @@ -1327,6 +1362,37 @@ class ForLoopBlock(Block): ) loop_block = original_loop_block block_outputs.append(block_output) + + # Check max_steps_per_iteration limit after each block execution + iteration_step_count += 1 # Count each block execution as a step + if iteration_step_count >= DEFAULT_MAX_STEPS_PER_ITERATION: + LOG.info( + f"ForLoopBlock: Reached max_steps_per_iteration limit ({DEFAULT_MAX_STEPS_PER_ITERATION}) in iteration {loop_idx}, stopping iteration", + workflow_run_id=workflow_run_id, + loop_idx=loop_idx, + max_steps_per_iteration=DEFAULT_MAX_STEPS_PER_ITERATION, + iteration_step_count=iteration_step_count, + ) + # Create a failure block result for this iteration + failure_block_result = await self.build_block_result( + success=False, + status=BlockStatus.failed, + failure_reason=f"Reached max_steps_per_iteration limit of {DEFAULT_MAX_STEPS_PER_ITERATION}", + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + block_outputs.append(failure_block_result) + # If continue_on_failure is False, stop the entire loop + if not self.continue_on_failure: + outputs_with_loop_values.append(each_loop_output_values) + return LoopBlockExecutedResult( + outputs_with_loop_values=outputs_with_loop_values, + block_outputs=block_outputs, + last_block=current_block, + ) + # If continue_on_failure is True, break out of the block loop for this iteration + break + if block_output.status == BlockStatus.canceled: LOG.info( f"ForLoopBlock: Block with type {loop_block.block_type} at index {block_idx} during loop {loop_idx} was canceled for workflow run {workflow_run_id}, canceling for loop", @@ -3431,8 +3497,17 @@ class TaskV2Block(Block): from skyvern.services import task_v2_service # noqa: PLC0415 workflow_run_context = self.get_workflow_run_context(workflow_run_id) + + # Simple template resolution - no complex dynamic resolution to prevent recursion try: self.format_potential_template_parameters(workflow_run_context) + + # Use the resolved values directly + resolved_prompt = self.prompt + resolved_url = self.url + resolved_totp_identifier = self.totp_identifier + resolved_totp_verification_url = self.totp_verification_url + except Exception as e: output_reason = f"Failed to format jinja template: {str(e)}" await self.record_output_parameter_value( @@ -3447,14 +3522,14 @@ class TaskV2Block(Block): organization_id=organization_id, ) - if not self.url: + if not resolved_url: browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id) if browser_state: page = await browser_state.get_working_page() if page: current_url = await SkyvernFrame.get_url(frame=page) if current_url != "about:blank": - self.url = current_url + resolved_url = current_url if not organization_id: raise ValueError("Running TaskV2Block requires organization_id") @@ -3468,12 +3543,12 @@ class TaskV2Block(Block): try: task_v2 = await task_v2_service.initialize_task_v2( organization=organization, - user_prompt=self.prompt, - user_url=self.url, + user_prompt=resolved_prompt, + user_url=resolved_url, parent_workflow_run_id=workflow_run_id, proxy_location=workflow_run.proxy_location, - totp_identifier=self.totp_identifier, - totp_verification_url=self.totp_verification_url, + totp_identifier=resolved_totp_identifier, + totp_verification_url=resolved_totp_verification_url, max_screenshot_scrolling_times=workflow_run.max_screenshot_scrolls, ) await app.DATABASE.update_task_v2( diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 60485c1b..130b00c4 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -62,7 +62,9 @@ from skyvern.webeye.utils.page import SkyvernFrame LOG = structlog.get_logger() DEFAULT_WORKFLOW_TITLE = "New Workflow" RANDOM_STRING_POOL = string.ascii_letters + string.digits -DEFAULT_MAX_ITERATIONS = 13 +# Maximum number of planning iterations for TaskV2 +# This limits how many times the LLM can plan and execute actions +DEFAULT_MAX_ITERATIONS = 50 MINI_GOAL_TEMPLATE = """Achieve the following mini goal and once it's achieved, complete: ```{mini_goal}``` @@ -574,6 +576,10 @@ async def run_task_v2_helper( url = str(task_v2.url) max_steps = int_max_steps_override or settings.MAX_STEPS_PER_TASK_V2 + + # When TaskV2 is inside a loop, each loop iteration should get fresh attempts + # This is managed at the ForLoop level by calling run_task_v2 for each iteration + # The DEFAULT_MAX_ITERATIONS limit applies to this single TaskV2 execution for i in range(DEFAULT_MAX_ITERATIONS): # validate the task execution await app.AGENT_FUNCTION.validate_task_execution( @@ -986,17 +992,16 @@ async def run_task_v2_helper( ) return workflow, workflow_run, task_v2 else: + # Loop completed without early exit - task exceeded max iterations LOG.info( - "Task v2 failed - run out of iterations", + "Task v2 failed - exceeded maximum iterations", max_iterations=DEFAULT_MAX_ITERATIONS, - max_steps=max_steps, workflow_run_id=workflow_run_id, ) - failure_reason = await _summarize_max_steps_failure_reason(task_v2, organization_id, browser_state) task_v2 = await mark_task_v2_as_failed( task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, - failure_reason=f"Max iterations reached. Possible failure reasons: {failure_reason}", + failure_reason=f"Task exceeded maximum of {DEFAULT_MAX_ITERATIONS} planning iterations. Consider simplifying the task or breaking it into smaller steps.", organization_id=organization_id, ) @@ -1821,19 +1826,22 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None: payload_dict = json.loads(payload_json) payload_dict.update(json.loads(task_run_response_json)) signed_data = generate_skyvern_webhook_signature(payload=payload_dict, api_key=api_key.token) + payload = signed_data.signed_payload + headers = signed_data.headers LOG.info( "Sending task v2 response to webhook callback url", task_v2_id=task_v2.observer_cruise_id, webhook_callback_url=task_v2.webhook_callback_url, - payload=signed_data.signed_payload, - headers=signed_data.headers, - ) - resp = await httpx.AsyncClient().post( - task_v2.webhook_callback_url, - data=signed_data.signed_payload, - headers=signed_data.headers, - timeout=httpx.Timeout(30.0), + payload_length=len(payload), + header_keys=sorted(headers.keys()), ) + timeout = httpx.Timeout(30.0) + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.post( + task_v2.webhook_callback_url, + data=payload, + headers=headers, + ) if resp.status_code >= 200 and resp.status_code < 300: LOG.info( "Task v2 webhook sent successfully",