Regenerate Fern Python SDK (#3928)

This commit is contained in:
Stanislav Novosad
2025-11-06 11:26:37 -07:00
committed by GitHub
parent 3c4df39fee
commit 8482ff4a75
22 changed files with 1918 additions and 170 deletions

View File

@@ -0,0 +1,4 @@
# This file was auto-generated by Fern from our API Definition.
# isort: skip_file

View File

@@ -0,0 +1,596 @@
# This file was auto-generated by Fern from our API Definition.
import typing
from ..core.client_wrapper import AsyncClientWrapper, SyncClientWrapper
from ..core.request_options import RequestOptions
from ..types.folder import Folder
from ..types.workflow import Workflow
from .raw_client import AsyncRawWorkflowsClient, RawWorkflowsClient
# this is used as the default value for optional parameters
OMIT = typing.cast(typing.Any, ...)
class WorkflowsClient:
def __init__(self, *, client_wrapper: SyncClientWrapper):
self._raw_client = RawWorkflowsClient(client_wrapper=client_wrapper)
@property
def with_raw_response(self) -> RawWorkflowsClient:
"""
Retrieves a raw implementation of this client that returns raw responses.
Returns
-------
RawWorkflowsClient
"""
return self._raw_client
def get_folders(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
search: typing.Optional[str] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[Folder]:
"""
Get all folders for the organization
Parameters
----------
page : typing.Optional[int]
Page number
page_size : typing.Optional[int]
Number of folders per page
search : typing.Optional[str]
Search folders by title or description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Folder]
Successfully retrieved folders
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
)
client.workflows.get_folders(
page=1,
page_size=1,
search="search",
)
"""
_response = self._raw_client.get_folders(
page=page, page_size=page_size, search=search, request_options=request_options
)
return _response.data
def create_folder(
self,
*,
title: str,
description: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> Folder:
"""
Create a new folder to organize workflows
Parameters
----------
title : str
Folder title
description : typing.Optional[str]
Folder description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Folder
Successfully created folder
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
)
client.workflows.create_folder(
title="title",
)
"""
_response = self._raw_client.create_folder(
title=title, description=description, request_options=request_options
)
return _response.data
def get_folder(self, folder_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> Folder:
"""
Get a specific folder by ID
Parameters
----------
folder_id : str
Folder ID
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Folder
Successfully retrieved folder
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
)
client.workflows.get_folder(
folder_id="fld_123",
)
"""
_response = self._raw_client.get_folder(folder_id, request_options=request_options)
return _response.data
def update_folder(
self,
folder_id: str,
*,
title: typing.Optional[str] = OMIT,
description: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> Folder:
"""
Update a folder's title or description
Parameters
----------
folder_id : str
Folder ID
title : typing.Optional[str]
Folder title
description : typing.Optional[str]
Folder description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Folder
Successfully updated folder
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
)
client.workflows.update_folder(
folder_id="fld_123",
)
"""
_response = self._raw_client.update_folder(
folder_id, title=title, description=description, request_options=request_options
)
return _response.data
def delete_folder(
self,
folder_id: str,
*,
delete_workflows: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.Dict[str, typing.Optional[typing.Any]]:
"""
Delete a folder. Optionally delete all workflows in the folder.
Parameters
----------
folder_id : str
Folder ID
delete_workflows : typing.Optional[bool]
If true, also delete all workflows in this folder
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Dict[str, typing.Optional[typing.Any]]
Successfully deleted folder
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
)
client.workflows.delete_folder(
folder_id="fld_123",
delete_workflows=True,
)
"""
_response = self._raw_client.delete_folder(
folder_id, delete_workflows=delete_workflows, request_options=request_options
)
return _response.data
def update_workflow_folder(
self,
workflow_permanent_id: str,
*,
folder_id: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> Workflow:
"""
Update a workflow's folder assignment for the latest version
Parameters
----------
workflow_permanent_id : str
Workflow permanent ID
folder_id : typing.Optional[str]
Folder ID to assign workflow to. Set to null to remove from folder.
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Workflow
Successfully updated workflow folder
Examples
--------
from skyvern import Skyvern
client = Skyvern(
api_key="YOUR_API_KEY",
)
client.workflows.update_workflow_folder(
workflow_permanent_id="wpid_123",
)
"""
_response = self._raw_client.update_workflow_folder(
workflow_permanent_id, folder_id=folder_id, request_options=request_options
)
return _response.data
class AsyncWorkflowsClient:
def __init__(self, *, client_wrapper: AsyncClientWrapper):
self._raw_client = AsyncRawWorkflowsClient(client_wrapper=client_wrapper)
@property
def with_raw_response(self) -> AsyncRawWorkflowsClient:
"""
Retrieves a raw implementation of this client that returns raw responses.
Returns
-------
AsyncRawWorkflowsClient
"""
return self._raw_client
async def get_folders(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
search: typing.Optional[str] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.List[Folder]:
"""
Get all folders for the organization
Parameters
----------
page : typing.Optional[int]
Page number
page_size : typing.Optional[int]
Number of folders per page
search : typing.Optional[str]
Search folders by title or description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.List[Folder]
Successfully retrieved folders
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
)
async def main() -> None:
await client.workflows.get_folders(
page=1,
page_size=1,
search="search",
)
asyncio.run(main())
"""
_response = await self._raw_client.get_folders(
page=page, page_size=page_size, search=search, request_options=request_options
)
return _response.data
async def create_folder(
self,
*,
title: str,
description: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> Folder:
"""
Create a new folder to organize workflows
Parameters
----------
title : str
Folder title
description : typing.Optional[str]
Folder description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Folder
Successfully created folder
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
)
async def main() -> None:
await client.workflows.create_folder(
title="title",
)
asyncio.run(main())
"""
_response = await self._raw_client.create_folder(
title=title, description=description, request_options=request_options
)
return _response.data
async def get_folder(self, folder_id: str, *, request_options: typing.Optional[RequestOptions] = None) -> Folder:
"""
Get a specific folder by ID
Parameters
----------
folder_id : str
Folder ID
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Folder
Successfully retrieved folder
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
)
async def main() -> None:
await client.workflows.get_folder(
folder_id="fld_123",
)
asyncio.run(main())
"""
_response = await self._raw_client.get_folder(folder_id, request_options=request_options)
return _response.data
async def update_folder(
self,
folder_id: str,
*,
title: typing.Optional[str] = OMIT,
description: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> Folder:
"""
Update a folder's title or description
Parameters
----------
folder_id : str
Folder ID
title : typing.Optional[str]
Folder title
description : typing.Optional[str]
Folder description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Folder
Successfully updated folder
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
)
async def main() -> None:
await client.workflows.update_folder(
folder_id="fld_123",
)
asyncio.run(main())
"""
_response = await self._raw_client.update_folder(
folder_id, title=title, description=description, request_options=request_options
)
return _response.data
async def delete_folder(
self,
folder_id: str,
*,
delete_workflows: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> typing.Dict[str, typing.Optional[typing.Any]]:
"""
Delete a folder. Optionally delete all workflows in the folder.
Parameters
----------
folder_id : str
Folder ID
delete_workflows : typing.Optional[bool]
If true, also delete all workflows in this folder
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
typing.Dict[str, typing.Optional[typing.Any]]
Successfully deleted folder
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
)
async def main() -> None:
await client.workflows.delete_folder(
folder_id="fld_123",
delete_workflows=True,
)
asyncio.run(main())
"""
_response = await self._raw_client.delete_folder(
folder_id, delete_workflows=delete_workflows, request_options=request_options
)
return _response.data
async def update_workflow_folder(
self,
workflow_permanent_id: str,
*,
folder_id: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> Workflow:
"""
Update a workflow's folder assignment for the latest version
Parameters
----------
workflow_permanent_id : str
Workflow permanent ID
folder_id : typing.Optional[str]
Folder ID to assign workflow to. Set to null to remove from folder.
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
Workflow
Successfully updated workflow folder
Examples
--------
import asyncio
from skyvern import AsyncSkyvern
client = AsyncSkyvern(
api_key="YOUR_API_KEY",
)
async def main() -> None:
await client.workflows.update_workflow_folder(
workflow_permanent_id="wpid_123",
)
asyncio.run(main())
"""
_response = await self._raw_client.update_workflow_folder(
workflow_permanent_id, folder_id=folder_id, request_options=request_options
)
return _response.data

View File

@@ -0,0 +1,909 @@
# This file was auto-generated by Fern from our API Definition.
import typing
from json.decoder import JSONDecodeError
from ..core.api_error import ApiError
from ..core.client_wrapper import AsyncClientWrapper, SyncClientWrapper
from ..core.http_response import AsyncHttpResponse, HttpResponse
from ..core.jsonable_encoder import jsonable_encoder
from ..core.pydantic_utilities import parse_obj_as
from ..core.request_options import RequestOptions
from ..errors.bad_request_error import BadRequestError
from ..errors.not_found_error import NotFoundError
from ..errors.unprocessable_entity_error import UnprocessableEntityError
from ..types.folder import Folder
from ..types.workflow import Workflow
# this is used as the default value for optional parameters
OMIT = typing.cast(typing.Any, ...)
class RawWorkflowsClient:
def __init__(self, *, client_wrapper: SyncClientWrapper):
self._client_wrapper = client_wrapper
def get_folders(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
search: typing.Optional[str] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> HttpResponse[typing.List[Folder]]:
"""
Get all folders for the organization
Parameters
----------
page : typing.Optional[int]
Page number
page_size : typing.Optional[int]
Number of folders per page
search : typing.Optional[str]
Search folders by title or description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
HttpResponse[typing.List[Folder]]
Successfully retrieved folders
"""
_response = self._client_wrapper.httpx_client.request(
"v1/folders",
method="GET",
params={
"page": page,
"page_size": page_size,
"search": search,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
typing.List[Folder],
parse_obj_as(
type_=typing.List[Folder], # type: ignore
object_=_response.json(),
),
)
return HttpResponse(response=_response, data=_data)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
def create_folder(
self,
*,
title: str,
description: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> HttpResponse[Folder]:
"""
Create a new folder to organize workflows
Parameters
----------
title : str
Folder title
description : typing.Optional[str]
Folder description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
HttpResponse[Folder]
Successfully created folder
"""
_response = self._client_wrapper.httpx_client.request(
"v1/folders",
method="POST",
json={
"title": title,
"description": description,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
Folder,
parse_obj_as(
type_=Folder, # type: ignore
object_=_response.json(),
),
)
return HttpResponse(response=_response, data=_data)
if _response.status_code == 400:
raise BadRequestError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
def get_folder(
self, folder_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> HttpResponse[Folder]:
"""
Get a specific folder by ID
Parameters
----------
folder_id : str
Folder ID
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
HttpResponse[Folder]
Successfully retrieved folder
"""
_response = self._client_wrapper.httpx_client.request(
f"v1/folders/{jsonable_encoder(folder_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
Folder,
parse_obj_as(
type_=Folder, # type: ignore
object_=_response.json(),
),
)
return HttpResponse(response=_response, data=_data)
if _response.status_code == 404:
raise NotFoundError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
def update_folder(
self,
folder_id: str,
*,
title: typing.Optional[str] = OMIT,
description: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> HttpResponse[Folder]:
"""
Update a folder's title or description
Parameters
----------
folder_id : str
Folder ID
title : typing.Optional[str]
Folder title
description : typing.Optional[str]
Folder description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
HttpResponse[Folder]
Successfully updated folder
"""
_response = self._client_wrapper.httpx_client.request(
f"v1/folders/{jsonable_encoder(folder_id)}",
method="PUT",
json={
"title": title,
"description": description,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
Folder,
parse_obj_as(
type_=Folder, # type: ignore
object_=_response.json(),
),
)
return HttpResponse(response=_response, data=_data)
if _response.status_code == 404:
raise NotFoundError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
def delete_folder(
self,
folder_id: str,
*,
delete_workflows: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> HttpResponse[typing.Dict[str, typing.Optional[typing.Any]]]:
"""
Delete a folder. Optionally delete all workflows in the folder.
Parameters
----------
folder_id : str
Folder ID
delete_workflows : typing.Optional[bool]
If true, also delete all workflows in this folder
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
HttpResponse[typing.Dict[str, typing.Optional[typing.Any]]]
Successfully deleted folder
"""
_response = self._client_wrapper.httpx_client.request(
f"v1/folders/{jsonable_encoder(folder_id)}",
method="DELETE",
params={
"delete_workflows": delete_workflows,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
typing.Dict[str, typing.Optional[typing.Any]],
parse_obj_as(
type_=typing.Dict[str, typing.Optional[typing.Any]], # type: ignore
object_=_response.json(),
),
)
return HttpResponse(response=_response, data=_data)
if _response.status_code == 404:
raise NotFoundError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
def update_workflow_folder(
self,
workflow_permanent_id: str,
*,
folder_id: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> HttpResponse[Workflow]:
"""
Update a workflow's folder assignment for the latest version
Parameters
----------
workflow_permanent_id : str
Workflow permanent ID
folder_id : typing.Optional[str]
Folder ID to assign workflow to. Set to null to remove from folder.
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
HttpResponse[Workflow]
Successfully updated workflow folder
"""
_response = self._client_wrapper.httpx_client.request(
f"v1/workflows/{jsonable_encoder(workflow_permanent_id)}/folder",
method="PUT",
json={
"folder_id": folder_id,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
Workflow,
parse_obj_as(
type_=Workflow, # type: ignore
object_=_response.json(),
),
)
return HttpResponse(response=_response, data=_data)
if _response.status_code == 400:
raise BadRequestError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 404:
raise NotFoundError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
class AsyncRawWorkflowsClient:
def __init__(self, *, client_wrapper: AsyncClientWrapper):
self._client_wrapper = client_wrapper
async def get_folders(
self,
*,
page: typing.Optional[int] = None,
page_size: typing.Optional[int] = None,
search: typing.Optional[str] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> AsyncHttpResponse[typing.List[Folder]]:
"""
Get all folders for the organization
Parameters
----------
page : typing.Optional[int]
Page number
page_size : typing.Optional[int]
Number of folders per page
search : typing.Optional[str]
Search folders by title or description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
AsyncHttpResponse[typing.List[Folder]]
Successfully retrieved folders
"""
_response = await self._client_wrapper.httpx_client.request(
"v1/folders",
method="GET",
params={
"page": page,
"page_size": page_size,
"search": search,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
typing.List[Folder],
parse_obj_as(
type_=typing.List[Folder], # type: ignore
object_=_response.json(),
),
)
return AsyncHttpResponse(response=_response, data=_data)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
async def create_folder(
self,
*,
title: str,
description: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> AsyncHttpResponse[Folder]:
"""
Create a new folder to organize workflows
Parameters
----------
title : str
Folder title
description : typing.Optional[str]
Folder description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
AsyncHttpResponse[Folder]
Successfully created folder
"""
_response = await self._client_wrapper.httpx_client.request(
"v1/folders",
method="POST",
json={
"title": title,
"description": description,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
Folder,
parse_obj_as(
type_=Folder, # type: ignore
object_=_response.json(),
),
)
return AsyncHttpResponse(response=_response, data=_data)
if _response.status_code == 400:
raise BadRequestError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
async def get_folder(
self, folder_id: str, *, request_options: typing.Optional[RequestOptions] = None
) -> AsyncHttpResponse[Folder]:
"""
Get a specific folder by ID
Parameters
----------
folder_id : str
Folder ID
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
AsyncHttpResponse[Folder]
Successfully retrieved folder
"""
_response = await self._client_wrapper.httpx_client.request(
f"v1/folders/{jsonable_encoder(folder_id)}",
method="GET",
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
Folder,
parse_obj_as(
type_=Folder, # type: ignore
object_=_response.json(),
),
)
return AsyncHttpResponse(response=_response, data=_data)
if _response.status_code == 404:
raise NotFoundError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
async def update_folder(
self,
folder_id: str,
*,
title: typing.Optional[str] = OMIT,
description: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> AsyncHttpResponse[Folder]:
"""
Update a folder's title or description
Parameters
----------
folder_id : str
Folder ID
title : typing.Optional[str]
Folder title
description : typing.Optional[str]
Folder description
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
AsyncHttpResponse[Folder]
Successfully updated folder
"""
_response = await self._client_wrapper.httpx_client.request(
f"v1/folders/{jsonable_encoder(folder_id)}",
method="PUT",
json={
"title": title,
"description": description,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
Folder,
parse_obj_as(
type_=Folder, # type: ignore
object_=_response.json(),
),
)
return AsyncHttpResponse(response=_response, data=_data)
if _response.status_code == 404:
raise NotFoundError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
async def delete_folder(
self,
folder_id: str,
*,
delete_workflows: typing.Optional[bool] = None,
request_options: typing.Optional[RequestOptions] = None,
) -> AsyncHttpResponse[typing.Dict[str, typing.Optional[typing.Any]]]:
"""
Delete a folder. Optionally delete all workflows in the folder.
Parameters
----------
folder_id : str
Folder ID
delete_workflows : typing.Optional[bool]
If true, also delete all workflows in this folder
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
AsyncHttpResponse[typing.Dict[str, typing.Optional[typing.Any]]]
Successfully deleted folder
"""
_response = await self._client_wrapper.httpx_client.request(
f"v1/folders/{jsonable_encoder(folder_id)}",
method="DELETE",
params={
"delete_workflows": delete_workflows,
},
request_options=request_options,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
typing.Dict[str, typing.Optional[typing.Any]],
parse_obj_as(
type_=typing.Dict[str, typing.Optional[typing.Any]], # type: ignore
object_=_response.json(),
),
)
return AsyncHttpResponse(response=_response, data=_data)
if _response.status_code == 404:
raise NotFoundError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)
async def update_workflow_folder(
self,
workflow_permanent_id: str,
*,
folder_id: typing.Optional[str] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
) -> AsyncHttpResponse[Workflow]:
"""
Update a workflow's folder assignment for the latest version
Parameters
----------
workflow_permanent_id : str
Workflow permanent ID
folder_id : typing.Optional[str]
Folder ID to assign workflow to. Set to null to remove from folder.
request_options : typing.Optional[RequestOptions]
Request-specific configuration.
Returns
-------
AsyncHttpResponse[Workflow]
Successfully updated workflow folder
"""
_response = await self._client_wrapper.httpx_client.request(
f"v1/workflows/{jsonable_encoder(workflow_permanent_id)}/folder",
method="PUT",
json={
"folder_id": folder_id,
},
headers={
"content-type": "application/json",
},
request_options=request_options,
omit=OMIT,
)
try:
if 200 <= _response.status_code < 300:
_data = typing.cast(
Workflow,
parse_obj_as(
type_=Workflow, # type: ignore
object_=_response.json(),
),
)
return AsyncHttpResponse(response=_response, data=_data)
if _response.status_code == 400:
raise BadRequestError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 404:
raise NotFoundError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
if _response.status_code == 422:
raise UnprocessableEntityError(
headers=dict(_response.headers),
body=typing.cast(
typing.Optional[typing.Any],
parse_obj_as(
type_=typing.Optional[typing.Any], # type: ignore
object_=_response.json(),
),
),
)
_response_json = _response.json()
except JSONDecodeError:
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response.text)
raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json)