Files
Dorod-Sky/skyvern/agent/client.py
2025-03-24 22:08:37 -07:00

91 lines
3.1 KiB
Python

from typing import Any
import httpx
from skyvern.config import settings
from skyvern.exceptions import SkyvernClientException
from skyvern.forge.sdk.workflow.models.workflow import RunWorkflowResponse, WorkflowRunResponse
from skyvern.schemas.runs import ProxyLocation, RunEngine, TaskRunResponse
class SkyvernClient:
def __init__(
self,
base_url: str = settings.SKYVERN_BASE_URL,
api_key: str = settings.SKYVERN_API_KEY,
) -> None:
self.base_url = base_url
self.api_key = api_key
async def run_task(
self,
goal: str,
engine: RunEngine = RunEngine.skyvern_v1,
url: str | None = None,
webhook_url: str | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
title: str | None = None,
error_code_mapping: dict[str, str] | None = None,
proxy_location: ProxyLocation | None = None,
max_steps: int | None = None,
) -> TaskRunResponse:
if engine == RunEngine.skyvern_v1:
return TaskRunResponse()
elif engine == RunEngine.skyvern_v2:
return TaskRunResponse()
raise ValueError(f"Invalid engine: {engine}")
async def run_workflow(
self,
workflow_id: str,
workflow_input: dict | None = None,
webhook_url: str | None = None,
proxy_location: ProxyLocation | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
) -> RunWorkflowResponse:
data: dict[str, Any] = {
"webhook_callback_url": webhook_url,
"proxy_location": proxy_location,
"totp_identifier": totp_identifier,
"totp_url": totp_url,
}
if workflow_input:
data["data"] = workflow_input
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/v1/workflows/{workflow_id}/run",
headers={"x-api-key": self.api_key},
json=data,
)
if response.status_code != 200:
raise SkyvernClientException(
f"Failed to run workflow: {response.text}",
status_code=response.status_code,
)
return RunWorkflowResponse.model_validate(response.json())
async def get_run(
self,
run_id: str,
) -> TaskRunResponse:
return TaskRunResponse()
async def get_workflow_run(
self,
workflow_run_id: str,
) -> WorkflowRunResponse:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/api/v1/workflows/runs/{workflow_run_id}",
headers={"x-api-key": self.api_key},
timeout=60,
)
if response.status_code != 200:
raise SkyvernClientException(
f"Failed to get workflow run: {response.text}",
status_code=response.status_code,
)
return WorkflowRunResponse.model_validate(response.json())