Fix script generation race condition causing hardcoded parameter values (#SKY-7653) (#4570)
This commit is contained in:
@@ -8,6 +8,7 @@ import structlog
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from skyvern.forge import app
|
from skyvern.forge import app
|
||||||
|
from skyvern.forge.sdk.core import skyvern_context
|
||||||
from skyvern.forge.sdk.prompting import PromptEngine
|
from skyvern.forge.sdk.prompting import PromptEngine
|
||||||
from skyvern.webeye.actions.actions import ActionType
|
from skyvern.webeye.actions.actions import ActionType
|
||||||
|
|
||||||
@@ -64,6 +65,21 @@ async def generate_workflow_parameters_schema(
|
|||||||
elif action_type == ActionType.SELECT_OPTION:
|
elif action_type == ActionType.SELECT_OPTION:
|
||||||
value = action.get("option", "")
|
value = action.get("option", "")
|
||||||
|
|
||||||
|
# Skip actions without data - addresses race condition where script generation
|
||||||
|
# runs before action data is fully saved to database (SKY-7653)
|
||||||
|
if not value:
|
||||||
|
LOG.debug(
|
||||||
|
"Skipping action without data during field mapping generation",
|
||||||
|
action_type=action_type,
|
||||||
|
action_id=action.get("action_id", ""),
|
||||||
|
task_id=task_id,
|
||||||
|
)
|
||||||
|
# Mark that we had incomplete actions - triggers finalize regeneration
|
||||||
|
ctx = skyvern_context.current()
|
||||||
|
if ctx:
|
||||||
|
ctx.script_gen_had_incomplete_actions = True
|
||||||
|
continue
|
||||||
|
|
||||||
# Check if this action has an existing field name that must be preserved
|
# Check if this action has an existing field name that must be preserved
|
||||||
existing_field_name = existing_field_assignments.get(action_counter)
|
existing_field_name = existing_field_assignments.get(action_counter)
|
||||||
|
|
||||||
|
|||||||
@@ -75,6 +75,10 @@ class SkyvernContext:
|
|||||||
action_ai_overrides: dict[str, dict[int, str]] = field(default_factory=dict)
|
action_ai_overrides: dict[str, dict[int, str]] = field(default_factory=dict)
|
||||||
action_counters: dict[str, int] = field(default_factory=dict)
|
action_counters: dict[str, int] = field(default_factory=dict)
|
||||||
|
|
||||||
|
# Track if script generation skipped any actions due to missing data (race condition)
|
||||||
|
# Used to determine if finalize regeneration is needed at workflow completion
|
||||||
|
script_gen_had_incomplete_actions: bool = False
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, step_id={self.step_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override}, run_id={self.run_id})"
|
return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, step_id={self.step_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override}, run_id={self.run_id})"
|
||||||
|
|
||||||
|
|||||||
@@ -849,6 +849,7 @@ class WorkflowService:
|
|||||||
workflow_run=workflow_run,
|
workflow_run=workflow_run,
|
||||||
block_labels=block_labels,
|
block_labels=block_labels,
|
||||||
blocks_to_update=blocks_to_update,
|
blocks_to_update=blocks_to_update,
|
||||||
|
finalize=True, # Force regeneration to ensure field mappings have complete action data
|
||||||
)
|
)
|
||||||
|
|
||||||
# Execute finally block if configured. Skip for: canceled (user explicitly stopped)
|
# Execute finally block if configured. Skip for: canceled (user explicitly stopped)
|
||||||
@@ -3352,10 +3353,48 @@ class WorkflowService:
|
|||||||
workflow_run: WorkflowRun,
|
workflow_run: WorkflowRun,
|
||||||
block_labels: list[str] | None = None,
|
block_labels: list[str] | None = None,
|
||||||
blocks_to_update: set[str] | None = None,
|
blocks_to_update: set[str] | None = None,
|
||||||
|
finalize: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""
|
||||||
|
Generate or regenerate workflow script if needed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
workflow: The workflow definition
|
||||||
|
workflow_run: The workflow run instance
|
||||||
|
block_labels: Optional list of specific block labels to generate
|
||||||
|
blocks_to_update: Set of block labels that need regeneration
|
||||||
|
finalize: If True, check if any actions were skipped during script generation
|
||||||
|
due to missing data (race condition). Only regenerate if needed.
|
||||||
|
This fixes SKY-7653 while avoiding unnecessary regeneration costs.
|
||||||
|
"""
|
||||||
code_gen = workflow_run.code_gen
|
code_gen = workflow_run.code_gen
|
||||||
blocks_to_update = set(blocks_to_update or [])
|
blocks_to_update = set(blocks_to_update or [])
|
||||||
|
|
||||||
|
# When finalizing, only regenerate if script generation had incomplete actions.
|
||||||
|
# This addresses the race condition (SKY-7653) while avoiding unnecessary
|
||||||
|
# regeneration costs when the script is already complete.
|
||||||
|
if finalize:
|
||||||
|
current_context = skyvern_context.current()
|
||||||
|
if current_context and current_context.script_gen_had_incomplete_actions:
|
||||||
|
LOG.info(
|
||||||
|
"Finalize: regenerating script due to incomplete actions during generation",
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
)
|
||||||
|
task_block_labels = {
|
||||||
|
block.label
|
||||||
|
for block in workflow.workflow_definition.blocks
|
||||||
|
if block.label and block.block_type in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
|
||||||
|
}
|
||||||
|
blocks_to_update.update(task_block_labels)
|
||||||
|
blocks_to_update.add(settings.WORKFLOW_START_BLOCK_LABEL)
|
||||||
|
# Reset flag after triggering regeneration to prevent stale state
|
||||||
|
current_context.script_gen_had_incomplete_actions = False
|
||||||
|
else:
|
||||||
|
LOG.debug(
|
||||||
|
"Finalize: skipping regeneration - no incomplete actions detected",
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
)
|
||||||
|
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Generate script?",
|
"Generate script?",
|
||||||
block_labels=block_labels,
|
block_labels=block_labels,
|
||||||
|
|||||||
@@ -967,10 +967,12 @@ async def run_task_v2_helper(
|
|||||||
context=context,
|
context=context,
|
||||||
screenshots=completion_screenshots,
|
screenshots=completion_screenshots,
|
||||||
)
|
)
|
||||||
await app.WORKFLOW_SERVICE.generate_script_if_needed(
|
if task_v2.run_with == "code":
|
||||||
workflow=workflow,
|
await app.WORKFLOW_SERVICE.generate_script_if_needed(
|
||||||
workflow_run=workflow_run,
|
workflow=workflow,
|
||||||
)
|
workflow_run=workflow_run,
|
||||||
|
finalize=True, # Force regeneration to ensure field mappings have complete action data
|
||||||
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
# total step number validation
|
# total step number validation
|
||||||
|
|||||||
Reference in New Issue
Block a user