diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index 3a16118d..726c71ec 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -339,7 +339,7 @@ async def run_observer_task_helper( observer_cruise_id=observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.running ) await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) - await _set_up_workflow_context(workflow_id, workflow_run_id) + await _set_up_workflow_context(workflow_id, workflow_run_id, organization) url = str(observer_task.url) user_prompt = observer_task.prompt @@ -717,18 +717,20 @@ async def handle_block_result( ) -async def _set_up_workflow_context(workflow_id: str, workflow_run_id: str) -> None: +async def _set_up_workflow_context(workflow_id: str, workflow_run_id: str, organization: Organization) -> None: """ TODO: see if we could remove this function as we can just set an empty workflow context """ # Get all tuples wp_wps_tuples = await app.WORKFLOW_SERVICE.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run_id) workflow_output_parameters = await app.WORKFLOW_SERVICE.get_workflow_output_parameters(workflow_id=workflow_id) - app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context( + await app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context( + organization, workflow_run_id, wp_wps_tuples, workflow_output_parameters, [], + [], ) diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index 5e9d87c6..7b45c9ae 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -1,5 +1,5 @@ import uuid -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Self import structlog @@ -12,7 +12,10 @@ from skyvern.forge.sdk.services.bitwarden import BitwardenConstants, BitwardenSe from skyvern.forge.sdk.workflow.exceptions import OutputParameterKeyCollisionError from skyvern.forge.sdk.workflow.models.parameter import ( PARAMETER_TYPE, + AWSSecretParameter, + BitwardenCreditCardDataParameter, BitwardenLoginCredentialParameter, + BitwardenSensitiveInformationParameter, ContextParameter, OutputParameter, Parameter, @@ -29,43 +32,68 @@ BlockMetadata = dict[str, str | int | float | bool | dict | list] class WorkflowRunContext: - parameters: dict[str, PARAMETER_TYPE] - values: dict[str, Any] - secrets: dict[str, Any] - - def __init__( - self, + @classmethod + async def init( + cls, + aws_client: AsyncAWSClient, + organization: Organization, workflow_parameter_tuples: list[tuple[WorkflowParameter, "WorkflowRunParameter"]], workflow_output_parameters: list[OutputParameter], context_parameters: list[ContextParameter], - ) -> None: + secret_parameters: list[ + AWSSecretParameter + | BitwardenLoginCredentialParameter + | BitwardenCreditCardDataParameter + | BitwardenSensitiveInformationParameter + ], + ) -> Self: # key is label name - self.blocks_metadata: dict[str, BlockMetadata] = {} - self.parameters = {} - self.values = {} - self.secrets = {} - + workflow_run_context = cls() for parameter, run_parameter in workflow_parameter_tuples: - if parameter.key in self.parameters: - prev_value = self.parameters[parameter.key] + if parameter.key in workflow_run_context.parameters: + prev_value = workflow_run_context.parameters[parameter.key] new_value = run_parameter.value LOG.error( f"Duplicate parameter key {parameter.key} found while initializing context manager, previous value: {prev_value}, new value: {new_value}. Using new value." ) - self.parameters[parameter.key] = parameter - self.values[parameter.key] = run_parameter.value + workflow_run_context.parameters[parameter.key] = parameter + workflow_run_context.values[parameter.key] = run_parameter.value for output_parameter in workflow_output_parameters: - if output_parameter.key in self.parameters: + if output_parameter.key in workflow_run_context.parameters: raise OutputParameterKeyCollisionError(output_parameter.key) - self.parameters[output_parameter.key] = output_parameter + workflow_run_context.parameters[output_parameter.key] = output_parameter + + for secrete_parameter in secret_parameters: + if isinstance(secrete_parameter, AWSSecretParameter): + await workflow_run_context.register_aws_secret_parameter_value(aws_client, secrete_parameter) + elif isinstance(secrete_parameter, BitwardenLoginCredentialParameter): + await workflow_run_context.register_bitwarden_login_credential_parameter_value( + aws_client, secrete_parameter, organization + ) + elif isinstance(secrete_parameter, BitwardenCreditCardDataParameter): + await workflow_run_context.register_bitwarden_credit_card_data_parameter_value( + aws_client, secrete_parameter, organization + ) + elif isinstance(secrete_parameter, BitwardenSensitiveInformationParameter): + await workflow_run_context.register_bitwarden_sensitive_information_parameter_value( + aws_client, secrete_parameter, organization + ) for context_parameter in context_parameters: # All context parameters will be registered with the context manager during initialization but the values # will be calculated and set before and after each block execution # values sometimes will be overwritten by the block execution itself - self.parameters[context_parameter.key] = context_parameter + workflow_run_context.parameters[context_parameter.key] = context_parameter + + return workflow_run_context + + def __init__(self) -> None: + self.blocks_metadata: dict[str, BlockMetadata] = {} + self.parameters: dict[str, PARAMETER_TYPE] = {} + self.values: dict[str, Any] = {} + self.secrets: dict[str, Any] = {} def get_parameter(self, key: str) -> Parameter: return self.parameters[key] @@ -133,6 +161,242 @@ class WorkflowRunContext: def generate_random_secret_id() -> str: return f"secret_{uuid.uuid4()}" + async def register_aws_secret_parameter_value( + self, + aws_client: AsyncAWSClient, + parameter: AWSSecretParameter, + ) -> None: + # If the parameter is an AWS secret, fetch the secret value and store it in the secrets dict + # The value of the parameter will be the random secret id with format `secret_`. + # We'll replace the random secret id with the actual secret value when we need to use it. + secret_value = await aws_client.get_secret(parameter.aws_key) + if secret_value is not None: + random_secret_id = self.generate_random_secret_id() + self.secrets[random_secret_id] = secret_value + self.values[parameter.key] = random_secret_id + self.parameters[parameter.key] = parameter + + async def register_bitwarden_login_credential_parameter_value( + self, + aws_client: AsyncAWSClient, + parameter: BitwardenLoginCredentialParameter, + organization: Organization, + ) -> None: + try: + # Get the Bitwarden login credentials from AWS secrets + client_id = settings.BITWARDEN_CLIENT_ID or await aws_client.get_secret( + parameter.bitwarden_client_id_aws_secret_key + ) + client_secret = settings.BITWARDEN_CLIENT_SECRET or await aws_client.get_secret( + parameter.bitwarden_client_secret_aws_secret_key + ) + master_password = settings.BITWARDEN_MASTER_PASSWORD or await aws_client.get_secret( + parameter.bitwarden_master_password_aws_secret_key + ) + except Exception as e: + LOG.error(f"Failed to get Bitwarden login credentials from AWS secrets. Error: {e}") + raise e + + if self.has_parameter(parameter.url_parameter_key) and self.has_value(parameter.url_parameter_key): + url = self.values[parameter.url_parameter_key] + elif parameter.url_parameter_key: + # If a key can't be found within the parameter values dict, assume it's a URL (and not a URL Parameter) + url = parameter.url_parameter_key + else: + LOG.error(f"URL parameter {parameter.url_parameter_key} not found or has no value") + raise SkyvernException("URL parameter for Bitwarden login credentials not found or has no value") + + collection_id = None + if parameter.bitwarden_collection_id: + if self.has_parameter(parameter.bitwarden_collection_id) and self.has_value( + parameter.bitwarden_collection_id + ): + collection_id = self.values[parameter.bitwarden_collection_id] + else: + collection_id = parameter.bitwarden_collection_id + + try: + secret_credentials = await BitwardenService.get_secret_value_from_url( + client_id, + client_secret, + master_password, + organization.bw_organization_id, + organization.bw_collection_ids, + url, + collection_id=collection_id, + ) + if secret_credentials: + self.secrets[BitwardenConstants.BW_ORGANIZATION_ID] = organization.bw_organization_id + self.secrets[BitwardenConstants.BW_COLLECTION_IDS] = organization.bw_collection_ids + self.secrets[BitwardenConstants.URL] = url + self.secrets[BitwardenConstants.CLIENT_SECRET] = client_secret + self.secrets[BitwardenConstants.CLIENT_ID] = client_id + self.secrets[BitwardenConstants.MASTER_PASSWORD] = master_password + self.secrets[BitwardenConstants.BW_COLLECTION_ID] = parameter.bitwarden_collection_id + + random_secret_id = self.generate_random_secret_id() + # username secret + username_secret_id = f"{random_secret_id}_username" + self.secrets[username_secret_id] = secret_credentials[BitwardenConstants.USERNAME] + # password secret + password_secret_id = f"{random_secret_id}_password" + self.secrets[password_secret_id] = secret_credentials[BitwardenConstants.PASSWORD] + self.values[parameter.key] = { + "username": username_secret_id, + "password": password_secret_id, + } + self.parameters[parameter.key] = parameter + + if BitwardenConstants.TOTP in secret_credentials and secret_credentials[BitwardenConstants.TOTP]: + totp_secret_id = f"{random_secret_id}_totp" + self.secrets[totp_secret_id] = BitwardenConstants.TOTP + totp_secret_value = self.totp_secret_value_key(totp_secret_id) + self.secrets[totp_secret_value] = secret_credentials[BitwardenConstants.TOTP] + self.values[parameter.key]["totp"] = totp_secret_id + + except BitwardenBaseError as e: + LOG.error(f"Failed to get secret from Bitwarden. Error: {e}") + raise e + + async def register_bitwarden_sensitive_information_parameter_value( + self, + aws_client: AsyncAWSClient, + parameter: BitwardenSensitiveInformationParameter, + organization: Organization, + ) -> None: + try: + # Get the Bitwarden login credentials from AWS secrets + client_id = settings.BITWARDEN_CLIENT_ID or await aws_client.get_secret( + parameter.bitwarden_client_id_aws_secret_key + ) + client_secret = settings.BITWARDEN_CLIENT_SECRET or await aws_client.get_secret( + parameter.bitwarden_client_secret_aws_secret_key + ) + master_password = settings.BITWARDEN_MASTER_PASSWORD or await aws_client.get_secret( + parameter.bitwarden_master_password_aws_secret_key + ) + except Exception as e: + LOG.error(f"Failed to get Bitwarden login credentials from AWS secrets. Error: {e}") + raise e + + bitwarden_identity_key = parameter.bitwarden_identity_key + if self.has_parameter(parameter.bitwarden_identity_key) and self.has_value(parameter.bitwarden_identity_key): + bitwarden_identity_key = self.values[parameter.bitwarden_identity_key] + + collection_id = parameter.bitwarden_collection_id + if self.has_parameter(parameter.bitwarden_collection_id) and self.has_value(parameter.bitwarden_collection_id): + collection_id = self.values[parameter.bitwarden_collection_id] + + try: + sensitive_values = await BitwardenService.get_sensitive_information_from_identity( + client_id, + client_secret, + master_password, + organization.bw_organization_id, + organization.bw_collection_ids, + collection_id, + bitwarden_identity_key, + parameter.bitwarden_identity_fields, + ) + if sensitive_values: + self.secrets[BitwardenConstants.BW_ORGANIZATION_ID] = organization.bw_organization_id + self.secrets[BitwardenConstants.BW_COLLECTION_IDS] = organization.bw_collection_ids + self.secrets[BitwardenConstants.IDENTITY_KEY] = bitwarden_identity_key + self.secrets[BitwardenConstants.CLIENT_SECRET] = client_secret + self.secrets[BitwardenConstants.CLIENT_ID] = client_id + self.secrets[BitwardenConstants.MASTER_PASSWORD] = master_password + self.secrets[BitwardenConstants.BW_COLLECTION_ID] = collection_id + + self.parameters[parameter.key] = parameter + self.values[parameter.key] = {} + for key, value in sensitive_values.items(): + random_secret_id = self.generate_random_secret_id() + secret_id = f"{random_secret_id}_{key}" + self.secrets[secret_id] = value + self.values[parameter.key][key] = secret_id + + except BitwardenBaseError as e: + LOG.error(f"Failed to get sensitive information from Bitwarden. Error: {e}") + raise e + + async def register_bitwarden_credit_card_data_parameter_value( + self, + aws_client: AsyncAWSClient, + parameter: BitwardenCreditCardDataParameter, + organization: Organization, + ) -> None: + try: + # Get the Bitwarden login credentials from AWS secrets + client_id = settings.BITWARDEN_CLIENT_ID or await aws_client.get_secret( + parameter.bitwarden_client_id_aws_secret_key + ) + client_secret = settings.BITWARDEN_CLIENT_SECRET or await aws_client.get_secret( + parameter.bitwarden_client_secret_aws_secret_key + ) + master_password = settings.BITWARDEN_MASTER_PASSWORD or await aws_client.get_secret( + parameter.bitwarden_master_password_aws_secret_key + ) + except Exception as e: + LOG.error(f"Failed to get Bitwarden login credentials from AWS secrets. Error: {e}") + raise e + + if self.has_parameter(parameter.bitwarden_item_id) and self.has_value(parameter.bitwarden_item_id): + item_id = self.values[parameter.bitwarden_item_id] + else: + item_id = parameter.bitwarden_item_id + + if self.has_parameter(parameter.bitwarden_collection_id) and self.has_value(parameter.bitwarden_collection_id): + collection_id = self.values[parameter.bitwarden_collection_id] + else: + collection_id = parameter.bitwarden_collection_id + + try: + credit_card_data = await BitwardenService.get_credit_card_data( + client_id, + client_secret, + master_password, + organization.bw_organization_id, + organization.bw_collection_ids, + collection_id, + item_id, + ) + if not credit_card_data: + raise ValueError("Credit card data not found in Bitwarden") + + self.secrets[BitwardenConstants.CLIENT_ID] = client_id + self.secrets[BitwardenConstants.CLIENT_SECRET] = client_secret + self.secrets[BitwardenConstants.MASTER_PASSWORD] = master_password + self.secrets[BitwardenConstants.ITEM_ID] = item_id + + fields_to_obfuscate = { + BitwardenConstants.CREDIT_CARD_NUMBER: "card_number", + BitwardenConstants.CREDIT_CARD_CVV: "card_cvv", + } + + pass_through_fields = { + BitwardenConstants.CREDIT_CARD_HOLDER_NAME: "card_holder_name", + BitwardenConstants.CREDIT_CARD_EXPIRATION_MONTH: "card_exp_month", + BitwardenConstants.CREDIT_CARD_EXPIRATION_YEAR: "card_exp_year", + BitwardenConstants.CREDIT_CARD_BRAND: "card_brand", + } + + parameter_value: dict[str, Any] = { + field_name: credit_card_data[field_key] for field_key, field_name in pass_through_fields.items() + } + + for data_key, secret_suffix in fields_to_obfuscate.items(): + random_secret_id = self.generate_random_secret_id() + secret_id = f"{random_secret_id}_{secret_suffix}" + self.secrets[secret_id] = credit_card_data[data_key] + parameter_value[secret_suffix] = secret_id + + self.values[parameter.key] = parameter_value + self.parameters[parameter.key] = parameter + + except BitwardenBaseError as e: + LOG.error(f"Failed to get credit card data from Bitwarden. Error: {e}") + raise e + async def register_parameter_value( self, aws_client: AsyncAWSClient, @@ -147,221 +411,6 @@ class WorkflowRunContext: elif parameter.parameter_type == ParameterType.OUTPUT: LOG.error(f"Output parameters are set after each block execution. Parameter key: {parameter.key}") raise ValueError(f"Output parameters are set after each block execution. Parameter key: {parameter.key}") - elif parameter.parameter_type == ParameterType.AWS_SECRET: - # If the parameter is an AWS secret, fetch the secret value and store it in the secrets dict - # The value of the parameter will be the random secret id with format `secret_`. - # We'll replace the random secret id with the actual secret value when we need to use it. - secret_value = await aws_client.get_secret(parameter.aws_key) - if secret_value is not None: - random_secret_id = self.generate_random_secret_id() - self.secrets[random_secret_id] = secret_value - self.values[parameter.key] = random_secret_id - elif parameter.parameter_type == ParameterType.BITWARDEN_LOGIN_CREDENTIAL: - try: - # Get the Bitwarden login credentials from AWS secrets - client_id = settings.BITWARDEN_CLIENT_ID or await aws_client.get_secret( - parameter.bitwarden_client_id_aws_secret_key - ) - client_secret = settings.BITWARDEN_CLIENT_SECRET or await aws_client.get_secret( - parameter.bitwarden_client_secret_aws_secret_key - ) - master_password = settings.BITWARDEN_MASTER_PASSWORD or await aws_client.get_secret( - parameter.bitwarden_master_password_aws_secret_key - ) - except Exception as e: - LOG.error(f"Failed to get Bitwarden login credentials from AWS secrets. Error: {e}") - raise e - - if self.has_parameter(parameter.url_parameter_key) and self.has_value(parameter.url_parameter_key): - url = self.values[parameter.url_parameter_key] - elif parameter.url_parameter_key: - # If a key can't be found within the parameter values dict, assume it's a URL (and not a URL Parameter) - url = parameter.url_parameter_key - else: - LOG.error(f"URL parameter {parameter.url_parameter_key} not found or has no value") - raise SkyvernException("URL parameter for Bitwarden login credentials not found or has no value") - - collection_id = None - if parameter.bitwarden_collection_id: - if self.has_parameter(parameter.bitwarden_collection_id) and self.has_value( - parameter.bitwarden_collection_id - ): - collection_id = self.values[parameter.bitwarden_collection_id] - else: - collection_id = parameter.bitwarden_collection_id - - try: - secret_credentials = await BitwardenService.get_secret_value_from_url( - client_id, - client_secret, - master_password, - organization.bw_organization_id, - organization.bw_collection_ids, - url, - collection_id=collection_id, - ) - if secret_credentials: - self.secrets[BitwardenConstants.BW_ORGANIZATION_ID] = organization.bw_organization_id - self.secrets[BitwardenConstants.BW_COLLECTION_IDS] = organization.bw_collection_ids - self.secrets[BitwardenConstants.URL] = url - self.secrets[BitwardenConstants.CLIENT_SECRET] = client_secret - self.secrets[BitwardenConstants.CLIENT_ID] = client_id - self.secrets[BitwardenConstants.MASTER_PASSWORD] = master_password - self.secrets[BitwardenConstants.BW_COLLECTION_ID] = parameter.bitwarden_collection_id - - random_secret_id = self.generate_random_secret_id() - # username secret - username_secret_id = f"{random_secret_id}_username" - self.secrets[username_secret_id] = secret_credentials[BitwardenConstants.USERNAME] - # password secret - password_secret_id = f"{random_secret_id}_password" - self.secrets[password_secret_id] = secret_credentials[BitwardenConstants.PASSWORD] - self.values[parameter.key] = { - "username": username_secret_id, - "password": password_secret_id, - } - - if BitwardenConstants.TOTP in secret_credentials and secret_credentials[BitwardenConstants.TOTP]: - totp_secret_id = f"{random_secret_id}_totp" - self.secrets[totp_secret_id] = BitwardenConstants.TOTP - totp_secret_value = self.totp_secret_value_key(totp_secret_id) - self.secrets[totp_secret_value] = secret_credentials[BitwardenConstants.TOTP] - self.values[parameter.key]["totp"] = totp_secret_id - - except BitwardenBaseError as e: - LOG.error(f"Failed to get secret from Bitwarden. Error: {e}") - raise e - elif parameter.parameter_type == ParameterType.BITWARDEN_SENSITIVE_INFORMATION: - try: - # Get the Bitwarden login credentials from AWS secrets - client_id = settings.BITWARDEN_CLIENT_ID or await aws_client.get_secret( - parameter.bitwarden_client_id_aws_secret_key - ) - client_secret = settings.BITWARDEN_CLIENT_SECRET or await aws_client.get_secret( - parameter.bitwarden_client_secret_aws_secret_key - ) - master_password = settings.BITWARDEN_MASTER_PASSWORD or await aws_client.get_secret( - parameter.bitwarden_master_password_aws_secret_key - ) - except Exception as e: - LOG.error(f"Failed to get Bitwarden login credentials from AWS secrets. Error: {e}") - raise e - - bitwarden_identity_key = parameter.bitwarden_identity_key - if self.has_parameter(parameter.bitwarden_identity_key) and self.has_value( - parameter.bitwarden_identity_key - ): - bitwarden_identity_key = self.values[parameter.bitwarden_identity_key] - - collection_id = parameter.bitwarden_collection_id - if self.has_parameter(parameter.bitwarden_collection_id) and self.has_value( - parameter.bitwarden_collection_id - ): - collection_id = self.values[parameter.bitwarden_collection_id] - - try: - sensitive_values = await BitwardenService.get_sensitive_information_from_identity( - client_id, - client_secret, - master_password, - organization.bw_organization_id, - organization.bw_collection_ids, - collection_id, - bitwarden_identity_key, - parameter.bitwarden_identity_fields, - ) - if sensitive_values: - self.secrets[BitwardenConstants.BW_ORGANIZATION_ID] = organization.bw_organization_id - self.secrets[BitwardenConstants.BW_COLLECTION_IDS] = organization.bw_collection_ids - self.secrets[BitwardenConstants.IDENTITY_KEY] = bitwarden_identity_key - self.secrets[BitwardenConstants.CLIENT_SECRET] = client_secret - self.secrets[BitwardenConstants.CLIENT_ID] = client_id - self.secrets[BitwardenConstants.MASTER_PASSWORD] = master_password - self.secrets[BitwardenConstants.BW_COLLECTION_ID] = collection_id - - self.values[parameter.key] = {} - for key, value in sensitive_values.items(): - random_secret_id = self.generate_random_secret_id() - secret_id = f"{random_secret_id}_{key}" - self.secrets[secret_id] = value - self.values[parameter.key][key] = secret_id - - except BitwardenBaseError as e: - LOG.error(f"Failed to get sensitive information from Bitwarden. Error: {e}") - raise e - elif parameter.parameter_type == ParameterType.BITWARDEN_CREDIT_CARD_DATA: - try: - # Get the Bitwarden login credentials from AWS secrets - client_id = settings.BITWARDEN_CLIENT_ID or await aws_client.get_secret( - parameter.bitwarden_client_id_aws_secret_key - ) - client_secret = settings.BITWARDEN_CLIENT_SECRET or await aws_client.get_secret( - parameter.bitwarden_client_secret_aws_secret_key - ) - master_password = settings.BITWARDEN_MASTER_PASSWORD or await aws_client.get_secret( - parameter.bitwarden_master_password_aws_secret_key - ) - except Exception as e: - LOG.error(f"Failed to get Bitwarden login credentials from AWS secrets. Error: {e}") - raise e - - if self.has_parameter(parameter.bitwarden_item_id) and self.has_value(parameter.bitwarden_item_id): - item_id = self.values[parameter.bitwarden_item_id] - else: - item_id = parameter.bitwarden_item_id - - if self.has_parameter(parameter.bitwarden_collection_id) and self.has_value( - parameter.bitwarden_collection_id - ): - collection_id = self.values[parameter.bitwarden_collection_id] - else: - collection_id = parameter.bitwarden_collection_id - - try: - credit_card_data = await BitwardenService.get_credit_card_data( - client_id, - client_secret, - master_password, - organization.bw_organization_id, - organization.bw_collection_ids, - collection_id, - item_id, - ) - if not credit_card_data: - raise ValueError("Credit card data not found in Bitwarden") - - self.secrets[BitwardenConstants.CLIENT_ID] = client_id - self.secrets[BitwardenConstants.CLIENT_SECRET] = client_secret - self.secrets[BitwardenConstants.MASTER_PASSWORD] = master_password - self.secrets[BitwardenConstants.ITEM_ID] = item_id - - fields_to_obfuscate = { - BitwardenConstants.CREDIT_CARD_NUMBER: "card_number", - BitwardenConstants.CREDIT_CARD_CVV: "card_cvv", - } - - pass_through_fields = { - BitwardenConstants.CREDIT_CARD_HOLDER_NAME: "card_holder_name", - BitwardenConstants.CREDIT_CARD_EXPIRATION_MONTH: "card_exp_month", - BitwardenConstants.CREDIT_CARD_EXPIRATION_YEAR: "card_exp_year", - BitwardenConstants.CREDIT_CARD_BRAND: "card_brand", - } - - parameter_value: dict[str, Any] = { - field_name: credit_card_data[field_key] for field_key, field_name in pass_through_fields.items() - } - - for data_key, secret_suffix in fields_to_obfuscate.items(): - random_secret_id = self.generate_random_secret_id() - secret_id = f"{random_secret_id}_{secret_suffix}" - self.secrets[secret_id] = credit_card_data[data_key] - parameter_value[secret_suffix] = secret_id - - self.values[parameter.key] = parameter_value - - except BitwardenBaseError as e: - LOG.error(f"Failed to get credit card data from Bitwarden. Error: {e}") - raise e elif isinstance(parameter, ContextParameter): if isinstance(parameter.source, WorkflowParameter): # TODO (kerem): set this while initializing the context manager @@ -494,6 +543,21 @@ class WorkflowRunContext: raise ValueError( f"Output parameter {parameter.key} should have already been set through workflow run context init" ) + elif isinstance( + parameter, + ( + AWSSecretParameter, + BitwardenLoginCredentialParameter, + BitwardenCreditCardDataParameter, + BitwardenSensitiveInformationParameter, + ), + ): + LOG.error( + f"SecretParameter {parameter.key} should have already been set through workflow run context init" + ) + raise ValueError( + f"SecretParameter {parameter.key} should have already been set through workflow run context init" + ) self.parameters[parameter.key] = parameter await self.register_parameter_value(aws_client, parameter, organization) @@ -519,15 +583,27 @@ class WorkflowContextManager: LOG.error(f"WorkflowRunContext not initialized for workflow run {workflow_run_id}") raise WorkflowRunContextNotInitialized(workflow_run_id=workflow_run_id) - def initialize_workflow_run_context( + async def initialize_workflow_run_context( self, + organization: Organization, workflow_run_id: str, workflow_parameter_tuples: list[tuple[WorkflowParameter, "WorkflowRunParameter"]], workflow_output_parameters: list[OutputParameter], context_parameters: list[ContextParameter], + secret_parameters: list[ + AWSSecretParameter + | BitwardenLoginCredentialParameter + | BitwardenCreditCardDataParameter + | BitwardenSensitiveInformationParameter + ], ) -> WorkflowRunContext: - workflow_run_context = WorkflowRunContext( - workflow_parameter_tuples, workflow_output_parameters, context_parameters + workflow_run_context = await WorkflowRunContext.init( + self.aws_client, + organization, + workflow_parameter_tuples, + workflow_output_parameters, + context_parameters, + secret_parameters, ) self.workflow_run_contexts[workflow_run_id] = workflow_run_context return workflow_run_context diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 1493814c..ae0ef5f6 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -60,6 +60,9 @@ from skyvern.forge.sdk.workflow.models.block import ( from skyvern.forge.sdk.workflow.models.parameter import ( PARAMETER_TYPE, AWSSecretParameter, + BitwardenCreditCardDataParameter, + BitwardenLoginCredentialParameter, + BitwardenSensitiveInformationParameter, ContextParameter, OutputParameter, Parameter, @@ -219,15 +222,56 @@ class WorkflowService: for parameter in workflow.workflow_definition.parameters if isinstance(parameter, ContextParameter) ] + + secret_parameters = [ + parameter + for parameter in workflow.workflow_definition.parameters + if isinstance( + parameter, + ( + AWSSecretParameter, + BitwardenLoginCredentialParameter, + BitwardenCreditCardDataParameter, + BitwardenSensitiveInformationParameter, + ), + ) + ] + # Get all tuples wp_wps_tuples = await self.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run.workflow_run_id) workflow_output_parameters = await self.get_workflow_output_parameters(workflow_id=workflow.workflow_id) - app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context( - workflow_run_id, - wp_wps_tuples, - workflow_output_parameters, - context_parameters, - ) + try: + await app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context( + organization, + workflow_run_id, + wp_wps_tuples, + workflow_output_parameters, + context_parameters, + secret_parameters, + ) + except Exception as e: + LOG.exception( + f"Error while initializing workflow run context for workflow run {workflow_run.workflow_run_id}", + workflow_run_id=workflow_run.workflow_run_id, + ) + + exception_message = f"Unexpected error: {str(e)}" + if isinstance(e, SkyvernException): + exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}" + + failure_reason = f"Failed to initialize workflow run context. failure reason: {exception_message}" + await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) + await self.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + browser_session_id=browser_session_id, + close_browser_on_completion=browser_session_id is None, + ) + return workflow_run + # Execute workflow blocks blocks = workflow.workflow_definition.blocks blocks_cnt = len(blocks)