# This file was auto-generated by Fern from our API Definition. import typing from ..core.client_wrapper import SyncClientWrapper from ..types.workflow_create_yaml_request import WorkflowCreateYamlRequest from ..core.request_options import RequestOptions from ..types.workflow import Workflow from ..core.serialization import convert_and_respect_annotation_metadata from ..core.pydantic_utilities import parse_obj_as from ..errors.unprocessable_entity_error import UnprocessableEntityError from json.decoder import JSONDecodeError from ..core.api_error import ApiError from ..core.jsonable_encoder import jsonable_encoder from ..core.client_wrapper import AsyncClientWrapper # this is used as the default value for optional parameters OMIT = typing.cast(typing.Any, ...) class WorkflowsClient: def __init__(self, *, client_wrapper: SyncClientWrapper): self._client_wrapper = client_wrapper def create_workflow( self, *, json_definition: typing.Optional[WorkflowCreateYamlRequest] = OMIT, yaml_definition: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> Workflow: """ Create a new workflow definition Parameters ---------- json_definition : typing.Optional[WorkflowCreateYamlRequest] Workflow definition in JSON format yaml_definition : typing.Optional[str] Workflow definition in YAML format request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully created workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", ) client.workflows.create_workflow() """ _response = self._client_wrapper.httpx_client.request( "v1/workflows", method="POST", json={ "json_definition": convert_and_respect_annotation_metadata( object_=json_definition, annotation=WorkflowCreateYamlRequest, direction="write" ), "yaml_definition": yaml_definition, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def update_workflow( self, workflow_id: str, *, json_definition: typing.Optional[WorkflowCreateYamlRequest] = OMIT, yaml_definition: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> Workflow: """ Update a workflow definition Parameters ---------- workflow_id : str The ID of the workflow to update. Workflow ID starts with `wpid_`. json_definition : typing.Optional[WorkflowCreateYamlRequest] Workflow definition in JSON format yaml_definition : typing.Optional[str] Workflow definition in YAML format request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully updated workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", ) client.workflows.update_workflow( workflow_id="wpid_123", ) """ _response = self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}", method="POST", json={ "json_definition": convert_and_respect_annotation_metadata( object_=json_definition, annotation=WorkflowCreateYamlRequest, direction="write" ), "yaml_definition": yaml_definition, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) def delete_workflow( self, workflow_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> typing.Optional[typing.Any]: """ Delete a workflow Parameters ---------- workflow_id : str The ID of the workflow to delete. Workflow ID starts with `wpid_`. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.Optional[typing.Any] Successfully deleted workflow Examples -------- from skyvern import Skyvern client = Skyvern( api_key="YOUR_API_KEY", ) client.workflows.delete_workflow( workflow_id="wpid_123", ) """ _response = self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}/delete", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) class AsyncWorkflowsClient: def __init__(self, *, client_wrapper: AsyncClientWrapper): self._client_wrapper = client_wrapper async def create_workflow( self, *, json_definition: typing.Optional[WorkflowCreateYamlRequest] = OMIT, yaml_definition: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> Workflow: """ Create a new workflow definition Parameters ---------- json_definition : typing.Optional[WorkflowCreateYamlRequest] Workflow definition in JSON format yaml_definition : typing.Optional[str] Workflow definition in YAML format request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully created workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", ) async def main() -> None: await client.workflows.create_workflow() asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( "v1/workflows", method="POST", json={ "json_definition": convert_and_respect_annotation_metadata( object_=json_definition, annotation=WorkflowCreateYamlRequest, direction="write" ), "yaml_definition": yaml_definition, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def update_workflow( self, workflow_id: str, *, json_definition: typing.Optional[WorkflowCreateYamlRequest] = OMIT, yaml_definition: typing.Optional[str] = OMIT, request_options: typing.Optional[RequestOptions] = None, ) -> Workflow: """ Update a workflow definition Parameters ---------- workflow_id : str The ID of the workflow to update. Workflow ID starts with `wpid_`. json_definition : typing.Optional[WorkflowCreateYamlRequest] Workflow definition in JSON format yaml_definition : typing.Optional[str] Workflow definition in YAML format request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- Workflow Successfully updated workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", ) async def main() -> None: await client.workflows.update_workflow( workflow_id="wpid_123", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}", method="POST", json={ "json_definition": convert_and_respect_annotation_metadata( object_=json_definition, annotation=WorkflowCreateYamlRequest, direction="write" ), "yaml_definition": yaml_definition, }, request_options=request_options, omit=OMIT, ) try: if 200 <= _response.status_code < 300: return typing.cast( Workflow, parse_obj_as( type_=Workflow, # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) async def delete_workflow( self, workflow_id: str, *, request_options: typing.Optional[RequestOptions] = None ) -> typing.Optional[typing.Any]: """ Delete a workflow Parameters ---------- workflow_id : str The ID of the workflow to delete. Workflow ID starts with `wpid_`. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- typing.Optional[typing.Any] Successfully deleted workflow Examples -------- import asyncio from skyvern import AsyncSkyvern client = AsyncSkyvern( api_key="YOUR_API_KEY", ) async def main() -> None: await client.workflows.delete_workflow( workflow_id="wpid_123", ) asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( f"v1/workflows/{jsonable_encoder(workflow_id)}/delete", method="POST", request_options=request_options, ) try: if 200 <= _response.status_code < 300: return typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) if _response.status_code == 422: raise UnprocessableEntityError( typing.cast( typing.Optional[typing.Any], parse_obj_as( type_=typing.Optional[typing.Any], # type: ignore object_=_response.json(), ), ) ) _response_json = _response.json() except JSONDecodeError: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json)