add langchain support (#1805)

This commit is contained in:
LawyZheng
2025-02-21 15:56:06 +08:00
committed by GitHub
parent 02a8861d4a
commit 960a51b029
8 changed files with 5479 additions and 1 deletions

View File

@@ -0,0 +1,184 @@
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*
- [Skyvern Langchain](#skyvern-langchain)
- [Installation](#installation)
- [Usage](#usage)
- [Run a task(sync) with skyvern agent (calling skyvern agent function directly in the tool)](#run-a-tasksync-with-skyvern-agent-calling-skyvern-agent-function-directly-in-the-tool)
- [Run a task(async) with skyvern agent (calling skyvern agent function directly in the tool)](#run-a-taskasync-with-skyvern-agent-calling-skyvern-agent-function-directly-in-the-tool)
- [Run a task(sync) with skyvern client (calling skyvern OpenAPI in the tool)](#run-a-tasksync-with-skyvern-client-calling-skyvern-openapi-in-the-tool)
- [Run a task(async) with skyvern client (calling skyvern OpenAPI in the tool)](#run-a-taskasync-with-skyvern-client-calling-skyvern-openapi-in-the-tool)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Skyvern Langchain
This is a langchain integration for Skyvern.
## Installation
```bash
pip install skyvern-langchain
```
## Usage
### Run a task(sync) with skyvern agent (calling skyvern agent function directly in the tool)
> sync task won't return until the task is finished.
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
```python
import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from skyvern_langchain.agent import run_task_v2
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = initialize_agent(
llm=llm,
tools=[run_task_v2],
verbose=True,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
)
async def main():
# to run skyvern agent locally, must run `skyvern init` first
print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'"))
if __name__ == "__main__":
asyncio.run(main())
```
### Run a task(async) with skyvern agent (calling skyvern agent function directly in the tool)
> async task will return immediately and the task will be running in the background. You can use `get_task_v2` tool to poll the task information until the task is finished.
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
```python
import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from skyvern_langchain.agent import queue_task_v2, get_task_v2
from langchain_community.tools.sleep.tool import SleepTool
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = initialize_agent(
llm=llm,
tools=[
queue_task_v2,
get_task_v2,
SleepTool(),
],
verbose=True,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
)
async def main():
# use sleep tool to set up the polling logic until the task is completed, if you only want to queue a task, you can remove the sleep tool
print(await agent.ainvoke("Queue a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s."))
if __name__ == "__main__":
asyncio.run(main())
```
### Run a task(sync) with skyvern client (calling skyvern OpenAPI in the tool)
> sync task won't return until the task is finished.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
```python
import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from skyvern_langchain.client import RunSkyvernClientTaskV2Tool
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
run_task_v2 = RunSkyvernClientTaskV2Tool(
credential="<your_organization_api_key>",
)
agent = initialize_agent(
llm=llm,
tools=[run_task_v2],
verbose=True,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
)
async def main():
print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'"))
if __name__ == "__main__":
asyncio.run(main())
```
### Run a task(async) with skyvern client (calling skyvern OpenAPI in the tool)
> async task will return immediately and the task will be running in the background. You can use `GetSkyvernClientTaskV2Tool` tool to poll the task information until the task is finished.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
```python
import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from skyvern_langchain.client import (
QueueSkyvernClientTaskV2Tool,
GetSkyvernClientTaskV2Tool,
)
from langchain_community.tools.sleep.tool import SleepTool
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
queue_task_v2 = QueueSkyvernClientTaskV2Tool(
credential="<your_organization_api_key>",
)
get_task_v2 = GetSkyvernClientTaskV2Tool(
credential="<your_organization_api_key>",
)
agent = initialize_agent(
llm=llm,
tools=[
queue_task_v2,
get_task_v2,
SleepTool(),
],
verbose=True,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
)
async def main():
# use sleep tool to set up the polling logic until the task is completed, if you only want to queue a task, you can remove the sleep tool
print(await agent.ainvoke("Queue a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s."))
if __name__ == "__main__":
asyncio.run(main())
```

5072
integrations/langchain/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,20 @@
[tool.poetry]
name = "skyvern-langchain"
version = "0.1.1"
description = ""
authors = ["lawyzheng <lawy@skyvern.com>"]
packages = [{ include = "skyvern_langchain" }]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11,<3.12"
skyvern = "^0.1.56"
langchain = "^0.3.19"
[tool.poetry.group.dev.dependencies]
twine = "^6.1.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -0,0 +1,48 @@
from typing import Any, Dict
from langchain.tools import tool
from skyvern_langchain.schema import GetTaskInput, TaskV1Request, TaskV2Request
from skyvern.agent import Agent
from skyvern.forge.sdk.schemas.observers import ObserverTask
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskResponse
@tool("run-skyvern-agent-task-v1", args_schema=TaskV1Request)
async def run_task_v1(**kwargs: Dict[str, Any]) -> TaskResponse:
"""Use Skyvern agent to run a v1 task. It is usually used for the simple tasks. This function won't return until the task is finished."""
task_request = TaskV1Request(**kwargs)
return await Agent().run_task(task_request=task_request, timeout_seconds=task_request.timeout_seconds)
@tool("queue-skyvern-agent-task-v1", args_schema=TaskV1Request)
async def queue_task_v1(**kwargs: Dict[str, Any]) -> CreateTaskResponse:
"""Use Skyvern agent to queue a v1 task. It is usually used for the simple tasks. This function will return immediately and the task will be running in the background."""
task_request = TaskV1Request(**kwargs)
return await Agent().create_task(task_request=task_request)
@tool("get-skyvern-agent-task-v1", args_schema=GetTaskInput)
async def get_task_v1(task_id: str) -> TaskResponse | None:
"""Use Skyvern agent to get a v1 task. v1 tasks are usually simple tasks."""
return await Agent().get_task(task_id=task_id)
@tool("run-skyvern-agent-task-v2", args_schema=TaskV2Request)
async def run_task_v2(**kwargs: Dict[str, Any]) -> ObserverTask:
"""Use Skyvern agent to run a v2 task. It is usually used for the complicated tasks. This function won't return until the task is finished."""
task_request = TaskV2Request(**kwargs)
return await Agent().run_observer_task_v_2(task_request=task_request, timeout_seconds=task_request.timeout_seconds)
@tool("queue-skyvern-agent-task-v2", args_schema=TaskV2Request)
async def queue_task_v2(**kwargs: Dict[str, Any]) -> ObserverTask:
"""Use Skyvern agent to queue a v2 task. It is usually used for the complicated tasks. This function will return immediately and the task will be running in the background."""
task_request = TaskV2Request(**kwargs)
return await Agent().observer_task_v_2(task_request=task_request)
@tool("get-skyvern-agent-task-v2", args_schema=GetTaskInput)
async def get_task_v2(task_id: str) -> ObserverTask | None:
"""Use Skyvern agent to get a v2 task. v2 tasks are usually complicated tasks."""
return await Agent().get_observer_task_v_2(task_id=task_id)

View File

@@ -0,0 +1,136 @@
from typing import Any, Dict, Type
from httpx import AsyncClient
from langchain.tools import BaseTool
from pydantic import BaseModel
from skyvern_langchain.schema import GetTaskInput, TaskV1Request, TaskV2Request
from skyvern.client import AsyncSkyvern
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskResponse
class SkyvernClientBaseTool(BaseTool):
credential: str = ""
base_url: str = "https://api.skyvern.com"
def get_client(self) -> AsyncSkyvern:
httpx_client = AsyncClient(
headers={
"Content-Type": "application/json",
"x-api-key": self.credential,
},
)
return AsyncSkyvern(base_url=self.base_url, httpx_client=httpx_client)
def _run(self) -> None:
raise NotImplementedError("skyvern client tool does not support sync")
class RunSkyvernClientTaskV1Tool(SkyvernClientBaseTool):
name: str = "run-skyvern-client-task-v1"
description: str = """Use Skyvern client to run a v1 task. It is usually used for the simple tasks. This function won't return until the task is finished."""
args_schema: Type[BaseModel] = TaskV1Request
async def _arun(self, **kwargs: Dict[str, Any]) -> TaskResponse:
task_request = TaskV1Request(**kwargs)
return await self.get_client().agent.run_task(
max_steps_override=task_request.max_steps,
timeout_seconds=task_request.timeout_seconds,
url=task_request.url,
title=task_request.title,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
totp_identifier=task_request.totp_identifier,
navigation_goal=task_request.navigation_goal,
data_extraction_goal=task_request.data_extraction_goal,
navigation_payload=task_request.navigation_goal,
error_code_mapping=task_request.error_code_mapping,
proxy_location=task_request.proxy_location,
extracted_information_schema=task_request.extracted_information_schema,
complete_criterion=task_request.complete_criterion,
terminate_criterion=task_request.terminate_criterion,
browser_session_id=task_request.browser_session_id,
)
class QueueSkyvernClientTaskV1Tool(SkyvernClientBaseTool):
name: str = "queue-skyvern-client-task-v1"
description: str = """Use Skyvern client to queue a v1 task. It is usually used for the simple tasks. This function will return immediately and the task will be running in the background."""
args_schema: Type[BaseModel] = TaskV1Request
async def _arun(self, **kwargs: Dict[str, Any]) -> CreateTaskResponse:
task_request = TaskV1Request(**kwargs)
return await self.get_client().agent.create_task(
max_steps_override=task_request.max_steps,
url=task_request.url,
title=task_request.title,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
totp_identifier=task_request.totp_identifier,
navigation_goal=task_request.navigation_goal,
data_extraction_goal=task_request.data_extraction_goal,
navigation_payload=task_request.navigation_goal,
error_code_mapping=task_request.error_code_mapping,
proxy_location=task_request.proxy_location,
extracted_information_schema=task_request.extracted_information_schema,
complete_criterion=task_request.complete_criterion,
terminate_criterion=task_request.terminate_criterion,
browser_session_id=task_request.browser_session_id,
)
class GetSkyvernClientTaskV1Tool(SkyvernClientBaseTool):
name: str = "get-skyvern-client-task-v1"
description: str = """Use Skyvern client to get a v1 task. v1 tasks are usually simple tasks."""
args_schema: Type[BaseModel] = GetTaskInput
async def _arun(self, task_id: str) -> TaskResponse:
return await self.get_client().agent.get_task(task_id=task_id)
class RunSkyvernClientTaskV2Tool(SkyvernClientBaseTool):
name: str = "run-skyvern-client-task-v2"
description: str = """Use Skyvern client to run a v2 task. It is usually used for the complicated tasks. This function won't return until the task is finished."""
args_schema: Type[BaseModel] = TaskV2Request
async def _arun(self, **kwargs: Dict[str, Any]) -> Dict[str, Any | None]:
task_request = TaskV2Request(**kwargs)
return await self.get_client().agent.run_observer_task_v_2(
max_iterations_override=task_request.max_iterations,
timeout_seconds=task_request.timeout_seconds,
user_prompt=task_request.user_prompt,
url=task_request.url,
browser_session_id=task_request.browser_session_id,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
totp_identifier=task_request.totp_identifier,
proxy_location=task_request.proxy_location,
)
class QueueSkyvernClientTaskV2Tool(SkyvernClientBaseTool):
name: str = "queue-skyvern-client-task-v2"
description: str = """Use Skyvern client to queue a v2 task. It is usually used for the complicated tasks. This function will return immediately and the task will be running in the background."""
args_schema: Type[BaseModel] = TaskV2Request
async def _arun(self, **kwargs: Dict[str, Any]) -> Dict[str, Any | None]:
task_request = TaskV2Request(**kwargs)
return await self.get_client().agent.observer_task_v_2(
max_iterations_override=task_request.max_iterations,
user_prompt=task_request.user_prompt,
url=task_request.url,
browser_session_id=task_request.browser_session_id,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
totp_identifier=task_request.totp_identifier,
proxy_location=task_request.proxy_location,
)
class GetSkyvernClientTaskV2Tool(SkyvernClientBaseTool):
name: str = "get-skyvern-client-task-v2"
description: str = """Use Skyvern client to get a v2 task. It is usually used for the complicated tasks."""
args_schema: Type[BaseModel] = GetTaskInput
async def _arun(self, task_id: str) -> Dict[str, Any | None]:
return await self.get_client().agent.get_observer_task_v_2(task_id=task_id)

View File

@@ -0,0 +1,18 @@
from pydantic import BaseModel
from skyvern.forge.sdk.schemas.observers import ObserverTaskRequest
from skyvern.forge.sdk.schemas.tasks import TaskRequest
class TaskV1Request(TaskRequest):
max_steps: int = 10
timeout_seconds: int = 60 * 60
class TaskV2Request(ObserverTaskRequest):
max_iterations: int = 10
timeout_seconds: int = 60 * 60
class GetTaskInput(BaseModel):
task_id: str

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "skyvern"
version = "0.1.55"
version = "0.1.56"
description = ""
authors = ["Skyvern AI <info@skyvern.com>"]
readme = "README.md"