diff --git a/skyvern/core/script_generations/real_skyvern_page_ai.py b/skyvern/core/script_generations/real_skyvern_page_ai.py index 632803ee..0c045124 100644 --- a/skyvern/core/script_generations/real_skyvern_page_ai.py +++ b/skyvern/core/script_generations/real_skyvern_page_ai.py @@ -19,7 +19,7 @@ from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.schemas.totp_codes import OTPType from skyvern.schemas.workflows import BlockStatus from skyvern.services import script_service -from skyvern.services.otp_service import poll_otp_value +from skyvern.services.otp_service import poll_otp_value, try_generate_totp_from_credential from skyvern.utils.prompt_engine import load_prompt_with_elements from skyvern.webeye.actions import handler_utils from skyvern.webeye.actions.actions import ( @@ -245,7 +245,10 @@ class RealSkyvernPageAi(SkyvernPageAi): if value and isinstance(data, dict) and "value" not in data: data["value"] = value - if (totp_identifier or totp_url) and context and organization_id and task_id: + # Try credential TOTP first (highest priority, doesn't need totp_url/totp_identifier) + otp_value = try_generate_totp_from_credential(workflow_run_id) + # Fall back to webhook/totp_identifier + if not otp_value and (totp_identifier or totp_url) and context and organization_id and task_id: if totp_identifier: totp_identifier = _render_template_with_label(totp_identifier, label=self.current_label) if totp_url: @@ -257,16 +260,16 @@ class RealSkyvernPageAi(SkyvernPageAi): totp_identifier=totp_identifier, totp_verification_url=totp_url, ) - if otp_value and otp_value.get_otp_type() == OTPType.TOTP: - verification_code = otp_value.value - if isinstance(data, dict) and SPECIAL_FIELD_VERIFICATION_CODE not in data: - data[SPECIAL_FIELD_VERIFICATION_CODE] = verification_code - elif isinstance(data, str) and SPECIAL_FIELD_VERIFICATION_CODE not in data: - data = f"{data}\n" + str({SPECIAL_FIELD_VERIFICATION_CODE: verification_code}) - elif isinstance(data, list): - data.append({SPECIAL_FIELD_VERIFICATION_CODE: verification_code}) - else: - data = {SPECIAL_FIELD_VERIFICATION_CODE: verification_code} + if otp_value and otp_value.get_otp_type() == OTPType.TOTP: + verification_code = otp_value.value + if isinstance(data, dict) and SPECIAL_FIELD_VERIFICATION_CODE not in data: + data[SPECIAL_FIELD_VERIFICATION_CODE] = verification_code + elif isinstance(data, str) and SPECIAL_FIELD_VERIFICATION_CODE not in data: + data = f"{data}\n" + str({SPECIAL_FIELD_VERIFICATION_CODE: verification_code}) + elif isinstance(data, list): + data.append({SPECIAL_FIELD_VERIFICATION_CODE: verification_code}) + else: + data = {SPECIAL_FIELD_VERIFICATION_CODE: verification_code} await self._refresh_scraped_page(take_screenshots=False) diff --git a/skyvern/core/script_generations/script_skyvern_page.py b/skyvern/core/script_generations/script_skyvern_page.py index 599365ec..55876c2e 100644 --- a/skyvern/core/script_generations/script_skyvern_page.py +++ b/skyvern/core/script_generations/script_skyvern_page.py @@ -27,7 +27,7 @@ from skyvern.forge.sdk.api.files import ( from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.schemas.steps import AgentStepOutput -from skyvern.services.otp_service import poll_otp_value +from skyvern.services.otp_service import poll_otp_value, try_generate_totp_from_credential from skyvern.utils.url_validators import prepend_scheme_and_validate_url from skyvern.webeye.actions.action_types import ActionType from skyvern.webeye.actions.actions import ( @@ -615,16 +615,21 @@ class ScriptSkyvernPage(SkyvernPage): if is_totp_value: value = generate_totp_value(context.workflow_run_id, original_value) elif (totp_identifier or totp_url) and organization_id: - totp_value = await poll_otp_value( - organization_id=organization_id, - task_id=task_id, - workflow_run_id=workflow_run_id, - totp_verification_url=totp_url, - totp_identifier=totp_identifier, - ) - if totp_value: - # use the totp verification code - value = totp_value.value + # Try credential TOTP first (higher priority than webhook/totp_identifier) + credential_totp = try_generate_totp_from_credential(workflow_run_id) + if credential_totp: + value = credential_totp.value + else: + totp_value = await poll_otp_value( + organization_id=organization_id, + task_id=task_id, + workflow_run_id=workflow_run_id, + totp_verification_url=totp_url, + totp_identifier=totp_identifier, + ) + if totp_value: + # use the totp verification code + value = totp_value.value return value diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index eca8c7c4..649e5f8d 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -104,7 +104,7 @@ from skyvern.schemas.runs import CUA_ENGINES, RunEngine from skyvern.schemas.steps import AgentStepOutput from skyvern.services import run_service, service_utils from skyvern.services.action_service import get_action_history -from skyvern.services.otp_service import poll_otp_value +from skyvern.services.otp_service import poll_otp_value, try_generate_totp_from_credential from skyvern.utils.image_resizer import Resolution from skyvern.utils.prompt_engine import MaxStepsReasonResponse, load_prompt_with_elements from skyvern.webeye.actions.action_types import ActionType @@ -4526,21 +4526,25 @@ class ForgeAgent: should_enter_verification_code = json_response.get("should_enter_verification_code") if place_to_enter_verification_code and should_enter_verification_code and task.organization_id: LOG.info("Need verification code") - workflow_id = workflow_permanent_id = None - if task.workflow_run_id: - workflow_run = await app.DATABASE.get_workflow_run(task.workflow_run_id) - if workflow_run: - workflow_id = workflow_run.workflow_id - workflow_permanent_id = workflow_run.workflow_permanent_id - otp_value = await poll_otp_value( - organization_id=task.organization_id, - task_id=task.task_id, - workflow_id=workflow_id, - workflow_run_id=task.workflow_run_id, - workflow_permanent_id=workflow_permanent_id, - totp_verification_url=task.totp_verification_url, - totp_identifier=task.totp_identifier, - ) + # Try credential TOTP first (highest priority, doesn't need totp_url/totp_identifier) + otp_value = try_generate_totp_from_credential(task.workflow_run_id) + # Fall back to webhook/totp_identifier + if not otp_value and (task.totp_verification_url or task.totp_identifier): + workflow_id = workflow_permanent_id = None + if task.workflow_run_id: + workflow_run = await app.DATABASE.get_workflow_run(task.workflow_run_id) + if workflow_run: + workflow_id = workflow_run.workflow_id + workflow_permanent_id = workflow_run.workflow_permanent_id + otp_value = await poll_otp_value( + organization_id=task.organization_id, + task_id=task.task_id, + workflow_id=workflow_id, + workflow_run_id=task.workflow_run_id, + workflow_permanent_id=workflow_permanent_id, + totp_verification_url=task.totp_verification_url, + totp_identifier=task.totp_identifier, + ) if not otp_value or otp_value.get_otp_type() != OTPType.TOTP: return json_response diff --git a/skyvern/services/otp_service.py b/skyvern/services/otp_service.py index 0d5c33fa..dee5d98a 100644 --- a/skyvern/services/otp_service.py +++ b/skyvern/services/otp_service.py @@ -1,6 +1,7 @@ import asyncio from datetime import datetime, timedelta +import pyotp import structlog from pydantic import BaseModel, Field @@ -57,6 +58,47 @@ async def parse_otp_login( return None +def try_generate_totp_from_credential(workflow_run_id: str | None) -> OTPValue | None: + """Try to generate a TOTP code from a credential secret stored in the workflow run context. + + Scans workflow_run_context.values for credential entries with a "totp" key + (e.g. Bitwarden, 1Password, Azure Key Vault credentials) and generates a + TOTP code using pyotp. This should be checked BEFORE poll_otp_value so that + credential-based TOTP takes priority over webhook (totp_url) and totp_identifier. + """ + if not workflow_run_id: + return None + + workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id) + if not workflow_run_context: + return None + + for key, value in workflow_run_context.values.items(): + if isinstance(value, dict) and "totp" in value: + totp_secret_id = value.get("totp") + if not totp_secret_id or not isinstance(totp_secret_id, str): + continue + totp_secret_key = workflow_run_context.totp_secret_value_key(totp_secret_id) + totp_secret = workflow_run_context.get_original_secret_value_or_none(totp_secret_key) + if totp_secret: + try: + code = pyotp.TOTP(totp_secret).now() + LOG.info( + "Generated TOTP from credential secret", + workflow_run_id=workflow_run_id, + credential_key=key, + ) + return OTPValue(value=code, type=OTPType.TOTP) + except Exception: + LOG.warning( + "Failed to generate TOTP from credential secret", + workflow_run_id=workflow_run_id, + credential_key=key, + exc_info=True, + ) + return None + + async def poll_otp_value( organization_id: str, task_id: str | None = None, diff --git a/skyvern/webeye/actions/parse_actions.py b/skyvern/webeye/actions/parse_actions.py index 40785d9b..e53e7021 100644 --- a/skyvern/webeye/actions/parse_actions.py +++ b/skyvern/webeye/actions/parse_actions.py @@ -14,7 +14,7 @@ from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.schemas.totp_codes import OTPType -from skyvern.services.otp_service import poll_otp_value +from skyvern.services.otp_service import poll_otp_value, try_generate_totp_from_credential from skyvern.utils.image_resizer import Resolution, scale_coordinates from skyvern.webeye.actions.action_types import ActionType from skyvern.webeye.actions.actions import ( @@ -913,7 +913,10 @@ async def generate_cua_fallback_actions( ) elif skyvern_action_type == "get_verification_code": - if (task.totp_verification_url or task.totp_identifier) and task.organization_id: + # Try credential TOTP first (highest priority, doesn't need totp_url/totp_identifier) + otp_value = try_generate_totp_from_credential(task.workflow_run_id) + # Fall back to webhook/totp_identifier + if not otp_value and (task.totp_verification_url or task.totp_identifier) and task.organization_id: LOG.info( "Getting verification code for CUA", task_id=task.task_id, @@ -930,29 +933,21 @@ async def generate_cua_fallback_actions( totp_verification_url=task.totp_verification_url, totp_identifier=task.totp_identifier, ) - if not otp_value or otp_value.get_otp_type() != OTPType.TOTP: - raise NoTOTPVerificationCodeFound() - verification_code = otp_value.value - reasoning = reasoning or f"Received verification code: {verification_code}" - action = VerificationCodeAction( - verification_code=verification_code, - reasoning=reasoning, - intention=reasoning, - ) except NoTOTPVerificationCodeFound: reasoning_suffix = "No verification code found" reasoning = f"{reasoning}. {reasoning_suffix}" if reasoning else reasoning_suffix - action = TerminateAction( - reasoning=reasoning, - intention=reasoning, - ) except FailedToGetTOTPVerificationCode as e: reasoning_suffix = f"Failed to get verification code. Reason: {e.reason}" reasoning = f"{reasoning}. {reasoning_suffix}" if reasoning else reasoning_suffix - action = TerminateAction( - reasoning=reasoning, - intention=reasoning, - ) + + if otp_value and otp_value.get_otp_type() == OTPType.TOTP: + verification_code = otp_value.value + reasoning = reasoning or f"Received verification code: {verification_code}" + action = VerificationCodeAction( + verification_code=verification_code, + reasoning=reasoning, + intention=reasoning, + ) else: action = TerminateAction( reasoning=reasoning, diff --git a/tests/unit/test_credential_totp_priority.py b/tests/unit/test_credential_totp_priority.py new file mode 100644 index 00000000..caf0dabc --- /dev/null +++ b/tests/unit/test_credential_totp_priority.py @@ -0,0 +1,167 @@ +"""Tests for credential TOTP priority over webhook (totp_url) and totp_identifier. + +Verifies that try_generate_totp_from_credential() correctly generates TOTP codes +from credential secrets stored in workflow run context, and that callers check +credential TOTP before falling back to poll_otp_value. +""" + +from unittest.mock import MagicMock, patch + +import pyotp + +from skyvern.forge.sdk.schemas.totp_codes import OTPType +from skyvern.services.otp_service import OTPValue, try_generate_totp_from_credential + +# A valid base32 TOTP secret for testing +TEST_TOTP_SECRET = "JBSWY3DPEHPK3PXP" + + +def _make_workflow_run_context( + values: dict | None = None, + secrets: dict | None = None, +) -> MagicMock: + """Create a mock WorkflowRunContext with the given values and secrets.""" + ctx = MagicMock() + ctx.values = values or {} + ctx.secrets = secrets or {} + + def totp_secret_value_key(totp_secret_id: str) -> str: + return f"{totp_secret_id}_value" + + ctx.totp_secret_value_key = totp_secret_value_key + + def get_original_secret_value_or_none(secret_key: str) -> str | None: + return ctx.secrets.get(secret_key) + + ctx.get_original_secret_value_or_none = get_original_secret_value_or_none + return ctx + + +class TestTryGenerateTotpFromCredential: + """Tests for the try_generate_totp_from_credential helper.""" + + def test_returns_none_when_workflow_run_id_is_none(self) -> None: + result = try_generate_totp_from_credential(None) + assert result is None + + def test_returns_none_when_no_workflow_run_context(self) -> None: + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = None + result = try_generate_totp_from_credential("wfr_123") + assert result is None + + def test_returns_none_when_no_credential_values(self) -> None: + ctx = _make_workflow_run_context(values={"some_param": "plain_string"}) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + assert result is None + + def test_returns_none_when_dict_value_has_no_totp_key(self) -> None: + ctx = _make_workflow_run_context( + values={"cred_param": {"username": "user", "password": "pass"}}, + ) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + assert result is None + + def test_returns_none_when_totp_secret_id_is_empty(self) -> None: + ctx = _make_workflow_run_context( + values={"cred_param": {"totp": ""}}, + ) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + assert result is None + + def test_returns_none_when_totp_secret_not_in_secrets(self) -> None: + """When the secret ID doesn't resolve to an actual secret value.""" + ctx = _make_workflow_run_context( + values={"cred_param": {"totp": "secret_id_123"}}, + secrets={}, # no secret stored + ) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + assert result is None + + def test_generates_totp_from_credential_secret(self) -> None: + """Happy path: credential with valid TOTP secret generates a code.""" + ctx = _make_workflow_run_context( + values={"cred_param": {"username": "user", "password": "pass", "totp": "totp_ref_1"}}, + secrets={"totp_ref_1_value": TEST_TOTP_SECRET}, + ) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + + assert result is not None + assert isinstance(result, OTPValue) + assert result.type == OTPType.TOTP + # Verify the code matches what pyotp would generate + expected_code = pyotp.TOTP(TEST_TOTP_SECRET).now() + assert result.value == expected_code + + def test_returns_first_matching_credential(self) -> None: + """When multiple credentials have TOTP, returns the first one found.""" + ctx = _make_workflow_run_context( + values={ + "cred_a": {"totp": "ref_a"}, + "cred_b": {"totp": "ref_b"}, + }, + secrets={ + "ref_a_value": TEST_TOTP_SECRET, + "ref_b_value": "ORSXG5DJNZTQ====", + }, + ) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + + assert result is not None + assert result.value == pyotp.TOTP(TEST_TOTP_SECRET).now() + + def test_skips_invalid_secret_and_continues(self) -> None: + """If one credential has an invalid TOTP secret, skip it and try the next.""" + ctx = _make_workflow_run_context( + values={ + "cred_bad": {"totp": "ref_bad"}, + "cred_good": {"totp": "ref_good"}, + }, + secrets={ + "ref_bad_value": "NOT_A_VALID_BASE32!!!", + "ref_good_value": TEST_TOTP_SECRET, + }, + ) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + + assert result is not None + assert result.value == pyotp.TOTP(TEST_TOTP_SECRET).now() + + def test_skips_non_string_totp_id(self) -> None: + """If the totp value is not a string (e.g., int or None), skip it.""" + ctx = _make_workflow_run_context( + values={"cred_param": {"totp": 12345}}, + ) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + assert result is None + + def test_skips_non_dict_values(self) -> None: + """Non-dict values in the context should be ignored.""" + ctx = _make_workflow_run_context( + values={ + "string_param": "hello", + "int_param": 42, + "list_param": [1, 2, 3], + "none_param": None, + }, + ) + with patch("skyvern.services.otp_service.app") as mock_app: + mock_app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context.return_value = ctx + result = try_generate_totp_from_credential("wfr_123") + assert result is None