# This file was auto-generated by Fern from our API Definition. import typing from ..core.client_wrapper import SyncClientWrapper from ..core.request_options import RequestOptions from ..types.workflow import Workflow from ..core.pydantic_utilities import parse_obj_as from ..errors.unprocessable_entity_error import UnprocessableEntityError from json.decoder import JSONDecodeError from ..core.api_error import ApiError from ..types.workflow_create_yaml_request import WorkflowCreateYamlRequest from ..core.serialization import convert_and_respect_annotation_metadata from ..core.jsonable_encoder import jsonable_encoder from ..core.client_wrapper import AsyncClientWrapper # this is used as the default value for optional parameters OMIT = typing.cast(typing.Any, ...) class WorkflowsClient: def __init__(self, *, client_wrapper: SyncClientWrapper): self._client_wrapper = client_wrapper def get_workflows( self, *, page: typing.Optional[int] = None, page_size: typing.Optional[int] = None, only_saved_tasks: typing.Optional[bool] = None, only_workflows: typing.Optional[bool] = None, title: typing.Optional[str] = None, template: typing.Optional[bool] = None, request_options: typing.Optional[RequestOptions] = None, ) -> typing.List[Workflow]: """ Get all workflows with the latest version for the organization. Parameters ---------- page : typing.Optional[int] page_size : typing.Optional[int] only_saved_tasks : typing.Optional[bool] only_workflows : typing.Optional[bool] title : typing.Optional[str] template : typing.Optional[bool] request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.List[Workflow] Successful Response Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", ) client.workflows.get_workflows() """ _response = self._client_wrapper.httpx_client.request( "v1/workflows", method="GET", params={ "page": page, "page_size": page_size, "only_saved_tasks": only_saved_tasks, "only_workflows": only_workflows, "title": title, "template": template, }, request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.List[Workflow], parse_obj_as( type_=typing.List[Workflow], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def create_workflow( self, *, json_definition: typing.Optional[WorkflowCreateYamlRequest] = OMIT, yaml_definition: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> Workflow: """ Create a new workflow definition Parameters ---------- json_definition : typing.Optional[WorkflowCreateYamlRequest] Workflow definition in JSON format yaml_definition : typing.Optional[str] Workflow definition in YAML format request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully created workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", ) client.workflows.create_workflow() """ _response = self._client_wrapper.httpx_client.request( "v1/workflows", method="POST", json={ "json_definition": convert_and_respect_annotation_metadata( object_=json_definition, annotation=WorkflowCreateYamlRequest, direction="write" ), "yaml_definition": yaml_definition, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def update_workflow( self, workflow_id: str, *, json_definition: typing.Optional[WorkflowCreateYamlRequest] = OMIT, yaml_definition: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> Workflow: """ Update a workflow definition Parameters ---------- workflow_id : str The ID of the workflow to update. Workflow ID starts with `wpid_`. json_definition : typing.Optional[WorkflowCreateYamlRequest] Workflow definition in JSON format yaml_definition : typing.Optional[str] Workflow definition in YAML format request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully updated workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", ) client.workflows.update_workflow( workflow_id="wpid_123", ) """ _response = self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}", method="POST", json={ "json_definition": convert_and_respect_annotation_metadata( object_=json_definition, annotation=WorkflowCreateYamlRequest, direction="write" ), "yaml_definition": yaml_definition, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def delete_workflow( self, workflow_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> typing.Optional[typing.Any]: """ Delete a workflow Parameters ---------- workflow_id : str The ID of the workflow to delete. Workflow ID starts with `wpid_`. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.Optional[typing.Any] Successfully deleted workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", ) client.workflows.delete_workflow( workflow_id="wpid_123", ) """ _response = self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}/delete", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) class AsyncWorkflowsClient: def __init__(self, *, client_wrapper: AsyncClientWrapper): self._client_wrapper = client_wrapper async def get_workflows( self, *, page: typing.Optional[int] = None, page_size: typing.Optional[int] = None, only_saved_tasks: typing.Optional[bool] = None, only_workflows: typing.Optional[bool] = None, title: typing.Optional[str] = None, template: typing.Optional[bool] = None, request_options: typing.Optional[RequestOptions] = None, ) -> typing.List[Workflow]: """ Get all workflows with the latest version for the organization. Parameters ---------- page : typing.Optional[int] page_size : typing.Optional[int] only_saved_tasks : typing.Optional[bool] only_workflows : typing.Optional[bool] title : typing.Optional[str] template : typing.Optional[bool] request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.List[Workflow] Successful Response Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", ) async def main() -> None: await client.workflows.get_workflows() asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( "v1/workflows", method="GET", params={ "page": page, "page_size": page_size, "only_saved_tasks": only_saved_tasks, "only_workflows": only_workflows, "title": title, "template": template, }, request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.List[Workflow], parse_obj_as( type_=typing.List[Workflow], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def create_workflow( self, *, json_definition: typing.Optional[WorkflowCreateYamlRequest] = OMIT, yaml_definition: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> Workflow: """ Create a new workflow definition Parameters ---------- json_definition : typing.Optional[WorkflowCreateYamlRequest] Workflow definition in JSON format yaml_definition : typing.Optional[str] Workflow definition in YAML format request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully created workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", ) async def main() -> None: await client.workflows.create_workflow() asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( "v1/workflows", method="POST", json={ "json_definition": convert_and_respect_annotation_metadata( object_=json_definition, annotation=WorkflowCreateYamlRequest, direction="write" ), "yaml_definition": yaml_definition, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def update_workflow( self, workflow_id: str, *, json_definition: typing.Optional[WorkflowCreateYamlRequest] = OMIT, yaml_definition: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> Workflow: """ Update a workflow definition Parameters ---------- workflow_id : str The ID of the workflow to update. Workflow ID starts with `wpid_`. json_definition : typing.Optional[WorkflowCreateYamlRequest] Workflow definition in JSON format yaml_definition : typing.Optional[str] Workflow definition in YAML format request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully updated workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", ) async def main() -> None: await client.workflows.update_workflow( workflow_id="wpid_123", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}", method="POST", json={ "json_definition": convert_and_respect_annotation_metadata( object_=json_definition, annotation=WorkflowCreateYamlRequest, direction="write" ), "yaml_definition": yaml_definition, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def delete_workflow( self, workflow_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> typing.Optional[typing.Any]: """ Delete a workflow Parameters ---------- workflow_id : str The ID of the workflow to delete. Workflow ID starts with `wpid_`. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.Optional[typing.Any] Successfully deleted workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", ) async def main() -> None: await client.workflows.delete_workflow( workflow_id="wpid_123", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}/delete", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json)