Add ForLoop block support for cached scripts (#SKY-7751) (#4600)
This commit is contained in:
@@ -119,7 +119,7 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
|
|||||||
action_dumps.append(action_dump)
|
action_dumps.append(action_dump)
|
||||||
actions_by_task[run_block.task_id] = action_dumps
|
actions_by_task[run_block.task_id] = action_dumps
|
||||||
else:
|
else:
|
||||||
LOG.warning(f"Task {run_block.task_id} not found")
|
LOG.warning("Task not found", task_id=run_block.task_id)
|
||||||
|
|
||||||
if run_block.block_type == BlockType.TaskV2:
|
if run_block.block_type == BlockType.TaskV2:
|
||||||
# Merge child workflow run data for task v2 blocks
|
# Merge child workflow run data for task v2 blocks
|
||||||
@@ -154,7 +154,96 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
|
|||||||
error=str(e),
|
error=str(e),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
LOG.warning(f"Task v2 block {run_block.label} does not have a child workflow run id")
|
LOG.warning(
|
||||||
|
"Task v2 block does not have a child workflow run id",
|
||||||
|
task_v2_label=run_block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
if run_block.block_type == BlockType.FOR_LOOP:
|
||||||
|
# Process ForLoop child blocks to get actions for task blocks inside the loop
|
||||||
|
# Child blocks have parent_workflow_run_block_id pointing to the ForLoop's workflow_run_block_id
|
||||||
|
child_run_blocks = [
|
||||||
|
b for b in workflow_run_blocks if b.parent_workflow_run_block_id == run_block.workflow_run_block_id
|
||||||
|
]
|
||||||
|
# Create mapping of child run blocks by label
|
||||||
|
child_run_blocks_by_label = {b.label: b for b in child_run_blocks if b.label}
|
||||||
|
|
||||||
|
# Warn about any unlabeled child blocks that won't be matched
|
||||||
|
unlabeled_children = [b for b in child_run_blocks if not b.label]
|
||||||
|
if unlabeled_children:
|
||||||
|
LOG.warning(
|
||||||
|
"ForLoop has child blocks without labels - these will not be matched to loop_blocks definitions",
|
||||||
|
forloop_label=run_block.label,
|
||||||
|
unlabeled_count=len(unlabeled_children),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get loop_blocks from the definition block
|
||||||
|
loop_blocks = final_dump.get("loop_blocks", [])
|
||||||
|
|
||||||
|
if loop_blocks and not child_run_blocks:
|
||||||
|
LOG.warning(
|
||||||
|
"ForLoop block has loop_blocks definitions but no child run blocks found",
|
||||||
|
forloop_label=run_block.label,
|
||||||
|
workflow_run_block_id=run_block.workflow_run_block_id,
|
||||||
|
loop_blocks_count=len(loop_blocks),
|
||||||
|
)
|
||||||
|
updated_loop_blocks = []
|
||||||
|
|
||||||
|
for loop_block_def in loop_blocks:
|
||||||
|
loop_block_dump = loop_block_def.copy() if isinstance(loop_block_def, dict) else loop_block_def
|
||||||
|
loop_block_label = loop_block_dump.get("label")
|
||||||
|
|
||||||
|
# Find matching child run block
|
||||||
|
child_run_block = child_run_blocks_by_label.get(loop_block_label) if loop_block_label else None
|
||||||
|
|
||||||
|
if child_run_block and child_run_block.block_type in SCRIPT_TASK_BLOCKS and child_run_block.task_id:
|
||||||
|
# Get task data for this child block
|
||||||
|
task = await app.DATABASE.get_task(task_id=child_run_block.task_id, organization_id=organization_id)
|
||||||
|
if task:
|
||||||
|
task_dump = task.model_dump()
|
||||||
|
loop_block_dump.update({k: v for k, v in task_dump.items() if k not in loop_block_dump})
|
||||||
|
loop_block_dump.update(
|
||||||
|
{
|
||||||
|
"task_id": child_run_block.task_id,
|
||||||
|
"status": child_run_block.status,
|
||||||
|
"output": child_run_block.output,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get task actions for the child block
|
||||||
|
actions = await app.DATABASE.get_task_actions_hydrated(
|
||||||
|
task_id=child_run_block.task_id, organization_id=organization_id
|
||||||
|
)
|
||||||
|
action_dumps = []
|
||||||
|
for action in actions:
|
||||||
|
action_dump = action.model_dump()
|
||||||
|
action_dump["xpath"] = action.get_xpath()
|
||||||
|
action_dump["has_mini_agent"] = action.has_mini_agent
|
||||||
|
if (
|
||||||
|
"data_extraction_goal" in loop_block_dump
|
||||||
|
and loop_block_dump["data_extraction_goal"]
|
||||||
|
and action.action_type == ActionType.EXTRACT
|
||||||
|
):
|
||||||
|
action_dump["data_extraction_goal"] = loop_block_dump["data_extraction_goal"]
|
||||||
|
if (
|
||||||
|
"extracted_information_schema" in loop_block_dump
|
||||||
|
and loop_block_dump["extracted_information_schema"]
|
||||||
|
and action.action_type == ActionType.EXTRACT
|
||||||
|
):
|
||||||
|
action_dump["data_extraction_schema"] = loop_block_dump["extracted_information_schema"]
|
||||||
|
action_dumps.append(action_dump)
|
||||||
|
actions_by_task[child_run_block.task_id] = action_dumps
|
||||||
|
else:
|
||||||
|
LOG.warning(
|
||||||
|
"Task not found for ForLoop child block",
|
||||||
|
task_id=child_run_block.task_id,
|
||||||
|
forloop_label=run_block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
updated_loop_blocks.append(loop_block_dump)
|
||||||
|
|
||||||
|
# Update final_dump with the processed loop_blocks
|
||||||
|
final_dump["loop_blocks"] = updated_loop_blocks
|
||||||
|
|
||||||
final_dump["workflow_run_id"] = workflow_run_id
|
final_dump["workflow_run_id"] = workflow_run_id
|
||||||
if run_block:
|
if run_block:
|
||||||
|
|||||||
@@ -128,6 +128,7 @@ BLOCK_TYPES_THAT_SHOULD_BE_CACHED = {
|
|||||||
BlockType.EXTRACTION,
|
BlockType.EXTRACTION,
|
||||||
BlockType.LOGIN,
|
BlockType.LOGIN,
|
||||||
BlockType.FILE_DOWNLOAD,
|
BlockType.FILE_DOWNLOAD,
|
||||||
|
BlockType.FOR_LOOP,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user