reorganize workflow updates so that we can sanely check if we need to prompt user about code cache deletion on the frontend (#3639)

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
This commit is contained in:
Shuchang Zheng
2025-10-07 16:56:53 -07:00
committed by GitHub
parent f48277d298
commit 360def0de5
4 changed files with 328 additions and 241 deletions

View File

@@ -756,6 +756,11 @@ class BrowserSessionNotFound(SkyvernHTTPException):
) )
class CannotUpdateWorkflowDueToCodeCache(SkyvernException):
def __init__(self, workflow_permanent_id: str) -> None:
super().__init__(f"No confirmation for code cache deletion on {workflow_permanent_id}.")
class APIKeyNotFound(SkyvernHTTPException): class APIKeyNotFound(SkyvernHTTPException):
def __init__(self, organization_id: str) -> None: def __init__(self, organization_id: str) -> None:
super().__init__(f"No valid API key token found for organization {organization_id}") super().__init__(f"No valid API key token found for organization {organization_id}")

View File

@@ -1473,6 +1473,7 @@ class AgentDB:
workflow_permanent_id: str, workflow_permanent_id: str,
organization_id: str | None = None, organization_id: str | None = None,
version: int | None = None, version: int | None = None,
ignore_version: int | None = None,
exclude_deleted: bool = True, exclude_deleted: bool = True,
) -> Workflow | None: ) -> Workflow | None:
try: try:
@@ -1483,6 +1484,8 @@ class AgentDB:
get_workflow_query = get_workflow_query.filter_by(organization_id=organization_id) get_workflow_query = get_workflow_query.filter_by(organization_id=organization_id)
if version: if version:
get_workflow_query = get_workflow_query.filter_by(version=version) get_workflow_query = get_workflow_query.filter_by(version=version)
if ignore_version:
get_workflow_query = get_workflow_query.filter(WorkflowModel.version != ignore_version)
get_workflow_query = get_workflow_query.order_by(WorkflowModel.version.desc()) get_workflow_query = get_workflow_query.order_by(WorkflowModel.version.desc())
async with self.Session() as session: async with self.Session() as session:
if workflow := (await session.scalars(get_workflow_query)).first(): if workflow := (await session.scalars(get_workflow_query)).first():
@@ -4519,3 +4522,25 @@ class AgentDB:
except Exception: except Exception:
LOG.error("UnexpectedError", exc_info=True) LOG.error("UnexpectedError", exc_info=True)
raise raise
async def get_workflow_scripts_by_permanent_id(
self,
organization_id: str,
workflow_permanent_id: str,
) -> list[WorkflowScriptModel]:
try:
async with self.Session() as session:
query = (
select(WorkflowScriptModel)
.filter_by(organization_id=organization_id)
.filter_by(workflow_permanent_id=workflow_permanent_id)
.filter_by(deleted_at=None)
)
return (await session.scalars(query)).all()
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise

View File

@@ -12,7 +12,11 @@ from fastapi.responses import ORJSONResponse
from skyvern import analytics from skyvern import analytics
from skyvern._version import __version__ from skyvern._version import __version__
from skyvern.config import settings from skyvern.config import settings
from skyvern.exceptions import BrowserSessionNotRenewable, MissingBrowserAddressError from skyvern.exceptions import (
BrowserSessionNotRenewable,
CannotUpdateWorkflowDueToCodeCache,
MissingBrowserAddressError,
)
from skyvern.forge import app from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError
@@ -607,6 +611,7 @@ async def update_workflow_legacy(
..., description="The ID of the workflow to update. Workflow ID starts with `wpid_`.", examples=["wpid_123"] ..., description="The ID of the workflow to update. Workflow ID starts with `wpid_`.", examples=["wpid_123"]
), ),
current_org: Organization = Depends(org_auth_service.get_current_org), current_org: Organization = Depends(org_auth_service.get_current_org),
delete_code_cache_is_ok: bool = Query(False),
) -> Workflow: ) -> Workflow:
analytics.capture("skyvern-oss-agent-workflow-update") analytics.capture("skyvern-oss-agent-workflow-update")
# validate the workflow # validate the workflow
@@ -622,7 +627,13 @@ async def update_workflow_legacy(
organization=current_org, organization=current_org,
request=workflow_create_request, request=workflow_create_request,
workflow_permanent_id=workflow_id, workflow_permanent_id=workflow_id,
delete_code_cache_is_ok=delete_code_cache_is_ok,
) )
except CannotUpdateWorkflowDueToCodeCache as e:
raise HTTPException(
status_code=422,
detail=str(e),
) from e
except WorkflowParameterMissingRequiredValue as e: except WorkflowParameterMissingRequiredValue as e:
raise e raise e
except Exception as e: except Exception as e:

View File

@@ -15,6 +15,7 @@ from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT, SAVE_DOWNLOADED_FILE
from skyvern.exceptions import ( from skyvern.exceptions import (
BlockNotFound, BlockNotFound,
BrowserSessionNotFound, BrowserSessionNotFound,
CannotUpdateWorkflowDueToCodeCache,
FailedToSendWebhook, FailedToSendWebhook,
InvalidCredentialId, InvalidCredentialId,
MissingValueForParameter, MissingValueForParameter,
@@ -115,9 +116,9 @@ DEFAULT_FIRST_BLOCK_LABEL = "block_1"
DEFAULT_WORKFLOW_TITLE = "New Workflow" DEFAULT_WORKFLOW_TITLE = "New Workflow"
def _get_workflow_definition_without_dates(workflow_definition: WorkflowDefinition) -> dict[str, Any]: def _get_workflow_definition_core_data(workflow_definition: WorkflowDefinition) -> dict[str, Any]:
""" """
This function dumps the workflow definition and removes the created_at and modified_at fields inside: This function dumps the workflow definition and removes the irrelevant data to the definition, like created_at and modified_at fields inside:
- list of blocks - list of blocks
- list of parameters - list of parameters
And return the dumped workflow definition as a python dictionary. And return the dumped workflow definition as a python dictionary.
@@ -131,6 +132,13 @@ def _get_workflow_definition_without_dates(workflow_definition: WorkflowDefiniti
"output_parameter_id", "output_parameter_id",
"workflow_id", "workflow_id",
"workflow_parameter_id", "workflow_parameter_id",
"aws_secret_parameter_id",
"bitwarden_login_credential_parameter_id",
"bitwarden_sensitive_information_parameter_id",
"bitwarden_credit_card_data_parameter_id",
"credential_parameter_id",
"onepassword_credential_parameter_id",
"azure_vault_credential_parameter_id",
] ]
# Use BFS to recursively remove fields from all nested objects # Use BFS to recursively remove fields from all nested objects
@@ -926,19 +934,14 @@ class WorkflowService:
statuses=statuses, statuses=statuses,
) )
async def update_workflow( async def update_workflow_definition(
self, self,
workflow_id: str, workflow_id: str,
organization_id: str | None = None, organization_id: str | None = None,
title: str | None = None, title: str | None = None,
description: str | None = None, description: str | None = None,
workflow_definition: WorkflowDefinition | None = None, workflow_definition: WorkflowDefinition | None = None,
delete_script: bool = True,
) -> Workflow: ) -> Workflow:
if workflow_definition:
workflow_definition.validate()
# Update the workflow
updated_workflow = await app.DATABASE.update_workflow( updated_workflow = await app.DATABASE.update_workflow(
workflow_id=workflow_id, workflow_id=workflow_id,
title=title, title=title,
@@ -946,49 +949,61 @@ class WorkflowService:
description=description, description=description,
workflow_definition=(workflow_definition.model_dump() if workflow_definition else None), workflow_definition=(workflow_definition.model_dump() if workflow_definition else None),
) )
updated_version = updated_workflow.version
previous_workflow = None return updated_workflow
if updated_version > 1:
previous_workflow = await app.DATABASE.get_workflow_by_permanent_id( async def maybe_delete_cached_code(
workflow_permanent_id=updated_workflow.workflow_permanent_id, self,
workflow: Workflow,
workflow_definition: WorkflowDefinition,
organization_id: str,
delete_script: bool = True,
delete_code_cache_is_ok: bool = False,
) -> None:
if workflow_definition:
workflow_definition.validate()
previous_valid_workflow = await app.DATABASE.get_workflow_by_permanent_id(
workflow_permanent_id=workflow.workflow_permanent_id,
organization_id=organization_id, organization_id=organization_id,
version=updated_version - 1, exclude_deleted=True,
ignore_version=workflow.version,
) )
# Check if workflow definition changed and delete published workflow scripts if so if previous_valid_workflow:
if ( current_definition = _get_workflow_definition_core_data(previous_valid_workflow.workflow_definition)
delete_script new_definition = _get_workflow_definition_core_data(workflow_definition)
and workflow_definition has_changes = current_definition != new_definition
and previous_workflow else:
and organization_id has_changes = False
and _get_workflow_definition_without_dates(previous_workflow.workflow_definition)
!= _get_workflow_definition_without_dates(workflow_definition) if previous_valid_workflow and has_changes and delete_script:
): to_delete = await app.DATABASE.get_workflow_scripts_by_permanent_id(
try:
deleted_count = await app.DATABASE.delete_workflow_scripts_by_permanent_id(
organization_id=organization_id, organization_id=organization_id,
workflow_permanent_id=updated_workflow.workflow_permanent_id, workflow_permanent_id=previous_valid_workflow.workflow_permanent_id,
) )
if deleted_count > 0:
LOG.info( if len(to_delete) > 0:
"Deleted published workflow scripts due to workflow definition change", if not delete_code_cache_is_ok:
workflow_id=workflow_id, raise CannotUpdateWorkflowDueToCodeCache(
workflow_permanent_id=updated_workflow.workflow_permanent_id, workflow_permanent_id=previous_valid_workflow.workflow_permanent_id,
)
else:
try:
await app.DATABASE.delete_workflow_scripts_by_permanent_id(
organization_id=organization_id, organization_id=organization_id,
deleted_count=deleted_count, workflow_permanent_id=previous_valid_workflow.workflow_permanent_id,
) )
except Exception as e: except Exception as e:
LOG.error( LOG.error(
"Failed to delete published workflow scripts after workflow definition change", "Failed to delete published workflow scripts after workflow definition change",
workflow_id=workflow_id, workflow_id=workflow.workflow_id,
workflow_permanent_id=updated_workflow.workflow_permanent_id, workflow_permanent_id=previous_valid_workflow.workflow_permanent_id,
organization_id=organization_id, organization_id=organization_id,
error=str(e), error=str(e),
exc_info=True, exc_info=True,
) )
return updated_workflow
async def delete_workflow_by_permanent_id( async def delete_workflow_by_permanent_id(
self, self,
workflow_permanent_id: str, workflow_permanent_id: str,
@@ -1857,90 +1872,26 @@ class WorkflowService:
await self.persist_har_data(browser_state, last_step, workflow, workflow_run) await self.persist_har_data(browser_state, last_step, workflow, workflow_run)
await self.persist_tracing_data(browser_state, last_step, workflow_run) await self.persist_tracing_data(browser_state, last_step, workflow_run)
async def create_workflow_from_request( async def make_workflow_definition(
self, self,
organization: Organization, workflow_id: str,
request: WorkflowCreateYAMLRequest, workflow_definition_yaml: WorkflowDefinitionYAML,
workflow_permanent_id: str | None = None, title: str,
delete_script: bool = True, organization_id: str,
) -> Workflow: ) -> WorkflowDefinition:
organization_id = organization.organization_id
LOG.info(
"Creating workflow from request",
organization_id=organization_id,
title=request.title,
)
new_workflow_id: str | None = None
try:
if workflow_permanent_id:
existing_latest_workflow = await self.get_workflow_by_permanent_id(
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
exclude_deleted=False,
)
existing_version = existing_latest_workflow.version
workflow = await self.create_workflow(
title=request.title,
workflow_definition=WorkflowDefinition(parameters=[], blocks=[]),
description=request.description,
organization_id=organization_id,
proxy_location=request.proxy_location,
webhook_callback_url=request.webhook_callback_url,
totp_verification_url=request.totp_verification_url,
totp_identifier=request.totp_identifier,
persist_browser_session=request.persist_browser_session,
model=request.model,
max_screenshot_scrolling_times=request.max_screenshot_scrolls,
extra_http_headers=request.extra_http_headers,
workflow_permanent_id=workflow_permanent_id,
version=existing_version + 1,
is_saved_task=request.is_saved_task,
status=request.status,
run_with=request.run_with,
cache_key=request.cache_key,
ai_fallback=request.ai_fallback,
run_sequentially=request.run_sequentially,
sequential_key=request.sequential_key,
)
else:
workflow = await self.create_workflow(
title=request.title,
workflow_definition=WorkflowDefinition(parameters=[], blocks=[]),
description=request.description,
organization_id=organization_id,
proxy_location=request.proxy_location,
webhook_callback_url=request.webhook_callback_url,
totp_verification_url=request.totp_verification_url,
totp_identifier=request.totp_identifier,
persist_browser_session=request.persist_browser_session,
model=request.model,
max_screenshot_scrolling_times=request.max_screenshot_scrolls,
extra_http_headers=request.extra_http_headers,
is_saved_task=request.is_saved_task,
status=request.status,
run_with=request.run_with,
cache_key=request.cache_key,
ai_fallback=request.ai_fallback,
run_sequentially=request.run_sequentially,
sequential_key=request.sequential_key,
)
# Keeping track of the new workflow id to delete it if an error occurs during the creation process
new_workflow_id = workflow.workflow_id
# Create parameters from the request # Create parameters from the request
parameters: dict[str, PARAMETER_TYPE] = {} parameters: dict[str, PARAMETER_TYPE] = {}
duplicate_parameter_keys = set() duplicate_parameter_keys = set()
# Check if user's trying to manually create an output parameter # Check if user's trying to manually create an output parameter
if any( if any(parameter.parameter_type == ParameterType.OUTPUT for parameter in workflow_definition_yaml.parameters):
parameter.parameter_type == ParameterType.OUTPUT for parameter in request.workflow_definition.parameters
):
raise InvalidWorkflowDefinition(message="Cannot manually create output parameters") raise InvalidWorkflowDefinition(message="Cannot manually create output parameters")
# Check if any parameter keys collide with automatically created output parameter keys # Check if any parameter keys collide with automatically created output parameter keys
block_labels = [block.label for block in request.workflow_definition.blocks] block_labels = [block.label for block in workflow_definition_yaml.blocks]
# TODO (kerem): Check if block labels are unique # TODO (kerem): Check if block labels are unique
output_parameter_keys = [f"{block_label}_output" for block_label in block_labels] output_parameter_keys = [f"{block_label}_output" for block_label in block_labels]
parameter_keys = [parameter.key for parameter in request.workflow_definition.parameters] parameter_keys = [parameter.key for parameter in workflow_definition_yaml.parameters]
if any(key in output_parameter_keys for key in parameter_keys): if any(key in output_parameter_keys for key in parameter_keys):
raise WorkflowDefinitionHasReservedParameterKeys( raise WorkflowDefinitionHasReservedParameterKeys(
reserved_keys=output_parameter_keys, parameter_keys=parameter_keys reserved_keys=output_parameter_keys, parameter_keys=parameter_keys
@@ -1954,8 +1905,8 @@ class WorkflowService:
# Create output parameters for all blocks # Create output parameters for all blocks
block_output_parameters = await WorkflowService._create_all_output_parameters_for_workflow( block_output_parameters = await WorkflowService._create_all_output_parameters_for_workflow(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
block_yamls=request.workflow_definition.blocks, block_yamls=workflow_definition_yaml.blocks,
) )
for block_output_parameter in block_output_parameters.values(): for block_output_parameter in block_output_parameters.values():
parameters[block_output_parameter.key] = block_output_parameter parameters[block_output_parameter.key] = block_output_parameter
@@ -1963,28 +1914,28 @@ class WorkflowService:
# We're going to process context parameters after other parameters since they depend on the other parameters # We're going to process context parameters after other parameters since they depend on the other parameters
context_parameter_yamls = [] context_parameter_yamls = []
for parameter in request.workflow_definition.parameters: for parameter in workflow_definition_yaml.parameters:
if parameter.key in parameters: if parameter.key in parameters:
LOG.error(f"Duplicate parameter key {parameter.key}") LOG.error(f"Duplicate parameter key {parameter.key}")
duplicate_parameter_keys.add(parameter.key) duplicate_parameter_keys.add(parameter.key)
continue continue
if parameter.parameter_type == ParameterType.AWS_SECRET: if parameter.parameter_type == ParameterType.AWS_SECRET:
parameters[parameter.key] = await self.create_aws_secret_parameter( parameters[parameter.key] = await self.create_aws_secret_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
aws_key=parameter.aws_key, aws_key=parameter.aws_key,
key=parameter.key, key=parameter.key,
description=parameter.description, description=parameter.description,
) )
elif parameter.parameter_type == ParameterType.CREDENTIAL: elif parameter.parameter_type == ParameterType.CREDENTIAL:
parameters[parameter.key] = await self.create_credential_parameter( parameters[parameter.key] = await self.create_credential_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
key=parameter.key, key=parameter.key,
description=parameter.description, description=parameter.description,
credential_id=parameter.credential_id, credential_id=parameter.credential_id,
) )
elif parameter.parameter_type == ParameterType.ONEPASSWORD: elif parameter.parameter_type == ParameterType.ONEPASSWORD:
parameters[parameter.key] = await self.create_onepassword_credential_parameter( parameters[parameter.key] = await self.create_onepassword_credential_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
key=parameter.key, key=parameter.key,
description=parameter.description, description=parameter.description,
vault_id=parameter.vault_id, vault_id=parameter.vault_id,
@@ -1992,7 +1943,7 @@ class WorkflowService:
) )
elif parameter.parameter_type == ParameterType.AZURE_VAULT_CREDENTIAL: elif parameter.parameter_type == ParameterType.AZURE_VAULT_CREDENTIAL:
parameters[parameter.key] = await self.create_azure_vault_credential_parameter( parameters[parameter.key] = await self.create_azure_vault_credential_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
key=parameter.key, key=parameter.key,
description=parameter.description, description=parameter.description,
vault_name=parameter.vault_name, vault_name=parameter.vault_name,
@@ -2014,7 +1965,7 @@ class WorkflowService:
required_value="url_parameter_key", required_value="url_parameter_key",
) )
parameters[parameter.key] = await self.create_bitwarden_login_credential_parameter( parameters[parameter.key] = await self.create_bitwarden_login_credential_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key,
bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key,
bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key,
@@ -2026,7 +1977,7 @@ class WorkflowService:
) )
elif parameter.parameter_type == ParameterType.BITWARDEN_SENSITIVE_INFORMATION: elif parameter.parameter_type == ParameterType.BITWARDEN_SENSITIVE_INFORMATION:
parameters[parameter.key] = await self.create_bitwarden_sensitive_information_parameter( parameters[parameter.key] = await self.create_bitwarden_sensitive_information_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key,
bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key,
bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key,
@@ -2039,7 +1990,7 @@ class WorkflowService:
) )
elif parameter.parameter_type == ParameterType.BITWARDEN_CREDIT_CARD_DATA: elif parameter.parameter_type == ParameterType.BITWARDEN_CREDIT_CARD_DATA:
parameters[parameter.key] = await self.create_bitwarden_credit_card_data_parameter( parameters[parameter.key] = await self.create_bitwarden_credit_card_data_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key,
bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key,
bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key,
@@ -2051,7 +2002,7 @@ class WorkflowService:
) )
elif parameter.parameter_type == ParameterType.WORKFLOW: elif parameter.parameter_type == ParameterType.WORKFLOW:
parameters[parameter.key] = await self.create_workflow_parameter( parameters[parameter.key] = await self.create_workflow_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
workflow_parameter_type=parameter.workflow_parameter_type, workflow_parameter_type=parameter.workflow_parameter_type,
key=parameter.key, key=parameter.key,
default_value=parameter.default_value, default_value=parameter.default_value,
@@ -2059,7 +2010,7 @@ class WorkflowService:
) )
elif parameter.parameter_type == ParameterType.OUTPUT: elif parameter.parameter_type == ParameterType.OUTPUT:
parameters[parameter.key] = await self.create_output_parameter( parameters[parameter.key] = await self.create_output_parameter(
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
key=parameter.key, key=parameter.key,
description=parameter.description, description=parameter.description,
) )
@@ -2097,28 +2048,124 @@ class WorkflowService:
# Create blocks from the request # Create blocks from the request
block_label_mapping = {} block_label_mapping = {}
blocks: list[BlockTypeVar] = [] blocks: list[BlockTypeVar] = []
for block_yaml in request.workflow_definition.blocks: for block_yaml in workflow_definition_yaml.blocks:
block = await self.block_yaml_to_block(workflow, block_yaml, parameters) block = await self.block_yaml_to_block(block_yaml, parameters)
blocks.append(block) blocks.append(block)
block_label_mapping[block.label] = block block_label_mapping[block.label] = block
# Set the blocks for the workflow definition # Set the blocks for the workflow definition
workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks) workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks)
workflow = await self.update_workflow(
workflow_id=workflow.workflow_id,
organization_id=organization_id,
workflow_definition=workflow_definition,
delete_script=delete_script,
)
LOG.info( LOG.info(
f"Created workflow from request, title: {request.title}", f"Created workflow from request, title: {title}",
parameter_keys=[parameter.key for parameter in parameters.values()], parameter_keys=[parameter.key for parameter in parameters.values()],
block_labels=[block.label for block in blocks], block_labels=[block.label for block in blocks],
organization_id=organization_id, organization_id=organization_id,
title=request.title, title=title,
workflow_id=workflow.workflow_id, workflow_id=workflow_id,
) )
return workflow
return workflow_definition
async def create_workflow_from_request(
self,
organization: Organization,
request: WorkflowCreateYAMLRequest,
workflow_permanent_id: str | None = None,
delete_script: bool = True,
delete_code_cache_is_ok: bool = True,
) -> Workflow:
organization_id = organization.organization_id
LOG.info(
"Creating workflow from request",
organization_id=organization_id,
title=request.title,
)
new_workflow_id: str | None = None
try:
if workflow_permanent_id:
existing_latest_workflow = await self.get_workflow_by_permanent_id(
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
exclude_deleted=False,
)
existing_version = existing_latest_workflow.version
# NOTE: it's only potential, as it may be immediately deleted!
potential_workflow = await self.create_workflow(
title=request.title,
workflow_definition=WorkflowDefinition(parameters=[], blocks=[]),
description=request.description,
organization_id=organization_id,
proxy_location=request.proxy_location,
webhook_callback_url=request.webhook_callback_url,
totp_verification_url=request.totp_verification_url,
totp_identifier=request.totp_identifier,
persist_browser_session=request.persist_browser_session,
model=request.model,
max_screenshot_scrolling_times=request.max_screenshot_scrolls,
extra_http_headers=request.extra_http_headers,
workflow_permanent_id=workflow_permanent_id,
version=existing_version + 1,
is_saved_task=request.is_saved_task,
status=request.status,
run_with=request.run_with,
cache_key=request.cache_key,
ai_fallback=request.ai_fallback,
run_sequentially=request.run_sequentially,
sequential_key=request.sequential_key,
)
else:
# NOTE: it's only potential, as it may be immediately deleted!
potential_workflow = await self.create_workflow(
title=request.title,
workflow_definition=WorkflowDefinition(parameters=[], blocks=[]),
description=request.description,
organization_id=organization_id,
proxy_location=request.proxy_location,
webhook_callback_url=request.webhook_callback_url,
totp_verification_url=request.totp_verification_url,
totp_identifier=request.totp_identifier,
persist_browser_session=request.persist_browser_session,
model=request.model,
max_screenshot_scrolling_times=request.max_screenshot_scrolls,
extra_http_headers=request.extra_http_headers,
is_saved_task=request.is_saved_task,
status=request.status,
run_with=request.run_with,
cache_key=request.cache_key,
ai_fallback=request.ai_fallback,
run_sequentially=request.run_sequentially,
sequential_key=request.sequential_key,
)
# Keeping track of the new workflow id to delete it if an error occurs during the creation process
new_workflow_id = potential_workflow.workflow_id
workflow_definition = await self.make_workflow_definition(
potential_workflow.workflow_id,
request.workflow_definition,
request.title,
organization_id,
)
updated_workflow = await self.update_workflow_definition(
workflow_id=potential_workflow.workflow_id,
organization_id=organization_id,
title=request.title,
description=request.description,
workflow_definition=workflow_definition,
)
await self.maybe_delete_cached_code(
updated_workflow,
workflow_definition=workflow_definition,
organization_id=organization_id,
delete_script=delete_script,
delete_code_cache_is_ok=delete_code_cache_is_ok,
)
return updated_workflow
except Exception as e: except Exception as e:
if new_workflow_id: if new_workflow_id:
LOG.error( LOG.error(
@@ -2160,7 +2207,6 @@ class WorkflowService:
@staticmethod @staticmethod
async def block_yaml_to_block( async def block_yaml_to_block(
workflow: Workflow,
block_yaml: BLOCK_YAML_TYPES, block_yaml: BLOCK_YAML_TYPES,
parameters: dict[str, Parameter], parameters: dict[str, Parameter],
) -> BlockTypeVar: ) -> BlockTypeVar:
@@ -2198,7 +2244,7 @@ class WorkflowService:
) )
elif block_yaml.block_type == BlockType.FOR_LOOP: elif block_yaml.block_type == BlockType.FOR_LOOP:
loop_blocks = [ loop_blocks = [
await WorkflowService.block_yaml_to_block(workflow, loop_block, parameters) await WorkflowService.block_yaml_to_block(loop_block, parameters)
for loop_block in block_yaml.loop_blocks for loop_block in block_yaml.loop_blocks
] ]