add llamaindex integration (#1809)

This commit is contained in:
LawyZheng
2025-03-04 02:28:58 +08:00
committed by GitHub
parent 8a1b0f3797
commit 088b772648
8 changed files with 6043 additions and 0 deletions

View File

@@ -0,0 +1,295 @@
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*
- [Skyvern LlamaIndex](#skyvern-llamaindex)
- [Installation](#installation)
- [Basic Usage](#basic-usage)
- [Run a task(sync) locally in your local environment](#run-a-tasksync-locally-in-your-local-environment)
- [Run a task(async) locally in your local environment](#run-a-taskasync-locally-in-your-local-environment)
- [Get a task locally in your local environment](#get-a-task-locally-in-your-local-environment)
- [Run a task(sync) by calling skyvern APIs](#run-a-tasksync-by-calling-skyvern-apis)
- [Run a task(async) by calling skyvern APIs](#run-a-taskasync-by-calling-skyvern-apis)
- [Get a task by calling skyvern APIs](#get-a-task-by-calling-skyvern-apis)
- [Advanced Usage](#advanced-usage)
- [Dispatch a task(async) locally in your local environment and wait until the task is finished](#dispatch-a-taskasync-locally-in-your-local-environment-and-wait-until-the-task-is-finished)
- [Dispatch a task(async) by calling skyvern APIs and wait until the task is finished](#dispatch-a-taskasync-by-calling-skyvern-apis-and-wait-until-the-task-is-finished)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Skyvern LlamaIndex
This is a LlamaIndex integration for Skyvern.
## Installation
```bash
pip install skyvern-llamaindex
```
## Basic Usage
### Run a task(sync) locally in your local environment
> sync task won't return until the task is finished.
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
```python
from dotenv import load_dotenv
from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from skyvern_llamaindex.agent import SkyvernTool
# load OpenAI API key from .env
load_dotenv()
skyvern_tool = SkyvernTool()
agent = OpenAIAgent.from_tools(
tools=[skyvern_tool.run_task()],
llm=OpenAI(model="gpt-4o"),
verbose=True,
)
response = agent.chat("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'")
print(response)
```
### Run a task(async) locally in your local environment
> async task will return immediately and the task will be running in the background.
:warning: :warning: if you want to run the task in the background, you need to keep the agent running until the task is finished, otherwise the task will be killed when the agent finished the chat.
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
```python
import asyncio
from dotenv import load_dotenv
from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from skyvern_llamaindex.agent import SkyvernTool
from llama_index.core.tools import FunctionTool
# load OpenAI API key from .env
load_dotenv()
async def sleep(seconds: int) -> str:
await asyncio.sleep(seconds)
return f"Slept for {seconds} seconds"
# define a sleep tool to keep the agent running until the task is finished
sleep_tool = FunctionTool.from_defaults(
async_fn=sleep,
description="Sleep for a given number of seconds",
name="sleep",
)
skyvern_tool = SkyvernTool()
agent = OpenAIAgent.from_tools(
tools=[skyvern_tool.dispatch_task(), sleep_tool],
llm=OpenAI(model="gpt-4o"),
verbose=True,
)
response = agent.chat("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, sleep for 10 minutes.")
print(response)
```
### Get a task locally in your local environment
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
```python
from dotenv import load_dotenv
from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from skyvern_llamaindex.agent import SkyvernTool
# load OpenAI API key from .env
load_dotenv()
skyvern_tool = SkyvernTool()
agent = OpenAIAgent.from_tools(
tools=[skyvern_tool.get_task()],
llm=OpenAI(model="gpt-4o"),
verbose=True,
)
response = agent.chat("Get the task information with Skyvern. The task id is '<task_id>'.")
print(response)
```
### Run a task(sync) by calling skyvern APIs
> sync task won't return until the task is finished.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
```python
from dotenv import load_dotenv
from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from skyvern_llamaindex.client import SkyvernTool
# load OpenAI API key from .env
load_dotenv()
skyvern_tool = SkyvernTool(api_key="<your_organization_api_key>")
# or you can load the api_key from SKYVERN_API_KEY in .env
# skyvern_tool = SkyvernTool()
agent = OpenAIAgent.from_tools(
tools=[skyvern_tool.run_task()],
llm=OpenAI(model="gpt-4o"),
verbose=True,
)
response = agent.chat("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'")
print(response)
```
### Run a task(async) by calling skyvern APIs
> async task will return immediately and the task will be running in the background.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
the task is actually running in the skyvern cloud service, so you don't need to keep your agent running until the task is finished.
```python
from dotenv import load_dotenv
from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from skyvern_llamaindex.client import SkyvernTool
# load OpenAI API key from .env
load_dotenv()
skyvern_tool = SkyvernTool(api_key="<your_organization_api_key>")
# or you can load the api_key from SKYVERN_API_KEY in .env
# skyvern_tool = SkyvernTool()
agent = OpenAIAgent.from_tools(
tools=[skyvern_tool.dispatch_task()],
llm=OpenAI(model="gpt-4o"),
verbose=True,
)
response = agent.chat("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'")
print(response)
```
### Get a task by calling skyvern APIs
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
```python
from dotenv import load_dotenv
from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from skyvern_llamaindex.client import SkyvernTool
# load OpenAI API key from .env
load_dotenv()
skyvern_tool = SkyvernTool(api_key="<your_organization_api_key>")
# or you can load the api_key from SKYVERN_API_KEY in .env
# skyvern_tool = SkyvernTool()
agent = OpenAIAgent.from_tools(
tools=[skyvern_tool.get_task()],
llm=OpenAI(model="gpt-4o"),
verbose=True,
)
response = agent.chat("Get the task information with Skyvern. The task id is '<task_id>'.")
print(response)
```
## Advanced Usage
To provide some examples of how to integrate Skyvern with other llama-index tools in the agent.
### Dispatch a task(async) locally in your local environment and wait until the task is finished
> dispatch task will return immediately and the task will be running in the background. You can use `get_task` tool to poll the task information until the task is finished.
:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key <your_openai_api_key>` command in your terminal to set up skyvern first.
```python
import asyncio
from dotenv import load_dotenv
from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from llama_index.core.tools import FunctionTool
from skyvern_llamaindex.agent import SkyvernTool
# load OpenAI API key from .env
load_dotenv()
async def sleep(seconds: int) -> str:
await asyncio.sleep(seconds)
return f"Slept for {seconds} seconds"
sleep_tool = FunctionTool.from_defaults(
async_fn=sleep,
description="Sleep for a given number of seconds",
name="sleep",
)
skyvern_tool = SkyvernTool()
agent = OpenAIAgent.from_tools(
tools=[skyvern_tool.dispatch_task(), skyvern_tool.get_task(), sleep_tool],
llm=OpenAI(model="gpt-4o"),
verbose=True,
max_function_calls=10,
)
response = agent.chat("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s.")
print(response)
```
### Dispatch a task(async) by calling skyvern APIs and wait until the task is finished
> dispatch task will return immediately and the task will be running in the background. You can use `get_task` tool to poll the task information until the task is finished.
no need to run `skyvern init` command in your terminal to set up skyvern before using this integration.
```python
import asyncio
from dotenv import load_dotenv
from llama_index.agent.openai import OpenAIAgent
from llama_index.llms.openai import OpenAI
from llama_index.core.tools import FunctionTool
from skyvern_llamaindex.client import SkyvernTool
# load OpenAI API key from .env
load_dotenv()
async def sleep(seconds: int) -> str:
await asyncio.sleep(seconds)
return f"Slept for {seconds} seconds"
sleep_tool = FunctionTool.from_defaults(
async_fn=sleep,
description="Sleep for a given number of seconds",
name="sleep",
)
skyvern_tool = SkyvernTool(api_key="<your_organization_api_key>")
# or you can load the api_key from SKYVERN_API_KEY in .env
# skyvern_tool = SkyvernTool()
agent = OpenAIAgent.from_tools(
tools=[skyvern_tool.dispatch_task(), skyvern_tool.get_task(), sleep_tool],
llm=OpenAI(model="gpt-4o"),
verbose=True,
max_function_calls=10,
)
response = agent.chat("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s.")
print(response)
```

5404
integrations/llama_index/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,20 @@
[tool.poetry]
name = "skyvern-llamaindex"
version = "0.0.4"
description = "Skyvern integration for LlamaIndex"
authors = ["lawyzheng <lawy@skyvern.com>"]
packages = [{ include = "skyvern_llamaindex" }]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11,<3.12"
skyvern = "^0.1.56"
llama-index = "^0.12.19"
[tool.poetry.group.dev.dependencies]
twine = "^6.1.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -0,0 +1,133 @@
from typing import List, Literal, Optional
from llama_index.core.tools import FunctionTool
from llama_index.core.tools.tool_spec.base import SPEC_FUNCTION_TYPE, BaseToolSpec
from skyvern_llamaindex.settings import settings
from skyvern.agent import Agent
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.schemas.observers import ObserverTask, ObserverTaskRequest
from skyvern.forge.sdk.schemas.task_generations import TaskGenerationBase
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskRequest, TaskResponse
default_agent = Agent()
class SkyvernTool:
def __init__(self, agent: Optional[Agent] = None):
if agent is None:
agent = default_agent
self.agent = agent
def run_task(self) -> FunctionTool:
task_tool_spec = SkyvernTaskToolSpec(agent=self.agent)
return task_tool_spec.to_tool_list(["run_task"])[0]
def dispatch_task(self) -> FunctionTool:
task_tool_spec = SkyvernTaskToolSpec(agent=self.agent)
return task_tool_spec.to_tool_list(["dispatch_task"])[0]
def get_task(self) -> FunctionTool:
task_tool_spec = SkyvernTaskToolSpec(agent=self.agent)
return task_tool_spec.to_tool_list(["get_task"])[0]
class SkyvernTaskToolSpec(BaseToolSpec):
spec_functions: List[SPEC_FUNCTION_TYPE] = [
"run_task",
"dispatch_task",
"get_task",
]
def __init__(
self,
*,
agent: Optional[Agent] = None,
engine: Literal["TaskV1", "TaskV2"] = settings.engine,
run_task_timeout_seconds: int = settings.run_task_timeout_seconds,
) -> None:
if agent is None:
agent = Agent()
self.agent = agent
self.engine = engine
self.run_task_timeout_seconds = run_task_timeout_seconds
# TODO: agent haven't exposed the task v1 generate function, we can migrate to use agent interface when it's available
async def _generate_v1_task_request(self, user_prompt: str) -> TaskGenerationBase:
llm_prompt = prompt_engine.load_prompt("generate-task", user_prompt=user_prompt)
llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, prompt_name="generate-task")
return TaskGenerationBase.model_validate(llm_response)
async def run_task(self, user_prompt: str, url: Optional[str] = None) -> TaskResponse | ObserverTask:
"""
Use Skyvern agent to run a task. This function won't return until the task is finished.
Args:
user_prompt[str]: The user's prompt describing the task.
url (Optional[str]): The URL of the target website for the task.
"""
if self.engine == "TaskV1":
return await self.run_task_v1(user_prompt=user_prompt, url=url)
else:
return await self.run_task_v2(user_prompt=user_prompt, url=url)
async def dispatch_task(self, user_prompt: str, url: Optional[str] = None) -> CreateTaskResponse | ObserverTask:
"""
Use Skyvern agent to dispatch a task. This function will return immediately and the task will be running in the background.
Args:
user_prompt[str]: The user's prompt describing the task.
url (Optional[str]): The URL of the target website for the task.
"""
if self.engine == "TaskV1":
return await self.dispatch_task_v1(user_prompt=user_prompt, url=url)
else:
return await self.dispatch_task_v2(user_prompt=user_prompt, url=url)
async def get_task(self, task_id: str) -> TaskResponse | ObserverTask | None:
"""
Use Skyvern agent to get a task.
Args:
task_id[str]: The id of the task.
"""
if self.engine == "TaskV1":
return await self.get_task_v1(task_id)
else:
return await self.get_task_v2(task_id)
async def run_task_v1(self, user_prompt: str, url: Optional[str] = None) -> TaskResponse:
task_generation = await self._generate_v1_task_request(user_prompt=user_prompt)
task_request = TaskRequest.model_validate(task_generation, from_attributes=True)
if url is not None:
task_request.url = url
return await self.agent.run_task(task_request=task_request, timeout_seconds=self.run_task_timeout_seconds)
async def dispatch_task_v1(self, user_prompt: str, url: Optional[str] = None) -> CreateTaskResponse:
task_generation = await self._generate_v1_task_request(user_prompt=user_prompt)
task_request = TaskRequest.model_validate(task_generation, from_attributes=True)
if url is not None:
task_request.url = url
return await self.agent.create_task(task_request=task_request)
async def get_task_v1(self, task_id: str) -> TaskResponse | None:
return await self.agent.get_task(task_id=task_id)
async def run_task_v2(self, user_prompt: str, url: Optional[str] = None) -> ObserverTask:
task_request = ObserverTaskRequest(user_prompt=user_prompt, url=url)
return await self.agent.run_observer_task_v_2(
task_request=task_request, timeout_seconds=self.run_task_timeout_seconds
)
async def dispatch_task_v2(self, user_prompt: str, url: Optional[str] = None) -> ObserverTask:
task_request = ObserverTaskRequest(user_prompt=user_prompt, url=url)
return await self.agent.observer_task_v_2(task_request=task_request)
async def get_task_v2(self, task_id: str) -> ObserverTask | None:
return await self.agent.get_observer_task_v_2(task_id=task_id)

View File

@@ -0,0 +1,173 @@
from typing import Any, Dict, List, Literal, Optional
from httpx import AsyncClient
from llama_index.core.tools import FunctionTool
from llama_index.core.tools.tool_spec.base import SPEC_FUNCTION_TYPE, BaseToolSpec
from pydantic import BaseModel
from skyvern_llamaindex.settings import settings
from skyvern.client import AsyncSkyvern
from skyvern.forge.sdk.schemas.observers import ObserverTaskRequest
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskRequest, TaskResponse
class SkyvernTool(BaseModel):
api_key: str = settings.api_key
base_url: str = settings.base_url
def run_task(self) -> FunctionTool:
task_tool_spec = SkyvernTaskToolSpec(
api_key=self.api_key,
base_url=self.base_url,
)
return task_tool_spec.to_tool_list(["run_task"])[0]
def dispatch_task(self) -> FunctionTool:
task_tool_spec = SkyvernTaskToolSpec(
api_key=self.api_key,
base_url=self.base_url,
)
return task_tool_spec.to_tool_list(["dispatch_task"])[0]
def get_task(self) -> FunctionTool:
task_tool_spec = SkyvernTaskToolSpec(
api_key=self.api_key,
base_url=self.base_url,
)
return task_tool_spec.to_tool_list(["get_task"])[0]
class SkyvernTaskToolSpec(BaseToolSpec):
spec_functions: List[SPEC_FUNCTION_TYPE] = [
"run_task",
"dispatch_task",
"get_task",
]
def __init__(
self,
*,
api_key: str = settings.api_key,
base_url: str = settings.base_url,
engine: Literal["TaskV1", "TaskV2"] = settings.engine,
run_task_timeout_seconds: int = settings.run_task_timeout_seconds,
):
httpx_client = AsyncClient(
headers={
"Content-Type": "application/json",
"x-api-key": api_key,
},
)
self.engine = engine
self.run_task_timeout_seconds = run_task_timeout_seconds
self.client = AsyncSkyvern(base_url=base_url, httpx_client=httpx_client)
async def run_task(self, user_prompt: str, url: Optional[str] = None) -> TaskResponse | Dict[str, Any | None]:
"""
Use Skyvern client to run a task. This function won't return until the task is finished.
Args:
user_prompt[str]: The user's prompt describing the task.
url (Optional[str]): The URL of the target website for the task.
"""
if self.engine == "TaskV1":
return await self.run_task_v1(user_prompt=user_prompt, url=url)
else:
return await self.run_task_v2(user_prompt=user_prompt, url=url)
async def dispatch_task(
self, user_prompt: str, url: Optional[str] = None
) -> CreateTaskResponse | Dict[str, Any | None]:
"""
Use Skyvern client to dispatch a task. This function will return immediately and the task will be running in the background.
Args:
user_prompt[str]: The user's prompt describing the task.
url (Optional[str]): The URL of the target website for the task.
"""
if self.engine == "TaskV1":
return await self.dispatch_task_v1(user_prompt=user_prompt, url=url)
else:
return await self.dispatch_task_v2(user_prompt=user_prompt, url=url)
async def get_task(self, task_id: str) -> TaskResponse | Dict[str, Any | None]:
"""
Use Skyvern client to get a task.
Args:
task_id[str]: The id of the task.
"""
if self.engine == "TaskV1":
return await self.get_task_v1(task_id)
else:
return await self.get_task_v2(task_id)
async def run_task_v1(self, user_prompt: str, url: Optional[str] = None) -> TaskResponse:
task_generation = await self.client.agent.generate_task(
prompt=user_prompt,
)
task_request = TaskRequest.model_validate(task_generation, from_attributes=True)
if url is not None:
task_request.url = url
return await self.client.agent.run_task(
timeout_seconds=self.run_task_timeout_seconds,
url=task_request.url,
title=task_request.title,
navigation_goal=task_request.navigation_goal,
data_extraction_goal=task_request.data_extraction_goal,
navigation_payload=task_request.navigation_goal,
error_code_mapping=task_request.error_code_mapping,
extracted_information_schema=task_request.extracted_information_schema,
complete_criterion=task_request.complete_criterion,
terminate_criterion=task_request.terminate_criterion,
)
async def dispatch_task_v1(self, user_prompt: str, url: Optional[str] = None) -> CreateTaskResponse:
task_generation = await self.client.agent.generate_task(
prompt=user_prompt,
)
task_request = TaskRequest.model_validate(task_generation, from_attributes=True)
if url is not None:
task_request.url = url
return await self.client.agent.create_task(
url=task_request.url,
title=task_request.title,
navigation_goal=task_request.navigation_goal,
data_extraction_goal=task_request.data_extraction_goal,
navigation_payload=task_request.navigation_goal,
error_code_mapping=task_request.error_code_mapping,
extracted_information_schema=task_request.extracted_information_schema,
complete_criterion=task_request.complete_criterion,
terminate_criterion=task_request.terminate_criterion,
)
async def get_task_v1(self, task_id: str) -> TaskResponse:
return await self.client.agent.get_task(task_id=task_id)
async def run_task_v2(self, user_prompt: str, url: Optional[str] = None) -> Dict[str, Any | None]:
task_request = ObserverTaskRequest(url=url, user_prompt=user_prompt)
return await self.client.agent.run_observer_task_v_2(
timeout_seconds=self.run_task_timeout_seconds,
user_prompt=task_request.user_prompt,
url=task_request.url,
browser_session_id=task_request.browser_session_id,
)
async def dispatch_task_v2(self, user_prompt: str, url: Optional[str] = None) -> Dict[str, Any | None]:
task_request = ObserverTaskRequest(url=url, user_prompt=user_prompt)
return await self.client.agent.observer_task_v_2(
user_prompt=task_request.user_prompt,
url=task_request.url,
browser_session_id=task_request.browser_session_id,
)
async def get_task_v2(self, task_id: str) -> Dict[str, Any | None]:
return await self.client.agent.get_observer_task_v_2(task_id=task_id)

View File

@@ -0,0 +1,18 @@
from typing import Literal
from dotenv import load_dotenv
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
api_key: str = ""
base_url: str = "https://api.skyvern.com"
engine: Literal["TaskV1", "TaskV2"] = "TaskV2"
run_task_timeout_seconds: int = 60 * 60
class Config:
env_prefix = "SKYVERN_"
load_dotenv()
settings = Settings()