doc update - add run tasks page (#2425)

This commit is contained in:
Shuchang Zheng
2025-05-24 23:46:31 -07:00
committed by GitHub
parent 601cd099f7
commit 6f83978937
27 changed files with 1168 additions and 48 deletions

View File

@@ -13,6 +13,8 @@ from .types import (
ActionBlockParametersItem_Output,
ActionBlockParametersItem_Workflow,
ActionBlockYaml,
Artifact,
ArtifactType,
AwsSecretParameter,
AwsSecretParameterYaml,
BitwardenCreditCardDataParameter,
@@ -317,7 +319,7 @@ from .types import (
WorkflowStatus,
)
from .errors import BadRequestError, ForbiddenError, NotFoundError, UnprocessableEntityError
from . import agent, browser_session, credentials, workflows
from . import agent, artifacts, browser_session, credentials, workflows
from .agent import (
AgentGetRunResponse,
AgentGetRunResponse_AnthropicCua,
@@ -350,6 +352,8 @@ __all__ = [
"AgentGetRunResponse_TaskV1",
"AgentGetRunResponse_TaskV2",
"AgentGetRunResponse_WorkflowRun",
"Artifact",
"ArtifactType",
"AsyncSkyvern",
"AwsSecretParameter",
"AwsSecretParameterYaml",
@@ -662,6 +666,7 @@ __all__ = [
"WorkflowStatus",
"__version__",
"agent",
"artifacts",
"browser_session",
"credentials",
"workflows",

View File

@@ -429,6 +429,65 @@ class AgentClient:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def retry_run_webhook(
self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Optional[typing.Any]:
"""
Retry sending the webhook for a run
Parameters
----------
run_id : str
The id of the task run or the workflow run.
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Optional[typing.Any]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
)
client.agent.retry_run_webhook(
run_id="tsk_123",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"v1/runs/{jsonable_encoder(run_id)}/retry_webhook",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
class AsyncAgentClient:
def __init__(self, *, client_wrapper: AsyncClientWrapper):
@@ -869,3 +928,70 @@ class AsyncAgentClient:
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def retry_run_webhook(
self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Optional[typing.Any]:
"""
Retry sending the webhook for a run
Parameters
----------
run_id : str
The id of the task run or the workflow run.
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Optional[typing.Any]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
)
async def main() -> None:
await client.agent.retry_run_webhook(
run_id="tsk_123",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"v1/runs/{jsonable_encoder(run_id)}/retry_webhook",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)

View File

@@ -6,11 +6,13 @@ import httpx
from .core.client_wrapper import SyncClientWrapper
from .agent.client import AgentClient
from .workflows.client import WorkflowsClient
from .artifacts.client import ArtifactsClient
from .browser_session.client import BrowserSessionClient
from .credentials.client import CredentialsClient
from .core.client_wrapper import AsyncClientWrapper
from .agent.client import AsyncAgentClient
from .workflows.client import AsyncWorkflowsClient
from .artifacts.client import AsyncArtifactsClient
from .browser_session.client import AsyncBrowserSessionClient
from .credentials.client import AsyncCredentialsClient
@@ -75,6 +77,7 @@ class Skyvern:
)
self.agent = AgentClient(client_wrapper=self._client_wrapper)
self.workflows = WorkflowsClient(client_wrapper=self._client_wrapper)
self.artifacts = ArtifactsClient(client_wrapper=self._client_wrapper)
self.browser_session = BrowserSessionClient(client_wrapper=self._client_wrapper)
self.credentials = CredentialsClient(client_wrapper=self._client_wrapper)
@@ -139,6 +142,7 @@ class AsyncSkyvern:
)
self.agent = AsyncAgentClient(client_wrapper=self._client_wrapper)
self.workflows = AsyncWorkflowsClient(client_wrapper=self._client_wrapper)
self.artifacts = AsyncArtifactsClient(client_wrapper=self._client_wrapper)
self.browser_session = AsyncBrowserSessionClient(client_wrapper=self._client_wrapper)
self.credentials = AsyncCredentialsClient(client_wrapper=self._client_wrapper)

View File

@@ -16,7 +16,7 @@ class BaseClientWrapper:
headers: typing.Dict[str, str] = {
"X-Fern-Language": "Python",
"X-Fern-SDK-Name": "skyvern",
"X-Fern-SDK-Version": "0.1.85",
"X-Fern-SDK-Version": "0.1.86",
}
if self._api_key is not None:
headers["x-api-key"] = self._api_key

View File

@@ -14,6 +14,8 @@ from .action_block_parameters_item import (
ActionBlockParametersItem_Workflow,
)
from .action_block_yaml import ActionBlockYaml
from .artifact import Artifact
from .artifact_type import ArtifactType
from .aws_secret_parameter import AwsSecretParameter
from .aws_secret_parameter_yaml import AwsSecretParameterYaml
from .bitwarden_credit_card_data_parameter import BitwardenCreditCardDataParameter
@@ -366,6 +368,8 @@ __all__ = [
"ActionBlockParametersItem_Output",
"ActionBlockParametersItem_Workflow",
"ActionBlockYaml",
"Artifact",
"ArtifactType",
"AwsSecretParameter",
"AwsSecretParameterYaml",
"BitwardenCreditCardDataParameter",

View File

@@ -0,0 +1,41 @@
# This file was auto-generated by Fern from our API Definition.
from ..core.pydantic_utilities import UniversalBaseModel
import pydantic
from .artifact_type import ArtifactType
import typing
from ..core.pydantic_utilities import IS_PYDANTIC_V2
class Artifact(UniversalBaseModel):
created_at: str = pydantic.Field()
"""
The creation datetime of the task.
"""
modified_at: str = pydantic.Field()
"""
The modification datetime of the task.
"""
artifact_id: str
artifact_type: ArtifactType
uri: str
task_id: typing.Optional[str] = None
step_id: typing.Optional[str] = None
workflow_run_id: typing.Optional[str] = None
workflow_run_block_id: typing.Optional[str] = None
observer_cruise_id: typing.Optional[str] = None
observer_thought_id: typing.Optional[str] = None
ai_suggestion_id: typing.Optional[str] = None
signed_url: typing.Optional[str] = None
organization_id: typing.Optional[str] = None
if IS_PYDANTIC_V2:
model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict(extra="allow", frozen=True) # type: ignore # Pydantic v2
else:
class Config:
frozen = True
smart_union = True
extra = pydantic.Extra.allow

View File

@@ -0,0 +1,34 @@
# This file was auto-generated by Fern from our API Definition.
import typing
ArtifactType = typing.Union[
typing.Literal[
"recording",
"browser_console_log",
"skyvern_log",
"skyvern_log_raw",
"screenshot",
"screenshot_llm",
"screenshot_action",
"screenshot_final",
"llm_prompt",
"llm_request",
"llm_response",
"llm_response_parsed",
"llm_response_rendered",
"visible_elements_id_css_map",
"visible_elements_id_frame_map",
"visible_elements_tree",
"visible_elements_tree_trimmed",
"visible_elements_tree_in_prompt",
"hashed_href_map",
"visible_elements_id_xpath_map",
"html",
"html_scrape",
"html_action",
"trace",
"har",
],
typing.Any,
]

View File

@@ -2,14 +2,14 @@
import typing
from ..core.client_wrapper import SyncClientWrapper
from ..types.workflow_create_yaml_request import WorkflowCreateYamlRequest
from ..core.request_options import RequestOptions
from ..types.workflow import Workflow
from ..core.serialization import convert_and_respect_annotation_metadata
from ..core.pydantic_utilities import parse_obj_as
from ..errors.unprocessable_entity_error import UnprocessableEntityError
from json.decoder import JSONDecodeError
from ..core.api_error import ApiError
from ..types.workflow_create_yaml_request import WorkflowCreateYamlRequest
from ..core.serialization import convert_and_respect_annotation_metadata
from ..core.jsonable_encoder import jsonable_encoder
from ..core.client_wrapper import AsyncClientWrapper
@@ -21,6 +21,88 @@ class WorkflowsClient:
def __init__(self, *, client_wrapper: SyncClientWrapper):
self._client_wrapper = client_wrapper
def get_workflows(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
only_saved_tasks: typing.Optional[bool] = None,
only_workflows: typing.Optional[bool] = None,
title: typing.Optional[str] = None,
template: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[Workflow]:
"""
Get all workflows with the latest version for the organization.
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
only_saved_tasks : typing.Optional[bool]
only_workflows : typing.Optional[bool]
title : typing.Optional[str]
template : typing.Optional[bool]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Workflow]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
)
client.workflows.get_workflows()
"""
_response = self._client_wrapper.httpx_client.request(
"v1/workflows",
method="GET",
params={
"page": page,
"page_size": page_size,
"only_saved_tasks": only_saved_tasks,
"only_workflows": only_workflows,
"title": title,
"template": template,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Workflow],
parse_obj_as(
type_=typing.List[Workflow], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def create_workflow(
self,
*,
@@ -233,6 +315,96 @@ class AsyncWorkflowsClient:
def __init__(self, *, client_wrapper: AsyncClientWrapper):
self._client_wrapper = client_wrapper
async def get_workflows(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
only_saved_tasks: typing.Optional[bool] = None,
only_workflows: typing.Optional[bool] = None,
title: typing.Optional[str] = None,
template: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[Workflow]:
"""
Get all workflows with the latest version for the organization.
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
only_saved_tasks : typing.Optional[bool]
only_workflows : typing.Optional[bool]
title : typing.Optional[str]
template : typing.Optional[bool]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Workflow]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
)
async def main() -> None:
await client.workflows.get_workflows()
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"v1/workflows",
method="GET",
params={
"page": page,
"page_size": page_size,
"only_saved_tasks": only_saved_tasks,
"only_workflows": only_workflows,
"title": title,
"template": template,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Workflow],
parse_obj_as(
type_=typing.List[Workflow], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def create_workflow(
self,
*,

View File

@@ -16,6 +16,7 @@ from skyvern.exceptions import (
)
from skyvern.forge.sdk.db.enums import TaskType
from skyvern.forge.sdk.schemas.files import FileInfo
from skyvern.schemas.docs.doc_strings import PROXY_LOCATION_DOC_STRING
from skyvern.schemas.runs import ProxyLocation
from skyvern.utils.url_validators import validate_url
@@ -65,8 +66,7 @@ class TaskBase(BaseModel):
)
proxy_location: ProxyLocation | None = Field(
default=None,
description="The location of the proxy to use for the task.",
examples=["US-WA", "US-CA", "US-FL", "US-NY", "US-TX"],
description=PROXY_LOCATION_DOC_STRING,
)
extracted_information_schema: dict[str, Any] | list | str | None = Field(
default=None,

View File

@@ -24,7 +24,12 @@ ALGORITHM = "HS256"
async def get_current_org(
x_api_key: Annotated[str | None, Header(description="API key for authentication")] = None,
x_api_key: Annotated[
str | None,
Header(
description="Skyvern API key for authentication. API key can be found at https://app.skyvern.com/settings."
),
] = None,
authorization: Annotated[str | None, Header(include_in_schema=False)] = None,
) -> Organization:
if not x_api_key and not authorization:

View File

@@ -325,7 +325,7 @@ class Skyvern(AsyncSkyvern):
data_extraction_schema = data_extraction_schema or task_generation.extracted_information_schema
task_request = TaskRequest(
title=title,
title=title or task_generation.suggested_title,
url=url,
navigation_goal=navigation_goal,
navigation_payload=navigation_payload,

View File

View File

@@ -0,0 +1,28 @@
TASK_PROMPT_EXAMPLES = [
"Find the top 3 posts on Hacker News.",
'Go to google finance, extract the "AAPL" stock price for me with the date.',
]
TASK_URL_EXAMPLES = [
"https://www.hackernews.com",
"https://www.google.com/finance",
]
ERROR_CODE_MAPPING_EXAMPLES = [
{"login_failed": "The login credentials are incorrect or the account is locked"},
{"maintenance_mode": "The website is down for maintenance"},
]
TOTP_IDENTIFIER_EXAMPLES = [
"john.doe@example.com",
"4155555555",
"user_123",
]
TOTP_URL_EXAMPLES = [
"https://my-totp-service.com/totp",
]
BROWSER_SESSION_ID_EXAMPLES = [
"pbs_123",
]

View File

@@ -0,0 +1,63 @@
TASK_PROMPT_DOC_STRING = """
The goal or task description for Skyvern to accomplish
"""
TASK_URL_DOC_STRING = """
The starting URL for the task. If not provided, Skyvern will attempt to determine an appropriate URL
"""
TASK_ENGINE_DOC_STRING = """
The engine that powers the agent task. The default value is `skyvern-2.0`, the latest Skyvern agent that performs pretty well with complex and multi-step tasks. `skyvern-1.0` is good for simple tasks like filling a form, or searching for information on Google. The `openai-cua` engine uses OpenAI's CUA model. The `anthropic-cua` uses Anthropic's Claude Sonnet 3.7 model with the computer use tool.
"""
PROXY_LOCATION_DOC_STRING = """
Geographic Proxy location to route the browser traffic through. This is only available in Skyvern Cloud.
Available geotargeting options:
- RESIDENTIAL: the default value. Skyvern Cloud uses a random US residential proxy.
- RESIDENTIAL_ES: Spain
- RESIDENTIAL_IE: Ireland
- RESIDENTIAL_GB: United Kingdom
- RESIDENTIAL_IN: India
- RESIDENTIAL_JP: Japan
- RESIDENTIAL_FR: France
- RESIDENTIAL_DE: Germany
- RESIDENTIAL_NZ: New Zealand
- RESIDENTIAL_ZA: South Africa
- RESIDENTIAL_AR: Argentina
- RESIDENTIAL_ISP: ISP proxy
- US-CA: California
- US-NY: New York
- US-TX: Texas
- US-FL: Florida
- US-WA: Washington
- NONE: No proxy
"""
DATA_EXTRACTION_SCHEMA_DOC_STRING = """
The schema for data to be extracted from the webpage. If you're looking for consistent data schema being returned by the agent, it's highly recommended to use https://json-schema.org/.
"""
ERROR_CODE_MAPPING_DOC_STRING = """
Custom mapping of error codes to error messages if Skyvern encounters an error.
"""
MAX_STEPS_DOC_STRING = """
Maximum number of steps the task can take. Task will fail if it exceeds this number. Cautions: you are charged per step so please set this number to a reasonable value. Contact sales@skyvern.com for custom pricing.
"""
WEBHOOK_URL_DOC_STRING = """
URL to send task status updates to after a run is finished. Refer to https://docs.skyvern.com/running-tasks/webhooks-faq for more details.
"""
TOTP_IDENTIFIER_DOC_STRING = """
Identifier for the TOTP/2FA/MFA code when the code is pushed to Skyvern. Refer to https://docs.skyvern.com/credentials/totp#option-3-push-code-to-skyvern for more details.
"""
TOTP_URL_DOC_STRING = """
URL that serves TOTP/2FA/MFA codes for Skyvern to use during the workflow run. Refer to https://docs.skyvern.com/credentials/totp#option-2-get-code-from-your-endpoint for more details.
"""
BROWSER_SESSION_ID_DOC_STRING = """
Run the task or workflow in the specific Skyvern browser session. Having a browser session can persist the real-time state of the browser, so that the next run can continue from where the previous run left off.
"""

View File

@@ -8,6 +8,27 @@ from zoneinfo import ZoneInfo
from pydantic import BaseModel, Field, field_validator
from skyvern.forge.sdk.schemas.files import FileInfo
from skyvern.schemas.docs.doc_examples import (
BROWSER_SESSION_ID_EXAMPLES,
ERROR_CODE_MAPPING_EXAMPLES,
TASK_PROMPT_EXAMPLES,
TASK_URL_EXAMPLES,
TOTP_IDENTIFIER_EXAMPLES,
TOTP_URL_EXAMPLES,
)
from skyvern.schemas.docs.doc_strings import (
BROWSER_SESSION_ID_DOC_STRING,
DATA_EXTRACTION_SCHEMA_DOC_STRING,
ERROR_CODE_MAPPING_DOC_STRING,
MAX_STEPS_DOC_STRING,
PROXY_LOCATION_DOC_STRING,
TASK_ENGINE_DOC_STRING,
TASK_PROMPT_DOC_STRING,
TASK_URL_DOC_STRING,
TOTP_IDENTIFIER_DOC_STRING,
TOTP_URL_DOC_STRING,
WEBHOOK_URL_DOC_STRING,
)
from skyvern.utils.url_validators import validate_url
@@ -188,42 +209,59 @@ class RunStatus(StrEnum):
class TaskRunRequest(BaseModel):
prompt: str = Field(description="The goal or task description for Skyvern to accomplish")
prompt: str = Field(
description=TASK_PROMPT_DOC_STRING,
examples=TASK_PROMPT_EXAMPLES,
)
url: str | None = Field(
default=None,
description="The starting URL for the task. If not provided, Skyvern will attempt to determine an appropriate URL",
description=TASK_URL_DOC_STRING,
examples=TASK_URL_EXAMPLES,
)
engine: RunEngine = Field(
default=RunEngine.skyvern_v2,
description="The Skyvern engine version to use for this task. The default value is skyvern-2.0.",
description=TASK_ENGINE_DOC_STRING,
)
title: str | None = Field(
default=None, description="The title for the task", examples=["The title of my first skyvern task"]
)
title: str | None = Field(default=None, description="The title for the task")
proxy_location: ProxyLocation | None = Field(
default=ProxyLocation.RESIDENTIAL, description="Geographic Proxy location to route the browser traffic through"
default=ProxyLocation.RESIDENTIAL,
description=PROXY_LOCATION_DOC_STRING,
)
data_extraction_schema: dict | list | str | None = Field(
default=None, description="Schema defining what data should be extracted from the webpage"
default=None,
description=DATA_EXTRACTION_SCHEMA_DOC_STRING,
)
error_code_mapping: dict[str, str] | None = Field(
default=None, description="Custom mapping of error codes to error messages if Skyvern encounters an error"
default=None,
description=ERROR_CODE_MAPPING_DOC_STRING,
examples=ERROR_CODE_MAPPING_EXAMPLES,
)
max_steps: int | None = Field(
default=None, description="Maximum number of steps the task can take before timing out"
default=None,
description=MAX_STEPS_DOC_STRING,
examples=[10, 25],
)
webhook_url: str | None = Field(
default=None, description="URL to send task status updates to after a run is finished"
default=None,
description=WEBHOOK_URL_DOC_STRING,
examples=["https://my-site.com/webhook"],
)
totp_identifier: str | None = Field(
default=None,
description="Identifier for TOTP (Time-based One-Time Password) authentication if codes are being pushed to Skyvern",
description=TOTP_IDENTIFIER_DOC_STRING,
examples=TOTP_IDENTIFIER_EXAMPLES,
)
totp_url: str | None = Field(
default=None,
description="URL for TOTP authentication setup if Skyvern should be polling endpoint for 2FA codes",
description=TOTP_URL_DOC_STRING,
examples=TOTP_URL_EXAMPLES,
)
browser_session_id: str | None = Field(
default=None,
description="ID of an existing browser session to reuse, having it continue from the current screen state",
description=BROWSER_SESSION_ID_DOC_STRING,
examples=BROWSER_SESSION_ID_EXAMPLES,
)
publish_workflow: bool = Field(
default=False,
@@ -258,7 +296,8 @@ class WorkflowRunRequest(BaseModel):
parameters: dict[str, Any] = Field(default={}, description="Parameters to pass to the workflow")
title: str | None = Field(default=None, description="The title for this workflow run")
proxy_location: ProxyLocation | None = Field(
default=ProxyLocation.RESIDENTIAL, description="Location of proxy to use for this workflow run"
default=ProxyLocation.RESIDENTIAL,
description=PROXY_LOCATION_DOC_STRING,
)
webhook_url: str | None = Field(
default=None,
@@ -266,11 +305,13 @@ class WorkflowRunRequest(BaseModel):
)
totp_url: str | None = Field(
default=None,
description="URL that serves TOTP/2FA/MFA codes for Skyvern to use during the workflow run. Refer to https://docs.skyvern.com/running-tasks/advanced-features#get-code-from-your-endpoint",
description=TOTP_URL_DOC_STRING,
examples=TOTP_URL_EXAMPLES,
)
totp_identifier: str | None = Field(
default=None,
description="Identifier for the TOTP/2FA/MFA code when the code is pushed to Skyvern. Refer to https://docs.skyvern.com/running-tasks/advanced-features#time-based-one-time-password-totp",
description=TOTP_IDENTIFIER_DOC_STRING,
examples=TOTP_IDENTIFIER_EXAMPLES,
)
browser_session_id: str | None = Field(
default=None,