disable script for block runs (#3319)
This commit is contained in:
@@ -345,7 +345,7 @@ class WorkflowService:
|
|||||||
return workflow_run
|
return workflow_run
|
||||||
|
|
||||||
# Check if there's a related workflow script that should be used instead
|
# Check if there's a related workflow script that should be used instead
|
||||||
workflow_script = await self._get_workflow_script(workflow, workflow_run)
|
workflow_script, _ = await self._get_workflow_script(workflow, workflow_run, block_labels)
|
||||||
if workflow_script is not None:
|
if workflow_script is not None:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Found related workflow script, running script instead of workflow",
|
"Found related workflow script, running script instead of workflow",
|
||||||
@@ -622,10 +622,11 @@ class WorkflowService:
|
|||||||
WorkflowRunStatus.timed_out,
|
WorkflowRunStatus.timed_out,
|
||||||
):
|
):
|
||||||
workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
|
workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
|
||||||
# generate script for workflow if the workflow.generate_script is True AND there's no script cached for the workflow
|
await self.generate_script_if_needed(
|
||||||
# only generate script if the workflow run is completed
|
workflow=workflow,
|
||||||
if workflow.generate_script:
|
workflow_run=workflow_run,
|
||||||
await self.generate_script_for_workflow(workflow=workflow, workflow_run=workflow_run)
|
block_labels=block_labels,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed",
|
"Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed",
|
||||||
@@ -2344,20 +2345,25 @@ class WorkflowService:
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def _get_workflow_script(self, workflow: Workflow, workflow_run: WorkflowRun) -> Script | None:
|
async def _get_workflow_script(
|
||||||
|
self, workflow: Workflow, workflow_run: WorkflowRun, block_labels: list[str] | None = None
|
||||||
|
) -> tuple[Script | None, str]:
|
||||||
"""
|
"""
|
||||||
Check if there's a related workflow script that should be used instead of running the workflow.
|
Check if there's a related workflow script that should be used instead of running the workflow.
|
||||||
Returns True if a script should be used, False otherwise.
|
Returns the tuple of (script, rendered_cache_key_value).
|
||||||
"""
|
"""
|
||||||
if not workflow.generate_script:
|
|
||||||
return None
|
|
||||||
# Only check for scripts if the workflow has a cache_key
|
|
||||||
cache_key = workflow.cache_key or ""
|
cache_key = workflow.cache_key or ""
|
||||||
|
rendered_cache_key_value = ""
|
||||||
|
|
||||||
|
if not workflow.generate_script:
|
||||||
|
return None, rendered_cache_key_value
|
||||||
|
if block_labels:
|
||||||
|
# Do not generate script or run script if block_labels is provided
|
||||||
|
return None, rendered_cache_key_value
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Render the cache_key_value from workflow run parameters (same logic as generate_script_for_workflow)
|
|
||||||
parameter_tuples = await app.DATABASE.get_workflow_run_parameters(
|
parameter_tuples = await app.DATABASE.get_workflow_run_parameters(
|
||||||
workflow_run_id=workflow_run.workflow_run_id
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
)
|
)
|
||||||
parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples}
|
parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples}
|
||||||
|
|
||||||
@@ -2379,9 +2385,9 @@ class WorkflowService:
|
|||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
script_count=len(existing_scripts),
|
script_count=len(existing_scripts),
|
||||||
)
|
)
|
||||||
return existing_scripts[0]
|
return existing_scripts[0], rendered_cache_key_value
|
||||||
|
|
||||||
return None
|
return None, rendered_cache_key_value
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
@@ -2391,7 +2397,7 @@ class WorkflowService:
|
|||||||
error=str(e),
|
error=str(e),
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
return None
|
return None, rendered_cache_key_value
|
||||||
|
|
||||||
async def _execute_workflow_script(
|
async def _execute_workflow_script(
|
||||||
self, script_id: str, workflow: Workflow, workflow_run: WorkflowRun, api_key: str, organization: Organization
|
self, script_id: str, workflow: Workflow, workflow_run: WorkflowRun, api_key: str, organization: Organization
|
||||||
@@ -2445,38 +2451,31 @@ class WorkflowService:
|
|||||||
|
|
||||||
return workflow_run
|
return workflow_run
|
||||||
|
|
||||||
async def generate_script_for_workflow(
|
async def generate_script_if_needed(
|
||||||
self,
|
self,
|
||||||
workflow: Workflow,
|
workflow: Workflow,
|
||||||
workflow_run: WorkflowRun,
|
workflow_run: WorkflowRun,
|
||||||
|
block_labels: list[str] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
cache_key = workflow.cache_key
|
if not workflow.generate_script:
|
||||||
rendered_cache_key_value = ""
|
return None
|
||||||
# 1) Build cache_key_value from workflow run parameters via jinja
|
if block_labels:
|
||||||
if cache_key:
|
# Do not generate script if block_labels is provided
|
||||||
parameter_tuples = await app.DATABASE.get_workflow_run_parameters(
|
return None
|
||||||
workflow_run_id=workflow_run.workflow_run_id
|
|
||||||
)
|
|
||||||
parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples}
|
|
||||||
jinja_sandbox_env = SandboxedEnvironment()
|
|
||||||
try:
|
|
||||||
rendered_cache_key_value = jinja_sandbox_env.from_string(cache_key).render(parameters)
|
|
||||||
except Exception:
|
|
||||||
LOG.warning("Failed to render cache key; skip script generation", exc_info=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# 2) Check existing cached scripts for this workflow + cache_key_value
|
existing_script, rendered_cache_key_value = await self._get_workflow_script(
|
||||||
existing_scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value(
|
workflow,
|
||||||
organization_id=workflow.organization_id,
|
workflow_run,
|
||||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
block_labels,
|
||||||
cache_key_value=rendered_cache_key_value,
|
|
||||||
)
|
)
|
||||||
if existing_scripts:
|
if existing_script:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Found cached script for workflow",
|
"Found cached script for workflow. Skipping script generation",
|
||||||
workflow_id=workflow.workflow_id,
|
workflow_id=workflow.workflow_id,
|
||||||
cache_key_value=rendered_cache_key_value,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
cache_key_value=rendered_cache_key_value,
|
||||||
|
script_id=existing_script.script_id,
|
||||||
|
script_revision_id=existing_script.script_revision_id,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -2538,7 +2537,7 @@ class WorkflowService:
|
|||||||
organization_id=workflow.organization_id,
|
organization_id=workflow.organization_id,
|
||||||
script_id=created_script.script_id,
|
script_id=created_script.script_id,
|
||||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||||
cache_key=cache_key or "",
|
cache_key=workflow.cache_key or "",
|
||||||
cache_key_value=rendered_cache_key_value,
|
cache_key_value=rendered_cache_key_value,
|
||||||
workflow_id=workflow.workflow_id,
|
workflow_id=workflow.workflow_id,
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
|||||||
Reference in New Issue
Block a user