diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index d232c433..8e997f2f 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -756,6 +756,11 @@ class BrowserSessionNotFound(SkyvernHTTPException): ) +class CannotUpdateWorkflowDueToCodeCache(SkyvernException): + def __init__(self, workflow_permanent_id: str) -> None: + super().__init__(f"No confirmation for code cache deletion on {workflow_permanent_id}.") + + class APIKeyNotFound(SkyvernHTTPException): def __init__(self, organization_id: str) -> None: super().__init__(f"No valid API key token found for organization {organization_id}") diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 8501c76c..f905d247 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1473,6 +1473,7 @@ class AgentDB: workflow_permanent_id: str, organization_id: str | None = None, version: int | None = None, + ignore_version: int | None = None, exclude_deleted: bool = True, ) -> Workflow | None: try: @@ -1483,6 +1484,8 @@ class AgentDB: get_workflow_query = get_workflow_query.filter_by(organization_id=organization_id) if version: get_workflow_query = get_workflow_query.filter_by(version=version) + if ignore_version: + get_workflow_query = get_workflow_query.filter(WorkflowModel.version != ignore_version) get_workflow_query = get_workflow_query.order_by(WorkflowModel.version.desc()) async with self.Session() as session: if workflow := (await session.scalars(get_workflow_query)).first(): @@ -4519,3 +4522,25 @@ class AgentDB: except Exception: LOG.error("UnexpectedError", exc_info=True) raise + + async def get_workflow_scripts_by_permanent_id( + self, + organization_id: str, + workflow_permanent_id: str, + ) -> list[WorkflowScriptModel]: + try: + async with self.Session() as session: + query = ( + select(WorkflowScriptModel) + .filter_by(organization_id=organization_id) + .filter_by(workflow_permanent_id=workflow_permanent_id) + .filter_by(deleted_at=None) + ) + + return (await session.scalars(query)).all() + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 5a2b1137..0298d888 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -12,7 +12,11 @@ from fastapi.responses import ORJSONResponse from skyvern import analytics from skyvern._version import __version__ from skyvern.config import settings -from skyvern.exceptions import BrowserSessionNotRenewable, MissingBrowserAddressError +from skyvern.exceptions import ( + BrowserSessionNotRenewable, + CannotUpdateWorkflowDueToCodeCache, + MissingBrowserAddressError, +) from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError @@ -607,6 +611,7 @@ async def update_workflow_legacy( ..., description="The ID of the workflow to update. Workflow ID starts with `wpid_`.", examples=["wpid_123"] ), current_org: Organization = Depends(org_auth_service.get_current_org), + delete_code_cache_is_ok: bool = Query(False), ) -> Workflow: analytics.capture("skyvern-oss-agent-workflow-update") # validate the workflow @@ -622,7 +627,13 @@ async def update_workflow_legacy( organization=current_org, request=workflow_create_request, workflow_permanent_id=workflow_id, + delete_code_cache_is_ok=delete_code_cache_is_ok, ) + except CannotUpdateWorkflowDueToCodeCache as e: + raise HTTPException( + status_code=422, + detail=str(e), + ) from e except WorkflowParameterMissingRequiredValue as e: raise e except Exception as e: diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 60d936db..e93fd9c8 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -15,6 +15,7 @@ from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT, SAVE_DOWNLOADED_FILE from skyvern.exceptions import ( BlockNotFound, BrowserSessionNotFound, + CannotUpdateWorkflowDueToCodeCache, FailedToSendWebhook, InvalidCredentialId, MissingValueForParameter, @@ -115,9 +116,9 @@ DEFAULT_FIRST_BLOCK_LABEL = "block_1" DEFAULT_WORKFLOW_TITLE = "New Workflow" -def _get_workflow_definition_without_dates(workflow_definition: WorkflowDefinition) -> dict[str, Any]: +def _get_workflow_definition_core_data(workflow_definition: WorkflowDefinition) -> dict[str, Any]: """ - This function dumps the workflow definition and removes the created_at and modified_at fields inside: + This function dumps the workflow definition and removes the irrelevant data to the definition, like created_at and modified_at fields inside: - list of blocks - list of parameters And return the dumped workflow definition as a python dictionary. @@ -131,6 +132,13 @@ def _get_workflow_definition_without_dates(workflow_definition: WorkflowDefiniti "output_parameter_id", "workflow_id", "workflow_parameter_id", + "aws_secret_parameter_id", + "bitwarden_login_credential_parameter_id", + "bitwarden_sensitive_information_parameter_id", + "bitwarden_credit_card_data_parameter_id", + "credential_parameter_id", + "onepassword_credential_parameter_id", + "azure_vault_credential_parameter_id", ] # Use BFS to recursively remove fields from all nested objects @@ -926,19 +934,14 @@ class WorkflowService: statuses=statuses, ) - async def update_workflow( + async def update_workflow_definition( self, workflow_id: str, organization_id: str | None = None, title: str | None = None, description: str | None = None, workflow_definition: WorkflowDefinition | None = None, - delete_script: bool = True, ) -> Workflow: - if workflow_definition: - workflow_definition.validate() - - # Update the workflow updated_workflow = await app.DATABASE.update_workflow( workflow_id=workflow_id, title=title, @@ -946,49 +949,61 @@ class WorkflowService: description=description, workflow_definition=(workflow_definition.model_dump() if workflow_definition else None), ) - updated_version = updated_workflow.version - previous_workflow = None - if updated_version > 1: - previous_workflow = await app.DATABASE.get_workflow_by_permanent_id( - workflow_permanent_id=updated_workflow.workflow_permanent_id, - organization_id=organization_id, - version=updated_version - 1, - ) - - # Check if workflow definition changed and delete published workflow scripts if so - if ( - delete_script - and workflow_definition - and previous_workflow - and organization_id - and _get_workflow_definition_without_dates(previous_workflow.workflow_definition) - != _get_workflow_definition_without_dates(workflow_definition) - ): - try: - deleted_count = await app.DATABASE.delete_workflow_scripts_by_permanent_id( - organization_id=organization_id, - workflow_permanent_id=updated_workflow.workflow_permanent_id, - ) - if deleted_count > 0: - LOG.info( - "Deleted published workflow scripts due to workflow definition change", - workflow_id=workflow_id, - workflow_permanent_id=updated_workflow.workflow_permanent_id, - organization_id=organization_id, - deleted_count=deleted_count, - ) - except Exception as e: - LOG.error( - "Failed to delete published workflow scripts after workflow definition change", - workflow_id=workflow_id, - workflow_permanent_id=updated_workflow.workflow_permanent_id, - organization_id=organization_id, - error=str(e), - exc_info=True, - ) return updated_workflow + async def maybe_delete_cached_code( + self, + workflow: Workflow, + workflow_definition: WorkflowDefinition, + organization_id: str, + delete_script: bool = True, + delete_code_cache_is_ok: bool = False, + ) -> None: + if workflow_definition: + workflow_definition.validate() + + previous_valid_workflow = await app.DATABASE.get_workflow_by_permanent_id( + workflow_permanent_id=workflow.workflow_permanent_id, + organization_id=organization_id, + exclude_deleted=True, + ignore_version=workflow.version, + ) + + if previous_valid_workflow: + current_definition = _get_workflow_definition_core_data(previous_valid_workflow.workflow_definition) + new_definition = _get_workflow_definition_core_data(workflow_definition) + has_changes = current_definition != new_definition + else: + has_changes = False + + if previous_valid_workflow and has_changes and delete_script: + to_delete = await app.DATABASE.get_workflow_scripts_by_permanent_id( + organization_id=organization_id, + workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, + ) + + if len(to_delete) > 0: + if not delete_code_cache_is_ok: + raise CannotUpdateWorkflowDueToCodeCache( + workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, + ) + else: + try: + await app.DATABASE.delete_workflow_scripts_by_permanent_id( + organization_id=organization_id, + workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, + ) + except Exception as e: + LOG.error( + "Failed to delete published workflow scripts after workflow definition change", + workflow_id=workflow.workflow_id, + workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, + organization_id=organization_id, + error=str(e), + exc_info=True, + ) + async def delete_workflow_by_permanent_id( self, workflow_permanent_id: str, @@ -1857,12 +1872,208 @@ class WorkflowService: await self.persist_har_data(browser_state, last_step, workflow, workflow_run) await self.persist_tracing_data(browser_state, last_step, workflow_run) + async def make_workflow_definition( + self, + workflow_id: str, + workflow_definition_yaml: WorkflowDefinitionYAML, + title: str, + organization_id: str, + ) -> WorkflowDefinition: + # Create parameters from the request + parameters: dict[str, PARAMETER_TYPE] = {} + duplicate_parameter_keys = set() + + # Check if user's trying to manually create an output parameter + if any(parameter.parameter_type == ParameterType.OUTPUT for parameter in workflow_definition_yaml.parameters): + raise InvalidWorkflowDefinition(message="Cannot manually create output parameters") + + # Check if any parameter keys collide with automatically created output parameter keys + block_labels = [block.label for block in workflow_definition_yaml.blocks] + # TODO (kerem): Check if block labels are unique + output_parameter_keys = [f"{block_label}_output" for block_label in block_labels] + parameter_keys = [parameter.key for parameter in workflow_definition_yaml.parameters] + if any(key in output_parameter_keys for key in parameter_keys): + raise WorkflowDefinitionHasReservedParameterKeys( + reserved_keys=output_parameter_keys, parameter_keys=parameter_keys + ) + + if any(key in RESERVED_PARAMETER_KEYS for key in parameter_keys): + raise WorkflowDefinitionHasReservedParameterKeys( + reserved_keys=RESERVED_PARAMETER_KEYS, + parameter_keys=parameter_keys, + ) + + # Create output parameters for all blocks + block_output_parameters = await WorkflowService._create_all_output_parameters_for_workflow( + workflow_id=workflow_id, + block_yamls=workflow_definition_yaml.blocks, + ) + for block_output_parameter in block_output_parameters.values(): + parameters[block_output_parameter.key] = block_output_parameter + + # We're going to process context parameters after other parameters since they depend on the other parameters + context_parameter_yamls = [] + + for parameter in workflow_definition_yaml.parameters: + if parameter.key in parameters: + LOG.error(f"Duplicate parameter key {parameter.key}") + duplicate_parameter_keys.add(parameter.key) + continue + if parameter.parameter_type == ParameterType.AWS_SECRET: + parameters[parameter.key] = await self.create_aws_secret_parameter( + workflow_id=workflow_id, + aws_key=parameter.aws_key, + key=parameter.key, + description=parameter.description, + ) + elif parameter.parameter_type == ParameterType.CREDENTIAL: + parameters[parameter.key] = await self.create_credential_parameter( + workflow_id=workflow_id, + key=parameter.key, + description=parameter.description, + credential_id=parameter.credential_id, + ) + elif parameter.parameter_type == ParameterType.ONEPASSWORD: + parameters[parameter.key] = await self.create_onepassword_credential_parameter( + workflow_id=workflow_id, + key=parameter.key, + description=parameter.description, + vault_id=parameter.vault_id, + item_id=parameter.item_id, + ) + elif parameter.parameter_type == ParameterType.AZURE_VAULT_CREDENTIAL: + parameters[parameter.key] = await self.create_azure_vault_credential_parameter( + workflow_id=workflow_id, + key=parameter.key, + description=parameter.description, + vault_name=parameter.vault_name, + username_key=parameter.username_key, + password_key=parameter.password_key, + totp_secret_key=parameter.totp_secret_key, + ) + elif parameter.parameter_type == ParameterType.BITWARDEN_LOGIN_CREDENTIAL: + if not parameter.bitwarden_collection_id and not parameter.bitwarden_item_id: + raise WorkflowParameterMissingRequiredValue( + workflow_parameter_type=ParameterType.BITWARDEN_LOGIN_CREDENTIAL, + workflow_parameter_key=parameter.key, + required_value="bitwarden_collection_id or bitwarden_item_id", + ) + if parameter.bitwarden_collection_id and not parameter.url_parameter_key: + raise WorkflowParameterMissingRequiredValue( + workflow_parameter_type=ParameterType.BITWARDEN_LOGIN_CREDENTIAL, + workflow_parameter_key=parameter.key, + required_value="url_parameter_key", + ) + parameters[parameter.key] = await self.create_bitwarden_login_credential_parameter( + workflow_id=workflow_id, + bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, + bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, + bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, + url_parameter_key=parameter.url_parameter_key, + key=parameter.key, + description=parameter.description, + bitwarden_collection_id=parameter.bitwarden_collection_id, + bitwarden_item_id=parameter.bitwarden_item_id, + ) + elif parameter.parameter_type == ParameterType.BITWARDEN_SENSITIVE_INFORMATION: + parameters[parameter.key] = await self.create_bitwarden_sensitive_information_parameter( + workflow_id=workflow_id, + bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, + bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, + bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, + # TODO: remove "# type: ignore" after ensuring bitwarden_collection_id is always set + bitwarden_collection_id=parameter.bitwarden_collection_id, # type: ignore + bitwarden_identity_key=parameter.bitwarden_identity_key, + bitwarden_identity_fields=parameter.bitwarden_identity_fields, + key=parameter.key, + description=parameter.description, + ) + elif parameter.parameter_type == ParameterType.BITWARDEN_CREDIT_CARD_DATA: + parameters[parameter.key] = await self.create_bitwarden_credit_card_data_parameter( + workflow_id=workflow_id, + bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, + bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, + bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, + # TODO: remove "# type: ignore" after ensuring bitwarden_collection_id is always set + bitwarden_collection_id=parameter.bitwarden_collection_id, # type: ignore + bitwarden_item_id=parameter.bitwarden_item_id, # type: ignore + key=parameter.key, + description=parameter.description, + ) + elif parameter.parameter_type == ParameterType.WORKFLOW: + parameters[parameter.key] = await self.create_workflow_parameter( + workflow_id=workflow_id, + workflow_parameter_type=parameter.workflow_parameter_type, + key=parameter.key, + default_value=parameter.default_value, + description=parameter.description, + ) + elif parameter.parameter_type == ParameterType.OUTPUT: + parameters[parameter.key] = await self.create_output_parameter( + workflow_id=workflow_id, + key=parameter.key, + description=parameter.description, + ) + elif parameter.parameter_type == ParameterType.CONTEXT: + context_parameter_yamls.append(parameter) + else: + LOG.error(f"Invalid parameter type {parameter.parameter_type}") + + # Now we can process the context parameters since all other parameters have been created + for context_parameter in context_parameter_yamls: + if context_parameter.source_parameter_key not in parameters: + raise ContextParameterSourceNotDefined( + context_parameter_key=context_parameter.key, + source_key=context_parameter.source_parameter_key, + ) + + if context_parameter.key in parameters: + LOG.error(f"Duplicate parameter key {context_parameter.key}") + duplicate_parameter_keys.add(context_parameter.key) + continue + + # We're only adding the context parameter to the parameters dict, we're not creating it in the database + # It'll only be stored in the `workflow.workflow_definition` + # todo (kerem): should we have a database table for context parameters? + parameters[context_parameter.key] = ContextParameter( + key=context_parameter.key, + description=context_parameter.description, + source=parameters[context_parameter.source_parameter_key], + # Context parameters don't have a default value, the value always depends on the source parameter + value=None, + ) + + if duplicate_parameter_keys: + raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys) + # Create blocks from the request + block_label_mapping = {} + blocks: list[BlockTypeVar] = [] + for block_yaml in workflow_definition_yaml.blocks: + block = await self.block_yaml_to_block(block_yaml, parameters) + blocks.append(block) + block_label_mapping[block.label] = block + + # Set the blocks for the workflow definition + workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks) + + LOG.info( + f"Created workflow from request, title: {title}", + parameter_keys=[parameter.key for parameter in parameters.values()], + block_labels=[block.label for block in blocks], + organization_id=organization_id, + title=title, + workflow_id=workflow_id, + ) + + return workflow_definition + async def create_workflow_from_request( self, organization: Organization, request: WorkflowCreateYAMLRequest, workflow_permanent_id: str | None = None, delete_script: bool = True, + delete_code_cache_is_ok: bool = True, ) -> Workflow: organization_id = organization.organization_id LOG.info( @@ -1871,6 +2082,7 @@ class WorkflowService: title=request.title, ) new_workflow_id: str | None = None + try: if workflow_permanent_id: existing_latest_workflow = await self.get_workflow_by_permanent_id( @@ -1879,7 +2091,9 @@ class WorkflowService: exclude_deleted=False, ) existing_version = existing_latest_workflow.version - workflow = await self.create_workflow( + + # NOTE: it's only potential, as it may be immediately deleted! + potential_workflow = await self.create_workflow( title=request.title, workflow_definition=WorkflowDefinition(parameters=[], blocks=[]), description=request.description, @@ -1903,7 +2117,8 @@ class WorkflowService: sequential_key=request.sequential_key, ) else: - workflow = await self.create_workflow( + # NOTE: it's only potential, as it may be immediately deleted! + potential_workflow = await self.create_workflow( title=request.title, workflow_definition=WorkflowDefinition(parameters=[], blocks=[]), description=request.description, @@ -1925,200 +2140,32 @@ class WorkflowService: sequential_key=request.sequential_key, ) # Keeping track of the new workflow id to delete it if an error occurs during the creation process - new_workflow_id = workflow.workflow_id - # Create parameters from the request - parameters: dict[str, PARAMETER_TYPE] = {} - duplicate_parameter_keys = set() + new_workflow_id = potential_workflow.workflow_id - # Check if user's trying to manually create an output parameter - if any( - parameter.parameter_type == ParameterType.OUTPUT for parameter in request.workflow_definition.parameters - ): - raise InvalidWorkflowDefinition(message="Cannot manually create output parameters") - - # Check if any parameter keys collide with automatically created output parameter keys - block_labels = [block.label for block in request.workflow_definition.blocks] - # TODO (kerem): Check if block labels are unique - output_parameter_keys = [f"{block_label}_output" for block_label in block_labels] - parameter_keys = [parameter.key for parameter in request.workflow_definition.parameters] - if any(key in output_parameter_keys for key in parameter_keys): - raise WorkflowDefinitionHasReservedParameterKeys( - reserved_keys=output_parameter_keys, parameter_keys=parameter_keys - ) - - if any(key in RESERVED_PARAMETER_KEYS for key in parameter_keys): - raise WorkflowDefinitionHasReservedParameterKeys( - reserved_keys=RESERVED_PARAMETER_KEYS, - parameter_keys=parameter_keys, - ) - - # Create output parameters for all blocks - block_output_parameters = await WorkflowService._create_all_output_parameters_for_workflow( - workflow_id=workflow.workflow_id, - block_yamls=request.workflow_definition.blocks, + workflow_definition = await self.make_workflow_definition( + potential_workflow.workflow_id, + request.workflow_definition, + request.title, + organization_id, ) - for block_output_parameter in block_output_parameters.values(): - parameters[block_output_parameter.key] = block_output_parameter - # We're going to process context parameters after other parameters since they depend on the other parameters - context_parameter_yamls = [] - - for parameter in request.workflow_definition.parameters: - if parameter.key in parameters: - LOG.error(f"Duplicate parameter key {parameter.key}") - duplicate_parameter_keys.add(parameter.key) - continue - if parameter.parameter_type == ParameterType.AWS_SECRET: - parameters[parameter.key] = await self.create_aws_secret_parameter( - workflow_id=workflow.workflow_id, - aws_key=parameter.aws_key, - key=parameter.key, - description=parameter.description, - ) - elif parameter.parameter_type == ParameterType.CREDENTIAL: - parameters[parameter.key] = await self.create_credential_parameter( - workflow_id=workflow.workflow_id, - key=parameter.key, - description=parameter.description, - credential_id=parameter.credential_id, - ) - elif parameter.parameter_type == ParameterType.ONEPASSWORD: - parameters[parameter.key] = await self.create_onepassword_credential_parameter( - workflow_id=workflow.workflow_id, - key=parameter.key, - description=parameter.description, - vault_id=parameter.vault_id, - item_id=parameter.item_id, - ) - elif parameter.parameter_type == ParameterType.AZURE_VAULT_CREDENTIAL: - parameters[parameter.key] = await self.create_azure_vault_credential_parameter( - workflow_id=workflow.workflow_id, - key=parameter.key, - description=parameter.description, - vault_name=parameter.vault_name, - username_key=parameter.username_key, - password_key=parameter.password_key, - totp_secret_key=parameter.totp_secret_key, - ) - elif parameter.parameter_type == ParameterType.BITWARDEN_LOGIN_CREDENTIAL: - if not parameter.bitwarden_collection_id and not parameter.bitwarden_item_id: - raise WorkflowParameterMissingRequiredValue( - workflow_parameter_type=ParameterType.BITWARDEN_LOGIN_CREDENTIAL, - workflow_parameter_key=parameter.key, - required_value="bitwarden_collection_id or bitwarden_item_id", - ) - if parameter.bitwarden_collection_id and not parameter.url_parameter_key: - raise WorkflowParameterMissingRequiredValue( - workflow_parameter_type=ParameterType.BITWARDEN_LOGIN_CREDENTIAL, - workflow_parameter_key=parameter.key, - required_value="url_parameter_key", - ) - parameters[parameter.key] = await self.create_bitwarden_login_credential_parameter( - workflow_id=workflow.workflow_id, - bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, - bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, - bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, - url_parameter_key=parameter.url_parameter_key, - key=parameter.key, - description=parameter.description, - bitwarden_collection_id=parameter.bitwarden_collection_id, - bitwarden_item_id=parameter.bitwarden_item_id, - ) - elif parameter.parameter_type == ParameterType.BITWARDEN_SENSITIVE_INFORMATION: - parameters[parameter.key] = await self.create_bitwarden_sensitive_information_parameter( - workflow_id=workflow.workflow_id, - bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, - bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, - bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, - # TODO: remove "# type: ignore" after ensuring bitwarden_collection_id is always set - bitwarden_collection_id=parameter.bitwarden_collection_id, # type: ignore - bitwarden_identity_key=parameter.bitwarden_identity_key, - bitwarden_identity_fields=parameter.bitwarden_identity_fields, - key=parameter.key, - description=parameter.description, - ) - elif parameter.parameter_type == ParameterType.BITWARDEN_CREDIT_CARD_DATA: - parameters[parameter.key] = await self.create_bitwarden_credit_card_data_parameter( - workflow_id=workflow.workflow_id, - bitwarden_client_id_aws_secret_key=parameter.bitwarden_client_id_aws_secret_key, - bitwarden_client_secret_aws_secret_key=parameter.bitwarden_client_secret_aws_secret_key, - bitwarden_master_password_aws_secret_key=parameter.bitwarden_master_password_aws_secret_key, - # TODO: remove "# type: ignore" after ensuring bitwarden_collection_id is always set - bitwarden_collection_id=parameter.bitwarden_collection_id, # type: ignore - bitwarden_item_id=parameter.bitwarden_item_id, # type: ignore - key=parameter.key, - description=parameter.description, - ) - elif parameter.parameter_type == ParameterType.WORKFLOW: - parameters[parameter.key] = await self.create_workflow_parameter( - workflow_id=workflow.workflow_id, - workflow_parameter_type=parameter.workflow_parameter_type, - key=parameter.key, - default_value=parameter.default_value, - description=parameter.description, - ) - elif parameter.parameter_type == ParameterType.OUTPUT: - parameters[parameter.key] = await self.create_output_parameter( - workflow_id=workflow.workflow_id, - key=parameter.key, - description=parameter.description, - ) - elif parameter.parameter_type == ParameterType.CONTEXT: - context_parameter_yamls.append(parameter) - else: - LOG.error(f"Invalid parameter type {parameter.parameter_type}") - - # Now we can process the context parameters since all other parameters have been created - for context_parameter in context_parameter_yamls: - if context_parameter.source_parameter_key not in parameters: - raise ContextParameterSourceNotDefined( - context_parameter_key=context_parameter.key, - source_key=context_parameter.source_parameter_key, - ) - - if context_parameter.key in parameters: - LOG.error(f"Duplicate parameter key {context_parameter.key}") - duplicate_parameter_keys.add(context_parameter.key) - continue - - # We're only adding the context parameter to the parameters dict, we're not creating it in the database - # It'll only be stored in the `workflow.workflow_definition` - # todo (kerem): should we have a database table for context parameters? - parameters[context_parameter.key] = ContextParameter( - key=context_parameter.key, - description=context_parameter.description, - source=parameters[context_parameter.source_parameter_key], - # Context parameters don't have a default value, the value always depends on the source parameter - value=None, - ) - - if duplicate_parameter_keys: - raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys) - # Create blocks from the request - block_label_mapping = {} - blocks: list[BlockTypeVar] = [] - for block_yaml in request.workflow_definition.blocks: - block = await self.block_yaml_to_block(workflow, block_yaml, parameters) - blocks.append(block) - block_label_mapping[block.label] = block - - # Set the blocks for the workflow definition - workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks) - workflow = await self.update_workflow( - workflow_id=workflow.workflow_id, - organization_id=organization_id, - workflow_definition=workflow_definition, - delete_script=delete_script, - ) - LOG.info( - f"Created workflow from request, title: {request.title}", - parameter_keys=[parameter.key for parameter in parameters.values()], - block_labels=[block.label for block in blocks], + updated_workflow = await self.update_workflow_definition( + workflow_id=potential_workflow.workflow_id, organization_id=organization_id, title=request.title, - workflow_id=workflow.workflow_id, + description=request.description, + workflow_definition=workflow_definition, ) - return workflow + + await self.maybe_delete_cached_code( + updated_workflow, + workflow_definition=workflow_definition, + organization_id=organization_id, + delete_script=delete_script, + delete_code_cache_is_ok=delete_code_cache_is_ok, + ) + + return updated_workflow except Exception as e: if new_workflow_id: LOG.error( @@ -2160,7 +2207,6 @@ class WorkflowService: @staticmethod async def block_yaml_to_block( - workflow: Workflow, block_yaml: BLOCK_YAML_TYPES, parameters: dict[str, Parameter], ) -> BlockTypeVar: @@ -2198,7 +2244,7 @@ class WorkflowService: ) elif block_yaml.block_type == BlockType.FOR_LOOP: loop_blocks = [ - await WorkflowService.block_yaml_to_block(workflow, loop_block, parameters) + await WorkflowService.block_yaml_to_block(loop_block, parameters) for loop_block in block_yaml.loop_blocks ]