From b48bf707c36f84fab0985cf6dc0735d870329f80 Mon Sep 17 00:00:00 2001 From: Marc Kelechava Date: Wed, 18 Feb 2026 11:45:07 -0800 Subject: [PATCH] add CLI parity commands for credential, block, and browser (#4793) --- skyvern/cli/block.py | 114 ++++++++++++ skyvern/cli/commands/__init__.py | 14 +- skyvern/cli/commands/browser.py | 127 +++++++++++++ skyvern/cli/credential.py | 110 +++++++++++ skyvern/cli/credentials.py | 4 +- tests/unit/test_cli_commands.py | 308 +++++++++++++++++++++++++++++++ 6 files changed, 675 insertions(+), 2 deletions(-) create mode 100644 skyvern/cli/block.py create mode 100644 skyvern/cli/credential.py diff --git a/skyvern/cli/block.py b/skyvern/cli/block.py new file mode 100644 index 00000000..37e7384c --- /dev/null +++ b/skyvern/cli/block.py @@ -0,0 +1,114 @@ +"""Workflow block CLI commands with MCP-parity output and validation.""" + +from __future__ import annotations + +import asyncio +import json +import sys +from pathlib import Path +from typing import Any, Callable, Coroutine + +import typer +from dotenv import load_dotenv + +from skyvern.config import settings +from skyvern.utils.env_paths import resolve_backend_env_path + +from .commands._output import output, output_error +from .mcp_tools.blocks import skyvern_block_schema as tool_block_schema +from .mcp_tools.blocks import skyvern_block_validate as tool_block_validate + +block_app = typer.Typer(help="Workflow block schema and validation commands.", no_args_is_help=True) + + +def _emit_tool_result(result: dict[str, Any], *, json_output: bool) -> None: + if json_output: + json.dump(result, sys.stdout, indent=2, default=str) + sys.stdout.write("\n") + if not result.get("ok", False): + raise SystemExit(1) + return + + if result.get("ok", False): + output(result.get("data"), action=str(result.get("action", "")), json_mode=False) + return + + err = result.get("error") or {} + output_error(str(err.get("message") or "Unknown error"), hint=str(err.get("hint") or ""), json_mode=False) + + +def _run_tool( + runner: Callable[[], Coroutine[Any, Any, dict[str, Any]]], + *, + json_output: bool, + hint_on_exception: str, +) -> None: + try: + result: dict[str, Any] = asyncio.run(runner()) + _emit_tool_result(result, json_output=json_output) + except typer.BadParameter: + raise + except Exception as e: + output_error(str(e), hint=hint_on_exception, json_mode=json_output) + + +def _resolve_inline_or_file(value: str, *, param_name: str) -> str: + if not value.startswith("@"): + return value + + file_path = value[1:] + if not file_path: + raise typer.BadParameter(f"{param_name} file path cannot be empty after '@'.") + + path = Path(file_path).expanduser() + try: + return path.read_text(encoding="utf-8") + except OSError as e: + raise typer.BadParameter(f"Unable to read {param_name} file '{path}': {e}") from e + + +@block_app.callback() +def block_callback( + api_key: str | None = typer.Option( + None, + "--api-key", + help="Skyvern API key", + envvar="SKYVERN_API_KEY", + ), +) -> None: + """Load environment and optional API key override.""" + load_dotenv(resolve_backend_env_path()) + if api_key: + settings.SKYVERN_API_KEY = api_key + + +@block_app.command("schema") +def block_schema( + block_type: str | None = typer.Option( + None, + "--type", + "--block-type", + help="Block type to inspect (omit to list all available types).", + ), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Get schema for a specific block type or list all block types.""" + + async def _run() -> dict[str, Any]: + return await tool_block_schema(block_type=block_type) + + _run_tool(_run, json_output=json_output, hint_on_exception="Check block type input.") + + +@block_app.command("validate") +def block_validate( + block_json: str = typer.Option(..., "--block-json", help="Block JSON string or @file path."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Validate a single workflow block definition.""" + + async def _run() -> dict[str, Any]: + resolved_json = _resolve_inline_or_file(block_json, param_name="block_json") + return await tool_block_validate(block_json=resolved_json) + + _run_tool(_run, json_output=json_output, hint_on_exception="Check block JSON syntax and required fields.") diff --git a/skyvern/cli/commands/__init__.py b/skyvern/cli/commands/__init__.py index 97ede4df..f5182828 100644 --- a/skyvern/cli/commands/__init__.py +++ b/skyvern/cli/commands/__init__.py @@ -6,6 +6,8 @@ from dotenv import load_dotenv from skyvern.forge.sdk.forge_log import setup_logger as _setup_logger from skyvern.utils.env_paths import resolve_backend_env_path +from ..block import block_app +from ..credential import credential_app from ..credentials import credentials_app from ..docs import docs_app from ..init_command import init_browser, init_env @@ -51,9 +53,19 @@ cli_app.add_typer( name="run", help="Run Skyvern services like the API server, UI, and MCP.", ) +cli_app.add_typer(block_app, name="block", help="Inspect and validate workflow block schemas.") +cli_app.add_typer( + credential_app, + name="credential", + help="MCP-parity credential commands (list/get/delete).", +) cli_app.add_typer(workflow_app, name="workflow", help="Workflow management commands.") cli_app.add_typer(tasks_app, name="tasks", help="Task management commands.") -cli_app.add_typer(credentials_app, name="credentials", help="Manage stored credentials for secure login.") +cli_app.add_typer( + credentials_app, + name="credentials", + help="Secure credential management (use this for interactive `add`).", +) cli_app.add_typer(docs_app, name="docs", help="Open Skyvern documentation.") cli_app.add_typer(status_app, name="status", help="Check if Skyvern services are running.") cli_app.add_typer(stop_app, name="stop", help="Stop Skyvern services.") diff --git a/skyvern/cli/commands/browser.py b/skyvern/cli/commands/browser.py index a4e2fbd7..125eda62 100644 --- a/skyvern/cli/commands/browser.py +++ b/skyvern/cli/commands/browser.py @@ -1,6 +1,8 @@ from __future__ import annotations import asyncio +import json +import sys from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path @@ -25,6 +27,8 @@ from skyvern.cli.core.guards import ( validate_wait_until, ) from skyvern.cli.core.session_ops import do_session_close, do_session_create, do_session_list +from skyvern.cli.mcp_tools.browser import skyvern_login as tool_login +from skyvern.cli.mcp_tools.browser import skyvern_run_task as tool_run_task browser_app = typer.Typer(help="Browser automation commands.", no_args_is_help=True) session_app = typer.Typer(help="Manage browser sessions.", no_args_is_help=True) @@ -92,6 +96,22 @@ def _validate_wait_state(state: str) -> None: raise GuardError(f"Invalid state: {state}", "Use visible, hidden, attached, or detached") +def _emit_tool_result(result: dict[str, Any], *, json_output: bool, action: str) -> None: + if json_output: + json.dump(result, sys.stdout, indent=2, default=str) + sys.stdout.write("\n") + if not result.get("ok", False): + raise SystemExit(1) + return + + if result.get("ok", False): + output(result.get("data"), action=action, json_mode=False) + return + + err = result.get("error") or {} + output_error(str(err.get("message") or "Unknown error"), hint=str(err.get("hint") or ""), json_mode=False) + + # --------------------------------------------------------------------------- # Session commands # --------------------------------------------------------------------------- @@ -791,3 +811,110 @@ def validate( raise except Exception as e: output_error(str(e), hint="Check the page state and validation prompt.", json_mode=json_output) + + +@browser_app.command("run-task") +def run_task( + prompt: str = typer.Option(..., help="Natural language description of the task to automate."), + session: str | None = typer.Option(None, help="Browser session ID."), + cdp: str | None = typer.Option(None, "--cdp", help="CDP WebSocket URL."), + url: str | None = typer.Option(None, help="URL to navigate to before running."), + data_extraction_schema: str | None = typer.Option( + None, + "--schema", + "--data-extraction-schema", + help="JSON Schema string defining what data to extract.", + ), + max_steps: int | None = typer.Option(None, "--max-steps", min=1, help="Maximum number of agent steps."), + timeout_seconds: int = typer.Option(180, "--timeout", min=10, max=1800, help="Timeout in seconds."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Run a quick one-off browser automation task.""" + + async def _run() -> dict[str, Any]: + connection = _resolve_connection(session, cdp) + return await tool_run_task( + prompt=prompt, + session_id=connection.session_id if connection.mode == "cloud" else None, + cdp_url=connection.cdp_url if connection.mode == "cdp" else None, + url=url, + data_extraction_schema=data_extraction_schema, + max_steps=max_steps, + timeout_seconds=timeout_seconds, + ) + + try: + result = asyncio.run(_run()) + _emit_tool_result(result, json_output=json_output, action="run_task") + except typer.BadParameter: + raise + except Exception as e: + output_error(str(e), hint="Check the prompt, active connection, and timeout settings.", json_mode=json_output) + + +@browser_app.command("login") +def login( + credential_type: str = typer.Option( + "skyvern", + "--credential-type", + help="Credential provider: skyvern, bitwarden, 1password, or azure_vault.", + ), + session: str | None = typer.Option(None, help="Browser session ID."), + cdp: str | None = typer.Option(None, "--cdp", help="CDP WebSocket URL."), + url: str | None = typer.Option(None, help="Login page URL."), + credential_id: str | None = typer.Option(None, "--credential-id", help="Skyvern credential ID for type=skyvern."), + bitwarden_item_id: str | None = typer.Option(None, "--bitwarden-item-id", help="Bitwarden item ID."), + bitwarden_collection_id: str | None = typer.Option( + None, "--bitwarden-collection-id", help="Bitwarden collection ID." + ), + onepassword_vault_id: str | None = typer.Option(None, "--onepassword-vault-id", help="1Password vault ID."), + onepassword_item_id: str | None = typer.Option(None, "--onepassword-item-id", help="1Password item ID."), + azure_vault_name: str | None = typer.Option(None, "--azure-vault-name", help="Azure Vault name."), + azure_vault_username_key: str | None = typer.Option( + None, "--azure-vault-username-key", help="Azure Vault username key." + ), + azure_vault_password_key: str | None = typer.Option( + None, "--azure-vault-password-key", help="Azure Vault password key." + ), + azure_vault_totp_secret_key: str | None = typer.Option( + None, "--azure-vault-totp-secret-key", help="Azure Vault TOTP secret key." + ), + prompt: str | None = typer.Option(None, help="Additional login instructions."), + totp_identifier: str | None = typer.Option(None, "--totp-identifier", help="TOTP identifier for 2FA."), + totp_url: str | None = typer.Option(None, "--totp-url", help="URL to fetch TOTP codes."), + timeout_seconds: int = typer.Option(180, "--timeout", min=10, max=600, help="Timeout in seconds."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Log into a site using stored credentials from a supported provider.""" + + async def _run() -> dict[str, Any]: + connection = _resolve_connection(session, cdp) + return await tool_login( + credential_type=credential_type, + session_id=connection.session_id if connection.mode == "cloud" else None, + cdp_url=connection.cdp_url if connection.mode == "cdp" else None, + url=url, + credential_id=credential_id, + bitwarden_item_id=bitwarden_item_id, + bitwarden_collection_id=bitwarden_collection_id, + onepassword_vault_id=onepassword_vault_id, + onepassword_item_id=onepassword_item_id, + azure_vault_name=azure_vault_name, + azure_vault_username_key=azure_vault_username_key, + azure_vault_password_key=azure_vault_password_key, + azure_vault_totp_secret_key=azure_vault_totp_secret_key, + prompt=prompt, + totp_identifier=totp_identifier, + totp_url=totp_url, + timeout_seconds=timeout_seconds, + ) + + try: + result = asyncio.run(_run()) + _emit_tool_result(result, json_output=json_output, action="login") + except typer.BadParameter: + raise + except Exception as e: + output_error( + str(e), hint="Check credential inputs, active connection, and timeout settings.", json_mode=json_output + ) diff --git a/skyvern/cli/credential.py b/skyvern/cli/credential.py new file mode 100644 index 00000000..7b67aafc --- /dev/null +++ b/skyvern/cli/credential.py @@ -0,0 +1,110 @@ +"""Credential CLI commands with MCP-parity output and validation.""" + +from __future__ import annotations + +import asyncio +import json +import sys +from typing import Any, Callable, Coroutine + +import typer +from dotenv import load_dotenv + +from skyvern.config import settings +from skyvern.utils.env_paths import resolve_backend_env_path + +from .commands._output import output, output_error +from .mcp_tools.credential import skyvern_credential_delete as tool_credential_delete +from .mcp_tools.credential import skyvern_credential_get as tool_credential_get +from .mcp_tools.credential import skyvern_credential_list as tool_credential_list + +credential_app = typer.Typer( + help="MCP-parity credential commands (list/get/delete). Use `skyvern credentials add` for secure creation.", + no_args_is_help=True, +) + + +def _emit_tool_result(result: dict[str, Any], *, json_output: bool) -> None: + if json_output: + json.dump(result, sys.stdout, indent=2, default=str) + sys.stdout.write("\n") + if not result.get("ok", False): + raise SystemExit(1) + return + + if result.get("ok", False): + output(result.get("data"), action=str(result.get("action", "")), json_mode=False) + return + + err = result.get("error") or {} + output_error(str(err.get("message") or "Unknown error"), hint=str(err.get("hint") or ""), json_mode=False) + + +def _run_tool( + runner: Callable[[], Coroutine[Any, Any, dict[str, Any]]], + *, + json_output: bool, + hint_on_exception: str, +) -> None: + try: + result: dict[str, Any] = asyncio.run(runner()) + _emit_tool_result(result, json_output=json_output) + except typer.BadParameter: + raise + except Exception as e: + output_error(str(e), hint=hint_on_exception, json_mode=json_output) + + +@credential_app.callback() +def credential_callback( + api_key: str | None = typer.Option( + None, + "--api-key", + help="Skyvern API key", + envvar="SKYVERN_API_KEY", + ), +) -> None: + """Load environment and optional API key override.""" + load_dotenv(resolve_backend_env_path()) + if api_key: + settings.SKYVERN_API_KEY = api_key + + +@credential_app.command("list") +def credential_list( + page: int = typer.Option(1, "--page", min=1, help="Page number (1-based)."), + page_size: int = typer.Option(10, "--page-size", min=1, max=100, help="Results per page."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """List stored credentials (metadata only).""" + + async def _run() -> dict[str, Any]: + return await tool_credential_list(page=page, page_size=page_size) + + _run_tool(_run, json_output=json_output, hint_on_exception="Check your API key and Skyvern connection.") + + +@credential_app.command("get") +def credential_get( + credential_id: str = typer.Option(..., "--id", "--credential-id", help="Credential ID (starts with cred_)."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Get credential metadata by ID.""" + + async def _run() -> dict[str, Any]: + return await tool_credential_get(credential_id=credential_id) + + _run_tool(_run, json_output=json_output, hint_on_exception="Check your API key and credential ID.") + + +@credential_app.command("delete") +def credential_delete( + credential_id: str = typer.Option(..., "--id", "--credential-id", help="Credential ID (starts with cred_)."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Delete a credential by ID.""" + + async def _run() -> dict[str, Any]: + return await tool_credential_delete(credential_id=credential_id) + + _run_tool(_run, json_output=json_output, hint_on_exception="Check your API key and credential ID.") diff --git a/skyvern/cli/credentials.py b/skyvern/cli/credentials.py index 179963aa..71a99c1b 100644 --- a/skyvern/cli/credentials.py +++ b/skyvern/cli/credentials.py @@ -22,7 +22,9 @@ from skyvern.utils.env_paths import resolve_backend_env_path from .console import console -credentials_app = typer.Typer(help="Manage stored credentials for secure login.") +credentials_app = typer.Typer( + help="Manage stored credentials for secure login. Use `credential` commands for MCP-parity list/get/delete." +) @credentials_app.callback() diff --git a/tests/unit/test_cli_commands.py b/tests/unit/test_cli_commands.py index cc9c4d80..7803f05b 100644 --- a/tests/unit/test_cli_commands.py +++ b/tests/unit/test_cli_commands.py @@ -468,3 +468,311 @@ class TestWorkflowCommands: ) tool.assert_not_called() + + +# --------------------------------------------------------------------------- +# PR C parity command behavior +# --------------------------------------------------------------------------- + + +class TestCredentialParityCommands: + def test_credential_list_maps_options(self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture) -> None: + from skyvern.cli import credential as credential_cmd + + tool = AsyncMock( + return_value={ + "ok": True, + "action": "skyvern_credential_list", + "browser_context": {"mode": "none", "session_id": None, "cdp_url": None}, + "data": {"credentials": [], "page": 2, "page_size": 25, "count": 0, "has_more": False}, + "artifacts": [], + "timing_ms": {}, + "warnings": [], + "error": None, + } + ) + monkeypatch.setattr(credential_cmd, "tool_credential_list", tool) + + credential_cmd.credential_list(page=2, page_size=25, json_output=True) + + assert tool.await_args.kwargs == {"page": 2, "page_size": 25} + parsed = json.loads(capsys.readouterr().out) + assert parsed["ok"] is True + assert parsed["action"] == "skyvern_credential_list" + + def test_credential_get_json_error_exits_nonzero( + self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture + ) -> None: + from skyvern.cli import credential as credential_cmd + + tool = AsyncMock( + return_value={ + "ok": False, + "action": "skyvern_credential_get", + "browser_context": {"mode": "none", "session_id": None, "cdp_url": None}, + "data": None, + "artifacts": [], + "timing_ms": {}, + "warnings": [], + "error": { + "code": "INVALID_INPUT", + "message": "Invalid credential_id format: 'bad'", + "hint": "Credential IDs start with cred_", + "details": {}, + }, + } + ) + monkeypatch.setattr(credential_cmd, "tool_credential_get", tool) + + with pytest.raises(SystemExit, match="1"): + credential_cmd.credential_get(credential_id="bad", json_output=True) + + parsed = json.loads(capsys.readouterr().out) + assert parsed["ok"] is False + assert "Invalid credential_id format" in parsed["error"]["message"] + + def test_credential_delete_maps_options( + self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture + ) -> None: + from skyvern.cli import credential as credential_cmd + + tool = AsyncMock( + return_value={ + "ok": True, + "action": "skyvern_credential_delete", + "browser_context": {"mode": "none", "session_id": None, "cdp_url": None}, + "data": {"credential_id": "cred_123", "deleted": True}, + "artifacts": [], + "timing_ms": {}, + "warnings": [], + "error": None, + } + ) + monkeypatch.setattr(credential_cmd, "tool_credential_delete", tool) + + credential_cmd.credential_delete(credential_id="cred_123", json_output=True) + + assert tool.await_args.kwargs == {"credential_id": "cred_123"} + parsed = json.loads(capsys.readouterr().out) + assert parsed["ok"] is True + assert parsed["data"]["deleted"] is True + + +class TestBlockParityCommands: + def test_block_schema_passes_block_type( + self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture + ) -> None: + from skyvern.cli import block as block_cmd + + tool = AsyncMock( + return_value={ + "ok": True, + "action": "skyvern_block_schema", + "browser_context": {"mode": "none", "session_id": None, "cdp_url": None}, + "data": {"block_type": "navigation", "schema": {"type": "object"}}, + "artifacts": [], + "timing_ms": {}, + "warnings": [], + "error": None, + } + ) + monkeypatch.setattr(block_cmd, "tool_block_schema", tool) + + block_cmd.block_schema(block_type="navigation", json_output=True) + + assert tool.await_args.kwargs == {"block_type": "navigation"} + parsed = json.loads(capsys.readouterr().out) + assert parsed["ok"] is True + assert parsed["data"]["block_type"] == "navigation" + + def test_block_validate_reads_json_from_file( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture + ) -> None: + from skyvern.cli import block as block_cmd + + block_file = tmp_path / "block.json" + block_file.write_text('{"block_type":"navigation","label":"step1","navigation_goal":"Go to page"}') + + tool = AsyncMock( + return_value={ + "ok": True, + "action": "skyvern_block_validate", + "browser_context": {"mode": "none", "session_id": None, "cdp_url": None}, + "data": {"valid": True, "block_type": "navigation", "label": "step1"}, + "artifacts": [], + "timing_ms": {}, + "warnings": [], + "error": None, + } + ) + monkeypatch.setattr(block_cmd, "tool_block_validate", tool) + + block_cmd.block_validate(block_json=f"@{block_file}", json_output=True) + + assert tool.await_args.kwargs == { + "block_json": '{"block_type":"navigation","label":"step1","navigation_goal":"Go to page"}' + } + parsed = json.loads(capsys.readouterr().out) + assert parsed["ok"] is True + assert parsed["data"]["valid"] is True + + +class TestBrowserPRCCommands: + def test_run_task_uses_resolved_connection( + self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture + ) -> None: + from skyvern.cli.commands import browser as browser_cmd + + monkeypatch.setattr( + browser_cmd, + "_resolve_connection", + lambda _session, _cdp: browser_cmd.ConnectionTarget(mode="cloud", session_id="pbs_123"), + ) + tool = AsyncMock( + return_value={ + "ok": True, + "action": "skyvern_run_task", + "browser_context": {"mode": "cloud", "session_id": "pbs_123", "cdp_url": None}, + "data": {"run_id": "run_123", "status": "completed"}, + "artifacts": [], + "timing_ms": {}, + "warnings": [], + "error": None, + } + ) + monkeypatch.setattr(browser_cmd, "tool_run_task", tool) + + browser_cmd.run_task( + prompt="Find the latest headline", + session=None, + cdp=None, + url="https://news.ycombinator.com", + data_extraction_schema='{"type":"object"}', + max_steps=5, + timeout_seconds=240, + json_output=True, + ) + + assert tool.await_args.kwargs == { + "prompt": "Find the latest headline", + "session_id": "pbs_123", + "cdp_url": None, + "url": "https://news.ycombinator.com", + "data_extraction_schema": '{"type":"object"}', + "max_steps": 5, + "timeout_seconds": 240, + } + parsed = json.loads(capsys.readouterr().out) + assert parsed["ok"] is True + assert parsed["data"]["run_id"] == "run_123" + + def test_login_json_error_exits_nonzero( + self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture + ) -> None: + from skyvern.cli.commands import browser as browser_cmd + + monkeypatch.setattr( + browser_cmd, + "_resolve_connection", + lambda _session, _cdp: browser_cmd.ConnectionTarget(mode="cloud", session_id="pbs_abc"), + ) + tool = AsyncMock( + return_value={ + "ok": False, + "action": "skyvern_login", + "browser_context": {"mode": "cloud", "session_id": "pbs_abc", "cdp_url": None}, + "data": None, + "artifacts": [], + "timing_ms": {}, + "warnings": [], + "error": { + "code": "INVALID_INPUT", + "message": "Missing required fields for credential_type='skyvern': credential_id", + "hint": "Provide: credential_id", + "details": {}, + }, + } + ) + monkeypatch.setattr(browser_cmd, "tool_login", tool) + + with pytest.raises(SystemExit, match="1"): + browser_cmd.login( + credential_type="skyvern", + session=None, + cdp=None, + credential_id=None, + json_output=True, + ) + + kwargs = tool.await_args.kwargs + assert kwargs["session_id"] == "pbs_abc" + assert kwargs["credential_type"] == "skyvern" + parsed = json.loads(capsys.readouterr().out) + assert parsed["ok"] is False + assert "Missing required fields" in parsed["error"]["message"] + + +class TestParityErrorFormatting: + def test_credential_emit_tool_result_handles_none_message_and_hint(self, monkeypatch: pytest.MonkeyPatch) -> None: + from skyvern.cli import credential as credential_cmd + + captured: dict[str, str | bool] = {} + + def _fake_output_error(message: str, *, hint: str = "", json_mode: bool = False, exit_code: int = 1) -> None: + captured["message"] = message + captured["hint"] = hint + captured["json_mode"] = json_mode + raise SystemExit(exit_code) + + monkeypatch.setattr(credential_cmd, "output_error", _fake_output_error) + + with pytest.raises(SystemExit, match="1"): + credential_cmd._emit_tool_result( + {"ok": False, "error": {"message": None, "hint": None}}, + json_output=False, + ) + + assert captured == {"message": "Unknown error", "hint": "", "json_mode": False} + + def test_block_emit_tool_result_handles_none_message_and_hint(self, monkeypatch: pytest.MonkeyPatch) -> None: + from skyvern.cli import block as block_cmd + + captured: dict[str, str | bool] = {} + + def _fake_output_error(message: str, *, hint: str = "", json_mode: bool = False, exit_code: int = 1) -> None: + captured["message"] = message + captured["hint"] = hint + captured["json_mode"] = json_mode + raise SystemExit(exit_code) + + monkeypatch.setattr(block_cmd, "output_error", _fake_output_error) + + with pytest.raises(SystemExit, match="1"): + block_cmd._emit_tool_result( + {"ok": False, "error": {"message": None, "hint": None}}, + json_output=False, + ) + + assert captured == {"message": "Unknown error", "hint": "", "json_mode": False} + + def test_browser_emit_tool_result_handles_none_message_and_hint(self, monkeypatch: pytest.MonkeyPatch) -> None: + from skyvern.cli.commands import browser as browser_cmd + + captured: dict[str, str | bool] = {} + + def _fake_output_error(message: str, *, hint: str = "", json_mode: bool = False, exit_code: int = 1) -> None: + captured["message"] = message + captured["hint"] = hint + captured["json_mode"] = json_mode + raise SystemExit(exit_code) + + monkeypatch.setattr(browser_cmd, "output_error", _fake_output_error) + + with pytest.raises(SystemExit, match="1"): + browser_cmd._emit_tool_result( + {"ok": False, "error": {"message": None, "hint": None}}, + json_output=False, + action="login", + ) + + assert captured == {"message": "Unknown error", "hint": "", "json_mode": False}