add fern sdk (#1786)

This commit is contained in:
LawyZheng
2025-02-19 00:58:48 +08:00
committed by GitHub
parent e7c3e4e37a
commit a258406a86
153 changed files with 17372 additions and 255 deletions

View File

@@ -0,0 +1,47 @@
# This file was auto-generated by Fern from our API Definition.
from .api_error import ApiError
from .client_wrapper import AsyncClientWrapper, BaseClientWrapper, SyncClientWrapper
from .datetime_utils import serialize_datetime
from .file import File, convert_file_dict_to_httpx_tuples, with_content_type
from .http_client import AsyncHttpClient, HttpClient
from .jsonable_encoder import jsonable_encoder
from .pydantic_utilities import (
IS_PYDANTIC_V2,
UniversalBaseModel,
UniversalRootModel,
parse_obj_as,
universal_field_validator,
universal_root_validator,
update_forward_refs,
)
from .query_encoder import encode_query
from .remove_none_from_dict import remove_none_from_dict
from .request_options import RequestOptions
from .serialization import FieldMetadata, convert_and_respect_annotation_metadata
__all__ = [
"ApiError",
"AsyncClientWrapper",
"AsyncHttpClient",
"BaseClientWrapper",
"FieldMetadata",
"File",
"HttpClient",
"IS_PYDANTIC_V2",
"RequestOptions",
"SyncClientWrapper",
"UniversalBaseModel",
"UniversalRootModel",
"convert_and_respect_annotation_metadata",
"convert_file_dict_to_httpx_tuples",
"encode_query",
"jsonable_encoder",
"parse_obj_as",
"remove_none_from_dict",
"serialize_datetime",
"universal_field_validator",
"universal_root_validator",
"update_forward_refs",
"with_content_type",
]

View File

@@ -0,0 +1,15 @@
# This file was auto-generated by Fern from our API Definition.
import typing
class ApiError(Exception):
status_code: typing.Optional[int]
body: typing.Any
def __init__(self, *, status_code: typing.Optional[int] = None, body: typing.Any = None):
self.status_code = status_code
self.body = body
def __str__(self) -> str:
return f"status_code: {self.status_code}, body: {self.body}"

View File

@@ -0,0 +1,48 @@
# This file was auto-generated by Fern from our API Definition.
import typing
import httpx
from .http_client import HttpClient
from .http_client import AsyncHttpClient
class BaseClientWrapper:
def __init__(self, *, base_url: str, timeout: typing.Optional[float] = None):
self._base_url = base_url
self._timeout = timeout
def get_headers(self) -> typing.Dict[str, str]:
headers: typing.Dict[str, str] = {
"X-Fern-Language": "Python",
"X-Fern-SDK-Name": "skyvern",
"X-Fern-SDK-Version": "0.1.56",
}
return headers
def get_base_url(self) -> str:
return self._base_url
def get_timeout(self) -> typing.Optional[float]:
return self._timeout
class SyncClientWrapper(BaseClientWrapper):
def __init__(self, *, base_url: str, timeout: typing.Optional[float] = None, httpx_client: httpx.Client):
super().__init__(base_url=base_url, timeout=timeout)
self.httpx_client = HttpClient(
httpx_client=httpx_client,
base_headers=self.get_headers,
base_timeout=self.get_timeout,
base_url=self.get_base_url,
)
class AsyncClientWrapper(BaseClientWrapper):
def __init__(self, *, base_url: str, timeout: typing.Optional[float] = None, httpx_client: httpx.AsyncClient):
super().__init__(base_url=base_url, timeout=timeout)
self.httpx_client = AsyncHttpClient(
httpx_client=httpx_client,
base_headers=self.get_headers,
base_timeout=self.get_timeout,
base_url=self.get_base_url,
)

View File

@@ -0,0 +1,28 @@
# This file was auto-generated by Fern from our API Definition.
import datetime as dt
def serialize_datetime(v: dt.datetime) -> str:
"""
Serialize a datetime including timezone info.
Uses the timezone info provided if present, otherwise uses the current runtime's timezone info.
UTC datetimes end in "Z" while all other timezones are represented as offset from UTC, e.g. +05:00.
"""
def _serialize_zoned_datetime(v: dt.datetime) -> str:
if v.tzinfo is not None and v.tzinfo.tzname(None) == dt.timezone.utc.tzname(None):
# UTC is a special case where we use "Z" at the end instead of "+00:00"
return v.isoformat().replace("+00:00", "Z")
else:
# Delegate to the typical +/- offset format
return v.isoformat()
if v.tzinfo is not None:
return _serialize_zoned_datetime(v)
else:
local_tz = dt.datetime.now().astimezone().tzinfo
localized_dt = v.replace(tzinfo=local_tz)
return _serialize_zoned_datetime(localized_dt)

View File

@@ -0,0 +1,67 @@
# This file was auto-generated by Fern from our API Definition.
from typing import IO, Dict, List, Mapping, Optional, Tuple, Union, cast
# File typing inspired by the flexibility of types within the httpx library
# https://github.com/encode/httpx/blob/master/httpx/_types.py
FileContent = Union[IO[bytes], bytes, str]
File = Union[
# file (or bytes)
FileContent,
# (filename, file (or bytes))
Tuple[Optional[str], FileContent],
# (filename, file (or bytes), content_type)
Tuple[Optional[str], FileContent, Optional[str]],
# (filename, file (or bytes), content_type, headers)
Tuple[
Optional[str],
FileContent,
Optional[str],
Mapping[str, str],
],
]
def convert_file_dict_to_httpx_tuples(
d: Dict[str, Union[File, List[File]]],
) -> List[Tuple[str, File]]:
"""
The format we use is a list of tuples, where the first element is the
name of the file and the second is the file object. Typically HTTPX wants
a dict, but to be able to send lists of files, you have to use the list
approach (which also works for non-lists)
https://github.com/encode/httpx/pull/1032
"""
httpx_tuples = []
for key, file_like in d.items():
if isinstance(file_like, list):
for file_like_item in file_like:
httpx_tuples.append((key, file_like_item))
else:
httpx_tuples.append((key, file_like))
return httpx_tuples
def with_content_type(*, file: File, default_content_type: str) -> File:
"""
This function resolves to the file's content type, if provided, and defaults
to the default_content_type value if not.
"""
if isinstance(file, tuple):
if len(file) == 2:
filename, content = cast(Tuple[Optional[str], FileContent], file) # type: ignore
return (filename, content, default_content_type)
elif len(file) == 3:
filename, content, file_content_type = cast(Tuple[Optional[str], FileContent, Optional[str]], file) # type: ignore
out_content_type = file_content_type or default_content_type
return (filename, content, out_content_type)
elif len(file) == 4:
filename, content, file_content_type, headers = cast( # type: ignore
Tuple[Optional[str], FileContent, Optional[str], Mapping[str, str]], file
)
out_content_type = file_content_type or default_content_type
return (filename, content, out_content_type, headers)
else:
raise ValueError(f"Unexpected tuple length: {len(file)}")
return (None, file, default_content_type)

View File

@@ -0,0 +1,499 @@
# This file was auto-generated by Fern from our API Definition.
import asyncio
import email.utils
import json
import re
import time
import typing
import urllib.parse
from contextlib import asynccontextmanager, contextmanager
from random import random
import httpx
from .file import File, convert_file_dict_to_httpx_tuples
from .jsonable_encoder import jsonable_encoder
from .query_encoder import encode_query
from .remove_none_from_dict import remove_none_from_dict
from .request_options import RequestOptions
INITIAL_RETRY_DELAY_SECONDS = 0.5
MAX_RETRY_DELAY_SECONDS = 10
MAX_RETRY_DELAY_SECONDS_FROM_HEADER = 30
def _parse_retry_after(response_headers: httpx.Headers) -> typing.Optional[float]:
"""
This function parses the `Retry-After` header in a HTTP response and returns the number of seconds to wait.
Inspired by the urllib3 retry implementation.
"""
retry_after_ms = response_headers.get("retry-after-ms")
if retry_after_ms is not None:
try:
return int(retry_after_ms) / 1000 if retry_after_ms > 0 else 0
except Exception:
pass
retry_after = response_headers.get("retry-after")
if retry_after is None:
return None
# Attempt to parse the header as an int.
if re.match(r"^\s*[0-9]+\s*$", retry_after):
seconds = float(retry_after)
# Fallback to parsing it as a date.
else:
retry_date_tuple = email.utils.parsedate_tz(retry_after)
if retry_date_tuple is None:
return None
if retry_date_tuple[9] is None: # Python 2
# Assume UTC if no timezone was specified
# On Python2.7, parsedate_tz returns None for a timezone offset
# instead of 0 if no timezone is given, where mktime_tz treats
# a None timezone offset as local time.
retry_date_tuple = retry_date_tuple[:9] + (0,) + retry_date_tuple[10:]
retry_date = email.utils.mktime_tz(retry_date_tuple)
seconds = retry_date - time.time()
if seconds < 0:
seconds = 0
return seconds
def _retry_timeout(response: httpx.Response, retries: int) -> float:
"""
Determine the amount of time to wait before retrying a request.
This function begins by trying to parse a retry-after header from the response, and then proceeds to use exponential backoff
with a jitter to determine the number of seconds to wait.
"""
# If the API asks us to wait a certain amount of time (and it's a reasonable amount), just do what it says.
retry_after = _parse_retry_after(response.headers)
if retry_after is not None and retry_after <= MAX_RETRY_DELAY_SECONDS_FROM_HEADER:
return retry_after
# Apply exponential backoff, capped at MAX_RETRY_DELAY_SECONDS.
retry_delay = min(INITIAL_RETRY_DELAY_SECONDS * pow(2.0, retries), MAX_RETRY_DELAY_SECONDS)
# Add a randomness / jitter to the retry delay to avoid overwhelming the server with retries.
timeout = retry_delay * (1 - 0.25 * random())
return timeout if timeout >= 0 else 0
def _should_retry(response: httpx.Response) -> bool:
retriable_400s = [429, 408, 409]
return response.status_code >= 500 or response.status_code in retriable_400s
def remove_omit_from_dict(
original: typing.Dict[str, typing.Optional[typing.Any]],
omit: typing.Optional[typing.Any],
) -> typing.Dict[str, typing.Any]:
if omit is None:
return original
new: typing.Dict[str, typing.Any] = {}
for key, value in original.items():
if value is not omit:
new[key] = value
return new
def maybe_filter_request_body(
data: typing.Optional[typing.Any],
request_options: typing.Optional[RequestOptions],
omit: typing.Optional[typing.Any],
) -> typing.Optional[typing.Any]:
if data is None:
return (
jsonable_encoder(request_options.get("additional_body_parameters", {})) or {}
if request_options is not None
else None
)
elif not isinstance(data, typing.Mapping):
data_content = jsonable_encoder(data)
else:
data_content = {
**(jsonable_encoder(remove_omit_from_dict(data, omit))), # type: ignore
**(
jsonable_encoder(request_options.get("additional_body_parameters", {})) or {}
if request_options is not None
else {}
),
}
return data_content
# Abstracted out for testing purposes
def get_request_body(
*,
json: typing.Optional[typing.Any],
data: typing.Optional[typing.Any],
request_options: typing.Optional[RequestOptions],
omit: typing.Optional[typing.Any],
) -> typing.Tuple[typing.Optional[typing.Any], typing.Optional[typing.Any]]:
json_body = None
data_body = None
if data is not None:
data_body = maybe_filter_request_body(data, request_options, omit)
else:
# If both data and json are None, we send json data in the event extra properties are specified
json_body = maybe_filter_request_body(json, request_options, omit)
# If you have an empty JSON body, you should just send None
return (json_body if json_body != {} else None), data_body if data_body != {} else None
class HttpClient:
def __init__(
self,
*,
httpx_client: httpx.Client,
base_timeout: typing.Callable[[], typing.Optional[float]],
base_headers: typing.Callable[[], typing.Dict[str, str]],
base_url: typing.Optional[typing.Callable[[], str]] = None,
):
self.base_url = base_url
self.base_timeout = base_timeout
self.base_headers = base_headers
self.httpx_client = httpx_client
def get_base_url(self, maybe_base_url: typing.Optional[str]) -> str:
base_url = maybe_base_url
if self.base_url is not None and base_url is None:
base_url = self.base_url()
if base_url is None:
raise ValueError("A base_url is required to make this request, please provide one and try again.")
return base_url
def request(
self,
path: typing.Optional[str] = None,
*,
method: str,
base_url: typing.Optional[str] = None,
params: typing.Optional[typing.Dict[str, typing.Any]] = None,
json: typing.Optional[typing.Any] = None,
data: typing.Optional[typing.Any] = None,
content: typing.Optional[typing.Union[bytes, typing.Iterator[bytes], typing.AsyncIterator[bytes]]] = None,
files: typing.Optional[typing.Dict[str, typing.Optional[typing.Union[File, typing.List[File]]]]] = None,
headers: typing.Optional[typing.Dict[str, typing.Any]] = None,
request_options: typing.Optional[RequestOptions] = None,
retries: int = 0,
omit: typing.Optional[typing.Any] = None,
) -> httpx.Response:
base_url = self.get_base_url(base_url)
timeout = (
request_options.get("timeout_in_seconds")
if request_options is not None and request_options.get("timeout_in_seconds") is not None
else self.base_timeout()
)
json_body, data_body = get_request_body(json=json, data=data, request_options=request_options, omit=omit)
response = self.httpx_client.request(
method=method,
url=urllib.parse.urljoin(f"{base_url}/", path),
headers=jsonable_encoder(
remove_none_from_dict(
{
**self.base_headers(),
**(headers if headers is not None else {}),
**(request_options.get("additional_headers", {}) or {} if request_options is not None else {}),
}
)
),
params=encode_query(
jsonable_encoder(
remove_none_from_dict(
remove_omit_from_dict(
{
**(params if params is not None else {}),
**(
request_options.get("additional_query_parameters", {}) or {}
if request_options is not None
else {}
),
},
omit,
)
)
)
),
json=json_body,
data=data_body,
content=content,
files=(
convert_file_dict_to_httpx_tuples(remove_omit_from_dict(remove_none_from_dict(files), omit))
if (files is not None and files is not omit)
else None
),
timeout=timeout,
)
max_retries: int = request_options.get("max_retries", 0) if request_options is not None else 0
if _should_retry(response=response):
if max_retries > retries:
time.sleep(_retry_timeout(response=response, retries=retries))
return self.request(
path=path,
method=method,
base_url=base_url,
params=params,
json=json,
content=content,
files=files,
headers=headers,
request_options=request_options,
retries=retries + 1,
omit=omit,
)
return response
@contextmanager
def stream(
self,
path: typing.Optional[str] = None,
*,
method: str,
base_url: typing.Optional[str] = None,
params: typing.Optional[typing.Dict[str, typing.Any]] = None,
json: typing.Optional[typing.Any] = None,
data: typing.Optional[typing.Any] = None,
content: typing.Optional[typing.Union[bytes, typing.Iterator[bytes], typing.AsyncIterator[bytes]]] = None,
files: typing.Optional[typing.Dict[str, typing.Optional[typing.Union[File, typing.List[File]]]]] = None,
headers: typing.Optional[typing.Dict[str, typing.Any]] = None,
request_options: typing.Optional[RequestOptions] = None,
retries: int = 0,
omit: typing.Optional[typing.Any] = None,
) -> typing.Iterator[httpx.Response]:
base_url = self.get_base_url(base_url)
timeout = (
request_options.get("timeout_in_seconds")
if request_options is not None and request_options.get("timeout_in_seconds") is not None
else self.base_timeout()
)
json_body, data_body = get_request_body(json=json, data=data, request_options=request_options, omit=omit)
with self.httpx_client.stream(
method=method,
url=urllib.parse.urljoin(f"{base_url}/", path),
headers=jsonable_encoder(
remove_none_from_dict(
{
**self.base_headers(),
**(headers if headers is not None else {}),
**(request_options.get("additional_headers", {}) if request_options is not None else {}),
}
)
),
params=encode_query(
jsonable_encoder(
remove_none_from_dict(
remove_omit_from_dict(
{
**(params if params is not None else {}),
**(
request_options.get("additional_query_parameters", {})
if request_options is not None
else {}
),
},
omit,
)
)
)
),
json=json_body,
data=data_body,
content=content,
files=(
convert_file_dict_to_httpx_tuples(remove_omit_from_dict(remove_none_from_dict(files), omit))
if (files is not None and files is not omit)
else None
),
timeout=timeout,
) as stream:
yield stream
class AsyncHttpClient:
def __init__(
self,
*,
httpx_client: httpx.AsyncClient,
base_timeout: typing.Callable[[], typing.Optional[float]],
base_headers: typing.Callable[[], typing.Dict[str, str]],
base_url: typing.Optional[typing.Callable[[], str]] = None,
):
self.base_url = base_url
self.base_timeout = base_timeout
self.base_headers = base_headers
self.httpx_client = httpx_client
def get_base_url(self, maybe_base_url: typing.Optional[str]) -> str:
base_url = maybe_base_url
if self.base_url is not None and base_url is None:
base_url = self.base_url()
if base_url is None:
raise ValueError("A base_url is required to make this request, please provide one and try again.")
return base_url
async def request(
self,
path: typing.Optional[str] = None,
*,
method: str,
base_url: typing.Optional[str] = None,
params: typing.Optional[typing.Dict[str, typing.Any]] = None,
json: typing.Optional[typing.Any] = None,
data: typing.Optional[typing.Any] = None,
content: typing.Optional[typing.Union[bytes, typing.Iterator[bytes], typing.AsyncIterator[bytes]]] = None,
files: typing.Optional[typing.Dict[str, typing.Optional[typing.Union[File, typing.List[File]]]]] = None,
headers: typing.Optional[typing.Dict[str, typing.Any]] = None,
request_options: typing.Optional[RequestOptions] = None,
retries: int = 0,
omit: typing.Optional[typing.Any] = None,
) -> httpx.Response:
base_url = self.get_base_url(base_url)
timeout = (
request_options.get("timeout_in_seconds")
if request_options is not None and request_options.get("timeout_in_seconds") is not None
else self.base_timeout()
)
json_body, data_body = get_request_body(json=json, data=data, request_options=request_options, omit=omit)
# Add the input to each of these and do None-safety checks
response = await self.httpx_client.request(
method=method,
url=urllib.parse.urljoin(f"{base_url}/", path),
headers=jsonable_encoder(
remove_none_from_dict(
{
**self.base_headers(),
**(headers if headers is not None else {}),
**(request_options.get("additional_headers", {}) or {} if request_options is not None else {}),
}
)
),
params=encode_query(
jsonable_encoder(
remove_none_from_dict(
remove_omit_from_dict(
{
**(params if params is not None else {}),
**(
request_options.get("additional_query_parameters", {}) or {}
if request_options is not None
else {}
),
},
omit,
)
)
)
),
json=json_body,
data=data_body,
content=content,
files=(
convert_file_dict_to_httpx_tuples(remove_omit_from_dict(remove_none_from_dict(files), omit))
if files is not None
else None
),
timeout=timeout,
)
max_retries: int = request_options.get("max_retries", 0) if request_options is not None else 0
if _should_retry(response=response):
if max_retries > retries:
await asyncio.sleep(_retry_timeout(response=response, retries=retries))
return await self.request(
path=path,
method=method,
base_url=base_url,
params=params,
json=json,
content=content,
files=files,
headers=headers,
request_options=request_options,
retries=retries + 1,
omit=omit,
)
return response
@asynccontextmanager
async def stream(
self,
path: typing.Optional[str] = None,
*,
method: str,
base_url: typing.Optional[str] = None,
params: typing.Optional[typing.Dict[str, typing.Any]] = None,
json: typing.Optional[typing.Any] = None,
data: typing.Optional[typing.Any] = None,
content: typing.Optional[typing.Union[bytes, typing.Iterator[bytes], typing.AsyncIterator[bytes]]] = None,
files: typing.Optional[typing.Dict[str, typing.Optional[typing.Union[File, typing.List[File]]]]] = None,
headers: typing.Optional[typing.Dict[str, typing.Any]] = None,
request_options: typing.Optional[RequestOptions] = None,
retries: int = 0,
omit: typing.Optional[typing.Any] = None,
) -> typing.AsyncIterator[httpx.Response]:
base_url = self.get_base_url(base_url)
timeout = (
request_options.get("timeout_in_seconds")
if request_options is not None and request_options.get("timeout_in_seconds") is not None
else self.base_timeout()
)
json_body, data_body = get_request_body(json=json, data=data, request_options=request_options, omit=omit)
async with self.httpx_client.stream(
method=method,
url=urllib.parse.urljoin(f"{base_url}/", path),
headers=jsonable_encoder(
remove_none_from_dict(
{
**self.base_headers(),
**(headers if headers is not None else {}),
**(request_options.get("additional_headers", {}) if request_options is not None else {}),
}
)
),
params=encode_query(
jsonable_encoder(
remove_none_from_dict(
remove_omit_from_dict(
{
**(params if params is not None else {}),
**(
request_options.get("additional_query_parameters", {})
if request_options is not None
else {}
),
},
omit=omit,
)
)
)
),
json=json_body,
data=data_body,
content=content,
files=(
convert_file_dict_to_httpx_tuples(remove_omit_from_dict(remove_none_from_dict(files), omit))
if files is not None
else None
),
timeout=timeout,
) as stream:
yield stream

View File

@@ -0,0 +1,101 @@
# This file was auto-generated by Fern from our API Definition.
"""
jsonable_encoder converts a Python object to a JSON-friendly dict
(e.g. datetimes to strings, Pydantic models to dicts).
Taken from FastAPI, and made a bit simpler
https://github.com/tiangolo/fastapi/blob/master/fastapi/encoders.py
"""
import base64
import dataclasses
import datetime as dt
from enum import Enum
from pathlib import PurePath
from types import GeneratorType
from typing import Any, Callable, Dict, List, Optional, Set, Union
import pydantic
from .datetime_utils import serialize_datetime
from .pydantic_utilities import (
IS_PYDANTIC_V2,
encode_by_type,
to_jsonable_with_fallback,
)
SetIntStr = Set[Union[int, str]]
DictIntStrAny = Dict[Union[int, str], Any]
def jsonable_encoder(obj: Any, custom_encoder: Optional[Dict[Any, Callable[[Any], Any]]] = None) -> Any:
custom_encoder = custom_encoder or {}
if custom_encoder:
if type(obj) in custom_encoder:
return custom_encoder[type(obj)](obj)
else:
for encoder_type, encoder_instance in custom_encoder.items():
if isinstance(obj, encoder_type):
return encoder_instance(obj)
if isinstance(obj, pydantic.BaseModel):
if IS_PYDANTIC_V2:
encoder = getattr(obj.model_config, "json_encoders", {}) # type: ignore # Pydantic v2
else:
encoder = getattr(obj.__config__, "json_encoders", {}) # type: ignore # Pydantic v1
if custom_encoder:
encoder.update(custom_encoder)
obj_dict = obj.dict(by_alias=True)
if "__root__" in obj_dict:
obj_dict = obj_dict["__root__"]
if "root" in obj_dict:
obj_dict = obj_dict["root"]
return jsonable_encoder(obj_dict, custom_encoder=encoder)
if dataclasses.is_dataclass(obj):
obj_dict = dataclasses.asdict(obj) # type: ignore
return jsonable_encoder(obj_dict, custom_encoder=custom_encoder)
if isinstance(obj, bytes):
return base64.b64encode(obj).decode("utf-8")
if isinstance(obj, Enum):
return obj.value
if isinstance(obj, PurePath):
return str(obj)
if isinstance(obj, (str, int, float, type(None))):
return obj
if isinstance(obj, dt.datetime):
return serialize_datetime(obj)
if isinstance(obj, dt.date):
return str(obj)
if isinstance(obj, dict):
encoded_dict = {}
allowed_keys = set(obj.keys())
for key, value in obj.items():
if key in allowed_keys:
encoded_key = jsonable_encoder(key, custom_encoder=custom_encoder)
encoded_value = jsonable_encoder(value, custom_encoder=custom_encoder)
encoded_dict[encoded_key] = encoded_value
return encoded_dict
if isinstance(obj, (list, set, frozenset, GeneratorType, tuple)):
encoded_list = []
for item in obj:
encoded_list.append(jsonable_encoder(item, custom_encoder=custom_encoder))
return encoded_list
def fallback_serializer(o: Any) -> Any:
attempt_encode = encode_by_type(o)
if attempt_encode is not None:
return attempt_encode
try:
data = dict(o)
except Exception as e:
errors: List[Exception] = []
errors.append(e)
try:
data = vars(o)
except Exception as e:
errors.append(e)
raise ValueError(errors) from e
return jsonable_encoder(data, custom_encoder=custom_encoder)
return to_jsonable_with_fallback(obj, fallback_serializer)

View File

@@ -0,0 +1,296 @@
# This file was auto-generated by Fern from our API Definition.
# nopycln: file
import datetime as dt
import typing
from collections import defaultdict
import typing_extensions
import pydantic
from .datetime_utils import serialize_datetime
from .serialization import convert_and_respect_annotation_metadata
IS_PYDANTIC_V2 = pydantic.VERSION.startswith("2.")
if IS_PYDANTIC_V2:
# isort will try to reformat the comments on these imports, which breaks mypy
# isort: off
from pydantic.v1.datetime_parse import ( # type: ignore # pyright: ignore[reportMissingImports] # Pydantic v2
parse_date as parse_date,
)
from pydantic.v1.datetime_parse import ( # pyright: ignore[reportMissingImports] # Pydantic v2
parse_datetime as parse_datetime,
)
from pydantic.v1.json import ( # type: ignore # pyright: ignore[reportMissingImports] # Pydantic v2
ENCODERS_BY_TYPE as encoders_by_type,
)
from pydantic.v1.typing import ( # type: ignore # pyright: ignore[reportMissingImports] # Pydantic v2
get_args as get_args,
)
from pydantic.v1.typing import ( # pyright: ignore[reportMissingImports] # Pydantic v2
get_origin as get_origin,
)
from pydantic.v1.typing import ( # pyright: ignore[reportMissingImports] # Pydantic v2
is_literal_type as is_literal_type,
)
from pydantic.v1.typing import ( # pyright: ignore[reportMissingImports] # Pydantic v2
is_union as is_union,
)
from pydantic.v1.fields import ModelField as ModelField # type: ignore # pyright: ignore[reportMissingImports] # Pydantic v2
else:
from pydantic.datetime_parse import parse_date as parse_date # type: ignore # Pydantic v1
from pydantic.datetime_parse import parse_datetime as parse_datetime # type: ignore # Pydantic v1
from pydantic.fields import ModelField as ModelField # type: ignore # Pydantic v1
from pydantic.json import ENCODERS_BY_TYPE as encoders_by_type # type: ignore # Pydantic v1
from pydantic.typing import get_args as get_args # type: ignore # Pydantic v1
from pydantic.typing import get_origin as get_origin # type: ignore # Pydantic v1
from pydantic.typing import is_literal_type as is_literal_type # type: ignore # Pydantic v1
from pydantic.typing import is_union as is_union # type: ignore # Pydantic v1
# isort: on
T = typing.TypeVar("T")
Model = typing.TypeVar("Model", bound=pydantic.BaseModel)
def parse_obj_as(type_: typing.Type[T], object_: typing.Any) -> T:
dealiased_object = convert_and_respect_annotation_metadata(object_=object_, annotation=type_, direction="read")
if IS_PYDANTIC_V2:
adapter = pydantic.TypeAdapter(type_) # type: ignore # Pydantic v2
return adapter.validate_python(dealiased_object)
else:
return pydantic.parse_obj_as(type_, dealiased_object)
def to_jsonable_with_fallback(
obj: typing.Any, fallback_serializer: typing.Callable[[typing.Any], typing.Any]
) -> typing.Any:
if IS_PYDANTIC_V2:
from pydantic_core import to_jsonable_python
return to_jsonable_python(obj, fallback=fallback_serializer)
else:
return fallback_serializer(obj)
class UniversalBaseModel(pydantic.BaseModel):
if IS_PYDANTIC_V2:
model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict(
# Allow fields begining with `model_` to be used in the model
protected_namespaces=(),
) # type: ignore # Pydantic v2
@pydantic.model_serializer(mode="wrap", when_used="json") # type: ignore # Pydantic v2
def serialize_model(self, handler: pydantic.SerializerFunctionWrapHandler) -> typing.Any: # type: ignore # Pydantic v2
serialized = handler(self)
data = {k: serialize_datetime(v) if isinstance(v, dt.datetime) else v for k, v in serialized.items()}
return data
else:
class Config:
smart_union = True
json_encoders = {dt.datetime: serialize_datetime}
@classmethod
def model_construct(
cls: typing.Type["Model"], _fields_set: typing.Optional[typing.Set[str]] = None, **values: typing.Any
) -> "Model":
dealiased_object = convert_and_respect_annotation_metadata(object_=values, annotation=cls, direction="read")
return cls.construct(_fields_set, **dealiased_object)
@classmethod
def construct(
cls: typing.Type["Model"], _fields_set: typing.Optional[typing.Set[str]] = None, **values: typing.Any
) -> "Model":
dealiased_object = convert_and_respect_annotation_metadata(object_=values, annotation=cls, direction="read")
if IS_PYDANTIC_V2:
return super().model_construct(_fields_set, **dealiased_object) # type: ignore # Pydantic v2
else:
return super().construct(_fields_set, **dealiased_object)
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
"by_alias": True,
"exclude_unset": True,
**kwargs,
}
if IS_PYDANTIC_V2:
return super().model_dump_json(**kwargs_with_defaults) # type: ignore # Pydantic v2
else:
return super().json(**kwargs_with_defaults)
def dict(self, **kwargs: typing.Any) -> typing.Dict[str, typing.Any]:
"""
Override the default dict method to `exclude_unset` by default. This function patches
`exclude_unset` to work include fields within non-None default values.
"""
# Note: the logic here is multi-plexed given the levers exposed in Pydantic V1 vs V2
# Pydantic V1's .dict can be extremely slow, so we do not want to call it twice.
#
# We'd ideally do the same for Pydantic V2, but it shells out to a library to serialize models
# that we have less control over, and this is less intrusive than custom serializers for now.
if IS_PYDANTIC_V2:
kwargs_with_defaults_exclude_unset: typing.Any = {
**kwargs,
"by_alias": True,
"exclude_unset": True,
"exclude_none": False,
}
kwargs_with_defaults_exclude_none: typing.Any = {
**kwargs,
"by_alias": True,
"exclude_none": True,
"exclude_unset": False,
}
dict_dump = deep_union_pydantic_dicts(
super().model_dump(**kwargs_with_defaults_exclude_unset), # type: ignore # Pydantic v2
super().model_dump(**kwargs_with_defaults_exclude_none), # type: ignore # Pydantic v2
)
else:
_fields_set = self.__fields_set__.copy()
fields = _get_model_fields(self.__class__)
for name, field in fields.items():
if name not in _fields_set:
default = _get_field_default(field)
# If the default values are non-null act like they've been set
# This effectively allows exclude_unset to work like exclude_none where
# the latter passes through intentionally set none values.
if default is not None or ("exclude_unset" in kwargs and not kwargs["exclude_unset"]):
_fields_set.add(name)
if default is not None:
self.__fields_set__.add(name)
kwargs_with_defaults_exclude_unset_include_fields: typing.Any = {
"by_alias": True,
"exclude_unset": True,
"include": _fields_set,
**kwargs,
}
dict_dump = super().dict(**kwargs_with_defaults_exclude_unset_include_fields)
return convert_and_respect_annotation_metadata(object_=dict_dump, annotation=self.__class__, direction="write")
def _union_list_of_pydantic_dicts(
source: typing.List[typing.Any], destination: typing.List[typing.Any]
) -> typing.List[typing.Any]:
converted_list: typing.List[typing.Any] = []
for i, item in enumerate(source):
destination_value = destination[i] # type: ignore
if isinstance(item, dict):
converted_list.append(deep_union_pydantic_dicts(item, destination_value))
elif isinstance(item, list):
converted_list.append(_union_list_of_pydantic_dicts(item, destination_value))
else:
converted_list.append(item)
return converted_list
def deep_union_pydantic_dicts(
source: typing.Dict[str, typing.Any], destination: typing.Dict[str, typing.Any]
) -> typing.Dict[str, typing.Any]:
for key, value in source.items():
node = destination.setdefault(key, {})
if isinstance(value, dict):
deep_union_pydantic_dicts(value, node)
# Note: we do not do this same processing for sets given we do not have sets of models
# and given the sets are unordered, the processing of the set and matching objects would
# be non-trivial.
elif isinstance(value, list):
destination[key] = _union_list_of_pydantic_dicts(value, node)
else:
destination[key] = value
return destination
if IS_PYDANTIC_V2:
class V2RootModel(UniversalBaseModel, pydantic.RootModel): # type: ignore # Pydantic v2
pass
UniversalRootModel: typing_extensions.TypeAlias = V2RootModel # type: ignore
else:
UniversalRootModel: typing_extensions.TypeAlias = UniversalBaseModel # type: ignore
def encode_by_type(o: typing.Any) -> typing.Any:
encoders_by_class_tuples: typing.Dict[typing.Callable[[typing.Any], typing.Any], typing.Tuple[typing.Any, ...]] = (
defaultdict(tuple)
)
for type_, encoder in encoders_by_type.items():
encoders_by_class_tuples[encoder] += (type_,)
if type(o) in encoders_by_type:
return encoders_by_type[type(o)](o)
for encoder, classes_tuple in encoders_by_class_tuples.items():
if isinstance(o, classes_tuple):
return encoder(o)
def update_forward_refs(model: typing.Type["Model"], **localns: typing.Any) -> None:
if IS_PYDANTIC_V2:
model.model_rebuild(raise_errors=False) # type: ignore # Pydantic v2
else:
model.update_forward_refs(**localns)
# Mirrors Pydantic's internal typing
AnyCallable = typing.Callable[..., typing.Any]
def universal_root_validator(
pre: bool = False,
) -> typing.Callable[[AnyCallable], AnyCallable]:
def decorator(func: AnyCallable) -> AnyCallable:
if IS_PYDANTIC_V2:
return pydantic.model_validator(mode="before" if pre else "after")(func) # type: ignore # Pydantic v2
else:
return pydantic.root_validator(pre=pre)(func) # type: ignore # Pydantic v1
return decorator
def universal_field_validator(field_name: str, pre: bool = False) -> typing.Callable[[AnyCallable], AnyCallable]:
def decorator(func: AnyCallable) -> AnyCallable:
if IS_PYDANTIC_V2:
return pydantic.field_validator(field_name, mode="before" if pre else "after")(func) # type: ignore # Pydantic v2
else:
return pydantic.validator(field_name, pre=pre)(func) # type: ignore # Pydantic v1
return decorator
PydanticField = typing.Union[ModelField, pydantic.fields.FieldInfo]
def _get_model_fields(
model: typing.Type["Model"],
) -> typing.Mapping[str, PydanticField]:
if IS_PYDANTIC_V2:
return model.model_fields # type: ignore # Pydantic v2
else:
return model.__fields__ # type: ignore # Pydantic v1
def _get_field_default(field: PydanticField) -> typing.Any:
try:
value = field.get_default() # type: ignore # Pydantic < v1.10.15
except:
value = field.default
if IS_PYDANTIC_V2:
from pydantic_core import PydanticUndefined
if value == PydanticUndefined:
return None
return value
return value

View File

@@ -0,0 +1,58 @@
# This file was auto-generated by Fern from our API Definition.
from typing import Any, Dict, List, Optional, Tuple
import pydantic
# Flattens dicts to be of the form {"key[subkey][subkey2]": value} where value is not a dict
def traverse_query_dict(dict_flat: Dict[str, Any], key_prefix: Optional[str] = None) -> List[Tuple[str, Any]]:
result = []
for k, v in dict_flat.items():
key = f"{key_prefix}[{k}]" if key_prefix is not None else k
if isinstance(v, dict):
result.extend(traverse_query_dict(v, key))
elif isinstance(v, list):
for arr_v in v:
if isinstance(arr_v, dict):
result.extend(traverse_query_dict(arr_v, key))
else:
result.append((key, arr_v))
else:
result.append((key, v))
return result
def single_query_encoder(query_key: str, query_value: Any) -> List[Tuple[str, Any]]:
if isinstance(query_value, pydantic.BaseModel) or isinstance(query_value, dict):
if isinstance(query_value, pydantic.BaseModel):
obj_dict = query_value.dict(by_alias=True)
else:
obj_dict = query_value
return traverse_query_dict(obj_dict, query_key)
elif isinstance(query_value, list):
encoded_values: List[Tuple[str, Any]] = []
for value in query_value:
if isinstance(value, pydantic.BaseModel) or isinstance(value, dict):
if isinstance(value, pydantic.BaseModel):
obj_dict = value.dict(by_alias=True)
elif isinstance(value, dict):
obj_dict = value
encoded_values.extend(single_query_encoder(query_key, obj_dict))
else:
encoded_values.append((query_key, value))
return encoded_values
return [(query_key, query_value)]
def encode_query(query: Optional[Dict[str, Any]]) -> Optional[List[Tuple[str, Any]]]:
if query is None:
return None
encoded_query = []
for k, v in query.items():
encoded_query.extend(single_query_encoder(k, v))
return encoded_query

View File

@@ -0,0 +1,11 @@
# This file was auto-generated by Fern from our API Definition.
from typing import Any, Dict, Mapping, Optional
def remove_none_from_dict(original: Mapping[str, Optional[Any]]) -> Dict[str, Any]:
new: Dict[str, Any] = {}
for key, value in original.items():
if value is not None:
new[key] = value
return new

View File

@@ -0,0 +1,35 @@
# This file was auto-generated by Fern from our API Definition.
import typing
try:
from typing import NotRequired # type: ignore
except ImportError:
from typing_extensions import NotRequired
class RequestOptions(typing.TypedDict, total=False):
"""
Additional options for request-specific configuration when calling APIs via the SDK.
This is used primarily as an optional final parameter for service functions.
Attributes:
- timeout_in_seconds: int. The number of seconds to await an API call before timing out.
- max_retries: int. The max number of retries to attempt if the API call fails.
- additional_headers: typing.Dict[str, typing.Any]. A dictionary containing additional parameters to spread into the request's header dict
- additional_query_parameters: typing.Dict[str, typing.Any]. A dictionary containing additional parameters to spread into the request's query parameters dict
- additional_body_parameters: typing.Dict[str, typing.Any]. A dictionary containing additional parameters to spread into the request's body parameters dict
- chunk_size: int. The size, in bytes, to process each chunk of data being streamed back within the response. This equates to leveraging `chunk_size` within `requests` or `httpx`, and is only leveraged for file downloads.
"""
timeout_in_seconds: NotRequired[int]
max_retries: NotRequired[int]
additional_headers: NotRequired[typing.Dict[str, typing.Any]]
additional_query_parameters: NotRequired[typing.Dict[str, typing.Any]]
additional_body_parameters: NotRequired[typing.Dict[str, typing.Any]]
chunk_size: NotRequired[int]

View File

@@ -0,0 +1,272 @@
# This file was auto-generated by Fern from our API Definition.
import collections
import inspect
import typing
import typing_extensions
import pydantic
class FieldMetadata:
"""
Metadata class used to annotate fields to provide additional information.
Example:
class MyDict(TypedDict):
field: typing.Annotated[str, FieldMetadata(alias="field_name")]
Will serialize: `{"field": "value"}`
To: `{"field_name": "value"}`
"""
alias: str
def __init__(self, *, alias: str) -> None:
self.alias = alias
def convert_and_respect_annotation_metadata(
*,
object_: typing.Any,
annotation: typing.Any,
inner_type: typing.Optional[typing.Any] = None,
direction: typing.Literal["read", "write"],
) -> typing.Any:
"""
Respect the metadata annotations on a field, such as aliasing. This function effectively
manipulates the dict-form of an object to respect the metadata annotations. This is primarily used for
TypedDicts, which cannot support aliasing out of the box, and can be extended for additional
utilities, such as defaults.
Parameters
----------
object_ : typing.Any
annotation : type
The type we're looking to apply typing annotations from
inner_type : typing.Optional[type]
Returns
-------
typing.Any
"""
if object_ is None:
return None
if inner_type is None:
inner_type = annotation
clean_type = _remove_annotations(inner_type)
# Pydantic models
if (
inspect.isclass(clean_type)
and issubclass(clean_type, pydantic.BaseModel)
and isinstance(object_, typing.Mapping)
):
return _convert_mapping(object_, clean_type, direction)
# TypedDicts
if typing_extensions.is_typeddict(clean_type) and isinstance(object_, typing.Mapping):
return _convert_mapping(object_, clean_type, direction)
if (
typing_extensions.get_origin(clean_type) == typing.Dict
or typing_extensions.get_origin(clean_type) == dict
or clean_type == typing.Dict
) and isinstance(object_, typing.Dict):
key_type = typing_extensions.get_args(clean_type)[0]
value_type = typing_extensions.get_args(clean_type)[1]
return {
key: convert_and_respect_annotation_metadata(
object_=value,
annotation=annotation,
inner_type=value_type,
direction=direction,
)
for key, value in object_.items()
}
# If you're iterating on a string, do not bother to coerce it to a sequence.
if not isinstance(object_, str):
if (
typing_extensions.get_origin(clean_type) == typing.Set
or typing_extensions.get_origin(clean_type) == set
or clean_type == typing.Set
) and isinstance(object_, typing.Set):
inner_type = typing_extensions.get_args(clean_type)[0]
return {
convert_and_respect_annotation_metadata(
object_=item,
annotation=annotation,
inner_type=inner_type,
direction=direction,
)
for item in object_
}
elif (
(
typing_extensions.get_origin(clean_type) == typing.List
or typing_extensions.get_origin(clean_type) == list
or clean_type == typing.List
)
and isinstance(object_, typing.List)
) or (
(
typing_extensions.get_origin(clean_type) == typing.Sequence
or typing_extensions.get_origin(clean_type) == collections.abc.Sequence
or clean_type == typing.Sequence
)
and isinstance(object_, typing.Sequence)
):
inner_type = typing_extensions.get_args(clean_type)[0]
return [
convert_and_respect_annotation_metadata(
object_=item,
annotation=annotation,
inner_type=inner_type,
direction=direction,
)
for item in object_
]
if typing_extensions.get_origin(clean_type) == typing.Union:
# We should be able to ~relatively~ safely try to convert keys against all
# member types in the union, the edge case here is if one member aliases a field
# of the same name to a different name from another member
# Or if another member aliases a field of the same name that another member does not.
for member in typing_extensions.get_args(clean_type):
object_ = convert_and_respect_annotation_metadata(
object_=object_,
annotation=annotation,
inner_type=member,
direction=direction,
)
return object_
annotated_type = _get_annotation(annotation)
if annotated_type is None:
return object_
# If the object is not a TypedDict, a Union, or other container (list, set, sequence, etc.)
# Then we can safely call it on the recursive conversion.
return object_
def _convert_mapping(
object_: typing.Mapping[str, object],
expected_type: typing.Any,
direction: typing.Literal["read", "write"],
) -> typing.Mapping[str, object]:
converted_object: typing.Dict[str, object] = {}
annotations = typing_extensions.get_type_hints(expected_type, include_extras=True)
aliases_to_field_names = _get_alias_to_field_name(annotations)
for key, value in object_.items():
if direction == "read" and key in aliases_to_field_names:
dealiased_key = aliases_to_field_names.get(key)
if dealiased_key is not None:
type_ = annotations.get(dealiased_key)
else:
type_ = annotations.get(key)
# Note you can't get the annotation by the field name if you're in read mode, so you must check the aliases map
#
# So this is effectively saying if we're in write mode, and we don't have a type, or if we're in read mode and we don't have an alias
# then we can just pass the value through as is
if type_ is None:
converted_object[key] = value
elif direction == "read" and key not in aliases_to_field_names:
converted_object[key] = convert_and_respect_annotation_metadata(
object_=value, annotation=type_, direction=direction
)
else:
converted_object[_alias_key(key, type_, direction, aliases_to_field_names)] = (
convert_and_respect_annotation_metadata(object_=value, annotation=type_, direction=direction)
)
return converted_object
def _get_annotation(type_: typing.Any) -> typing.Optional[typing.Any]:
maybe_annotated_type = typing_extensions.get_origin(type_)
if maybe_annotated_type is None:
return None
if maybe_annotated_type == typing_extensions.NotRequired:
type_ = typing_extensions.get_args(type_)[0]
maybe_annotated_type = typing_extensions.get_origin(type_)
if maybe_annotated_type == typing_extensions.Annotated:
return type_
return None
def _remove_annotations(type_: typing.Any) -> typing.Any:
maybe_annotated_type = typing_extensions.get_origin(type_)
if maybe_annotated_type is None:
return type_
if maybe_annotated_type == typing_extensions.NotRequired:
return _remove_annotations(typing_extensions.get_args(type_)[0])
if maybe_annotated_type == typing_extensions.Annotated:
return _remove_annotations(typing_extensions.get_args(type_)[0])
return type_
def get_alias_to_field_mapping(type_: typing.Any) -> typing.Dict[str, str]:
annotations = typing_extensions.get_type_hints(type_, include_extras=True)
return _get_alias_to_field_name(annotations)
def get_field_to_alias_mapping(type_: typing.Any) -> typing.Dict[str, str]:
annotations = typing_extensions.get_type_hints(type_, include_extras=True)
return _get_field_to_alias_name(annotations)
def _get_alias_to_field_name(
field_to_hint: typing.Dict[str, typing.Any],
) -> typing.Dict[str, str]:
aliases = {}
for field, hint in field_to_hint.items():
maybe_alias = _get_alias_from_type(hint)
if maybe_alias is not None:
aliases[maybe_alias] = field
return aliases
def _get_field_to_alias_name(
field_to_hint: typing.Dict[str, typing.Any],
) -> typing.Dict[str, str]:
aliases = {}
for field, hint in field_to_hint.items():
maybe_alias = _get_alias_from_type(hint)
if maybe_alias is not None:
aliases[field] = maybe_alias
return aliases
def _get_alias_from_type(type_: typing.Any) -> typing.Optional[str]:
maybe_annotated_type = _get_annotation(type_)
if maybe_annotated_type is not None:
# The actual annotations are 1 onward, the first is the annotated type
annotations = typing_extensions.get_args(maybe_annotated_type)[1:]
for annotation in annotations:
if isinstance(annotation, FieldMetadata) and annotation.alias is not None:
return annotation.alias
return None
def _alias_key(
key: str,
type_: typing.Any,
direction: typing.Literal["read", "write"],
aliases_to_field_names: typing.Dict[str, str],
) -> str:
if direction == "read":
return aliases_to_field_names.get(key, key)
return _get_alias_from_type(type_=type_) or key