Files
Dorod-Sky/skyvern/client/agent/client.py
2025-03-27 22:54:35 -07:00

4481 lines
146 KiB
Python

# This file was auto-generated by Fern from our API Definition.
import typing
from ..core.client_wrapper import SyncClientWrapper
from ..types.run_engine import RunEngine
from ..types.proxy_location import ProxyLocation
from .types.task_run_request_data_extraction_schema import TaskRunRequestDataExtractionSchema
from ..core.request_options import RequestOptions
from ..types.task_run_response import TaskRunResponse
from ..core.serialization import convert_and_respect_annotation_metadata
from ..core.pydantic_utilities import parse_obj_as
from ..errors.unprocessable_entity_error import UnprocessableEntityError
from ..types.http_validation_error import HttpValidationError
from json.decoder import JSONDecodeError
from ..core.api_error import ApiError
from ..types.task_status import TaskStatus
from ..types.order_by import OrderBy
from ..types.sort_direction import SortDirection
from ..types.task import Task
from .types.task_request_navigation_payload import TaskRequestNavigationPayload
from .types.task_request_extracted_information_schema import TaskRequestExtractedInformationSchema
from ..types.task_type import TaskType
from ..types.create_task_response import CreateTaskResponse
from ..types.task_response import TaskResponse
from ..core.jsonable_encoder import jsonable_encoder
from ..types.workflow_run_status import WorkflowRunStatus
from .types.agent_get_runs_response_item import AgentGetRunsResponseItem
from ..types.step import Step
from ..types.entity_type import EntityType
from ..types.artifact import Artifact
from ..types.action import Action
from ..types.run_workflow_response import RunWorkflowResponse
from ..types.workflow_run import WorkflowRun
from ..types.workflow_run_timeline import WorkflowRunTimeline
from ..types.workflow_run_response import WorkflowRunResponse
from ..types.workflow import Workflow
from ..types.task_generation import TaskGeneration
from .types.agent_run_task_v2request_x_max_iterations_override import AgentRunTaskV2RequestXMaxIterationsOverride
from .types.agent_run_task_v2request_x_max_steps_override import AgentRunTaskV2RequestXMaxStepsOverride
from .types.task_v2request_extracted_information_schema import TaskV2RequestExtractedInformationSchema
import datetime as dt
from ..types.totp_code import TotpCode
from ..core.client_wrapper import AsyncClientWrapper
# this is used as the default value for optional parameters
OMIT = typing.cast(typing.Any, ...)
class AgentClient:
def __init__(self, *, client_wrapper: SyncClientWrapper):
self._client_wrapper = client_wrapper
def run_task(
self,
*,
goal: str,
url: typing.Optional[str] = OMIT,
title: typing.Optional[str] = OMIT,
engine: typing.Optional[RunEngine] = OMIT,
proxy_location: typing.Optional[ProxyLocation] = OMIT,
data_extraction_schema: typing.Optional[TaskRunRequestDataExtractionSchema] = OMIT,
error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
max_steps: typing.Optional[int] = OMIT,
webhook_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
totp_url: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
publish_workflow: typing.Optional[bool] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> TaskRunResponse:
"""
Parameters
----------
goal : str
url : typing.Optional[str]
title : typing.Optional[str]
engine : typing.Optional[RunEngine]
proxy_location : typing.Optional[ProxyLocation]
data_extraction_schema : typing.Optional[TaskRunRequestDataExtractionSchema]
error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]]
max_steps : typing.Optional[int]
webhook_url : typing.Optional[str]
totp_identifier : typing.Optional[str]
totp_url : typing.Optional[str]
browser_session_id : typing.Optional[str]
publish_workflow : typing.Optional[bool]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskRunResponse
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.run_task(
goal="goal",
)
"""
_response = self._client_wrapper.httpx_client.request(
"v1/tasks",
method="POST",
json={
"goal": goal,
"url": url,
"title": title,
"engine": engine,
"proxy_location": proxy_location,
"data_extraction_schema": convert_and_respect_annotation_metadata(
object_=data_extraction_schema, annotation=TaskRunRequestDataExtractionSchema, direction="write"
),
"error_code_mapping": error_code_mapping,
"max_steps": max_steps,
"webhook_url": webhook_url,
"totp_identifier": totp_identifier,
"totp_url": totp_url,
"browser_session_id": browser_session_id,
"publish_workflow": publish_workflow,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskRunResponse,
parse_obj_as(
type_=TaskRunResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_tasks(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
task_status: typing.Optional[typing.Union[TaskStatus, typing.Sequence[TaskStatus]]] = None,
workflow_run_id: typing.Optional[str] = None,
only_standalone_tasks: typing.Optional[bool] = None,
application: typing.Optional[str] = None,
sort: typing.Optional[OrderBy] = None,
order: typing.Optional[SortDirection] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[Task]:
"""
Get all tasks.
:param page: Starting page, defaults to 1
:param page_size: Page size, defaults to 10
:param task_status: Task status filter
:param workflow_run_id: Workflow run id filter
:param only_standalone_tasks: Only standalone tasks, tasks which are part of a workflow run will be filtered out
:param order: Direction to sort by, ascending or descending
:param sort: Column to sort by, created_at or modified_at
:return: List of tasks with pagination without steps populated. Steps can be populated by calling the
get_agent_task endpoint.
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
task_status : typing.Optional[typing.Union[TaskStatus, typing.Sequence[TaskStatus]]]
workflow_run_id : typing.Optional[str]
only_standalone_tasks : typing.Optional[bool]
application : typing.Optional[str]
sort : typing.Optional[OrderBy]
order : typing.Optional[SortDirection]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Task]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_tasks()
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/tasks",
method="GET",
params={
"page": page,
"page_size": page_size,
"task_status": task_status,
"workflow_run_id": workflow_run_id,
"only_standalone_tasks": only_standalone_tasks,
"application": application,
"sort": sort,
"order": order,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Task],
parse_obj_as(
type_=typing.List[Task], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def run_task_v1(
self,
*,
url: str,
max_steps_override: typing.Optional[int] = None,
title: typing.Optional[str] = OMIT,
webhook_callback_url: typing.Optional[str] = OMIT,
totp_verification_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
navigation_goal: typing.Optional[str] = OMIT,
data_extraction_goal: typing.Optional[str] = OMIT,
navigation_payload: typing.Optional[TaskRequestNavigationPayload] = OMIT,
error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
proxy_location: typing.Optional[ProxyLocation] = OMIT,
extracted_information_schema: typing.Optional[TaskRequestExtractedInformationSchema] = OMIT,
complete_criterion: typing.Optional[str] = OMIT,
terminate_criterion: typing.Optional[str] = OMIT,
task_type: typing.Optional[TaskType] = OMIT,
application: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> CreateTaskResponse:
"""
Parameters
----------
url : str
Starting URL for the task.
max_steps_override : typing.Optional[int]
title : typing.Optional[str]
The title of the task.
webhook_callback_url : typing.Optional[str]
The URL to call when the task is completed.
totp_verification_url : typing.Optional[str]
totp_identifier : typing.Optional[str]
navigation_goal : typing.Optional[str]
The user's goal for the task.
data_extraction_goal : typing.Optional[str]
The user's goal for data extraction.
navigation_payload : typing.Optional[TaskRequestNavigationPayload]
The user's details needed to achieve the task.
error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]]
The mapping of error codes and their descriptions.
proxy_location : typing.Optional[ProxyLocation]
The location of the proxy to use for the task.
extracted_information_schema : typing.Optional[TaskRequestExtractedInformationSchema]
The requested schema of the extracted information.
complete_criterion : typing.Optional[str]
Criterion to complete
terminate_criterion : typing.Optional[str]
Criterion to terminate
task_type : typing.Optional[TaskType]
The type of the task
application : typing.Optional[str]
The application for which the task is running
browser_session_id : typing.Optional[str]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
CreateTaskResponse
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.run_task_v1(
url="https://www.geico.com",
)
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/tasks",
method="POST",
json={
"title": title,
"url": url,
"webhook_callback_url": webhook_callback_url,
"totp_verification_url": totp_verification_url,
"totp_identifier": totp_identifier,
"navigation_goal": navigation_goal,
"data_extraction_goal": data_extraction_goal,
"navigation_payload": convert_and_respect_annotation_metadata(
object_=navigation_payload, annotation=TaskRequestNavigationPayload, direction="write"
),
"error_code_mapping": error_code_mapping,
"proxy_location": proxy_location,
"extracted_information_schema": convert_and_respect_annotation_metadata(
object_=extracted_information_schema,
annotation=TaskRequestExtractedInformationSchema,
direction="write",
),
"complete_criterion": complete_criterion,
"terminate_criterion": terminate_criterion,
"task_type": task_type,
"application": application,
"browser_session_id": browser_session_id,
},
headers={
"content-type": "application/json",
"x-max-steps-override": str(max_steps_override) if max_steps_override is not None else None,
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
CreateTaskResponse,
parse_obj_as(
type_=CreateTaskResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_task_v1(self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> TaskResponse:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskResponse
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_task_v1(
task_id="task_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskResponse,
parse_obj_as(
type_=TaskResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def cancel_task(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Optional[typing.Any]:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Optional[typing.Any]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.cancel_task(
task_id="task_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/cancel",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def cancel_workflow_run(
self, workflow_run_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Optional[typing.Any]:
"""
Parameters
----------
workflow_run_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Optional[typing.Any]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.cancel_workflow_run(
workflow_run_id="workflow_run_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/runs/{jsonable_encoder(workflow_run_id)}/cancel",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def retry_webhook(self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> TaskResponse:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskResponse
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.retry_webhook(
task_id="task_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/retry_webhook",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskResponse,
parse_obj_as(
type_=TaskResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_runs(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
status: typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[AgentGetRunsResponseItem]:
"""
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
status : typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[AgentGetRunsResponseItem]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_runs()
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/runs",
method="GET",
params={
"page": page,
"page_size": page_size,
"status": status,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[AgentGetRunsResponseItem],
parse_obj_as(
type_=typing.List[AgentGetRunsResponseItem], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_run(self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> TaskRunResponse:
"""
Parameters
----------
run_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskRunResponse
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_run(
run_id="run_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/runs/{jsonable_encoder(run_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskRunResponse,
parse_obj_as(
type_=TaskRunResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_steps(self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> typing.List[Step]:
"""
Get all steps for a task.
:param task_id:
:return: List of steps for a task with pagination.
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Step]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_steps(
task_id="task_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/steps",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Step],
parse_obj_as(
type_=typing.List[Step], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_artifacts(
self, entity_type: EntityType, entity_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Artifact]:
"""
Get all artifacts for an entity (step, task, workflow_run).
Args:
entity_type: Type of entity to fetch artifacts for
entity_id: ID of the entity
current_org: Current organization from auth
Returns:
List of artifacts for the entity
Raises:
HTTPException: If entity is not supported
Parameters
----------
entity_type : EntityType
entity_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Artifact]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_artifacts(
entity_type="step",
entity_id="entity_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/{jsonable_encoder(entity_type)}/{jsonable_encoder(entity_id)}/artifacts",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Artifact],
parse_obj_as(
type_=typing.List[Artifact], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_step_artifacts(
self, task_id: str, step_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Artifact]:
"""
Get all artifacts for a list of steps.
:param task_id:
:param step_id:
:return: List of artifacts for a list of steps.
Parameters
----------
task_id : str
step_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Artifact]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_step_artifacts(
task_id="task_id",
step_id="step_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/steps/{jsonable_encoder(step_id)}/artifacts",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Artifact],
parse_obj_as(
type_=typing.List[Artifact], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_actions(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Action]:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Action]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_actions(
task_id="task_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/actions",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Action],
parse_obj_as(
type_=typing.List[Action], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def run_workflow(
self,
workflow_id: str,
*,
version: typing.Optional[int] = None,
template: typing.Optional[bool] = None,
max_steps_override: typing.Optional[int] = None,
data: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT,
proxy_location: typing.Optional[ProxyLocation] = OMIT,
webhook_callback_url: typing.Optional[str] = OMIT,
totp_verification_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> RunWorkflowResponse:
"""
Parameters
----------
workflow_id : str
version : typing.Optional[int]
template : typing.Optional[bool]
max_steps_override : typing.Optional[int]
data : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]]
proxy_location : typing.Optional[ProxyLocation]
webhook_callback_url : typing.Optional[str]
totp_verification_url : typing.Optional[str]
totp_identifier : typing.Optional[str]
browser_session_id : typing.Optional[str]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
RunWorkflowResponse
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.run_workflow(
workflow_id="workflow_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_id)}/run",
method="POST",
params={
"version": version,
"template": template,
},
json={
"data": data,
"proxy_location": proxy_location,
"webhook_callback_url": webhook_callback_url,
"totp_verification_url": totp_verification_url,
"totp_identifier": totp_identifier,
"browser_session_id": browser_session_id,
},
headers={
"content-type": "application/json",
"x-max-steps-override": str(max_steps_override) if max_steps_override is not None else None,
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
RunWorkflowResponse,
parse_obj_as(
type_=RunWorkflowResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_workflow_runs(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
status: typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[WorkflowRun]:
"""
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
status : typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[WorkflowRun]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_workflow_runs()
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/workflows/runs",
method="GET",
params={
"page": page,
"page_size": page_size,
"status": status,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[WorkflowRun],
parse_obj_as(
type_=typing.List[WorkflowRun], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_workflow_runs_by_id(
self,
workflow_id: str,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
status: typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[WorkflowRun]:
"""
Parameters
----------
workflow_id : str
page : typing.Optional[int]
page_size : typing.Optional[int]
status : typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[WorkflowRun]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_workflow_runs_by_id(
workflow_id="workflow_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_id)}/runs",
method="GET",
params={
"page": page,
"page_size": page_size,
"status": status,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[WorkflowRun],
parse_obj_as(
type_=typing.List[WorkflowRun], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_workflow_run_with_workflow_id(
self, workflow_id: str, workflow_run_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Dict[str, typing.Optional[typing.Any]]:
"""
Parameters
----------
workflow_id : str
workflow_run_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Dict[str, typing.Optional[typing.Any]]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_workflow_run_with_workflow_id(
workflow_id="workflow_id",
workflow_run_id="workflow_run_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_id)}/runs/{jsonable_encoder(workflow_run_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Dict[str, typing.Optional[typing.Any]],
parse_obj_as(
type_=typing.Dict[str, typing.Optional[typing.Any]], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_workflow_run_timeline(
self,
workflow_run_id: str,
workflow_id: str,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[WorkflowRunTimeline]:
"""
Parameters
----------
workflow_run_id : str
workflow_id : str
page : typing.Optional[int]
page_size : typing.Optional[int]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[WorkflowRunTimeline]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_workflow_run_timeline(
workflow_run_id="workflow_run_id",
workflow_id="workflow_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_id)}/runs/{jsonable_encoder(workflow_run_id)}/timeline",
method="GET",
params={
"page": page,
"page_size": page_size,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[WorkflowRunTimeline],
parse_obj_as(
type_=typing.List[WorkflowRunTimeline], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_workflow_run(
self, workflow_run_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> WorkflowRunResponse:
"""
Parameters
----------
workflow_run_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
WorkflowRunResponse
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_workflow_run(
workflow_run_id="workflow_run_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/runs/{jsonable_encoder(workflow_run_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
WorkflowRunResponse,
parse_obj_as(
type_=WorkflowRunResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_workflows(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
only_saved_tasks: typing.Optional[bool] = None,
only_workflows: typing.Optional[bool] = None,
title: typing.Optional[str] = None,
template: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[Workflow]:
"""
Get all workflows with the latest version for the organization.
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
only_saved_tasks : typing.Optional[bool]
only_workflows : typing.Optional[bool]
title : typing.Optional[str]
template : typing.Optional[bool]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Workflow]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_workflows()
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/workflows",
method="GET",
params={
"page": page,
"page_size": page_size,
"only_saved_tasks": only_saved_tasks,
"only_workflows": only_workflows,
"title": title,
"template": template,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Workflow],
parse_obj_as(
type_=typing.List[Workflow], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def create_workflow(self, *, request_options: typing.Optional[RequestOptions] = None) -> Workflow:
"""
Parameters
----------
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Workflow
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.create_workflow()
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/workflows",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
Workflow,
parse_obj_as(
type_=Workflow, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_workflow(
self,
workflow_permanent_id: str,
*,
version: typing.Optional[int] = None,
template: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> Workflow:
"""
Parameters
----------
workflow_permanent_id : str
version : typing.Optional[int]
template : typing.Optional[bool]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Workflow
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_workflow(
workflow_permanent_id="workflow_permanent_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_permanent_id)}",
method="GET",
params={
"version": version,
"template": template,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
Workflow,
parse_obj_as(
type_=Workflow, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def update_workflow(
self, workflow_permanent_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> Workflow:
"""
Parameters
----------
workflow_permanent_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Workflow
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.update_workflow(
workflow_permanent_id="workflow_permanent_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_permanent_id)}",
method="PUT",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
Workflow,
parse_obj_as(
type_=Workflow, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def delete_workflow(
self, workflow_permanent_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Optional[typing.Any]:
"""
Parameters
----------
workflow_permanent_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Optional[typing.Any]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.delete_workflow(
workflow_permanent_id="workflow_permanent_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_permanent_id)}",
method="DELETE",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_workflow_templates(
self, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Workflow]:
"""
Parameters
----------
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Workflow]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_workflow_templates()
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/workflows/templates",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Workflow],
parse_obj_as(
type_=typing.List[Workflow], # type: ignore
object_=_response.json(),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def generate_task(self, *, prompt: str, request_options: typing.Optional[RequestOptions] = None) -> TaskGeneration:
"""
Parameters
----------
prompt : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskGeneration
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.generate_task(
prompt="prompt",
)
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/generate/task",
method="POST",
json={
"prompt": prompt,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskGeneration,
parse_obj_as(
type_=TaskGeneration, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def run_task_v2(
self,
*,
user_prompt: str,
max_iterations_override: typing.Optional[AgentRunTaskV2RequestXMaxIterationsOverride] = None,
max_steps_override: typing.Optional[AgentRunTaskV2RequestXMaxStepsOverride] = None,
url: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
webhook_callback_url: typing.Optional[str] = OMIT,
totp_verification_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
proxy_location: typing.Optional[ProxyLocation] = OMIT,
publish_workflow: typing.Optional[bool] = OMIT,
extracted_information_schema: typing.Optional[TaskV2RequestExtractedInformationSchema] = OMIT,
error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.Dict[str, typing.Optional[typing.Any]]:
"""
Parameters
----------
user_prompt : str
max_iterations_override : typing.Optional[AgentRunTaskV2RequestXMaxIterationsOverride]
max_steps_override : typing.Optional[AgentRunTaskV2RequestXMaxStepsOverride]
url : typing.Optional[str]
browser_session_id : typing.Optional[str]
webhook_callback_url : typing.Optional[str]
totp_verification_url : typing.Optional[str]
totp_identifier : typing.Optional[str]
proxy_location : typing.Optional[ProxyLocation]
publish_workflow : typing.Optional[bool]
extracted_information_schema : typing.Optional[TaskV2RequestExtractedInformationSchema]
error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Dict[str, typing.Optional[typing.Any]]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.run_task_v2(
user_prompt="user_prompt",
)
"""
_response = self._client_wrapper.httpx_client.request(
"api/v2/tasks",
method="POST",
json={
"user_prompt": user_prompt,
"url": url,
"browser_session_id": browser_session_id,
"webhook_callback_url": webhook_callback_url,
"totp_verification_url": totp_verification_url,
"totp_identifier": totp_identifier,
"proxy_location": proxy_location,
"publish_workflow": publish_workflow,
"extracted_information_schema": convert_and_respect_annotation_metadata(
object_=extracted_information_schema,
annotation=TaskV2RequestExtractedInformationSchema,
direction="write",
),
"error_code_mapping": error_code_mapping,
},
headers={
"content-type": "application/json",
"x-max-iterations-override": str(max_iterations_override)
if max_iterations_override is not None
else None,
"x-max-steps-override": str(max_steps_override) if max_steps_override is not None else None,
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Dict[str, typing.Optional[typing.Any]],
parse_obj_as(
type_=typing.Dict[str, typing.Optional[typing.Any]], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def get_task_v2(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Dict[str, typing.Optional[typing.Any]]:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Dict[str, typing.Optional[typing.Any]]
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.get_task_v2(
task_id="task_id",
)
"""
_response = self._client_wrapper.httpx_client.request(
f"api/v2/tasks/{jsonable_encoder(task_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Dict[str, typing.Optional[typing.Any]],
parse_obj_as(
type_=typing.Dict[str, typing.Optional[typing.Any]], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
def send_totp_code(
self,
*,
totp_identifier: str,
content: str,
task_id: typing.Optional[str] = OMIT,
workflow_id: typing.Optional[str] = OMIT,
source: typing.Optional[str] = OMIT,
expired_at: typing.Optional[dt.datetime] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> TotpCode:
"""
Parameters
----------
totp_identifier : str
content : str
task_id : typing.Optional[str]
workflow_id : typing.Optional[str]
source : typing.Optional[str]
expired_at : typing.Optional[dt.datetime]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TotpCode
Successful Response
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
client.agent.send_totp_code(
totp_identifier="totp_identifier",
content="content",
)
"""
_response = self._client_wrapper.httpx_client.request(
"api/v1/totp",
method="POST",
json={
"totp_identifier": totp_identifier,
"task_id": task_id,
"workflow_id": workflow_id,
"source": source,
"content": content,
"expired_at": expired_at,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TotpCode,
parse_obj_as(
type_=TotpCode, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
class AsyncAgentClient:
def __init__(self, *, client_wrapper: AsyncClientWrapper):
self._client_wrapper = client_wrapper
async def run_task(
self,
*,
goal: str,
url: typing.Optional[str] = OMIT,
title: typing.Optional[str] = OMIT,
engine: typing.Optional[RunEngine] = OMIT,
proxy_location: typing.Optional[ProxyLocation] = OMIT,
data_extraction_schema: typing.Optional[TaskRunRequestDataExtractionSchema] = OMIT,
error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
max_steps: typing.Optional[int] = OMIT,
webhook_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
totp_url: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
publish_workflow: typing.Optional[bool] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> TaskRunResponse:
"""
Parameters
----------
goal : str
url : typing.Optional[str]
title : typing.Optional[str]
engine : typing.Optional[RunEngine]
proxy_location : typing.Optional[ProxyLocation]
data_extraction_schema : typing.Optional[TaskRunRequestDataExtractionSchema]
error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]]
max_steps : typing.Optional[int]
webhook_url : typing.Optional[str]
totp_identifier : typing.Optional[str]
totp_url : typing.Optional[str]
browser_session_id : typing.Optional[str]
publish_workflow : typing.Optional[bool]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskRunResponse
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.run_task(
goal="goal",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"v1/tasks",
method="POST",
json={
"goal": goal,
"url": url,
"title": title,
"engine": engine,
"proxy_location": proxy_location,
"data_extraction_schema": convert_and_respect_annotation_metadata(
object_=data_extraction_schema, annotation=TaskRunRequestDataExtractionSchema, direction="write"
),
"error_code_mapping": error_code_mapping,
"max_steps": max_steps,
"webhook_url": webhook_url,
"totp_identifier": totp_identifier,
"totp_url": totp_url,
"browser_session_id": browser_session_id,
"publish_workflow": publish_workflow,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskRunResponse,
parse_obj_as(
type_=TaskRunResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_tasks(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
task_status: typing.Optional[typing.Union[TaskStatus, typing.Sequence[TaskStatus]]] = None,
workflow_run_id: typing.Optional[str] = None,
only_standalone_tasks: typing.Optional[bool] = None,
application: typing.Optional[str] = None,
sort: typing.Optional[OrderBy] = None,
order: typing.Optional[SortDirection] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[Task]:
"""
Get all tasks.
:param page: Starting page, defaults to 1
:param page_size: Page size, defaults to 10
:param task_status: Task status filter
:param workflow_run_id: Workflow run id filter
:param only_standalone_tasks: Only standalone tasks, tasks which are part of a workflow run will be filtered out
:param order: Direction to sort by, ascending or descending
:param sort: Column to sort by, created_at or modified_at
:return: List of tasks with pagination without steps populated. Steps can be populated by calling the
get_agent_task endpoint.
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
task_status : typing.Optional[typing.Union[TaskStatus, typing.Sequence[TaskStatus]]]
workflow_run_id : typing.Optional[str]
only_standalone_tasks : typing.Optional[bool]
application : typing.Optional[str]
sort : typing.Optional[OrderBy]
order : typing.Optional[SortDirection]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Task]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_tasks()
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/tasks",
method="GET",
params={
"page": page,
"page_size": page_size,
"task_status": task_status,
"workflow_run_id": workflow_run_id,
"only_standalone_tasks": only_standalone_tasks,
"application": application,
"sort": sort,
"order": order,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Task],
parse_obj_as(
type_=typing.List[Task], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def run_task_v1(
self,
*,
url: str,
max_steps_override: typing.Optional[int] = None,
title: typing.Optional[str] = OMIT,
webhook_callback_url: typing.Optional[str] = OMIT,
totp_verification_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
navigation_goal: typing.Optional[str] = OMIT,
data_extraction_goal: typing.Optional[str] = OMIT,
navigation_payload: typing.Optional[TaskRequestNavigationPayload] = OMIT,
error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
proxy_location: typing.Optional[ProxyLocation] = OMIT,
extracted_information_schema: typing.Optional[TaskRequestExtractedInformationSchema] = OMIT,
complete_criterion: typing.Optional[str] = OMIT,
terminate_criterion: typing.Optional[str] = OMIT,
task_type: typing.Optional[TaskType] = OMIT,
application: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> CreateTaskResponse:
"""
Parameters
----------
url : str
Starting URL for the task.
max_steps_override : typing.Optional[int]
title : typing.Optional[str]
The title of the task.
webhook_callback_url : typing.Optional[str]
The URL to call when the task is completed.
totp_verification_url : typing.Optional[str]
totp_identifier : typing.Optional[str]
navigation_goal : typing.Optional[str]
The user's goal for the task.
data_extraction_goal : typing.Optional[str]
The user's goal for data extraction.
navigation_payload : typing.Optional[TaskRequestNavigationPayload]
The user's details needed to achieve the task.
error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]]
The mapping of error codes and their descriptions.
proxy_location : typing.Optional[ProxyLocation]
The location of the proxy to use for the task.
extracted_information_schema : typing.Optional[TaskRequestExtractedInformationSchema]
The requested schema of the extracted information.
complete_criterion : typing.Optional[str]
Criterion to complete
terminate_criterion : typing.Optional[str]
Criterion to terminate
task_type : typing.Optional[TaskType]
The type of the task
application : typing.Optional[str]
The application for which the task is running
browser_session_id : typing.Optional[str]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
CreateTaskResponse
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.run_task_v1(
url="https://www.geico.com",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/tasks",
method="POST",
json={
"title": title,
"url": url,
"webhook_callback_url": webhook_callback_url,
"totp_verification_url": totp_verification_url,
"totp_identifier": totp_identifier,
"navigation_goal": navigation_goal,
"data_extraction_goal": data_extraction_goal,
"navigation_payload": convert_and_respect_annotation_metadata(
object_=navigation_payload, annotation=TaskRequestNavigationPayload, direction="write"
),
"error_code_mapping": error_code_mapping,
"proxy_location": proxy_location,
"extracted_information_schema": convert_and_respect_annotation_metadata(
object_=extracted_information_schema,
annotation=TaskRequestExtractedInformationSchema,
direction="write",
),
"complete_criterion": complete_criterion,
"terminate_criterion": terminate_criterion,
"task_type": task_type,
"application": application,
"browser_session_id": browser_session_id,
},
headers={
"content-type": "application/json",
"x-max-steps-override": str(max_steps_override) if max_steps_override is not None else None,
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
CreateTaskResponse,
parse_obj_as(
type_=CreateTaskResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_task_v1(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> TaskResponse:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskResponse
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_task_v1(
task_id="task_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskResponse,
parse_obj_as(
type_=TaskResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def cancel_task(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Optional[typing.Any]:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Optional[typing.Any]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.cancel_task(
task_id="task_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/cancel",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def cancel_workflow_run(
self, workflow_run_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Optional[typing.Any]:
"""
Parameters
----------
workflow_run_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Optional[typing.Any]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.cancel_workflow_run(
workflow_run_id="workflow_run_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/runs/{jsonable_encoder(workflow_run_id)}/cancel",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def retry_webhook(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> TaskResponse:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskResponse
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.retry_webhook(
task_id="task_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/retry_webhook",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskResponse,
parse_obj_as(
type_=TaskResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_runs(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
status: typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[AgentGetRunsResponseItem]:
"""
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
status : typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[AgentGetRunsResponseItem]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_runs()
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/runs",
method="GET",
params={
"page": page,
"page_size": page_size,
"status": status,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[AgentGetRunsResponseItem],
parse_obj_as(
type_=typing.List[AgentGetRunsResponseItem], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_run(self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> TaskRunResponse:
"""
Parameters
----------
run_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskRunResponse
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_run(
run_id="run_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/runs/{jsonable_encoder(run_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskRunResponse,
parse_obj_as(
type_=TaskRunResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_steps(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Step]:
"""
Get all steps for a task.
:param task_id:
:return: List of steps for a task with pagination.
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Step]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_steps(
task_id="task_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/steps",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Step],
parse_obj_as(
type_=typing.List[Step], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_artifacts(
self, entity_type: EntityType, entity_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Artifact]:
"""
Get all artifacts for an entity (step, task, workflow_run).
Args:
entity_type: Type of entity to fetch artifacts for
entity_id: ID of the entity
current_org: Current organization from auth
Returns:
List of artifacts for the entity
Raises:
HTTPException: If entity is not supported
Parameters
----------
entity_type : EntityType
entity_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Artifact]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_artifacts(
entity_type="step",
entity_id="entity_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/{jsonable_encoder(entity_type)}/{jsonable_encoder(entity_id)}/artifacts",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Artifact],
parse_obj_as(
type_=typing.List[Artifact], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_step_artifacts(
self, task_id: str, step_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Artifact]:
"""
Get all artifacts for a list of steps.
:param task_id:
:param step_id:
:return: List of artifacts for a list of steps.
Parameters
----------
task_id : str
step_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Artifact]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_step_artifacts(
task_id="task_id",
step_id="step_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/steps/{jsonable_encoder(step_id)}/artifacts",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Artifact],
parse_obj_as(
type_=typing.List[Artifact], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_actions(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Action]:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Action]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_actions(
task_id="task_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/tasks/{jsonable_encoder(task_id)}/actions",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Action],
parse_obj_as(
type_=typing.List[Action], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def run_workflow(
self,
workflow_id: str,
*,
version: typing.Optional[int] = None,
template: typing.Optional[bool] = None,
max_steps_override: typing.Optional[int] = None,
data: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT,
proxy_location: typing.Optional[ProxyLocation] = OMIT,
webhook_callback_url: typing.Optional[str] = OMIT,
totp_verification_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> RunWorkflowResponse:
"""
Parameters
----------
workflow_id : str
version : typing.Optional[int]
template : typing.Optional[bool]
max_steps_override : typing.Optional[int]
data : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]]
proxy_location : typing.Optional[ProxyLocation]
webhook_callback_url : typing.Optional[str]
totp_verification_url : typing.Optional[str]
totp_identifier : typing.Optional[str]
browser_session_id : typing.Optional[str]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
RunWorkflowResponse
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.run_workflow(
workflow_id="workflow_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_id)}/run",
method="POST",
params={
"version": version,
"template": template,
},
json={
"data": data,
"proxy_location": proxy_location,
"webhook_callback_url": webhook_callback_url,
"totp_verification_url": totp_verification_url,
"totp_identifier": totp_identifier,
"browser_session_id": browser_session_id,
},
headers={
"content-type": "application/json",
"x-max-steps-override": str(max_steps_override) if max_steps_override is not None else None,
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
RunWorkflowResponse,
parse_obj_as(
type_=RunWorkflowResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_workflow_runs(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
status: typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[WorkflowRun]:
"""
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
status : typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[WorkflowRun]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_workflow_runs()
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/workflows/runs",
method="GET",
params={
"page": page,
"page_size": page_size,
"status": status,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[WorkflowRun],
parse_obj_as(
type_=typing.List[WorkflowRun], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_workflow_runs_by_id(
self,
workflow_id: str,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
status: typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[WorkflowRun]:
"""
Parameters
----------
workflow_id : str
page : typing.Optional[int]
page_size : typing.Optional[int]
status : typing.Optional[typing.Union[WorkflowRunStatus, typing.Sequence[WorkflowRunStatus]]]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[WorkflowRun]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_workflow_runs_by_id(
workflow_id="workflow_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_id)}/runs",
method="GET",
params={
"page": page,
"page_size": page_size,
"status": status,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[WorkflowRun],
parse_obj_as(
type_=typing.List[WorkflowRun], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_workflow_run_with_workflow_id(
self, workflow_id: str, workflow_run_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Dict[str, typing.Optional[typing.Any]]:
"""
Parameters
----------
workflow_id : str
workflow_run_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Dict[str, typing.Optional[typing.Any]]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_workflow_run_with_workflow_id(
workflow_id="workflow_id",
workflow_run_id="workflow_run_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_id)}/runs/{jsonable_encoder(workflow_run_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Dict[str, typing.Optional[typing.Any]],
parse_obj_as(
type_=typing.Dict[str, typing.Optional[typing.Any]], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_workflow_run_timeline(
self,
workflow_run_id: str,
workflow_id: str,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[WorkflowRunTimeline]:
"""
Parameters
----------
workflow_run_id : str
workflow_id : str
page : typing.Optional[int]
page_size : typing.Optional[int]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[WorkflowRunTimeline]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_workflow_run_timeline(
workflow_run_id="workflow_run_id",
workflow_id="workflow_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_id)}/runs/{jsonable_encoder(workflow_run_id)}/timeline",
method="GET",
params={
"page": page,
"page_size": page_size,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[WorkflowRunTimeline],
parse_obj_as(
type_=typing.List[WorkflowRunTimeline], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_workflow_run(
self, workflow_run_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> WorkflowRunResponse:
"""
Parameters
----------
workflow_run_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
WorkflowRunResponse
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_workflow_run(
workflow_run_id="workflow_run_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/runs/{jsonable_encoder(workflow_run_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
WorkflowRunResponse,
parse_obj_as(
type_=WorkflowRunResponse, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_workflows(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
only_saved_tasks: typing.Optional[bool] = None,
only_workflows: typing.Optional[bool] = None,
title: typing.Optional[str] = None,
template: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[Workflow]:
"""
Get all workflows with the latest version for the organization.
Parameters
----------
page : typing.Optional[int]
page_size : typing.Optional[int]
only_saved_tasks : typing.Optional[bool]
only_workflows : typing.Optional[bool]
title : typing.Optional[str]
template : typing.Optional[bool]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Workflow]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_workflows()
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/workflows",
method="GET",
params={
"page": page,
"page_size": page_size,
"only_saved_tasks": only_saved_tasks,
"only_workflows": only_workflows,
"title": title,
"template": template,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Workflow],
parse_obj_as(
type_=typing.List[Workflow], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def create_workflow(self, *, request_options: typing.Optional[RequestOptions] = None) -> Workflow:
"""
Parameters
----------
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Workflow
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.create_workflow()
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/workflows",
method="POST",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
Workflow,
parse_obj_as(
type_=Workflow, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_workflow(
self,
workflow_permanent_id: str,
*,
version: typing.Optional[int] = None,
template: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> Workflow:
"""
Parameters
----------
workflow_permanent_id : str
version : typing.Optional[int]
template : typing.Optional[bool]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Workflow
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_workflow(
workflow_permanent_id="workflow_permanent_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_permanent_id)}",
method="GET",
params={
"version": version,
"template": template,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
Workflow,
parse_obj_as(
type_=Workflow, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def update_workflow(
self, workflow_permanent_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> Workflow:
"""
Parameters
----------
workflow_permanent_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Workflow
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.update_workflow(
workflow_permanent_id="workflow_permanent_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_permanent_id)}",
method="PUT",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
Workflow,
parse_obj_as(
type_=Workflow, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def delete_workflow(
self, workflow_permanent_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Optional[typing.Any]:
"""
Parameters
----------
workflow_permanent_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Optional[typing.Any]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.delete_workflow(
workflow_permanent_id="workflow_permanent_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v1/workflows/{jsonable_encoder(workflow_permanent_id)}",
method="DELETE",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_workflow_templates(
self, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.List[Workflow]:
"""
Parameters
----------
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Workflow]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_workflow_templates()
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/workflows/templates",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.List[Workflow],
parse_obj_as(
type_=typing.List[Workflow], # type: ignore
object_=_response.json(),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def generate_task(
self, *, prompt: str, request_options: typing.Optional[RequestOptions] = None
) -> TaskGeneration:
"""
Parameters
----------
prompt : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TaskGeneration
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.generate_task(
prompt="prompt",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/generate/task",
method="POST",
json={
"prompt": prompt,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TaskGeneration,
parse_obj_as(
type_=TaskGeneration, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def run_task_v2(
self,
*,
user_prompt: str,
max_iterations_override: typing.Optional[AgentRunTaskV2RequestXMaxIterationsOverride] = None,
max_steps_override: typing.Optional[AgentRunTaskV2RequestXMaxStepsOverride] = None,
url: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
webhook_callback_url: typing.Optional[str] = OMIT,
totp_verification_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
proxy_location: typing.Optional[ProxyLocation] = OMIT,
publish_workflow: typing.Optional[bool] = OMIT,
extracted_information_schema: typing.Optional[TaskV2RequestExtractedInformationSchema] = OMIT,
error_code_mapping: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.Dict[str, typing.Optional[typing.Any]]:
"""
Parameters
----------
user_prompt : str
max_iterations_override : typing.Optional[AgentRunTaskV2RequestXMaxIterationsOverride]
max_steps_override : typing.Optional[AgentRunTaskV2RequestXMaxStepsOverride]
url : typing.Optional[str]
browser_session_id : typing.Optional[str]
webhook_callback_url : typing.Optional[str]
totp_verification_url : typing.Optional[str]
totp_identifier : typing.Optional[str]
proxy_location : typing.Optional[ProxyLocation]
publish_workflow : typing.Optional[bool]
extracted_information_schema : typing.Optional[TaskV2RequestExtractedInformationSchema]
error_code_mapping : typing.Optional[typing.Dict[str, typing.Optional[str]]]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Dict[str, typing.Optional[typing.Any]]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.run_task_v2(
user_prompt="user_prompt",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v2/tasks",
method="POST",
json={
"user_prompt": user_prompt,
"url": url,
"browser_session_id": browser_session_id,
"webhook_callback_url": webhook_callback_url,
"totp_verification_url": totp_verification_url,
"totp_identifier": totp_identifier,
"proxy_location": proxy_location,
"publish_workflow": publish_workflow,
"extracted_information_schema": convert_and_respect_annotation_metadata(
object_=extracted_information_schema,
annotation=TaskV2RequestExtractedInformationSchema,
direction="write",
),
"error_code_mapping": error_code_mapping,
},
headers={
"content-type": "application/json",
"x-max-iterations-override": str(max_iterations_override)
if max_iterations_override is not None
else None,
"x-max-steps-override": str(max_steps_override) if max_steps_override is not None else None,
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Dict[str, typing.Optional[typing.Any]],
parse_obj_as(
type_=typing.Dict[str, typing.Optional[typing.Any]], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def get_task_v2(
self, task_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> typing.Dict[str, typing.Optional[typing.Any]]:
"""
Parameters
----------
task_id : str
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Dict[str, typing.Optional[typing.Any]]
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.get_task_v2(
task_id="task_id",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
f"api/v2/tasks/{jsonable_encoder(task_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
typing.Dict[str, typing.Optional[typing.Any]],
parse_obj_as(
type_=typing.Dict[str, typing.Optional[typing.Any]], # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)
async def send_totp_code(
self,
*,
totp_identifier: str,
content: str,
task_id: typing.Optional[str] = OMIT,
workflow_id: typing.Optional[str] = OMIT,
source: typing.Optional[str] = OMIT,
expired_at: typing.Optional[dt.datetime] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> TotpCode:
"""
Parameters
----------
totp_identifier : str
content : str
task_id : typing.Optional[str]
workflow_id : typing.Optional[str]
source : typing.Optional[str]
expired_at : typing.Optional[dt.datetime]
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
TotpCode
Successful Response
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
authorization="YOUR_AUTHORIZATION",
)
async def main() -> None:
await client.agent.send_totp_code(
totp_identifier="totp_identifier",
content="content",
)
asyncio.run(main())
"""
_response = await self._client_wrapper.httpx_client.request(
"api/v1/totp",
method="POST",
json={
"totp_identifier": totp_identifier,
"task_id": task_id,
"workflow_id": workflow_id,
"source": source,
"content": content,
"expired_at": expired_at,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
return typing.cast(
TotpCode,
parse_obj_as(
type_=TotpCode, # type: ignore
object_=_response.json(),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
typing.cast(
HttpValidationError,
parse_obj_as(
type_=HttpValidationError, # type: ignore
object_=_response.json(),
),
)
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, body=_response.text)
raise ApiError(status_code=_response.status_code, body=_response_json)