Flatten timeline so forloops and taskv2 blocks play nice (#3946)

This commit is contained in:
pedrohsdb
2025-11-07 17:32:10 -08:00
committed by GitHub
parent d1d0c9414b
commit ea7361c9f2
3 changed files with 161 additions and 36 deletions

View File

@@ -2671,6 +2671,59 @@ async def get_task_v2(
return task_v2.model_dump(by_alias=True)
async def _flatten_workflow_run_timeline_recursive(
timeline: WorkflowRunTimeline,
organization_id: str,
) -> list[WorkflowRunTimeline]:
"""
Recursively flatten a timeline item and its children, handling TaskV2 blocks.
TaskV2 blocks are replaced with their internal workflow run blocks.
Other blocks (like ForLoop) are kept with their children recursively processed.
"""
result = []
# Check if this is a TaskV2 block that needs to be flattened
if timeline.block and timeline.block.block_type == BlockType.TaskV2:
if timeline.block.block_workflow_run_id:
# Recursively flatten the TaskV2's internal workflow run
nested_timeline = await _flatten_workflow_run_timeline(
organization_id=organization_id,
workflow_run_id=timeline.block.block_workflow_run_id,
)
result.extend(nested_timeline)
else:
LOG.warning(
"Block workflow run id is not set for task_v2 block",
workflow_run_block_id=timeline.block.workflow_run_block_id if timeline.block else None,
organization_id=organization_id,
)
result.append(timeline)
else:
# For non-TaskV2 blocks, process children recursively to handle nested TaskV2 blocks
new_children = []
if timeline.children:
for child in timeline.children:
child_results = await _flatten_workflow_run_timeline_recursive(
timeline=child,
organization_id=organization_id,
)
new_children.extend(child_results)
# Create a new timeline with processed children
processed_timeline = WorkflowRunTimeline(
type=timeline.type,
block=timeline.block,
thought=timeline.thought,
children=new_children,
created_at=timeline.created_at,
modified_at=timeline.modified_at,
)
result.append(processed_timeline)
return result
async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: str) -> list[WorkflowRunTimeline]:
"""
Get the timeline workflow runs including the nested workflow runs in a flattened list
@@ -2686,29 +2739,18 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id:
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
# loop through the run block timeline, find the task_v2 blocks, flatten the timeline for task_v2
# Recursively flatten the timeline, handling TaskV2 blocks at any nesting level
final_workflow_run_block_timeline = []
for timeline in workflow_run_block_timeline:
if not timeline.block:
continue
if timeline.block.block_type != BlockType.TaskV2:
# flatten the timeline for task_v2
final_workflow_run_block_timeline.append(timeline)
continue
if not timeline.block.block_workflow_run_id:
LOG.warning(
"Block workflow run id is not set for task_v2 block",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
task_v2_id=task_v2_obj.observer_cruise_id if task_v2_obj else None,
)
continue
# in the future if we want to nested taskv2 shows up as a nested block, we should not flatten the timeline
workflow_blocks = await _flatten_workflow_run_timeline(
flattened = await _flatten_workflow_run_timeline_recursive(
timeline=timeline,
organization_id=organization_id,
workflow_run_id=timeline.block.block_workflow_run_id,
)
final_workflow_run_block_timeline.extend(workflow_blocks)
final_workflow_run_block_timeline.extend(flattened)
if task_v2_obj and task_v2_obj.observer_cruise_id:
thought_timeline = await task_v2_service.get_thought_timelines(

View File

@@ -113,6 +113,10 @@ TASKV2_TO_BLOCK_STATUS: dict[TaskV2Status, BlockStatus] = {
TaskV2Status.timed_out: BlockStatus.timed_out,
}
# ForLoop constants
DEFAULT_MAX_LOOP_ITERATIONS = 100
DEFAULT_MAX_STEPS_PER_ITERATION = 50
class Block(BaseModel, abc.ABC):
# Must be unique within workflow definition
@@ -1263,6 +1267,27 @@ class ForLoopBlock(Block):
current_block: BlockTypeVar | None = None
for loop_idx, loop_over_value in enumerate(loop_over_values):
# Check max_iterations limit
if loop_idx >= DEFAULT_MAX_LOOP_ITERATIONS:
LOG.info(
f"ForLoopBlock: Reached max_iterations limit ({DEFAULT_MAX_LOOP_ITERATIONS}), stopping loop",
workflow_run_id=workflow_run_id,
loop_idx=loop_idx,
max_iterations=DEFAULT_MAX_LOOP_ITERATIONS,
)
failure_block_result = await self.build_block_result(
success=False,
status=BlockStatus.failed,
failure_reason=f"Reached max_loop_iterations limit of {DEFAULT_MAX_LOOP_ITERATIONS}",
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
block_outputs.append(failure_block_result)
return LoopBlockExecutedResult(
outputs_with_loop_values=outputs_with_loop_values,
block_outputs=block_outputs,
last_block=current_block,
)
LOG.info("Starting loop iteration", loop_idx=loop_idx, loop_over_value=loop_over_value)
# context parameter has been deprecated. However, it's still used by task v2 - we should migrate away from it.
context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value)
@@ -1270,6 +1295,16 @@ class ForLoopBlock(Block):
workflow_run_context.set_value(context_parameter.key, context_parameter.value)
each_loop_output_values: list[dict[str, Any]] = []
# Track steps for current iteration
iteration_step_count = 0
LOG.info(
f"ForLoopBlock: Starting iteration {loop_idx} with max_steps_per_iteration={DEFAULT_MAX_STEPS_PER_ITERATION}",
workflow_run_id=workflow_run_id,
loop_idx=loop_idx,
max_steps_per_iteration=DEFAULT_MAX_STEPS_PER_ITERATION,
)
for block_idx, loop_block in enumerate(self.loop_blocks):
metadata: BlockMetadata = {
"current_index": loop_idx,
@@ -1327,6 +1362,37 @@ class ForLoopBlock(Block):
)
loop_block = original_loop_block
block_outputs.append(block_output)
# Check max_steps_per_iteration limit after each block execution
iteration_step_count += 1 # Count each block execution as a step
if iteration_step_count >= DEFAULT_MAX_STEPS_PER_ITERATION:
LOG.info(
f"ForLoopBlock: Reached max_steps_per_iteration limit ({DEFAULT_MAX_STEPS_PER_ITERATION}) in iteration {loop_idx}, stopping iteration",
workflow_run_id=workflow_run_id,
loop_idx=loop_idx,
max_steps_per_iteration=DEFAULT_MAX_STEPS_PER_ITERATION,
iteration_step_count=iteration_step_count,
)
# Create a failure block result for this iteration
failure_block_result = await self.build_block_result(
success=False,
status=BlockStatus.failed,
failure_reason=f"Reached max_steps_per_iteration limit of {DEFAULT_MAX_STEPS_PER_ITERATION}",
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
block_outputs.append(failure_block_result)
# If continue_on_failure is False, stop the entire loop
if not self.continue_on_failure:
outputs_with_loop_values.append(each_loop_output_values)
return LoopBlockExecutedResult(
outputs_with_loop_values=outputs_with_loop_values,
block_outputs=block_outputs,
last_block=current_block,
)
# If continue_on_failure is True, break out of the block loop for this iteration
break
if block_output.status == BlockStatus.canceled:
LOG.info(
f"ForLoopBlock: Block with type {loop_block.block_type} at index {block_idx} during loop {loop_idx} was canceled for workflow run {workflow_run_id}, canceling for loop",
@@ -3431,8 +3497,17 @@ class TaskV2Block(Block):
from skyvern.services import task_v2_service # noqa: PLC0415
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
# Simple template resolution - no complex dynamic resolution to prevent recursion
try:
self.format_potential_template_parameters(workflow_run_context)
# Use the resolved values directly
resolved_prompt = self.prompt
resolved_url = self.url
resolved_totp_identifier = self.totp_identifier
resolved_totp_verification_url = self.totp_verification_url
except Exception as e:
output_reason = f"Failed to format jinja template: {str(e)}"
await self.record_output_parameter_value(
@@ -3447,14 +3522,14 @@ class TaskV2Block(Block):
organization_id=organization_id,
)
if not self.url:
if not resolved_url:
browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id)
if browser_state:
page = await browser_state.get_working_page()
if page:
current_url = await SkyvernFrame.get_url(frame=page)
if current_url != "about:blank":
self.url = current_url
resolved_url = current_url
if not organization_id:
raise ValueError("Running TaskV2Block requires organization_id")
@@ -3468,12 +3543,12 @@ class TaskV2Block(Block):
try:
task_v2 = await task_v2_service.initialize_task_v2(
organization=organization,
user_prompt=self.prompt,
user_url=self.url,
user_prompt=resolved_prompt,
user_url=resolved_url,
parent_workflow_run_id=workflow_run_id,
proxy_location=workflow_run.proxy_location,
totp_identifier=self.totp_identifier,
totp_verification_url=self.totp_verification_url,
totp_identifier=resolved_totp_identifier,
totp_verification_url=resolved_totp_verification_url,
max_screenshot_scrolling_times=workflow_run.max_screenshot_scrolls,
)
await app.DATABASE.update_task_v2(