# This file was auto-generated by Fern from our API Definition. import typing from ..core.client_wrapper import SyncClientWrapper from ..core.request_options import RequestOptions from .types.agent_get_run_response import AgentGetRunResponse from ..core.jsonable_encoder import jsonable_encoder from ..core.pydantic_utilities import parse_obj_as from ..errors.not_found_error import NotFoundError from ..errors.unprocessable_entity_error import UnprocessableEntityError from json.decoder import JSONDecodeError from ..core.api_error import ApiError from ..types.workflow import Workflow from ..types.run_engine import RunEngine from ..types.proxy_location import ProxyLocation from ..types.task_run_request_data_extraction_schema import TaskRunRequestDataExtractionSchema from ..types.task_run_response import TaskRunResponse from ..core.serialization import convert_and_respect_annotation_metadata from ..errors.bad_request_error import BadRequestError from ..types.workflow_run_response import WorkflowRunResponse from ..core.client_wrapper import AsyncClientWrapper # this is used as the default value for optional parameters OMIT = typing.cast(typing.Any, ...) class AgentClient: def __init__(self, *, client_wrapper: SyncClientWrapper): self._client_wrapper = client_wrapper def get_run(self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> AgentGetRunResponse: """ Get a task or a workflow run by id Parameters ---------- run_id : str The id of the task run or the workflow run. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- AgentGetRunResponse Successfully got run Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) client.agent.get_run( run_id="run_id", ) """ _response = self._client_wrapper.httpx_client.request( f"v1/runs/{jsonable_encoder(run_id)}", method="GET", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( AgentGetRunResponse, parse_obj_as( type_=AgentGetRunResponse, # type: ignore object_=_response.json(), ), ) if _response.status_code == 404: raise NotFoundError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def create_workflow(self, *, request_options: typing.Optional[RequestOptions] = None) -> Workflow: """ Create a new workflow Parameters ---------- request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully created workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) client.agent.create_workflow() """ _response = self._client_wrapper.httpx_client.request( "v1/workflows", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def update_workflow(self, workflow_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> Workflow: """ Update a workflow definition Parameters ---------- workflow_id : str request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully updated workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) client.agent.update_workflow( workflow_id="workflow_id", ) """ _response = self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def delete_workflow( self, workflow_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> typing.Optional[typing.Any]: """ Delete a workflow Parameters ---------- workflow_id : str request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.Optional[typing.Any] Successfully deleted workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) client.agent.delete_workflow( workflow_id="workflow_id", ) """ _response = self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}/delete", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def run_task( self, *, prompt: str, user_agent: typing.Optional[str] = None, url: typing.Optional[str] = OMIT, title: typing.Optional[str] = OMIT, engine: typing.Optional[RunEngine] = OMIT, proxy_location: typing.Optional[ProxyLocation] = OMIT, data_extraction_schema: typing.Optional[TaskRunRequestDataExtractionSchema] = OMIT, error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT, max_steps: typing.Optional[int] = OMIT, webhook_url: typing.Optional[str] = OMIT, totp_identifier: typing.Optional[str] = OMIT, totp_url: typing.Optional[str] = OMIT, browser_session_id: typing.Optional[str] = OMIT, publish_workflow: typing.Optional[bool] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> TaskRunResponse: """ Run a task Parameters ---------- prompt : str The goal or task description for Skyvern to accomplish user_agent : typing.Optional[str] url : typing.Optional[str] The starting URL for the task. If not provided, Skyvern will attempt to determine an appropriate URL title : typing.Optional[str] Optional title for the task engine : typing.Optional[RunEngine] The Skyvern engine version to use for this task proxy_location : typing.Optional[ProxyLocation] Geographic Proxy location to route the browser traffic through data_extraction_schema : typing.Optional[TaskRunRequestDataExtractionSchema] Schema defining what data should be extracted from the webpage error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]] Custom mapping of error codes to error messages if Skyvern encounters an error max_steps : typing.Optional[int] Maximum number of steps the task can take before timing out webhook_url : typing.Optional[str] URL to send task status updates to after a run is finished totp_identifier : typing.Optional[str] Identifier for TOTP (Time-based One-Time Password) authentication if codes are being pushed to Skyvern totp_url : typing.Optional[str] URL for TOTP authentication setup if Skyvern should be polling endpoint for 2FA codes browser_session_id : typing.Optional[str] ID of an existing browser session to reuse, having it continue from the current screen state publish_workflow : typing.Optional[bool] Whether to publish this task as a reusable workflow. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- TaskRunResponse Successfully run task Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) client.agent.run_task( prompt="prompt", ) """ _response = self._client_wrapper.httpx_client.request( "v1/run/tasks", method="POST", json={ "prompt": prompt, "url": url, "title": title, "engine": engine, "proxy_location": proxy_location, "data_extraction_schema": convert_and_respect_annotation_metadata( object_=data_extraction_schema, annotation=TaskRunRequestDataExtractionSchema, direction="write" ), "error_code_mapping": error_code_mapping, "max_steps": max_steps, "webhook_url": webhook_url, "totp_identifier": totp_identifier, "totp_url": totp_url, "browser_session_id": browser_session_id, "publish_workflow": publish_workflow, }, headers={ "x-user-agent": str(user_agent) if user_agent is not None else None, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( TaskRunResponse, parse_obj_as( type_=TaskRunResponse, # type: ignore object_=_response.json(), ), ) if _response.status_code == 400: raise BadRequestError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def run_workflow( self, *, workflow_id: str, template: typing.Optional[bool] = None, max_steps_override: typing.Optional[int] = None, user_agent: typing.Optional[str] = None, title: typing.Optional[str] = OMIT, parameters: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, proxy_location: typing.Optional[ProxyLocation] = OMIT, webhook_url: typing.Optional[str] = OMIT, totp_url: typing.Optional[str] = OMIT, totp_identifier: typing.Optional[str] = OMIT, browser_session_id: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> WorkflowRunResponse: """ Run a workflow Parameters ---------- workflow_id : str ID of the workflow to run template : typing.Optional[bool] max_steps_override : typing.Optional[int] user_agent : typing.Optional[str] title : typing.Optional[str] Optional title for this workflow run parameters : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] Parameters to pass to the workflow proxy_location : typing.Optional[ProxyLocation] Location of proxy to use for this workflow run webhook_url : typing.Optional[str] URL to send workflow status updates to after a run is finished totp_url : typing.Optional[str] URL for TOTP authentication setup if Skyvern should be polling endpoint for 2FA codes totp_identifier : typing.Optional[str] Identifier for TOTP (Time-based One-Time Password) authentication if codes are being pushed to Skyvern browser_session_id : typing.Optional[str] ID of an existing browser session to reuse, having it continue from the current screen state request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- WorkflowRunResponse Successfully run workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) client.agent.run_workflow( workflow_id="workflow_id", ) """ _response = self._client_wrapper.httpx_client.request( "v1/run/workflows", method="POST", params={ "template": template, }, json={ "workflow_id": workflow_id, "title": title, "parameters": parameters, "proxy_location": proxy_location, "webhook_url": webhook_url, "totp_url": totp_url, "totp_identifier": totp_identifier, "browser_session_id": browser_session_id, }, headers={ "x-max-steps-override": str(max_steps_override) if max_steps_override is not None else None, "x-user-agent": str(user_agent) if user_agent is not None else None, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( WorkflowRunResponse, parse_obj_as( type_=WorkflowRunResponse, # type: ignore object_=_response.json(), ), ) if _response.status_code == 400: raise BadRequestError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def cancel_run( self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> typing.Optional[typing.Any]: """ Cancel a task or workflow run Parameters ---------- run_id : str The id of the task run or the workflow run to cancel. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.Optional[typing.Any] Successful Response Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) client.agent.cancel_run( run_id="run_id", ) """ _response = self._client_wrapper.httpx_client.request( f"v1/runs/{jsonable_encoder(run_id)}/cancel", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) class AsyncAgentClient: def __init__(self, *, client_wrapper: AsyncClientWrapper): self._client_wrapper = client_wrapper async def get_run( self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> AgentGetRunResponse: """ Get a task or a workflow run by id Parameters ---------- run_id : str The id of the task run or the workflow run. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- AgentGetRunResponse Successfully got run Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) async def main() -> None: await client.agent.get_run( run_id="run_id", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( f"v1/runs/{jsonable_encoder(run_id)}", method="GET", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( AgentGetRunResponse, parse_obj_as( type_=AgentGetRunResponse, # type: ignore object_=_response.json(), ), ) if _response.status_code == 404: raise NotFoundError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def create_workflow(self, *, request_options: typing.Optional[RequestOptions] = None) -> Workflow: """ Create a new workflow Parameters ---------- request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully created workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) async def main() -> None: await client.agent.create_workflow() asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( "v1/workflows", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def update_workflow( self, workflow_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> Workflow: """ Update a workflow definition Parameters ---------- workflow_id : str request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully updated workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) async def main() -> None: await client.agent.update_workflow( workflow_id="workflow_id", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def delete_workflow( self, workflow_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> typing.Optional[typing.Any]: """ Delete a workflow Parameters ---------- workflow_id : str request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.Optional[typing.Any] Successfully deleted workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) async def main() -> None: await client.agent.delete_workflow( workflow_id="workflow_id", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}/delete", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def run_task( self, *, prompt: str, user_agent: typing.Optional[str] = None, url: typing.Optional[str] = OMIT, title: typing.Optional[str] = OMIT, engine: typing.Optional[RunEngine] = OMIT, proxy_location: typing.Optional[ProxyLocation] = OMIT, data_extraction_schema: typing.Optional[TaskRunRequestDataExtractionSchema] = OMIT, error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT, max_steps: typing.Optional[int] = OMIT, webhook_url: typing.Optional[str] = OMIT, totp_identifier: typing.Optional[str] = OMIT, totp_url: typing.Optional[str] = OMIT, browser_session_id: typing.Optional[str] = OMIT, publish_workflow: typing.Optional[bool] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> TaskRunResponse: """ Run a task Parameters ---------- prompt : str The goal or task description for Skyvern to accomplish user_agent : typing.Optional[str] url : typing.Optional[str] The starting URL for the task. If not provided, Skyvern will attempt to determine an appropriate URL title : typing.Optional[str] Optional title for the task engine : typing.Optional[RunEngine] The Skyvern engine version to use for this task proxy_location : typing.Optional[ProxyLocation] Geographic Proxy location to route the browser traffic through data_extraction_schema : typing.Optional[TaskRunRequestDataExtractionSchema] Schema defining what data should be extracted from the webpage error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]] Custom mapping of error codes to error messages if Skyvern encounters an error max_steps : typing.Optional[int] Maximum number of steps the task can take before timing out webhook_url : typing.Optional[str] URL to send task status updates to after a run is finished totp_identifier : typing.Optional[str] Identifier for TOTP (Time-based One-Time Password) authentication if codes are being pushed to Skyvern totp_url : typing.Optional[str] URL for TOTP authentication setup if Skyvern should be polling endpoint for 2FA codes browser_session_id : typing.Optional[str] ID of an existing browser session to reuse, having it continue from the current screen state publish_workflow : typing.Optional[bool] Whether to publish this task as a reusable workflow. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- TaskRunResponse Successfully run task Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) async def main() -> None: await client.agent.run_task( prompt="prompt", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( "v1/run/tasks", method="POST", json={ "prompt": prompt, "url": url, "title": title, "engine": engine, "proxy_location": proxy_location, "data_extraction_schema": convert_and_respect_annotation_metadata( object_=data_extraction_schema, annotation=TaskRunRequestDataExtractionSchema, direction="write" ), "error_code_mapping": error_code_mapping, "max_steps": max_steps, "webhook_url": webhook_url, "totp_identifier": totp_identifier, "totp_url": totp_url, "browser_session_id": browser_session_id, "publish_workflow": publish_workflow, }, headers={ "x-user-agent": str(user_agent) if user_agent is not None else None, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( TaskRunResponse, parse_obj_as( type_=TaskRunResponse, # type: ignore object_=_response.json(), ), ) if _response.status_code == 400: raise BadRequestError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def run_workflow( self, *, workflow_id: str, template: typing.Optional[bool] = None, max_steps_override: typing.Optional[int] = None, user_agent: typing.Optional[str] = None, title: typing.Optional[str] = OMIT, parameters: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, proxy_location: typing.Optional[ProxyLocation] = OMIT, webhook_url: typing.Optional[str] = OMIT, totp_url: typing.Optional[str] = OMIT, totp_identifier: typing.Optional[str] = OMIT, browser_session_id: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> WorkflowRunResponse: """ Run a workflow Parameters ---------- workflow_id : str ID of the workflow to run template : typing.Optional[bool] max_steps_override : typing.Optional[int] user_agent : typing.Optional[str] title : typing.Optional[str] Optional title for this workflow run parameters : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] Parameters to pass to the workflow proxy_location : typing.Optional[ProxyLocation] Location of proxy to use for this workflow run webhook_url : typing.Optional[str] URL to send workflow status updates to after a run is finished totp_url : typing.Optional[str] URL for TOTP authentication setup if Skyvern should be polling endpoint for 2FA codes totp_identifier : typing.Optional[str] Identifier for TOTP (Time-based One-Time Password) authentication if codes are being pushed to Skyvern browser_session_id : typing.Optional[str] ID of an existing browser session to reuse, having it continue from the current screen state request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- WorkflowRunResponse Successfully run workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) async def main() -> None: await client.agent.run_workflow( workflow_id="workflow_id", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( "v1/run/workflows", method="POST", params={ "template": template, }, json={ "workflow_id": workflow_id, "title": title, "parameters": parameters, "proxy_location": proxy_location, "webhook_url": webhook_url, "totp_url": totp_url, "totp_identifier": totp_identifier, "browser_session_id": browser_session_id, }, headers={ "x-max-steps-override": str(max_steps_override) if max_steps_override is not None else None, "x-user-agent": str(user_agent) if user_agent is not None else None, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( WorkflowRunResponse, parse_obj_as( type_=WorkflowRunResponse, # type: ignore object_=_response.json(), ), ) if _response.status_code == 400: raise BadRequestError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def cancel_run( self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> typing.Optional[typing.Any]: """ Cancel a task or workflow run Parameters ---------- run_id : str The id of the task run or the workflow run to cancel. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.Optional[typing.Any] Successful Response Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", authorization="YOUR_AUTHORIZATION", ) async def main() -> None: await client.agent.cancel_run( run_id="run_id", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( f"v1/runs/{jsonable_encoder(run_id)}/cancel", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json)