add CLI parity commands for credential, block, and browser (#4793)

This commit is contained in:
Marc Kelechava
2026-02-18 11:45:07 -08:00
committed by GitHub
parent 0811d23651
commit b48bf707c3
6 changed files with 675 additions and 2 deletions

114
skyvern/cli/block.py Normal file
View File

@@ -0,0 +1,114 @@
"""Workflow block CLI commands with MCP-parity output and validation."""
from __future__ import annotations
import asyncio
import json
import sys
from pathlib import Path
from typing import Any, Callable, Coroutine
import typer
from dotenv import load_dotenv
from skyvern.config import settings
from skyvern.utils.env_paths import resolve_backend_env_path
from .commands._output import output, output_error
from .mcp_tools.blocks import skyvern_block_schema as tool_block_schema
from .mcp_tools.blocks import skyvern_block_validate as tool_block_validate
block_app = typer.Typer(help="Workflow block schema and validation commands.", no_args_is_help=True)
def _emit_tool_result(result: dict[str, Any], *, json_output: bool) -> None:
if json_output:
json.dump(result, sys.stdout, indent=2, default=str)
sys.stdout.write("\n")
if not result.get("ok", False):
raise SystemExit(1)
return
if result.get("ok", False):
output(result.get("data"), action=str(result.get("action", "")), json_mode=False)
return
err = result.get("error") or {}
output_error(str(err.get("message") or "Unknown error"), hint=str(err.get("hint") or ""), json_mode=False)
def _run_tool(
runner: Callable[[], Coroutine[Any, Any, dict[str, Any]]],
*,
json_output: bool,
hint_on_exception: str,
) -> None:
try:
result: dict[str, Any] = asyncio.run(runner())
_emit_tool_result(result, json_output=json_output)
except typer.BadParameter:
raise
except Exception as e:
output_error(str(e), hint=hint_on_exception, json_mode=json_output)
def _resolve_inline_or_file(value: str, *, param_name: str) -> str:
if not value.startswith("@"):
return value
file_path = value[1:]
if not file_path:
raise typer.BadParameter(f"{param_name} file path cannot be empty after '@'.")
path = Path(file_path).expanduser()
try:
return path.read_text(encoding="utf-8")
except OSError as e:
raise typer.BadParameter(f"Unable to read {param_name} file '{path}': {e}") from e
@block_app.callback()
def block_callback(
api_key: str | None = typer.Option(
None,
"--api-key",
help="Skyvern API key",
envvar="SKYVERN_API_KEY",
),
) -> None:
"""Load environment and optional API key override."""
load_dotenv(resolve_backend_env_path())
if api_key:
settings.SKYVERN_API_KEY = api_key
@block_app.command("schema")
def block_schema(
block_type: str | None = typer.Option(
None,
"--type",
"--block-type",
help="Block type to inspect (omit to list all available types).",
),
json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
) -> None:
"""Get schema for a specific block type or list all block types."""
async def _run() -> dict[str, Any]:
return await tool_block_schema(block_type=block_type)
_run_tool(_run, json_output=json_output, hint_on_exception="Check block type input.")
@block_app.command("validate")
def block_validate(
block_json: str = typer.Option(..., "--block-json", help="Block JSON string or @file path."),
json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
) -> None:
"""Validate a single workflow block definition."""
async def _run() -> dict[str, Any]:
resolved_json = _resolve_inline_or_file(block_json, param_name="block_json")
return await tool_block_validate(block_json=resolved_json)
_run_tool(_run, json_output=json_output, hint_on_exception="Check block JSON syntax and required fields.")

View File

@@ -6,6 +6,8 @@ from dotenv import load_dotenv
from skyvern.forge.sdk.forge_log import setup_logger as _setup_logger
from skyvern.utils.env_paths import resolve_backend_env_path
from ..block import block_app
from ..credential import credential_app
from ..credentials import credentials_app
from ..docs import docs_app
from ..init_command import init_browser, init_env
@@ -51,9 +53,19 @@ cli_app.add_typer(
name="run",
help="Run Skyvern services like the API server, UI, and MCP.",
)
cli_app.add_typer(block_app, name="block", help="Inspect and validate workflow block schemas.")
cli_app.add_typer(
credential_app,
name="credential",
help="MCP-parity credential commands (list/get/delete).",
)
cli_app.add_typer(workflow_app, name="workflow", help="Workflow management commands.")
cli_app.add_typer(tasks_app, name="tasks", help="Task management commands.")
cli_app.add_typer(credentials_app, name="credentials", help="Manage stored credentials for secure login.")
cli_app.add_typer(
credentials_app,
name="credentials",
help="Secure credential management (use this for interactive `add`).",
)
cli_app.add_typer(docs_app, name="docs", help="Open Skyvern documentation.")
cli_app.add_typer(status_app, name="status", help="Check if Skyvern services are running.")
cli_app.add_typer(stop_app, name="stop", help="Stop Skyvern services.")

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
import asyncio
import json
import sys
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
@@ -25,6 +27,8 @@ from skyvern.cli.core.guards import (
validate_wait_until,
)
from skyvern.cli.core.session_ops import do_session_close, do_session_create, do_session_list
from skyvern.cli.mcp_tools.browser import skyvern_login as tool_login
from skyvern.cli.mcp_tools.browser import skyvern_run_task as tool_run_task
browser_app = typer.Typer(help="Browser automation commands.", no_args_is_help=True)
session_app = typer.Typer(help="Manage browser sessions.", no_args_is_help=True)
@@ -92,6 +96,22 @@ def _validate_wait_state(state: str) -> None:
raise GuardError(f"Invalid state: {state}", "Use visible, hidden, attached, or detached")
def _emit_tool_result(result: dict[str, Any], *, json_output: bool, action: str) -> None:
if json_output:
json.dump(result, sys.stdout, indent=2, default=str)
sys.stdout.write("\n")
if not result.get("ok", False):
raise SystemExit(1)
return
if result.get("ok", False):
output(result.get("data"), action=action, json_mode=False)
return
err = result.get("error") or {}
output_error(str(err.get("message") or "Unknown error"), hint=str(err.get("hint") or ""), json_mode=False)
# ---------------------------------------------------------------------------
# Session commands
# ---------------------------------------------------------------------------
@@ -791,3 +811,110 @@ def validate(
raise
except Exception as e:
output_error(str(e), hint="Check the page state and validation prompt.", json_mode=json_output)
@browser_app.command("run-task")
def run_task(
prompt: str = typer.Option(..., help="Natural language description of the task to automate."),
session: str | None = typer.Option(None, help="Browser session ID."),
cdp: str | None = typer.Option(None, "--cdp", help="CDP WebSocket URL."),
url: str | None = typer.Option(None, help="URL to navigate to before running."),
data_extraction_schema: str | None = typer.Option(
None,
"--schema",
"--data-extraction-schema",
help="JSON Schema string defining what data to extract.",
),
max_steps: int | None = typer.Option(None, "--max-steps", min=1, help="Maximum number of agent steps."),
timeout_seconds: int = typer.Option(180, "--timeout", min=10, max=1800, help="Timeout in seconds."),
json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
) -> None:
"""Run a quick one-off browser automation task."""
async def _run() -> dict[str, Any]:
connection = _resolve_connection(session, cdp)
return await tool_run_task(
prompt=prompt,
session_id=connection.session_id if connection.mode == "cloud" else None,
cdp_url=connection.cdp_url if connection.mode == "cdp" else None,
url=url,
data_extraction_schema=data_extraction_schema,
max_steps=max_steps,
timeout_seconds=timeout_seconds,
)
try:
result = asyncio.run(_run())
_emit_tool_result(result, json_output=json_output, action="run_task")
except typer.BadParameter:
raise
except Exception as e:
output_error(str(e), hint="Check the prompt, active connection, and timeout settings.", json_mode=json_output)
@browser_app.command("login")
def login(
credential_type: str = typer.Option(
"skyvern",
"--credential-type",
help="Credential provider: skyvern, bitwarden, 1password, or azure_vault.",
),
session: str | None = typer.Option(None, help="Browser session ID."),
cdp: str | None = typer.Option(None, "--cdp", help="CDP WebSocket URL."),
url: str | None = typer.Option(None, help="Login page URL."),
credential_id: str | None = typer.Option(None, "--credential-id", help="Skyvern credential ID for type=skyvern."),
bitwarden_item_id: str | None = typer.Option(None, "--bitwarden-item-id", help="Bitwarden item ID."),
bitwarden_collection_id: str | None = typer.Option(
None, "--bitwarden-collection-id", help="Bitwarden collection ID."
),
onepassword_vault_id: str | None = typer.Option(None, "--onepassword-vault-id", help="1Password vault ID."),
onepassword_item_id: str | None = typer.Option(None, "--onepassword-item-id", help="1Password item ID."),
azure_vault_name: str | None = typer.Option(None, "--azure-vault-name", help="Azure Vault name."),
azure_vault_username_key: str | None = typer.Option(
None, "--azure-vault-username-key", help="Azure Vault username key."
),
azure_vault_password_key: str | None = typer.Option(
None, "--azure-vault-password-key", help="Azure Vault password key."
),
azure_vault_totp_secret_key: str | None = typer.Option(
None, "--azure-vault-totp-secret-key", help="Azure Vault TOTP secret key."
),
prompt: str | None = typer.Option(None, help="Additional login instructions."),
totp_identifier: str | None = typer.Option(None, "--totp-identifier", help="TOTP identifier for 2FA."),
totp_url: str | None = typer.Option(None, "--totp-url", help="URL to fetch TOTP codes."),
timeout_seconds: int = typer.Option(180, "--timeout", min=10, max=600, help="Timeout in seconds."),
json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
) -> None:
"""Log into a site using stored credentials from a supported provider."""
async def _run() -> dict[str, Any]:
connection = _resolve_connection(session, cdp)
return await tool_login(
credential_type=credential_type,
session_id=connection.session_id if connection.mode == "cloud" else None,
cdp_url=connection.cdp_url if connection.mode == "cdp" else None,
url=url,
credential_id=credential_id,
bitwarden_item_id=bitwarden_item_id,
bitwarden_collection_id=bitwarden_collection_id,
onepassword_vault_id=onepassword_vault_id,
onepassword_item_id=onepassword_item_id,
azure_vault_name=azure_vault_name,
azure_vault_username_key=azure_vault_username_key,
azure_vault_password_key=azure_vault_password_key,
azure_vault_totp_secret_key=azure_vault_totp_secret_key,
prompt=prompt,
totp_identifier=totp_identifier,
totp_url=totp_url,
timeout_seconds=timeout_seconds,
)
try:
result = asyncio.run(_run())
_emit_tool_result(result, json_output=json_output, action="login")
except typer.BadParameter:
raise
except Exception as e:
output_error(
str(e), hint="Check credential inputs, active connection, and timeout settings.", json_mode=json_output
)

110
skyvern/cli/credential.py Normal file
View File

@@ -0,0 +1,110 @@
"""Credential CLI commands with MCP-parity output and validation."""
from __future__ import annotations
import asyncio
import json
import sys
from typing import Any, Callable, Coroutine
import typer
from dotenv import load_dotenv
from skyvern.config import settings
from skyvern.utils.env_paths import resolve_backend_env_path
from .commands._output import output, output_error
from .mcp_tools.credential import skyvern_credential_delete as tool_credential_delete
from .mcp_tools.credential import skyvern_credential_get as tool_credential_get
from .mcp_tools.credential import skyvern_credential_list as tool_credential_list
credential_app = typer.Typer(
help="MCP-parity credential commands (list/get/delete). Use `skyvern credentials add` for secure creation.",
no_args_is_help=True,
)
def _emit_tool_result(result: dict[str, Any], *, json_output: bool) -> None:
if json_output:
json.dump(result, sys.stdout, indent=2, default=str)
sys.stdout.write("\n")
if not result.get("ok", False):
raise SystemExit(1)
return
if result.get("ok", False):
output(result.get("data"), action=str(result.get("action", "")), json_mode=False)
return
err = result.get("error") or {}
output_error(str(err.get("message") or "Unknown error"), hint=str(err.get("hint") or ""), json_mode=False)
def _run_tool(
runner: Callable[[], Coroutine[Any, Any, dict[str, Any]]],
*,
json_output: bool,
hint_on_exception: str,
) -> None:
try:
result: dict[str, Any] = asyncio.run(runner())
_emit_tool_result(result, json_output=json_output)
except typer.BadParameter:
raise
except Exception as e:
output_error(str(e), hint=hint_on_exception, json_mode=json_output)
@credential_app.callback()
def credential_callback(
api_key: str | None = typer.Option(
None,
"--api-key",
help="Skyvern API key",
envvar="SKYVERN_API_KEY",
),
) -> None:
"""Load environment and optional API key override."""
load_dotenv(resolve_backend_env_path())
if api_key:
settings.SKYVERN_API_KEY = api_key
@credential_app.command("list")
def credential_list(
page: int = typer.Option(1, "--page", min=1, help="Page number (1-based)."),
page_size: int = typer.Option(10, "--page-size", min=1, max=100, help="Results per page."),
json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
) -> None:
"""List stored credentials (metadata only)."""
async def _run() -> dict[str, Any]:
return await tool_credential_list(page=page, page_size=page_size)
_run_tool(_run, json_output=json_output, hint_on_exception="Check your API key and Skyvern connection.")
@credential_app.command("get")
def credential_get(
credential_id: str = typer.Option(..., "--id", "--credential-id", help="Credential ID (starts with cred_)."),
json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
) -> None:
"""Get credential metadata by ID."""
async def _run() -> dict[str, Any]:
return await tool_credential_get(credential_id=credential_id)
_run_tool(_run, json_output=json_output, hint_on_exception="Check your API key and credential ID.")
@credential_app.command("delete")
def credential_delete(
credential_id: str = typer.Option(..., "--id", "--credential-id", help="Credential ID (starts with cred_)."),
json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
) -> None:
"""Delete a credential by ID."""
async def _run() -> dict[str, Any]:
return await tool_credential_delete(credential_id=credential_id)
_run_tool(_run, json_output=json_output, hint_on_exception="Check your API key and credential ID.")

View File

@@ -22,7 +22,9 @@ from skyvern.utils.env_paths import resolve_backend_env_path
from .console import console
credentials_app = typer.Typer(help="Manage stored credentials for secure login.")
credentials_app = typer.Typer(
help="Manage stored credentials for secure login. Use `credential` commands for MCP-parity list/get/delete."
)
@credentials_app.callback()

View File

@@ -468,3 +468,311 @@ class TestWorkflowCommands:
)
tool.assert_not_called()
# ---------------------------------------------------------------------------
# PR C parity command behavior
# ---------------------------------------------------------------------------
class TestCredentialParityCommands:
def test_credential_list_maps_options(self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture) -> None:
from skyvern.cli import credential as credential_cmd
tool = AsyncMock(
return_value={
"ok": True,
"action": "skyvern_credential_list",
"browser_context": {"mode": "none", "session_id": None, "cdp_url": None},
"data": {"credentials": [], "page": 2, "page_size": 25, "count": 0, "has_more": False},
"artifacts": [],
"timing_ms": {},
"warnings": [],
"error": None,
}
)
monkeypatch.setattr(credential_cmd, "tool_credential_list", tool)
credential_cmd.credential_list(page=2, page_size=25, json_output=True)
assert tool.await_args.kwargs == {"page": 2, "page_size": 25}
parsed = json.loads(capsys.readouterr().out)
assert parsed["ok"] is True
assert parsed["action"] == "skyvern_credential_list"
def test_credential_get_json_error_exits_nonzero(
self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
from skyvern.cli import credential as credential_cmd
tool = AsyncMock(
return_value={
"ok": False,
"action": "skyvern_credential_get",
"browser_context": {"mode": "none", "session_id": None, "cdp_url": None},
"data": None,
"artifacts": [],
"timing_ms": {},
"warnings": [],
"error": {
"code": "INVALID_INPUT",
"message": "Invalid credential_id format: 'bad'",
"hint": "Credential IDs start with cred_",
"details": {},
},
}
)
monkeypatch.setattr(credential_cmd, "tool_credential_get", tool)
with pytest.raises(SystemExit, match="1"):
credential_cmd.credential_get(credential_id="bad", json_output=True)
parsed = json.loads(capsys.readouterr().out)
assert parsed["ok"] is False
assert "Invalid credential_id format" in parsed["error"]["message"]
def test_credential_delete_maps_options(
self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
from skyvern.cli import credential as credential_cmd
tool = AsyncMock(
return_value={
"ok": True,
"action": "skyvern_credential_delete",
"browser_context": {"mode": "none", "session_id": None, "cdp_url": None},
"data": {"credential_id": "cred_123", "deleted": True},
"artifacts": [],
"timing_ms": {},
"warnings": [],
"error": None,
}
)
monkeypatch.setattr(credential_cmd, "tool_credential_delete", tool)
credential_cmd.credential_delete(credential_id="cred_123", json_output=True)
assert tool.await_args.kwargs == {"credential_id": "cred_123"}
parsed = json.loads(capsys.readouterr().out)
assert parsed["ok"] is True
assert parsed["data"]["deleted"] is True
class TestBlockParityCommands:
def test_block_schema_passes_block_type(
self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
from skyvern.cli import block as block_cmd
tool = AsyncMock(
return_value={
"ok": True,
"action": "skyvern_block_schema",
"browser_context": {"mode": "none", "session_id": None, "cdp_url": None},
"data": {"block_type": "navigation", "schema": {"type": "object"}},
"artifacts": [],
"timing_ms": {},
"warnings": [],
"error": None,
}
)
monkeypatch.setattr(block_cmd, "tool_block_schema", tool)
block_cmd.block_schema(block_type="navigation", json_output=True)
assert tool.await_args.kwargs == {"block_type": "navigation"}
parsed = json.loads(capsys.readouterr().out)
assert parsed["ok"] is True
assert parsed["data"]["block_type"] == "navigation"
def test_block_validate_reads_json_from_file(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
from skyvern.cli import block as block_cmd
block_file = tmp_path / "block.json"
block_file.write_text('{"block_type":"navigation","label":"step1","navigation_goal":"Go to page"}')
tool = AsyncMock(
return_value={
"ok": True,
"action": "skyvern_block_validate",
"browser_context": {"mode": "none", "session_id": None, "cdp_url": None},
"data": {"valid": True, "block_type": "navigation", "label": "step1"},
"artifacts": [],
"timing_ms": {},
"warnings": [],
"error": None,
}
)
monkeypatch.setattr(block_cmd, "tool_block_validate", tool)
block_cmd.block_validate(block_json=f"@{block_file}", json_output=True)
assert tool.await_args.kwargs == {
"block_json": '{"block_type":"navigation","label":"step1","navigation_goal":"Go to page"}'
}
parsed = json.loads(capsys.readouterr().out)
assert parsed["ok"] is True
assert parsed["data"]["valid"] is True
class TestBrowserPRCCommands:
def test_run_task_uses_resolved_connection(
self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
from skyvern.cli.commands import browser as browser_cmd
monkeypatch.setattr(
browser_cmd,
"_resolve_connection",
lambda _session, _cdp: browser_cmd.ConnectionTarget(mode="cloud", session_id="pbs_123"),
)
tool = AsyncMock(
return_value={
"ok": True,
"action": "skyvern_run_task",
"browser_context": {"mode": "cloud", "session_id": "pbs_123", "cdp_url": None},
"data": {"run_id": "run_123", "status": "completed"},
"artifacts": [],
"timing_ms": {},
"warnings": [],
"error": None,
}
)
monkeypatch.setattr(browser_cmd, "tool_run_task", tool)
browser_cmd.run_task(
prompt="Find the latest headline",
session=None,
cdp=None,
url="https://news.ycombinator.com",
data_extraction_schema='{"type":"object"}',
max_steps=5,
timeout_seconds=240,
json_output=True,
)
assert tool.await_args.kwargs == {
"prompt": "Find the latest headline",
"session_id": "pbs_123",
"cdp_url": None,
"url": "https://news.ycombinator.com",
"data_extraction_schema": '{"type":"object"}',
"max_steps": 5,
"timeout_seconds": 240,
}
parsed = json.loads(capsys.readouterr().out)
assert parsed["ok"] is True
assert parsed["data"]["run_id"] == "run_123"
def test_login_json_error_exits_nonzero(
self, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
) -> None:
from skyvern.cli.commands import browser as browser_cmd
monkeypatch.setattr(
browser_cmd,
"_resolve_connection",
lambda _session, _cdp: browser_cmd.ConnectionTarget(mode="cloud", session_id="pbs_abc"),
)
tool = AsyncMock(
return_value={
"ok": False,
"action": "skyvern_login",
"browser_context": {"mode": "cloud", "session_id": "pbs_abc", "cdp_url": None},
"data": None,
"artifacts": [],
"timing_ms": {},
"warnings": [],
"error": {
"code": "INVALID_INPUT",
"message": "Missing required fields for credential_type='skyvern': credential_id",
"hint": "Provide: credential_id",
"details": {},
},
}
)
monkeypatch.setattr(browser_cmd, "tool_login", tool)
with pytest.raises(SystemExit, match="1"):
browser_cmd.login(
credential_type="skyvern",
session=None,
cdp=None,
credential_id=None,
json_output=True,
)
kwargs = tool.await_args.kwargs
assert kwargs["session_id"] == "pbs_abc"
assert kwargs["credential_type"] == "skyvern"
parsed = json.loads(capsys.readouterr().out)
assert parsed["ok"] is False
assert "Missing required fields" in parsed["error"]["message"]
class TestParityErrorFormatting:
def test_credential_emit_tool_result_handles_none_message_and_hint(self, monkeypatch: pytest.MonkeyPatch) -> None:
from skyvern.cli import credential as credential_cmd
captured: dict[str, str | bool] = {}
def _fake_output_error(message: str, *, hint: str = "", json_mode: bool = False, exit_code: int = 1) -> None:
captured["message"] = message
captured["hint"] = hint
captured["json_mode"] = json_mode
raise SystemExit(exit_code)
monkeypatch.setattr(credential_cmd, "output_error", _fake_output_error)
with pytest.raises(SystemExit, match="1"):
credential_cmd._emit_tool_result(
{"ok": False, "error": {"message": None, "hint": None}},
json_output=False,
)
assert captured == {"message": "Unknown error", "hint": "", "json_mode": False}
def test_block_emit_tool_result_handles_none_message_and_hint(self, monkeypatch: pytest.MonkeyPatch) -> None:
from skyvern.cli import block as block_cmd
captured: dict[str, str | bool] = {}
def _fake_output_error(message: str, *, hint: str = "", json_mode: bool = False, exit_code: int = 1) -> None:
captured["message"] = message
captured["hint"] = hint
captured["json_mode"] = json_mode
raise SystemExit(exit_code)
monkeypatch.setattr(block_cmd, "output_error", _fake_output_error)
with pytest.raises(SystemExit, match="1"):
block_cmd._emit_tool_result(
{"ok": False, "error": {"message": None, "hint": None}},
json_output=False,
)
assert captured == {"message": "Unknown error", "hint": "", "json_mode": False}
def test_browser_emit_tool_result_handles_none_message_and_hint(self, monkeypatch: pytest.MonkeyPatch) -> None:
from skyvern.cli.commands import browser as browser_cmd
captured: dict[str, str | bool] = {}
def _fake_output_error(message: str, *, hint: str = "", json_mode: bool = False, exit_code: int = 1) -> None:
captured["message"] = message
captured["hint"] = hint
captured["json_mode"] = json_mode
raise SystemExit(exit_code)
monkeypatch.setattr(browser_cmd, "output_error", _fake_output_error)
with pytest.raises(SystemExit, match="1"):
browser_cmd._emit_tool_result(
{"ok": False, "error": {"message": None, "hint": None}},
json_output=False,
action="login",
)
assert captured == {"message": "Unknown error", "hint": "", "json_mode": False}