From b3b548ed38d403da1ed902232fc81a9ab2e40883 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Mon, 3 Mar 2025 10:38:00 +0800 Subject: [PATCH] update langchain integration (#1815) --- integrations/langchain/README.md | 261 ++++++++++++------ integrations/langchain/pyproject.toml | 2 +- .../langchain/skyvern_langchain/agent.py | 117 +++++--- .../langchain/skyvern_langchain/client.py | 168 ++++++----- .../langchain/skyvern_langchain/schema.py | 14 +- .../langchain/skyvern_langchain/settings.py | 18 ++ 6 files changed, 369 insertions(+), 211 deletions(-) create mode 100644 integrations/langchain/skyvern_langchain/settings.py diff --git a/integrations/langchain/README.md b/integrations/langchain/README.md index 7feae148..6570f7c8 100644 --- a/integrations/langchain/README.md +++ b/integrations/langchain/README.md @@ -4,11 +4,16 @@ - [Skyvern Langchain](#skyvern-langchain) - [Installation](#installation) - - [Usage](#usage) - - [Run a task(sync) with skyvern agent (calling skyvern agent function directly in the tool)](#run-a-tasksync-with-skyvern-agent-calling-skyvern-agent-function-directly-in-the-tool) - - [Run a task(async) with skyvern agent (calling skyvern agent function directly in the tool)](#run-a-taskasync-with-skyvern-agent-calling-skyvern-agent-function-directly-in-the-tool) - - [Run a task(sync) with skyvern client (calling skyvern OpenAPI in the tool)](#run-a-tasksync-with-skyvern-client-calling-skyvern-openapi-in-the-tool) - - [Run a task(async) with skyvern client (calling skyvern OpenAPI in the tool)](#run-a-taskasync-with-skyvern-client-calling-skyvern-openapi-in-the-tool) + - [Basic Usage](#basic-usage) + - [Run a task(sync) locally in your local environment](#run-a-tasksync-locally-in-your-local-environment) + - [Run a task(async) locally in your local environment](#run-a-taskasync-locally-in-your-local-environment) + - [Get a task locally in your local environment](#get-a-task-locally-in-your-local-environment) + - [Run a task(sync) by calling skyvern APIs](#run-a-tasksync-by-calling-skyvern-apis) + - [Run a task(async) by calling skyvern APIs](#run-a-taskasync-by-calling-skyvern-apis) + - [Get a task by calling skyvern APIs](#get-a-task-by-calling-skyvern-apis) + - [Agent Usage](#agent-usage) + - [Run a task(async) locally in your local environment and wait until the task is finished](#run-a-taskasync-locally-in-your-local-environment-and-wait-until-the-task-is-finished) + - [Run a task(async) by calling skyvern APIs and wait until the task is finished](#run-a-taskasync-by-calling-skyvern-apis-and-wait-until-the-task-is-finished) @@ -22,9 +27,20 @@ This is a langchain integration for Skyvern. pip install skyvern-langchain ``` -## Usage +To run the example scenarios, you might need to install other langchain dependencies. +```bash +pip install langchain-openai +pip install langchain-community +``` -### Run a task(sync) with skyvern agent (calling skyvern agent function directly in the tool) +## Basic Usage + +This is the only basic usage of skyvern langchain tool. If you want a full langchain integration experience, please refer to the [Agent Usage](#agent-usage) section to play with langchain agent. + +Go to [Langchain Tools](https://python.langchain.com/v0.1/docs/modules/tools/) to see more advanced langchain tool usage. + + +### Run a task(sync) locally in your local environment > sync task won't return until the task is finished. :warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key ` command in your terminal to set up skyvern first. @@ -32,34 +48,149 @@ pip install skyvern-langchain ```python import asyncio -from dotenv import load_dotenv -from langchain_openai import ChatOpenAI -from langchain.agents import initialize_agent, AgentType -from skyvern_langchain.agent import run_task_v2 - -load_dotenv() - -llm = ChatOpenAI(model="gpt-4o", temperature=0) - -agent = initialize_agent( - llm=llm, - tools=[run_task_v2], - verbose=True, - agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, -) +from skyvern_langchain.agent import RunTask +run_task = RunTask() async def main(): # to run skyvern agent locally, must run `skyvern init` first - print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'")) + print(await run_task.ainvoke("Navigate to the Hacker News homepage and get the top 3 posts.")) if __name__ == "__main__": asyncio.run(main()) ``` -### Run a task(async) with skyvern agent (calling skyvern agent function directly in the tool) -> async task will return immediately and the task will be running in the background. You can use `get_task_v2` tool to poll the task information until the task is finished. +### Run a task(async) locally in your local environment +> async task will return immediately and the task will be running in the background. + +:warning: :warning: if you want to run the task in the background, you need to keep the script running until the task is finished, otherwise the task will be killed when the script is finished. + +:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key ` command in your terminal to set up skyvern first. + +```python +import asyncio +from skyvern_langchain.agent import DispatchTask + +dispatch_task = DispatchTask() + +async def main(): + # to run skyvern agent locally, must run `skyvern init` first + print(await dispatch_task.ainvoke("Navigate to the Hacker News homepage and get the top 3 posts.")) + + # keep the script running until the task is finished + await asyncio.sleep(600) + + +if __name__ == "__main__": + asyncio.run(main()) + +``` + +### Get a task locally in your local environment + +:warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key ` command in your terminal to set up skyvern first. + +```python +import asyncio +from skyvern_langchain.agent import GetTask + +get_task = GetTask() + +async def main(): + # to run skyvern agent locally, must run `skyvern init` first + print(await get_task.ainvoke("")) + + +if __name__ == "__main__": + asyncio.run(main()) + +``` + +### Run a task(sync) by calling skyvern APIs +> sync task won't return until the task is finished. + +no need to run `skyvern init` command in your terminal to set up skyvern before using this integration. + +```python +import asyncio +from skyvern_langchain.client import RunTask + +run_task = RunTask( + api_key="", +) +# or you can load the api_key from SKYVERN_API_KEY in .env +# run_task = RunTask() + +async def main(): + print(await run_task.ainvoke("Navigate to the Hacker News homepage and get the top 3 posts.")) + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### Run a task(async) by calling skyvern APIs +> async task will return immediately and the task will be running in the background. + +no need to run `skyvern init` command in your terminal to set up skyvern before using this integration. + +the task is actually running in the skyvern cloud service, so you don't need to keep your script running until the task is finished. + +```python +import asyncio +from skyvern_langchain.client import DispatchTask + +dispatch_task = DispatchTask( + api_key="", +) +# or you can load the api_key from SKYVERN_API_KEY in .env +# dispatch_task = DispatchTask() + +async def main(): + print(await dispatch_task.ainvoke("Navigate to the Hacker News homepage and get the top 3 posts.")) + + +if __name__ == "__main__": + asyncio.run(main()) +``` + + +### Get a task by calling skyvern APIs +> async task will return immediately and the task will be running in the background. + +no need to run `skyvern init` command in your terminal to set up skyvern before using this integration. + +the task is actually running in the skyvern cloud service, so you don't need to keep your script running until the task is finished. + +```python +import asyncio +from skyvern_langchain.client import GetTask + +get_task = GetTask( + api_key="", +) +# or you can load the api_key from SKYVERN_API_KEY in .env +# get_task = GetTask() + +async def main(): + print(await get_task.ainvoke("")) + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## Agent Usage + +Langchain is more powerful when used with [Langchain Agents](https://python.langchain.com/v0.1/docs/modules/agents/). + +The following two examples show how to build an agent that executes a specified task, waits for its completion, and then returns the results. For example, the agent is tasked with navigating to the Hacker News homepage and retrieving the top three posts. + + +### Run a task(async) locally in your local environment and wait until the task is finished + +> async task will return immediately and the task will be running in the background. You can use `GetTask` tool to poll the task information until the task is finished. :warning: :warning: if you want to run this code block, you need to run `skyvern init --openai-api-key ` command in your terminal to set up skyvern first. @@ -68,19 +199,23 @@ import asyncio from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain.agents import initialize_agent, AgentType -from skyvern_langchain.agent import queue_task_v2, get_task_v2 +from skyvern_langchain.agent import DispatchTask, GetTask from langchain_community.tools.sleep.tool import SleepTool +# load OpenAI API key from .env load_dotenv() llm = ChatOpenAI(model="gpt-4o", temperature=0) +dispatch_task = DispatchTask() +get_task = GetTask() + agent = initialize_agent( llm=llm, tools=[ - queue_task_v2, - get_task_v2, + dispatch_task, + get_task, SleepTool(), ], verbose=True, @@ -89,8 +224,8 @@ agent = initialize_agent( async def main(): - # use sleep tool to set up the polling logic until the task is completed, if you only want to queue a task, you can remove the sleep tool - print(await agent.ainvoke("Queue a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s.")) + # use sleep tool to set up the polling logic until the task is completed, if you only want to dispatch a task, you can remove the sleep tool + print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s.")) if __name__ == "__main__": @@ -98,8 +233,9 @@ if __name__ == "__main__": ``` -### Run a task(sync) with skyvern client (calling skyvern OpenAPI in the tool) -> sync task won't return until the task is finished. +### Run a task(async) by calling skyvern APIs and wait until the task is finished + +> async task will return immediately and the task will be running in the background. You can use `GetTask` tool to poll the task information until the task is finished. no need to run `skyvern init` command in your terminal to set up skyvern before using this integration. @@ -108,65 +244,32 @@ import asyncio from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain.agents import initialize_agent, AgentType -from skyvern_langchain.client import RunSkyvernClientTaskV2Tool - -load_dotenv() - -llm = ChatOpenAI(model="gpt-4o", temperature=0) - -run_task_v2 = RunSkyvernClientTaskV2Tool( - credential="", -) - -agent = initialize_agent( - llm=llm, - tools=[run_task_v2], - verbose=True, - agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, -) - -async def main(): - print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.'")) - - -if __name__ == "__main__": - asyncio.run(main()) -``` - -### Run a task(async) with skyvern client (calling skyvern OpenAPI in the tool) -> async task will return immediately and the task will be running in the background. You can use `GetSkyvernClientTaskV2Tool` tool to poll the task information until the task is finished. - -no need to run `skyvern init` command in your terminal to set up skyvern before using this integration. - -```python -import asyncio -from dotenv import load_dotenv -from langchain_openai import ChatOpenAI -from langchain.agents import initialize_agent, AgentType -from skyvern_langchain.client import ( - QueueSkyvernClientTaskV2Tool, - GetSkyvernClientTaskV2Tool, -) +from skyvern_langchain.client import DispatchTask, GetTask from langchain_community.tools.sleep.tool import SleepTool +# load OpenAI API key from .env load_dotenv() llm = ChatOpenAI(model="gpt-4o", temperature=0) -queue_task_v2 = QueueSkyvernClientTaskV2Tool( - credential="", +dispatch_task = DispatchTask( + api_key="", ) +# or you can load the api_key from SKYVERN_API_KEY in .env +# dispatch_task = DispatchTask() -get_task_v2 = GetSkyvernClientTaskV2Tool( - credential="", +get_task = GetTask( + api_key="", ) +# or you can load the api_key from SKYVERN_API_KEY in .env +# get_task = GetTask() agent = initialize_agent( llm=llm, tools=[ - queue_task_v2, - get_task_v2, + dispatch_task, + get_task, SleepTool(), ], verbose=True, @@ -175,8 +278,8 @@ agent = initialize_agent( async def main(): - # use sleep tool to set up the polling logic until the task is completed, if you only want to queue a task, you can remove the sleep tool - print(await agent.ainvoke("Queue a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s.")) + # use sleep tool to set up the polling logic until the task is completed, if you only want to dispatch a task, you can remove the sleep tool + print(await agent.ainvoke("Run a task with Skyvern. The task is about 'Navigate to the Hacker News homepage and get the top 3 posts.' Then, get this task information until it's completed. The task information re-get interval should be 60s.")) if __name__ == "__main__": diff --git a/integrations/langchain/pyproject.toml b/integrations/langchain/pyproject.toml index 1f56836f..d529db83 100644 --- a/integrations/langchain/pyproject.toml +++ b/integrations/langchain/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "skyvern-langchain" -version = "0.1.1" +version = "0.1.5" description = "" authors = ["lawyzheng "] packages = [{ include = "skyvern_langchain" }] diff --git a/integrations/langchain/skyvern_langchain/agent.py b/integrations/langchain/skyvern_langchain/agent.py index f4c8d327..15e26ddc 100644 --- a/integrations/langchain/skyvern_langchain/agent.py +++ b/integrations/langchain/skyvern_langchain/agent.py @@ -1,48 +1,97 @@ -from typing import Any, Dict +from typing import Any, Literal, Type -from langchain.tools import tool -from skyvern_langchain.schema import GetTaskInput, TaskV1Request, TaskV2Request +from langchain.tools import BaseTool +from litellm import BaseModel +from pydantic import Field +from skyvern_langchain.schema import CreateTaskInput, GetTaskInput +from skyvern_langchain.settings import settings from skyvern.agent import Agent -from skyvern.forge.sdk.schemas.observers import ObserverTask -from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskResponse +from skyvern.forge import app +from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.schemas.observers import ObserverTask, ObserverTaskRequest +from skyvern.forge.sdk.schemas.task_generations import TaskGenerationBase +from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskRequest, TaskResponse + +agent = Agent() -@tool("run-skyvern-agent-task-v1", args_schema=TaskV1Request) -async def run_task_v1(**kwargs: Dict[str, Any]) -> TaskResponse: - """Use Skyvern agent to run a v1 task. It is usually used for the simple tasks. This function won't return until the task is finished.""" - task_request = TaskV1Request(**kwargs) - return await Agent().run_task(task_request=task_request, timeout_seconds=task_request.timeout_seconds) +class SkyvernTaskBaseTool(BaseTool): + engine: Literal["TaskV1", "TaskV2"] = Field(default=settings.engine) + timeout_seconds: int = Field(default=settings.run_task_timeout) + agent: Agent = agent + + def _run(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("skyvern task tool does not support sync") + + # TODO: agent haven't exposed the task v1 generate function, we can migrate to use agent interface when it's available + async def _generate_v1_task_request(self, user_prompt: str) -> TaskGenerationBase: + llm_prompt = prompt_engine.load_prompt("generate-task", user_prompt=user_prompt) + llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, prompt_name="generate-task") + return TaskGenerationBase.model_validate(llm_response) -@tool("queue-skyvern-agent-task-v1", args_schema=TaskV1Request) -async def queue_task_v1(**kwargs: Dict[str, Any]) -> CreateTaskResponse: - """Use Skyvern agent to queue a v1 task. It is usually used for the simple tasks. This function will return immediately and the task will be running in the background.""" - task_request = TaskV1Request(**kwargs) - return await Agent().create_task(task_request=task_request) +class RunTask(SkyvernTaskBaseTool): + name: str = "run-skyvern-agent-task" + description: str = """Use Skyvern agent to run a task. This function won't return until the task is finished.""" + args_schema: Type[BaseModel] = CreateTaskInput + + async def _arun(self, user_prompt: str, url: str | None = None) -> TaskResponse | ObserverTask: + if self.engine == "TaskV1": + return await self._arun_task_v1(user_prompt=user_prompt, url=url) + else: + return await self._arun_task_v2(user_prompt=user_prompt, url=url) + + async def _arun_task_v1(self, user_prompt: str, url: str | None = None) -> TaskResponse: + task_generation = await self._generate_v1_task_request(user_prompt=user_prompt) + task_request = TaskRequest.model_validate(task_generation, from_attributes=True) + if url is not None: + task_request.url = url + + return await self.agent.run_task(task_request=task_request, timeout_seconds=self.timeout_seconds) + + async def _arun_task_v2(self, user_prompt: str, url: str | None = None) -> ObserverTask: + task_request = ObserverTaskRequest(user_prompt=user_prompt, url=url) + return await self.agent.run_observer_task_v_2(task_request=task_request, timeout_seconds=self.timeout_seconds) -@tool("get-skyvern-agent-task-v1", args_schema=GetTaskInput) -async def get_task_v1(task_id: str) -> TaskResponse | None: - """Use Skyvern agent to get a v1 task. v1 tasks are usually simple tasks.""" - return await Agent().get_task(task_id=task_id) +class DispatchTask(SkyvernTaskBaseTool): + name: str = "dispatch-skyvern-agent-task" + description: str = """Use Skyvern agent to dispatch a task. This function will return immediately and the task will be running in the background.""" + args_schema: Type[BaseModel] = CreateTaskInput + + async def _arun(self, user_prompt: str, url: str | None = None) -> CreateTaskResponse | ObserverTask: + if self.engine == "TaskV1": + return await self._arun_task_v1(user_prompt=user_prompt, url=url) + else: + return await self._arun_task_v2(user_prompt=user_prompt, url=url) + + async def _arun_task_v1(self, user_prompt: str, url: str | None = None) -> CreateTaskResponse: + task_generation = await self._generate_v1_task_request(user_prompt=user_prompt) + task_request = TaskRequest.model_validate(task_generation, from_attributes=True) + if url is not None: + task_request.url = url + + return await self.agent.create_task(task_request=task_request) + + async def _arun_task_v2(self, user_prompt: str, url: str | None = None) -> ObserverTask: + task_request = ObserverTaskRequest(user_prompt=user_prompt, url=url) + return await self.agent.observer_task_v_2(task_request=task_request) -@tool("run-skyvern-agent-task-v2", args_schema=TaskV2Request) -async def run_task_v2(**kwargs: Dict[str, Any]) -> ObserverTask: - """Use Skyvern agent to run a v2 task. It is usually used for the complicated tasks. This function won't return until the task is finished.""" - task_request = TaskV2Request(**kwargs) - return await Agent().run_observer_task_v_2(task_request=task_request, timeout_seconds=task_request.timeout_seconds) +class GetTask(SkyvernTaskBaseTool): + name: str = "get-skyvern-agent-task" + description: str = """Use Skyvern agent to get a task.""" + args_schema: Type[BaseModel] = GetTaskInput + async def _arun(self, task_id: str) -> TaskResponse | ObserverTask | None: + if self.engine == "TaskV1": + return await self._arun_task_v1(task_id=task_id) + else: + return await self._arun_task_v2(task_id=task_id) -@tool("queue-skyvern-agent-task-v2", args_schema=TaskV2Request) -async def queue_task_v2(**kwargs: Dict[str, Any]) -> ObserverTask: - """Use Skyvern agent to queue a v2 task. It is usually used for the complicated tasks. This function will return immediately and the task will be running in the background.""" - task_request = TaskV2Request(**kwargs) - return await Agent().observer_task_v_2(task_request=task_request) + async def _arun_task_v1(self, task_id: str) -> TaskResponse | None: + return await self.agent.get_task(task_id=task_id) - -@tool("get-skyvern-agent-task-v2", args_schema=GetTaskInput) -async def get_task_v2(task_id: str) -> ObserverTask | None: - """Use Skyvern agent to get a v2 task. v2 tasks are usually complicated tasks.""" - return await Agent().get_observer_task_v_2(task_id=task_id) + async def _arun_task_v2(self, task_id: str) -> ObserverTask | None: + return await self.agent.get_observer_task_v_2(task_id=task_id) diff --git a/integrations/langchain/skyvern_langchain/client.py b/integrations/langchain/skyvern_langchain/client.py index 981a889c..303d71c2 100644 --- a/integrations/langchain/skyvern_langchain/client.py +++ b/integrations/langchain/skyvern_langchain/client.py @@ -1,136 +1,132 @@ -from typing import Any, Dict, Type +from typing import Any, Dict, Literal, Type from httpx import AsyncClient from langchain.tools import BaseTool -from pydantic import BaseModel -from skyvern_langchain.schema import GetTaskInput, TaskV1Request, TaskV2Request +from pydantic import BaseModel, Field +from skyvern_langchain.schema import CreateTaskInput, GetTaskInput +from skyvern_langchain.settings import settings from skyvern.client import AsyncSkyvern -from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskResponse +from skyvern.forge.sdk.schemas.observers import ObserverTaskRequest +from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, TaskRequest, TaskResponse -class SkyvernClientBaseTool(BaseTool): - credential: str = "" - base_url: str = "https://api.skyvern.com" +class SkyvernTaskBaseTool(BaseTool): + api_key: str = Field(default=settings.api_key) + base_url: str = Field(default=settings.base_url) + engine: Literal["TaskV1", "TaskV2"] = Field(default=settings.engine) + run_task_timeout_seconds: int = Field(default=settings.run_task_timeout) def get_client(self) -> AsyncSkyvern: httpx_client = AsyncClient( headers={ "Content-Type": "application/json", - "x-api-key": self.credential, + "x-api-key": self.api_key, }, ) return AsyncSkyvern(base_url=self.base_url, httpx_client=httpx_client) - def _run(self) -> None: - raise NotImplementedError("skyvern client tool does not support sync") + def _run(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("skyvern task tool does not support sync") -class RunSkyvernClientTaskV1Tool(SkyvernClientBaseTool): - name: str = "run-skyvern-client-task-v1" - description: str = """Use Skyvern client to run a v1 task. It is usually used for the simple tasks. This function won't return until the task is finished.""" - args_schema: Type[BaseModel] = TaskV1Request +class RunTask(SkyvernTaskBaseTool): + name: str = "run-skyvern-client-task" + description: str = """Use Skyvern client to run a task. This function won't return until the task is finished.""" + args_schema: Type[BaseModel] = CreateTaskInput - async def _arun(self, **kwargs: Dict[str, Any]) -> TaskResponse: - task_request = TaskV1Request(**kwargs) + async def _arun(self, user_prompt: str, url: str | None = None) -> TaskResponse | Dict[str, Any | None]: + if self.engine == "TaskV1": + return await self._arun_task_v1(user_prompt=user_prompt, url=url) + else: + return await self._arun_task_v2(user_prompt=user_prompt, url=url) + + async def _arun_task_v1(self, user_prompt: str, url: str | None = None) -> TaskResponse: + task_generation = await self.get_client().agent.generate_task( + prompt=user_prompt, + ) + + if url is not None: + task_generation.url = url + + task_request = TaskRequest.model_validate(task_generation, from_attributes=True) return await self.get_client().agent.run_task( - max_steps_override=task_request.max_steps, - timeout_seconds=task_request.timeout_seconds, + timeout_seconds=self.run_task_timeout_seconds, url=task_request.url, title=task_request.title, - webhook_callback_url=task_request.webhook_callback_url, - totp_verification_url=task_request.totp_verification_url, - totp_identifier=task_request.totp_identifier, navigation_goal=task_request.navigation_goal, data_extraction_goal=task_request.data_extraction_goal, navigation_payload=task_request.navigation_goal, error_code_mapping=task_request.error_code_mapping, - proxy_location=task_request.proxy_location, extracted_information_schema=task_request.extracted_information_schema, complete_criterion=task_request.complete_criterion, terminate_criterion=task_request.terminate_criterion, - browser_session_id=task_request.browser_session_id, ) - -class QueueSkyvernClientTaskV1Tool(SkyvernClientBaseTool): - name: str = "queue-skyvern-client-task-v1" - description: str = """Use Skyvern client to queue a v1 task. It is usually used for the simple tasks. This function will return immediately and the task will be running in the background.""" - args_schema: Type[BaseModel] = TaskV1Request - - async def _arun(self, **kwargs: Dict[str, Any]) -> CreateTaskResponse: - task_request = TaskV1Request(**kwargs) - return await self.get_client().agent.create_task( - max_steps_override=task_request.max_steps, - url=task_request.url, - title=task_request.title, - webhook_callback_url=task_request.webhook_callback_url, - totp_verification_url=task_request.totp_verification_url, - totp_identifier=task_request.totp_identifier, - navigation_goal=task_request.navigation_goal, - data_extraction_goal=task_request.data_extraction_goal, - navigation_payload=task_request.navigation_goal, - error_code_mapping=task_request.error_code_mapping, - proxy_location=task_request.proxy_location, - extracted_information_schema=task_request.extracted_information_schema, - complete_criterion=task_request.complete_criterion, - terminate_criterion=task_request.terminate_criterion, - browser_session_id=task_request.browser_session_id, - ) - - -class GetSkyvernClientTaskV1Tool(SkyvernClientBaseTool): - name: str = "get-skyvern-client-task-v1" - description: str = """Use Skyvern client to get a v1 task. v1 tasks are usually simple tasks.""" - args_schema: Type[BaseModel] = GetTaskInput - - async def _arun(self, task_id: str) -> TaskResponse: - return await self.get_client().agent.get_task(task_id=task_id) - - -class RunSkyvernClientTaskV2Tool(SkyvernClientBaseTool): - name: str = "run-skyvern-client-task-v2" - description: str = """Use Skyvern client to run a v2 task. It is usually used for the complicated tasks. This function won't return until the task is finished.""" - args_schema: Type[BaseModel] = TaskV2Request - - async def _arun(self, **kwargs: Dict[str, Any]) -> Dict[str, Any | None]: - task_request = TaskV2Request(**kwargs) + async def _arun_task_v2(self, user_prompt: str, url: str | None = None) -> TaskResponse: + task_request = ObserverTaskRequest(url=url, user_prompt=user_prompt) return await self.get_client().agent.run_observer_task_v_2( - max_iterations_override=task_request.max_iterations, - timeout_seconds=task_request.timeout_seconds, + timeout_seconds=self.run_task_timeout_seconds, user_prompt=task_request.user_prompt, url=task_request.url, browser_session_id=task_request.browser_session_id, - webhook_callback_url=task_request.webhook_callback_url, - totp_verification_url=task_request.totp_verification_url, - totp_identifier=task_request.totp_identifier, - proxy_location=task_request.proxy_location, ) -class QueueSkyvernClientTaskV2Tool(SkyvernClientBaseTool): - name: str = "queue-skyvern-client-task-v2" - description: str = """Use Skyvern client to queue a v2 task. It is usually used for the complicated tasks. This function will return immediately and the task will be running in the background.""" - args_schema: Type[BaseModel] = TaskV2Request +class DispatchTask(SkyvernTaskBaseTool): + name: str = "dispatch-skyvern-client-task" + description: str = """Use Skyvern client to dispatch a task. This function will return immediately and the task will be running in the background.""" + args_schema: Type[BaseModel] = CreateTaskInput - async def _arun(self, **kwargs: Dict[str, Any]) -> Dict[str, Any | None]: - task_request = TaskV2Request(**kwargs) + async def _arun(self, user_prompt: str, url: str | None = None) -> CreateTaskResponse | Dict[str, Any | None]: + if self.engine == "TaskV1": + return await self._arun_task_v1(user_prompt=user_prompt, url=url) + else: + return await self._arun_task_v2(user_prompt=user_prompt, url=url) + + async def _arun_task_v1(self, user_prompt: str, url: str | None = None) -> CreateTaskResponse: + task_generation = await self.get_client().agent.generate_task( + prompt=user_prompt, + ) + + if url is not None: + task_generation.url = url + + task_request = TaskRequest.model_validate(task_generation, from_attributes=True) + return await self.get_client().agent.create_task( + url=task_request.url, + title=task_request.title, + navigation_goal=task_request.navigation_goal, + data_extraction_goal=task_request.data_extraction_goal, + navigation_payload=task_request.navigation_goal, + error_code_mapping=task_request.error_code_mapping, + extracted_information_schema=task_request.extracted_information_schema, + complete_criterion=task_request.complete_criterion, + terminate_criterion=task_request.terminate_criterion, + ) + + async def _arun_task_v2(self, user_prompt: str, url: str | None = None) -> Dict[str, Any | None]: + task_request = ObserverTaskRequest(url=url, user_prompt=user_prompt) return await self.get_client().agent.observer_task_v_2( - max_iterations_override=task_request.max_iterations, user_prompt=task_request.user_prompt, url=task_request.url, browser_session_id=task_request.browser_session_id, - webhook_callback_url=task_request.webhook_callback_url, - totp_verification_url=task_request.totp_verification_url, - totp_identifier=task_request.totp_identifier, - proxy_location=task_request.proxy_location, ) -class GetSkyvernClientTaskV2Tool(SkyvernClientBaseTool): - name: str = "get-skyvern-client-task-v2" - description: str = """Use Skyvern client to get a v2 task. It is usually used for the complicated tasks.""" +class GetTask(SkyvernTaskBaseTool): + name: str = "get-skyvern-client-task" + description: str = """Use Skyvern client to get a task.""" args_schema: Type[BaseModel] = GetTaskInput async def _arun(self, task_id: str) -> Dict[str, Any | None]: + if self.engine == "TaskV1": + return await self._arun_task_v1(task_id=task_id) + else: + return await self._arun_task_v2(task_id=task_id) + + async def _arun_task_v1(self, task_id: str) -> TaskResponse: + return await self.get_client().agent.get_task(task_id=task_id) + + async def _arun_task_v2(self, task_id: str) -> Dict[str, Any | None]: return await self.get_client().agent.get_observer_task_v_2(task_id=task_id) diff --git a/integrations/langchain/skyvern_langchain/schema.py b/integrations/langchain/skyvern_langchain/schema.py index a34f6048..e2e47faf 100644 --- a/integrations/langchain/skyvern_langchain/schema.py +++ b/integrations/langchain/skyvern_langchain/schema.py @@ -1,17 +1,9 @@ from pydantic import BaseModel -from skyvern.forge.sdk.schemas.observers import ObserverTaskRequest -from skyvern.forge.sdk.schemas.tasks import TaskRequest - -class TaskV1Request(TaskRequest): - max_steps: int = 10 - timeout_seconds: int = 60 * 60 - - -class TaskV2Request(ObserverTaskRequest): - max_iterations: int = 10 - timeout_seconds: int = 60 * 60 +class CreateTaskInput(BaseModel): + user_prompt: str + url: str | None = None class GetTaskInput(BaseModel): diff --git a/integrations/langchain/skyvern_langchain/settings.py b/integrations/langchain/skyvern_langchain/settings.py new file mode 100644 index 00000000..6bbd1357 --- /dev/null +++ b/integrations/langchain/skyvern_langchain/settings.py @@ -0,0 +1,18 @@ +from typing import Literal + +from dotenv import load_dotenv +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + api_key: str = "" + base_url: str = "https://api.skyvern.com" + engine: Literal["TaskV1", "TaskV2"] = "TaskV2" + run_task_timeout_seconds: int = 60 * 60 + + class Config: + env_prefix = "SKYVERN_" + + +load_dotenv() +settings = Settings()