update langchain integration (#1815)

This commit is contained in:
LawyZheng
2025-03-03 10:38:00 +08:00
committed by GitHub
parent 889c8f1963
commit b3b548ed38
6 changed files with 369 additions and 211 deletions

View File

@@ -4,11 +4,16 @@
- [Skyvern Langchain](#skyvern-langchain)
- [Installation](#installation)
- [Usage](#usage)
- [Run a task(sync) with skyvern agent (calling skyvern agent function directly in the tool)](#run-a-tasksync-with-skyvern-agent-calling-skyvern-agent-function-directly-in-the-tool)
- [Run a task(async) with skyvern agent (calling skyvern agent function directly in the tool)](#run-a-taskasync-with-skyvern-agent-calling-skyvern-agent-function-directly-in-the-tool)
- [Run a task(sync) with skyvern client (calling skyvern OpenAPI in the tool)](#run-a-tasksync-with-skyvern-client-calling-skyvern-openapi-in-the-tool)
- [Run a task(async) with skyvern client (calling skyvern OpenAPI in the tool)](#run-a-taskasync-with-skyvern-client-calling-skyvern-openapi-in-the-tool)
- [Basic Usage](#basic-usage)
- [Run a task(sync) locally in your local environment](#run-a-tasksync-locally-in-your-local-environment)
- [Run a task(async) locally in your local environment](#run-a-taskasync-locally-in-your-local-environment)
- [Get a task locally in your local environment](#get-a-task-locally-in-your-local-environment)
- [Run a task(sync) by calling skyvern APIs](#run-a-tasksync-by-calling-skyvern-apis)
- [Run a task(async) by calling skyvern APIs](#run-a-taskasync-by-calling-skyvern-apis)
- [Get a task by calling skyvern APIs](#get-a-task-by-calling-skyvern-apis)
- [Agent Usage](#agent-usage)
- [Run a task(async) locally in your local environment and wait until the task is finished](#run-a-taskasync-locally-in-your-local-environment-and-wait-until-the-task-is-finished)
- [Run a task(async) by calling skyvern APIs and wait until the task is finished](#run-a-taskasync-by-calling-skyvern-apis-and-wait-until-the-task-is-finished)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
@@ -22,9 +27,20 @@ This is a langchain integration for Skyvern.
pip install skyvern-langchain
```
## Usage
To run the example scenarios, you might need to install other langchain dependencies.
```bash
pip install langchain-openai
pip install langchain-community
```
### Run a task(sync) with skyvern agent (calling skyvern agent function directly in the tool)
## Basic Usage
This is the only basic usage of skyvern langchain tool. If you want a full langchain integration experience, please refer to the [Agent Usage](#agent-usage) section to play with langchain agent.
Go to [Langchain Tools](https://python.langchain.com/v0.1/docs/modules/tools/) to see more advanced langchain tool usage.
### Run a task(sync) locally in your local environment
> sync task won't return until the task is finished.
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
@@ -32,34 +48,149 @@ pip install skyvern-langchain
```python
import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from skyvern_langchain.agent import run_task_v2
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = initialize_agent(
llm=llm,
tools=[run_task_v2],
verbose=True,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
)
from skyvern_langchain.agent import RunTask
run_task = RunTask()
async def main():
# to run skyvern agent locally, must run `skyvern init` first
print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'"))
print(await run_task.ainvoke("Navigate to the Hacker News homepage and get the top 3 posts."))
if __name__ == "__main__":
asyncio.run(main())
```
### Run a task(async) with skyvern agent (calling skyvern agent function directly in the tool)
> async task will return immediately and the task will be running in the background. You can use `get_task_v2` tool to poll the task information until the task is finished.
### Run a task(async) locally in your local environment
> async task will return immediately and the task will be running in the background.
:warning: :warning: if you want to run the task in the background, you need to keep the script running until the task is finished, otherwise the task will be killed when the script is finished.
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
```python
import asyncio
from skyvern_langchain.agent import DispatchTask
dispatch_task = DispatchTask()
async def main():
# to run skyvern agent locally, must run `skyvern init` first
print(await dispatch_task.ainvoke("Navigate to the Hacker News homepage and get the top 3 posts."))
# keep the script running until the task is finished
await asyncio.sleep(600)
if __name__ == "__main__":
asyncio.run(main())
```
### Get a task locally in your local environment
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
```python
import asyncio
from skyvern_langchain.agent import GetTask
get_task = GetTask()
async def main():
# to run skyvern agent locally, must run `skyvern init` first
print(await get_task.ainvoke("<task_id>"))
if __name__ == "__main__":
asyncio.run(main())
```
### Run a task(sync) by calling skyvern APIs
> sync task won't return until the task is finished.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
```python
import asyncio
from skyvern_langchain.client import RunTask
run_task = RunTask(
api_key="<your_organization_api_key>",
)
# or you can load the api_key from SKYVERN_API_KEY in .env
# run_task = RunTask()
async def main():
print(await run_task.ainvoke("Navigate to the Hacker News homepage and get the top 3 posts."))
if __name__ == "__main__":
asyncio.run(main())
```
### Run a task(async) by calling skyvern APIs
> async task will return immediately and the task will be running in the background.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
the task is actually running in the skyvern cloud service, so you don't need to keep your script running until the task is finished.
```python
import asyncio
from skyvern_langchain.client import DispatchTask
dispatch_task = DispatchTask(
api_key="<your_organization_api_key>",
)
# or you can load the api_key from SKYVERN_API_KEY in .env
# dispatch_task = DispatchTask()
async def main():
print(await dispatch_task.ainvoke("Navigate to the Hacker News homepage and get the top 3 posts."))
if __name__ == "__main__":
asyncio.run(main())
```
### Get a task by calling skyvern APIs
> async task will return immediately and the task will be running in the background.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
the task is actually running in the skyvern cloud service, so you don't need to keep your script running until the task is finished.
```python
import asyncio
from skyvern_langchain.client import GetTask
get_task = GetTask(
api_key="<your_organization_api_key>",
)
# or you can load the api_key from SKYVERN_API_KEY in .env
# get_task = GetTask()
async def main():
print(await get_task.ainvoke("<task_id>"))
if __name__ == "__main__":
asyncio.run(main())
```
## Agent Usage
Langchain is more powerful when used with [Langchain Agents](https://python.langchain.com/v0.1/docs/modules/agents/).
The following two examples show how to build an agent that executes a specified task, waits for its completion, and then returns the results. For example, the agent is tasked with navigating to the Hacker News homepage and retrieving the top three posts.
### Run a task(async) locally in your local environment and wait until the task is finished
> async task will return immediately and the task will be running in the background. You can use `GetTask` tool to poll the task information until the task is finished.
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
@@ -68,19 +199,23 @@ import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from skyvern_langchain.agent import queue_task_v2, get_task_v2
from skyvern_langchain.agent import DispatchTask, GetTask
from langchain_community.tools.sleep.tool import SleepTool
# load OpenAI API key from .env
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
dispatch_task = DispatchTask()
get_task = GetTask()
agent = initialize_agent(
llm=llm,
tools=[
queue_task_v2,
get_task_v2,
dispatch_task,
get_task,
SleepTool(),
],
verbose=True,
@@ -89,8 +224,8 @@ agent = initialize_agent(
async def main():
# use sleep tool to set up the polling logic until the task is completed, if you only want to queue a task, you can remove the sleep tool
print(await agent.ainvoke("Queue a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s."))
# use sleep tool to set up the polling logic until the task is completed, if you only want to dispatch a task, you can remove the sleep tool
print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s."))
if __name__ == "__main__":
@@ -98,8 +233,9 @@ if __name__ == "__main__":
```
### Run a task(sync) with skyvern client (calling skyvern OpenAPI in the tool)
> sync task won't return until the task is finished.
### Run a task(async) by calling skyvern APIs and wait until the task is finished
> async task will return immediately and the task will be running in the background. You can use `GetTask` tool to poll the task information until the task is finished.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
@@ -108,65 +244,32 @@ import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from skyvern_langchain.client import RunSkyvernClientTaskV2Tool
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
run_task_v2 = RunSkyvernClientTaskV2Tool(
credential="<your_organization_api_key>",
)
agent = initialize_agent(
llm=llm,
tools=[run_task_v2],
verbose=True,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
)
async def main():
print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'"))
if __name__ == "__main__":
asyncio.run(main())
```
### Run a task(async) with skyvern client (calling skyvern OpenAPI in the tool)
> async task will return immediately and the task will be running in the background. You can use `GetSkyvernClientTaskV2Tool` tool to poll the task information until the task is finished.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
```python
import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from skyvern_langchain.client import (
QueueSkyvernClientTaskV2Tool,
GetSkyvernClientTaskV2Tool,
)
from skyvern_langchain.client import DispatchTask, GetTask
from langchain_community.tools.sleep.tool import SleepTool
# load OpenAI API key from .env
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
queue_task_v2 = QueueSkyvernClientTaskV2Tool(
credential="<your_organization_api_key>",
dispatch_task = DispatchTask(
api_key="<your_organization_api_key>",
)
# or you can load the api_key from SKYVERN_API_KEY in .env
# dispatch_task = DispatchTask()
get_task_v2 = GetSkyvernClientTaskV2Tool(
credential="<your_organization_api_key>",
get_task = GetTask(
api_key="<your_organization_api_key>",
)
# or you can load the api_key from SKYVERN_API_KEY in .env
# get_task = GetTask()
agent = initialize_agent(
llm=llm,
tools=[
queue_task_v2,
get_task_v2,
dispatch_task,
get_task,
SleepTool(),
],
verbose=True,
@@ -175,8 +278,8 @@ agent = initialize_agent(
async def main():
# use sleep tool to set up the polling logic until the task is completed, if you only want to queue a task, you can remove the sleep tool
print(await agent.ainvoke("Queue a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s."))
# use sleep tool to set up the polling logic until the task is completed, if you only want to dispatch a task, you can remove the sleep tool
print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s."))
if __name__ == "__main__":

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "skyvern-langchain"
version = "0.1.1"
version = "0.1.5"
description = ""
authors = ["lawyzheng <lawy@skyvern.com>"]
packages = [{ include = "skyvern_langchain" }]

View File

@@ -1,48 +1,97 @@
from typing import Any, Dict
from typing import Any, Literal, Type
from langchain.tools import tool
from skyvern_langchain.schema import GetTaskInput, TaskV1Request, TaskV2Request
from langchain.tools import BaseTool
from litellm import BaseModel
from pydantic import Field
from skyvern_langchain.schema import CreateTaskInput, GetTaskInput
from skyvern_langchain.settings import settings
from skyvern.agent import Agent
from skyvern.forge.sdk.schemas.observers import ObserverTask
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskResponse
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.schemas.observers import ObserverTask, ObserverTaskRequest
from skyvern.forge.sdk.schemas.task_generations import TaskGenerationBase
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskRequest, TaskResponse
agent = Agent()
@tool("run-skyvern-agent-task-v1", args_schema=TaskV1Request)
async def run_task_v1(**kwargs: Dict[str, Any]) -> TaskResponse:
"""Use Skyvern agent to run a v1 task. It is usually used for the simple tasks. This function won't return until the task is finished."""
task_request = TaskV1Request(**kwargs)
return await Agent().run_task(task_request=task_request, timeout_seconds=task_request.timeout_seconds)
class SkyvernTaskBaseTool(BaseTool):
engine: Literal["TaskV1", "TaskV2"] = Field(default=settings.engine)
timeout_seconds: int = Field(default=settings.run_task_timeout)
agent: Agent = agent
def _run(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError("skyvern task tool does not support sync")
# TODO: agent haven't exposed the task v1 generate function, we can migrate to use agent interface when it's available
async def _generate_v1_task_request(self, user_prompt: str) -> TaskGenerationBase:
llm_prompt = prompt_engine.load_prompt("generate-task", user_prompt=user_prompt)
llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, prompt_name="generate-task")
return TaskGenerationBase.model_validate(llm_response)
@tool("queue-skyvern-agent-task-v1", args_schema=TaskV1Request)
async def queue_task_v1(**kwargs: Dict[str, Any]) -> CreateTaskResponse:
"""Use Skyvern agent to queue a v1 task. It is usually used for the simple tasks. This function will return immediately and the task will be running in the background."""
task_request = TaskV1Request(**kwargs)
return await Agent().create_task(task_request=task_request)
class RunTask(SkyvernTaskBaseTool):
name: str = "run-skyvern-agent-task"
description: str = """Use Skyvern agent to run a task. This function won't return until the task is finished."""
args_schema: Type[BaseModel] = CreateTaskInput
async def _arun(self, user_prompt: str, url: str | None = None) -> TaskResponse | ObserverTask:
if self.engine == "TaskV1":
return await self._arun_task_v1(user_prompt=user_prompt, url=url)
else:
return await self._arun_task_v2(user_prompt=user_prompt, url=url)
async def _arun_task_v1(self, user_prompt: str, url: str | None = None) -> TaskResponse:
task_generation = await self._generate_v1_task_request(user_prompt=user_prompt)
task_request = TaskRequest.model_validate(task_generation, from_attributes=True)
if url is not None:
task_request.url = url
return await self.agent.run_task(task_request=task_request, timeout_seconds=self.timeout_seconds)
async def _arun_task_v2(self, user_prompt: str, url: str | None = None) -> ObserverTask:
task_request = ObserverTaskRequest(user_prompt=user_prompt, url=url)
return await self.agent.run_observer_task_v_2(task_request=task_request, timeout_seconds=self.timeout_seconds)
@tool("get-skyvern-agent-task-v1", args_schema=GetTaskInput)
async def get_task_v1(task_id: str) -> TaskResponse | None:
"""Use Skyvern agent to get a v1 task. v1 tasks are usually simple tasks."""
return await Agent().get_task(task_id=task_id)
class DispatchTask(SkyvernTaskBaseTool):
name: str = "dispatch-skyvern-agent-task"
description: str = """Use Skyvern agent to dispatch a task. This function will return immediately and the task will be running in the background."""
args_schema: Type[BaseModel] = CreateTaskInput
async def _arun(self, user_prompt: str, url: str | None = None) -> CreateTaskResponse | ObserverTask:
if self.engine == "TaskV1":
return await self._arun_task_v1(user_prompt=user_prompt, url=url)
else:
return await self._arun_task_v2(user_prompt=user_prompt, url=url)
async def _arun_task_v1(self, user_prompt: str, url: str | None = None) -> CreateTaskResponse:
task_generation = await self._generate_v1_task_request(user_prompt=user_prompt)
task_request = TaskRequest.model_validate(task_generation, from_attributes=True)
if url is not None:
task_request.url = url
return await self.agent.create_task(task_request=task_request)
async def _arun_task_v2(self, user_prompt: str, url: str | None = None) -> ObserverTask:
task_request = ObserverTaskRequest(user_prompt=user_prompt, url=url)
return await self.agent.observer_task_v_2(task_request=task_request)
@tool("run-skyvern-agent-task-v2", args_schema=TaskV2Request)
async def run_task_v2(**kwargs: Dict[str, Any]) -> ObserverTask:
"""Use Skyvern agent to run a v2 task. It is usually used for the complicated tasks. This function won't return until the task is finished."""
task_request = TaskV2Request(**kwargs)
return await Agent().run_observer_task_v_2(task_request=task_request, timeout_seconds=task_request.timeout_seconds)
class GetTask(SkyvernTaskBaseTool):
name: str = "get-skyvern-agent-task"
description: str = """Use Skyvern agent to get a task."""
args_schema: Type[BaseModel] = GetTaskInput
async def _arun(self, task_id: str) -> TaskResponse | ObserverTask | None:
if self.engine == "TaskV1":
return await self._arun_task_v1(task_id=task_id)
else:
return await self._arun_task_v2(task_id=task_id)
@tool("queue-skyvern-agent-task-v2", args_schema=TaskV2Request)
async def queue_task_v2(**kwargs: Dict[str, Any]) -> ObserverTask:
"""Use Skyvern agent to queue a v2 task. It is usually used for the complicated tasks. This function will return immediately and the task will be running in the background."""
task_request = TaskV2Request(**kwargs)
return await Agent().observer_task_v_2(task_request=task_request)
async def _arun_task_v1(self, task_id: str) -> TaskResponse | None:
return await self.agent.get_task(task_id=task_id)
@tool("get-skyvern-agent-task-v2", args_schema=GetTaskInput)
async def get_task_v2(task_id: str) -> ObserverTask | None:
"""Use Skyvern agent to get a v2 task. v2 tasks are usually complicated tasks."""
return await Agent().get_observer_task_v_2(task_id=task_id)
async def _arun_task_v2(self, task_id: str) -> ObserverTask | None:
return await self.agent.get_observer_task_v_2(task_id=task_id)

View File

@@ -1,136 +1,132 @@
from typing import Any, Dict, Type
from typing import Any, Dict, Literal, Type
from httpx import AsyncClient
from langchain.tools import BaseTool
from pydantic import BaseModel
from skyvern_langchain.schema import GetTaskInput, TaskV1Request, TaskV2Request
from pydantic import BaseModel, Field
from skyvern_langchain.schema import CreateTaskInput, GetTaskInput
from skyvern_langchain.settings import settings
from skyvern.client import AsyncSkyvern
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskResponse
from skyvern.forge.sdk.schemas.observers import ObserverTaskRequest
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskRequest, TaskResponse
class SkyvernClientBaseTool(BaseTool):
credential: str = ""
base_url: str = "https://api.skyvern.com"
class SkyvernTaskBaseTool(BaseTool):
api_key: str = Field(default=settings.api_key)
base_url: str = Field(default=settings.base_url)
engine: Literal["TaskV1", "TaskV2"] = Field(default=settings.engine)
run_task_timeout_seconds: int = Field(default=settings.run_task_timeout)
def get_client(self) -> AsyncSkyvern:
httpx_client = AsyncClient(
headers={
"Content-Type": "application/json",
"x-api-key": self.credential,
"x-api-key": self.api_key,
},
)
return AsyncSkyvern(base_url=self.base_url, httpx_client=httpx_client)
def _run(self) -> None:
raise NotImplementedError("skyvern client tool does not support sync")
def _run(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError("skyvern task tool does not support sync")
class RunSkyvernClientTaskV1Tool(SkyvernClientBaseTool):
name: str = "run-skyvern-client-task-v1"
description: str = """Use Skyvern client to run a v1 task. It is usually used for the simple tasks. This function won't return until the task is finished."""
args_schema: Type[BaseModel] = TaskV1Request
class RunTask(SkyvernTaskBaseTool):
name: str = "run-skyvern-client-task"
description: str = """Use Skyvern client to run a task. This function won't return until the task is finished."""
args_schema: Type[BaseModel] = CreateTaskInput
async def _arun(self, **kwargs: Dict[str, Any]) -> TaskResponse:
task_request = TaskV1Request(**kwargs)
async def _arun(self, user_prompt: str, url: str | None = None) -> TaskResponse | Dict[str, Any | None]:
if self.engine == "TaskV1":
return await self._arun_task_v1(user_prompt=user_prompt, url=url)
else:
return await self._arun_task_v2(user_prompt=user_prompt, url=url)
async def _arun_task_v1(self, user_prompt: str, url: str | None = None) -> TaskResponse:
task_generation = await self.get_client().agent.generate_task(
prompt=user_prompt,
)
if url is not None:
task_generation.url = url
task_request = TaskRequest.model_validate(task_generation, from_attributes=True)
return await self.get_client().agent.run_task(
max_steps_override=task_request.max_steps,
timeout_seconds=task_request.timeout_seconds,
timeout_seconds=self.run_task_timeout_seconds,
url=task_request.url,
title=task_request.title,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
totp_identifier=task_request.totp_identifier,
navigation_goal=task_request.navigation_goal,
data_extraction_goal=task_request.data_extraction_goal,
navigation_payload=task_request.navigation_goal,
error_code_mapping=task_request.error_code_mapping,
proxy_location=task_request.proxy_location,
extracted_information_schema=task_request.extracted_information_schema,
complete_criterion=task_request.complete_criterion,
terminate_criterion=task_request.terminate_criterion,
browser_session_id=task_request.browser_session_id,
)
class QueueSkyvernClientTaskV1Tool(SkyvernClientBaseTool):
name: str = "queue-skyvern-client-task-v1"
description: str = """Use Skyvern client to queue a v1 task. It is usually used for the simple tasks. This function will return immediately and the task will be running in the background."""
args_schema: Type[BaseModel] = TaskV1Request
async def _arun(self, **kwargs: Dict[str, Any]) -> CreateTaskResponse:
task_request = TaskV1Request(**kwargs)
return await self.get_client().agent.create_task(
max_steps_override=task_request.max_steps,
url=task_request.url,
title=task_request.title,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
totp_identifier=task_request.totp_identifier,
navigation_goal=task_request.navigation_goal,
data_extraction_goal=task_request.data_extraction_goal,
navigation_payload=task_request.navigation_goal,
error_code_mapping=task_request.error_code_mapping,
proxy_location=task_request.proxy_location,
extracted_information_schema=task_request.extracted_information_schema,
complete_criterion=task_request.complete_criterion,
terminate_criterion=task_request.terminate_criterion,
browser_session_id=task_request.browser_session_id,
)
class GetSkyvernClientTaskV1Tool(SkyvernClientBaseTool):
name: str = "get-skyvern-client-task-v1"
description: str = """Use Skyvern client to get a v1 task. v1 tasks are usually simple tasks."""
args_schema: Type[BaseModel] = GetTaskInput
async def _arun(self, task_id: str) -> TaskResponse:
return await self.get_client().agent.get_task(task_id=task_id)
class RunSkyvernClientTaskV2Tool(SkyvernClientBaseTool):
name: str = "run-skyvern-client-task-v2"
description: str = """Use Skyvern client to run a v2 task. It is usually used for the complicated tasks. This function won't return until the task is finished."""
args_schema: Type[BaseModel] = TaskV2Request
async def _arun(self, **kwargs: Dict[str, Any]) -> Dict[str, Any | None]:
task_request = TaskV2Request(**kwargs)
async def _arun_task_v2(self, user_prompt: str, url: str | None = None) -> TaskResponse:
task_request = ObserverTaskRequest(url=url, user_prompt=user_prompt)
return await self.get_client().agent.run_observer_task_v_2(
max_iterations_override=task_request.max_iterations,
timeout_seconds=task_request.timeout_seconds,
timeout_seconds=self.run_task_timeout_seconds,
user_prompt=task_request.user_prompt,
url=task_request.url,
browser_session_id=task_request.browser_session_id,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
totp_identifier=task_request.totp_identifier,
proxy_location=task_request.proxy_location,
)
class QueueSkyvernClientTaskV2Tool(SkyvernClientBaseTool):
name: str = "queue-skyvern-client-task-v2"
description: str = """Use Skyvern client to queue a v2 task. It is usually used for the complicated tasks. This function will return immediately and the task will be running in the background."""
args_schema: Type[BaseModel] = TaskV2Request
class DispatchTask(SkyvernTaskBaseTool):
name: str = "dispatch-skyvern-client-task"
description: str = """Use Skyvern client to dispatch a task. This function will return immediately and the task will be running in the background."""
args_schema: Type[BaseModel] = CreateTaskInput
async def _arun(self, **kwargs: Dict[str, Any]) -> Dict[str, Any | None]:
task_request = TaskV2Request(**kwargs)
async def _arun(self, user_prompt: str, url: str | None = None) -> CreateTaskResponse | Dict[str, Any | None]:
if self.engine == "TaskV1":
return await self._arun_task_v1(user_prompt=user_prompt, url=url)
else:
return await self._arun_task_v2(user_prompt=user_prompt, url=url)
async def _arun_task_v1(self, user_prompt: str, url: str | None = None) -> CreateTaskResponse:
task_generation = await self.get_client().agent.generate_task(
prompt=user_prompt,
)
if url is not None:
task_generation.url = url
task_request = TaskRequest.model_validate(task_generation, from_attributes=True)
return await self.get_client().agent.create_task(
url=task_request.url,
title=task_request.title,
navigation_goal=task_request.navigation_goal,
data_extraction_goal=task_request.data_extraction_goal,
navigation_payload=task_request.navigation_goal,
error_code_mapping=task_request.error_code_mapping,
extracted_information_schema=task_request.extracted_information_schema,
complete_criterion=task_request.complete_criterion,
terminate_criterion=task_request.terminate_criterion,
)
async def _arun_task_v2(self, user_prompt: str, url: str | None = None) -> Dict[str, Any | None]:
task_request = ObserverTaskRequest(url=url, user_prompt=user_prompt)
return await self.get_client().agent.observer_task_v_2(
max_iterations_override=task_request.max_iterations,
user_prompt=task_request.user_prompt,
url=task_request.url,
browser_session_id=task_request.browser_session_id,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
totp_identifier=task_request.totp_identifier,
proxy_location=task_request.proxy_location,
)
class GetSkyvernClientTaskV2Tool(SkyvernClientBaseTool):
name: str = "get-skyvern-client-task-v2"
description: str = """Use Skyvern client to get a v2 task. It is usually used for the complicated tasks."""
class GetTask(SkyvernTaskBaseTool):
name: str = "get-skyvern-client-task"
description: str = """Use Skyvern client to get a task."""
args_schema: Type[BaseModel] = GetTaskInput
async def _arun(self, task_id: str) -> Dict[str, Any | None]:
if self.engine == "TaskV1":
return await self._arun_task_v1(task_id=task_id)
else:
return await self._arun_task_v2(task_id=task_id)
async def _arun_task_v1(self, task_id: str) -> TaskResponse:
return await self.get_client().agent.get_task(task_id=task_id)
async def _arun_task_v2(self, task_id: str) -> Dict[str, Any | None]:
return await self.get_client().agent.get_observer_task_v_2(task_id=task_id)

View File

@@ -1,17 +1,9 @@
from pydantic import BaseModel
from skyvern.forge.sdk.schemas.observers import ObserverTaskRequest
from skyvern.forge.sdk.schemas.tasks import TaskRequest
class TaskV1Request(TaskRequest):
max_steps: int = 10
timeout_seconds: int = 60 * 60
class TaskV2Request(ObserverTaskRequest):
max_iterations: int = 10
timeout_seconds: int = 60 * 60
class CreateTaskInput(BaseModel):
user_prompt: str
url: str | None = None
class GetTaskInput(BaseModel):

View File

@@ -0,0 +1,18 @@
from typing import Literal
from dotenv import load_dotenv
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
api_key: str = ""
base_url: str = "https://api.skyvern.com"
engine: Literal["TaskV1", "TaskV2"] = "TaskV2"
run_task_timeout_seconds: int = 60 * 60
class Config:
env_prefix = "SKYVERN_"
load_dotenv()
settings = Settings()