From 367473f930ac5a0b01882209b0224ab9b729d4ea Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Wed, 19 Feb 2025 23:21:08 -0800 Subject: [PATCH] push eval script to oss (#1796) --- evaluation/core/__init__.py | 306 ++++++++++++++++++ evaluation/core/utils.py | 58 ++++ .../create_webvoyager_evaluation_result.py | 67 ++++ .../script/create_webvoyager_observer.py | 66 ++++ .../script/create_webvoyager_workflow.py | 75 +++++ evaluation/script/eval_webvoyager_cruise.py | 115 +++++++ .../prompts/skyvern/check-evaluation-goal.j2 | 21 ++ .../forge/prompts/skyvern/evaluate-prompt.j2 | 40 +++ 8 files changed, 748 insertions(+) create mode 100644 evaluation/core/__init__.py create mode 100644 evaluation/core/utils.py create mode 100644 evaluation/script/create_webvoyager_evaluation_result.py create mode 100644 evaluation/script/create_webvoyager_observer.py create mode 100644 evaluation/script/create_webvoyager_workflow.py create mode 100644 evaluation/script/eval_webvoyager_cruise.py create mode 100644 skyvern/forge/prompts/skyvern/check-evaluation-goal.j2 create mode 100644 skyvern/forge/prompts/skyvern/evaluate-prompt.j2 diff --git a/evaluation/core/__init__.py b/evaluation/core/__init__.py new file mode 100644 index 00000000..6d977e56 --- /dev/null +++ b/evaluation/core/__init__.py @@ -0,0 +1,306 @@ +import asyncio +import json +import os +from datetime import datetime +from typing import Any +from urllib.parse import urlparse + +import httpx +import requests +from pydantic import BaseModel + +from skyvern.forge import app +from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.api.files import create_folder_if_not_exist +from skyvern.forge.sdk.schemas.observers import ObserverTask, ObserverTaskRequest +from skyvern.forge.sdk.schemas.tasks import ProxyLocation, TaskRequest, TaskResponse, TaskStatus +from skyvern.forge.sdk.workflow.models.workflow import WorkflowRequestBody, WorkflowRunStatus, WorkflowRunStatusResponse + + +class TaskOutput(BaseModel): + extracted_information: list | dict[str, Any] | str | None + final_screenshot: bytes | None + + +class SkyvernClient: + def __init__(self, base_url: str, credentials: str): + self.base_url = base_url + self.credentials = credentials + + def generate_curl_params(self, request_body: BaseModel, max_steps: int | None = None) -> tuple[dict, dict]: + payload = request_body.model_dump_json() + headers = { + "Content-Type": "application/json", + "x-api-key": self.credentials, + } + if max_steps is not None: + headers["x-max-steps-override"] = str(max_steps) + + return payload, headers + + def create_task(self, task_request_body: TaskRequest, max_steps: int | None = None) -> str: + url = f"{self.base_url}/tasks" + payload, headers = self.generate_curl_params(task_request_body, max_steps=max_steps) + response = requests.post(url, headers=headers, data=payload) + assert "task_id" in response.json(), f"Failed to create task: {response.text}" + return response.json()["task_id"] + + def create_workflow_run( + self, workflow_pid: str, workflow_request_body: WorkflowRequestBody, max_steps: int | None = None + ) -> str: + url = f"{self.base_url}/workflows/{workflow_pid}/run/" + payload, headers = self.generate_curl_params(workflow_request_body, max_steps=max_steps) + response = requests.post(url, headers=headers, data=payload) + assert "workflow_run_id" in response.json(), f"Failed to create workflow run: {response.text}" + return response.json()["workflow_run_id"] + + def create_cruise(self, cruise_request: ObserverTaskRequest, max_steps: int | None = None) -> ObserverTask: + url = f"{self.base_url}/cruise" + payload, headers = self.generate_curl_params(cruise_request, max_steps=max_steps) + response = requests.post(url, headers=headers, data=payload) + assert "observer_cruise_id" in response.json(), f"Failed to create observer cruise: {response.text}" + return ObserverTask.model_validate(response.json()) + + def get_task(self, task_id: str) -> TaskResponse: + """Get a task by id.""" + url = f"{self.base_url}/tasks/{task_id}" + headers = {"x-api-key": self.credentials} + response = requests.get(url, headers=headers) + assert response.status_code == 200, f"Expected to get task response status 200, but got {response.status_code}" + return TaskResponse(**response.json()) + + async def get_workflow_run(self, workflow_pid: str, workflow_run_id: str) -> WorkflowRunStatusResponse: + url = f"{self.base_url}/workflows/{workflow_pid}/runs/{workflow_run_id}" + headers = {"x-api-key": self.credentials} + async with httpx.AsyncClient() as client: + response = await client.get(url, headers=headers) + assert response.status_code == 200, ( + f"Expected to get workflow run response status 200, but got {response.status_code}" + ) + return WorkflowRunStatusResponse(**response.json()) + + +class Evaluator: + def __init__(self, client: SkyvernClient, artifact_folder: str) -> None: + self.client = client + self.artifact_folder = artifact_folder + + async def _wait_for_task_finish(self, task_id: str) -> None: + while True: + task_response = self.client.get_task(task_id) + if task_response.status.is_final(): + return + await asyncio.sleep(20) + + @staticmethod + def _download_screenshot(url: str) -> bytes | None: + if url.startswith("file://"): + file_path = urlparse(url).path + with open(file_path, "rb") as f: + return f.read() + + elif url.startswith("http://") or url.startswith("https://"): + response = requests.get(url) + assert response.status_code == 200, ( + f"Expected screenshot download response is 200, but got {response.status_code}" + ) + return response.content + + return None + + def _get_final_screenshot_from_task(self, task_response: TaskResponse) -> bytes | None: + screenshot_url: str = "" + + if task_response.screenshot_url is not None: + screenshot_url = task_response.screenshot_url + + if ( + not screenshot_url + and task_response.action_screenshot_urls is not None + and len(task_response.action_screenshot_urls) > 0 + ): + screenshot_url = task_response.action_screenshot_urls[0] + + assert screenshot_url, ( + f"{task_response.task_id} Expected final screenshot is not None, but got NONE: {task_response.model_dump_json(indent=2)}" + ) + + if screenshot_url.startswith("file://"): + file_path = urlparse(screenshot_url).path + with open(file_path, "rb") as f: + return f.read() + + elif screenshot_url.startswith("http://") or screenshot_url.startswith("https://"): + response = requests.get(screenshot_url) + assert response.status_code == 200, ( + f"Expected screenshot download response is 200, but got {response.status_code}" + ) + return response.content + + return None + + def _save_artifact(self, file_name: str, data: bytes) -> None: + if not self.artifact_folder: + return + + create_folder_if_not_exist(self.artifact_folder) + file_path = os.path.join(self.artifact_folder, f"{int(datetime.now().timestamp())}-{file_name}") + with open(file_path, "wb") as f: + f.write(data) + + async def _execute_eval( + self, + question: str, + answer: str, + extracted_information: list | dict[str, Any] | str | None, + final_screenshot: bytes, + is_updated: bool = False, + ) -> tuple[bool, str]: + if extracted_information is None: + extracted_information = "" + + if not isinstance(extracted_information, str): + extracted_information = json.dumps(extracted_information) + + prompt = prompt_engine.load_prompt( + "evaluate-prompt", + ques=question, + answer=answer, + is_updated=is_updated, + extracted_information=extracted_information, + ) + self._save_artifact("prompt_evaluate_result.txt", prompt.encode()) + self._save_artifact("screenshot_evaluate_result.png", final_screenshot) + + json_response = await app.LLM_API_HANDLER( + prompt=prompt, screenshots=[final_screenshot], prompt_name="evaluate-prompt" + ) + self._save_artifact("llm_response_evaluate_result.json", json.dumps(json_response, indent=2).encode()) + + verdict = json_response.get("verdict") + return verdict == "SUCCESS", json_response.get("thoughts", "") + + async def generate_task_request(self, user_prompt: str, proxy: ProxyLocation | None = None) -> TaskRequest: + prompt = prompt_engine.load_prompt("generate-task", user_prompt=user_prompt) + self._save_artifact("prompt_generate_task.txt", prompt.encode()) + + json_response = await app.LLM_API_HANDLER(prompt=prompt, prompt_name="generate-task") + self._save_artifact("llm_response_generate_task.json", json.dumps(json_response, indent=2).encode()) + task = TaskRequest( + title=json_response["suggested_title"], + url=json_response["url"], + navigation_goal=json_response["navigation_goal"], + navigation_payload=json_response["navigation_payload"], + data_extraction_goal=json_response["data_extraction_goal"], + proxy_location=proxy, + ) + return task + + def queue_skyvern_task(self, task: TaskRequest, max_step: int | None = None) -> str: + task_id = self.client.create_task(task_request_body=task, max_steps=max_step) + self._save_artifact("task_id.txt", task_id.encode()) + assert task_id + return task_id + + def queue_skyvern_workflow( + self, workflow_pid: str, workflow_request: WorkflowRequestBody, max_step: int | None = None + ) -> str: + workflow_run_id = self.client.create_workflow_run( + workflow_pid=workflow_pid, workflow_request_body=workflow_request, max_steps=max_step + ) + self._save_artifact("workflow_run_id.txt", workflow_run_id.encode()) + assert workflow_run_id + return workflow_run_id + + def queue_skyvern_cruise(self, cruise_request: ObserverTaskRequest, max_step: int | None = None) -> ObserverTask: + cruise = self.client.create_cruise(cruise_request=cruise_request, max_steps=max_step) + self._save_artifact("cruise.json", cruise.model_dump_json(indent=2).encode()) + return cruise + + async def eval_skyvern_task( + self, + task_id: str, + question: str, + answer: str, + ) -> None: + task_response = self.client.get_task(task_id=task_id) + assert task_response.status == TaskStatus.completed, f"{task_id} Expected completed, but {task_response.status}" + final_screenshot = self._get_final_screenshot_from_task(task_response=task_response) + assert final_screenshot is not None, f"{task_id} Expected final screenshot, but got None" + + ok, reasoning = await self._execute_eval( + question=question, + answer=answer, + extracted_information=task_response.extracted_information, + final_screenshot=final_screenshot, + ) + assert ok, f"{task_id} failed due to {reasoning}" + + async def eval_skyvern_workflow_run( + self, + workflow_pid: str, + workflow_run_id: str, + question: str, + answer: str, + is_updated: bool, + ) -> None: + workflow_run_response = await self.client.get_workflow_run( + workflow_pid=workflow_pid, workflow_run_id=workflow_run_id + ) + assert workflow_run_response.status == WorkflowRunStatus.completed, ( + f"Expected {workflow_pid + '/' + workflow_run_id} completed, but {workflow_run_response.status}" + ) + assert workflow_run_response.screenshot_urls and len(workflow_run_response.screenshot_urls) > 0, ( + f"Expected {workflow_pid + '/' + workflow_run_id} with screenshots, but got empty" + ) + final_screenshot = self._download_screenshot(workflow_run_response.screenshot_urls[0]) + assert final_screenshot is not None, ( + f"Expected {workflow_pid + '/' + workflow_run_id} final screenshot, but got None" + ) + + extracted_information: list | dict[str, Any] | str | None = None + if workflow_run_response.observer_cruise is None: + assert workflow_run_response.outputs and len(workflow_run_response.outputs) > 0, ( + f"Expected {workflow_pid + '/' + workflow_run_id} with output, but got empty output" + ) + + label, result = workflow_run_response.outputs.popitem() + if isinstance(result, dict): + extracted_information = result.get("extracted_information") + else: + # FIXME: improve this when the last block is loop block + extracted_information = result + else: + workflow_run_response.observer_cruise.summary + workflow_run_response.observer_cruise.output + summary = f"{('summary:' + workflow_run_response.observer_cruise.summary) if workflow_run_response.observer_cruise.summary else ''}" + output = f"{('output: ' + json.dumps(workflow_run_response.observer_cruise.output)) if workflow_run_response.observer_cruise.output else ''}" + extracted_information = "" + if summary: + extracted_information = summary + + if output: + if extracted_information: + extracted_information = extracted_information + "\n" + output + else: + extracted_information = output + + ok, reasoning = await self._execute_eval( + question=question, + answer=answer, + extracted_information=extracted_information, + final_screenshot=final_screenshot, + is_updated=is_updated, + ) + assert ok, f"{workflow_pid + '/' + workflow_run_id} failed due to {reasoning}" + + async def create_and_eval_skyvern_task( + self, task: TaskRequest, question: str, answer: str, max_step: int | None = None + ) -> None: + task_id = self.client.create_task(task_request_body=task, max_steps=max_step) + self._save_artifact("task_id.txt", task_id.encode()) + await self._wait_for_task_finish(task_id=task_id) + # (?) looks like there's a bug on agent side: + # sometimes the screenshot_url is NONE if the task is finished. but if we query again later, the value appeared. + await asyncio.sleep(30) + await self.eval_skyvern_task(task_id=task_id, question=question, answer=answer) diff --git a/evaluation/core/utils.py b/evaluation/core/utils.py new file mode 100644 index 00000000..4e1cd936 --- /dev/null +++ b/evaluation/core/utils.py @@ -0,0 +1,58 @@ +import json +from typing import Iterator +from uuid import uuid4 + +from pydantic import BaseModel + + +class WebVoyagerTestCase(BaseModel): + group_id: str + id: str + url: str + question: str + answer: str + is_updated: bool = False + max_steps: int | None = None + + +class WorkflowRunResultRequest(BaseModel): + id: str + workflow_run_id: str + + +def load_webvoyager_case_from_json(file_path: str, group_id: str = "") -> Iterator[WebVoyagerTestCase]: + with open("evaluation/datasets/webvoyager_reference_answer.json", "r") as answer_file: + webvoyager_answers: dict = json.load(answer_file) + + if not group_id: + group_id = str(uuid4()) + + with open(file_path, "r", encoding="utf-8") as file: + for line in file: + test_case: dict[str, str] = json.loads(line) + web_name, id = test_case["id"].split("--") + for answer in webvoyager_answers[web_name]["answers"]: + if str(answer["id"]) == id: + ans = answer["ans"] + yield WebVoyagerTestCase( + group_id=group_id, + id=test_case["id"], + url=test_case["web"], + question=test_case["ques"], + answer=ans, + ) + break + else: + raise Exception("no answer for the task") + + +def load_records_from_json(file_path: str) -> Iterator[WorkflowRunResultRequest]: + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + item: dict[str, str] = json.loads(line) + id = item["id"] + workflow_run_id = item["workflow_run_id"] + yield WorkflowRunResultRequest( + id=id, + workflow_run_id=workflow_run_id, + ) diff --git a/evaluation/script/create_webvoyager_evaluation_result.py b/evaluation/script/create_webvoyager_evaluation_result.py new file mode 100644 index 00000000..7f9e736c --- /dev/null +++ b/evaluation/script/create_webvoyager_evaluation_result.py @@ -0,0 +1,67 @@ +import csv +import json +from typing import Any + +import typer + +from evaluation.core import SkyvernClient +from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus + +csv_headers = [ + "id", + "status", + "assertion", + "failure_reason", + "url", + "question", + "answer", + "summary", + "output", + "is_updated", + "workflow_permanent_id", + "workflow_run_id", +] + + +def main( + base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"), + cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"), + workflow_pid: str = typer.Option(..., "--workflow-pid", help="workflow pid to execute the evaluation test"), + record_json_path: str = typer.Option(..., "--record-json", help="record json path for evaluation run"), + output_csv_path: str = typer.Option("output.csv", "--output-path", help="output csv path for evaluation run"), +) -> None: + client = SkyvernClient(base_url=base_url, credentials=cred) + + with open(record_json_path, "r", encoding="utf-8") as file: + with open(output_csv_path, newline="", mode="w", encoding="utf-8") as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=csv_headers) + writer.writeheader() + + for line in file: + one_record: dict[str, Any] = json.loads(line) + workflow_run_id: str = one_record.get("workflow_run_id", "") + + workflow_run_response = client.get_workflow_run( + workflow_pid=workflow_pid, workflow_run_id=workflow_run_id + ) + one_record.update( + { + "workflow_permanent_id": workflow_pid, + "status": str(workflow_run_response.status), + "summary": workflow_run_response.observer_cruise.summary, + "output": workflow_run_response.observer_cruise.output, + "assertion": workflow_run_response.status == WorkflowRunStatus.completed, + "failure_reason": workflow_run_response.failure_reason or "", + } + ) + csv_data = {key: one_record[key] for key in csv_headers} + print( + f"{workflow_run_id}(id={one_record.get('id')}) {workflow_run_response.status}. Saving to the output csv.." + ) + writer.writerow(csv_data) + + print(f"Exported all records in {output_csv_path}") + + +if __name__ == "__main__": + typer.run(main) diff --git a/evaluation/script/create_webvoyager_observer.py b/evaluation/script/create_webvoyager_observer.py new file mode 100644 index 00000000..fde08fc2 --- /dev/null +++ b/evaluation/script/create_webvoyager_observer.py @@ -0,0 +1,66 @@ +import asyncio +import json +from datetime import datetime +from uuid import uuid4 + +import typer + +from evaluation.core import Evaluator, SkyvernClient +from evaluation.core.utils import load_webvoyager_case_from_json +from skyvern.forge import app +from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.schemas.observers import ObserverTaskRequest + + +async def create_observer_cruise( + base_url: str, + cred: str, +) -> None: + client = SkyvernClient(base_url=base_url, credentials=cred) + group_id = uuid4() + + cnt = 0 + record_file_path = f"{group_id}-webvoyager-record.jsonl" + with open(record_file_path, "w", encoding="utf-8") as f: + for case_data in load_webvoyager_case_from_json( + file_path="evaluation/datasets/webvoyager_tasks.jsonl", group_id=str(group_id) + ): + prompt = prompt_engine.load_prompt( + "check-evaluation-goal", user_goal=case_data.question, local_datetime=datetime.now().isoformat() + ) + response = await app.LLM_API_HANDLER(prompt=prompt, prompt_name="check-evaluation-goal") + tweaked_user_goal = response.get("tweaked_user_goal") + case_data.is_updated = tweaked_user_goal != case_data.question + case_data.question = tweaked_user_goal + + evaluator = Evaluator(client=client, artifact_folder=f"test/artifacts/{case_data.group_id}/{case_data.id}") + request_body = ObserverTaskRequest( + url=case_data.url, + user_prompt=case_data.question, + ) + cruise = evaluator.queue_skyvern_cruise(cruise_request=request_body, max_step=case_data.max_steps) + dumped_data = case_data.model_dump() + dumped_data.update( + { + "observer_cruise_id": cruise.observer_cruise_id, + "workflow_run_id": cruise.workflow_run_id, + "workflow_permanent_id": cruise.workflow_permanent_id, + "cruise_url": str(cruise.url) if cruise.url else cruise.url, + } + ) + print(f"Queued {cruise.observer_cruise_id} for {case_data.model_dump_json()}") + f.write(json.dumps(dumped_data) + "\n") + cnt += 1 + + print(f"Queued {cnt} cruises to launch webvoyager evaluation test. saving the records file in {record_file_path}") + + +def main( + base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"), + cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"), +) -> None: + asyncio.run(create_observer_cruise(base_url=base_url, cred=cred)) + + +if __name__ == "__main__": + typer.run(main) diff --git a/evaluation/script/create_webvoyager_workflow.py b/evaluation/script/create_webvoyager_workflow.py new file mode 100644 index 00000000..b47f1625 --- /dev/null +++ b/evaluation/script/create_webvoyager_workflow.py @@ -0,0 +1,75 @@ +import asyncio +import json +from datetime import datetime +from typing import Optional +from uuid import uuid4 + +import typer + +from evaluation.core import Evaluator, SkyvernClient +from evaluation.core.utils import load_webvoyager_case_from_json +from skyvern.forge import app +from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.schemas.tasks import ProxyLocation +from skyvern.forge.sdk.workflow.models.workflow import WorkflowRequestBody + + +async def create_workflow_run( + base_url: str, + cred: str, + workflow_pid: str, + proxy_location: ProxyLocation | None = None, +) -> None: + client = SkyvernClient(base_url=base_url, credentials=cred) + group_id = uuid4() + + cnt = 0 + record_file_path = f"{group_id}-webvoyager-record.jsonl" + with open(record_file_path, "w", encoding="utf-8") as f: + for case_data in load_webvoyager_case_from_json( + file_path="evaluation/datasets/webvoyager_tasks.jsonl", group_id=str(group_id) + ): + prompt = prompt_engine.load_prompt( + "check-evaluation-goal", user_goal=case_data.question, local_datetime=datetime.now().isoformat() + ) + response = await app.LLM_API_HANDLER(prompt=prompt, prompt_name="check-evaluation-goal") + tweaked_user_goal = response.get("tweaked_user_goal") + case_data.is_updated = tweaked_user_goal != case_data.question + case_data.question = tweaked_user_goal + + evaluator = Evaluator(client=client, artifact_folder=f"test/artifacts/{case_data.group_id}/{case_data.id}") + request_body = WorkflowRequestBody( + data={ + "url": case_data.url, + "instruction": case_data.question, + "answer": case_data.answer, + }, + proxy_location=proxy_location, + ) + workflow_run_id = evaluator.queue_skyvern_workflow( + workflow_pid=workflow_pid, workflow_request=request_body, max_step=case_data.max_steps + ) + dumped_data = case_data.model_dump() + dumped_data.update({"workflow_run_id": workflow_run_id}) + print(f"Queued {workflow_run_id} for {case_data.model_dump_json()}") + f.write(json.dumps(dumped_data) + "\n") + cnt += 1 + + print(f"Queued {cnt} workflows to launch webvoyager evaluation test. saving the records file in {record_file_path}") + + +def main( + base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"), + cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"), + workflow_pid: str = typer.Option(..., "--workflow-pid", help="workflow pid to execute the evaluation test"), + proxy_location: Optional[ProxyLocation] = typer.Option( + None, "--proxy-location", help="overwrite the workflow proxy location" + ), +) -> None: + asyncio.run( + create_workflow_run(base_url=base_url, cred=cred, workflow_pid=workflow_pid, proxy_location=proxy_location) + ) + + +if __name__ == "__main__": + typer.run(main) diff --git a/evaluation/script/eval_webvoyager_cruise.py b/evaluation/script/eval_webvoyager_cruise.py new file mode 100644 index 00000000..e2060e18 --- /dev/null +++ b/evaluation/script/eval_webvoyager_cruise.py @@ -0,0 +1,115 @@ +import asyncio +import csv +import json +from typing import Any + +import typer + +from evaluation.core import Evaluator, SkyvernClient +from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus + +csv_headers = [ + "id", + "status", + "assertion", + "failure_reason", + "url", + "question", + "answer", + "summary", + "output", + "is_updated", + "workflow_permanent_id", + "workflow_run_id", +] + +BATCH_SIZE = 5 + + +async def process_record(client: SkyvernClient, one_record: dict[str, Any]) -> dict[str, Any]: + workflow_pid: str = one_record.get("workflow_permanent_id", "") + workflow_run_id: str = one_record.get("workflow_run_id", "") + workflow_run_response = await client.get_workflow_run(workflow_pid=workflow_pid, workflow_run_id=workflow_run_id) + one_record.update( + { + "status": str(workflow_run_response.status), + "summary": workflow_run_response.observer_cruise.summary, + "output": workflow_run_response.observer_cruise.output, + } + ) + if workflow_run_response.status != WorkflowRunStatus.completed: + one_record.update( + { + "assertion": False, + "failure_reason": workflow_run_response.failure_reason, + }, + ) + else: + evaluator = Evaluator( + client=client, + artifact_folder=f"test/artifacts/{one_record.get('group_id', '')}/{one_record.get('id', '')}", + ) + try: + await evaluator.eval_skyvern_workflow_run( + workflow_pid=workflow_pid, + workflow_run_id=workflow_run_id, + question=one_record.get("question", ""), + answer=one_record.get("answer", ""), + is_updated=one_record.get("is_updated", False), + ) + one_record.update({"assertion": True, "failure_reason": ""}) + except Exception as e: + one_record.update({"assertion": False, "failure_reason": str(e)}) + + csv_data = {key: one_record[key] for key in csv_headers} + print( + f"{workflow_pid}/{workflow_run_id}(id={one_record.get('id')}) {workflow_run_response.status}. Saving to the output csv.." + ) + return csv_data + + +async def run_eval( + base_url: str, + cred: str, + record_json_path: str, + output_csv_path: str, +) -> None: + client = SkyvernClient(base_url=base_url, credentials=cred) + + with open(record_json_path, "r", encoding="utf-8") as file: + with open(output_csv_path, newline="", mode="w", encoding="utf-8") as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=csv_headers) + writer.writeheader() + + current_batch = [] + for line in file: + one_record: dict[str, Any] = json.loads(line) + current_batch.append(one_record) + + if len(current_batch) >= BATCH_SIZE: + results = await asyncio.gather(*(process_record(client, record) for record in current_batch)) + for result in results: + writer.writerow(result) + current_batch = [] + + if current_batch: + results = await asyncio.gather(*(process_record(client, record) for record in current_batch)) + for result in results: + writer.writerow(result) + + print(f"Exported all records in {output_csv_path}") + + +def main( + base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"), + cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"), + record_json_path: str = typer.Option(..., "--record-json", help="record json path for evaluation run"), + output_csv_path: str = typer.Option("output.csv", "--output-path", help="output csv path for evaluation run"), +) -> None: + asyncio.run( + run_eval(base_url=base_url, cred=cred, record_json_path=record_json_path, output_csv_path=output_csv_path) + ) + + +if __name__ == "__main__": + typer.run(main) diff --git a/skyvern/forge/prompts/skyvern/check-evaluation-goal.j2 b/skyvern/forge/prompts/skyvern/check-evaluation-goal.j2 new file mode 100644 index 00000000..e81421e8 --- /dev/null +++ b/skyvern/forge/prompts/skyvern/check-evaluation-goal.j2 @@ -0,0 +1,21 @@ +You're provided with a user goal. You need to help analysing the goal. + +MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc. + +Reply in JSON format with the following keys: +{ +"thought": str, // Think step by step. Describe your thought in thi field. +"is_booking": bool, // True if the goal is to book something, including room, flight and so on. +"is_including_date": bool, // True if the goal includes date information. +"tweaked_user_goal": str, // If is_booking is True and is_including_date is True, repick a date within the next two months to replace the orignal date in the goal. Otherwise, return the orignal user goal. +} + +User goal +``` +{{ user_goal }} +``` + +Current datetime, ISO format: +``` +{{ local_datetime }} +``` \ No newline at end of file diff --git a/skyvern/forge/prompts/skyvern/evaluate-prompt.j2 b/skyvern/forge/prompts/skyvern/evaluate-prompt.j2 new file mode 100644 index 00000000..7998ed05 --- /dev/null +++ b/skyvern/forge/prompts/skyvern/evaluate-prompt.j2 @@ -0,0 +1,40 @@ +As an evaluator, you will be presented with three primary components to assist you in your role: + +1. Web Task Instruction: This is a clear and specific directive provided in natural language, detailing the online activity to be carried out. These requirements may include conducting searches, verifying information, comparing prices, checking availability, or any other action relevant to the specified web service (such as Amazon, Apple, ArXiv, BBC News, Booking etc). + +2. Correct Answer: This is the correct answer for the task.{%if is_updated%} But this answer is out of date, so this answer is for reference only.{% endif %} + +3. Current Answer: This is the answer with a screenshot of the current page, waiting to be verified. But sometimes the text of the current answer could be empty, then you need to verify if the page screenshot fulfilled the task instruction. + +-- You DO NOT NEED to interact with web pages or perform actions such as booking flights or conducting searches on websites. +-- You SHOULD NOT make assumptions based on information not presented in the screenshot when comparing it to the instructions. +-- Your primary responsibility is to conduct a thorough assessment of the web task instruction against the outcome depicted in the current answer, evaluating whether the actions taken align with the given instructions. +-- NOTE that the instruction may involve more than one task, for example, locating the garage and summarizing the review. Failing to complete either task, such as not providing a summary, should be considered unsuccessful.{%if is_updated%} +-- NOTE that the correct answer is out of date. So as long as the screenshots and the current answer are fulfilled all the task instruction, consider task has been successfully accomplished.{% endif %} +-- NOTE that the screenshot is authentic, but the text of current answer is generated before the screenshot was taken, and there may be discrepancies between the text and the screenshots. +-- NOTE the difference: 1) The text in answer may contradict the screenshot in answer, then the content of the text prevails, 2) The text in the answer is not mentioned on the screenshot, choose to believe the text. 3) The text may be empty, choose to belive the screenshot. +You should elaborate on how you arrived at your final evaluation and then provide a definitive verdict on whether the task has been successfully accomplished, either as 'SUCCESS' or 'NOT SUCCESS'. + +Make sure to ONLY return the JSON object in this format with no additional text before or after it: +```json +{ + "evaluation_criteria" : str, // Think step by step. Based on the web task instruction and the correct answer, how to verify the task is successfully completed. + "thoughts": str, // Think step by step. What information makes you believe the result meets or does not meet the criterion. + "verdict": str, // string enum. "SUCCESS", "NOT SUCCESS" +} +``` + +Web Task Instruction +``` +{{ ques }} +``` + +Correct Answer +``` +{{ answer }} +``` + +Current Answer: +``` +{{ extracted_information }} +```