push eval script to oss (#1796)

This commit is contained in:
Shuchang Zheng
2025-02-19 23:21:08 -08:00
committed by GitHub
parent ef5cb8d671
commit 367473f930
8 changed files with 748 additions and 0 deletions

306
evaluation/core/__init__.py Normal file
View File

@@ -0,0 +1,306 @@
import asyncio
import json
import os
from datetime import datetime
from typing import Any
from urllib.parse import urlparse
import httpx
import requests
from pydantic import BaseModel
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import create_folder_if_not_exist
from skyvern.forge.sdk.schemas.observers import ObserverTask, ObserverTaskRequest
from skyvern.forge.sdk.schemas.tasks import ProxyLocation, TaskRequest, TaskResponse, TaskStatus
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRequestBody, WorkflowRunStatus, WorkflowRunStatusResponse
class TaskOutput(BaseModel):
extracted_information: list | dict[str, Any] | str | None
final_screenshot: bytes | None
class SkyvernClient:
def __init__(self, base_url: str, credentials: str):
self.base_url = base_url
self.credentials = credentials
def generate_curl_params(self, request_body: BaseModel, max_steps: int | None = None) -> tuple[dict, dict]:
payload = request_body.model_dump_json()
headers = {
"Content-Type": "application/json",
"x-api-key": self.credentials,
}
if max_steps is not None:
headers["x-max-steps-override"] = str(max_steps)
return payload, headers
def create_task(self, task_request_body: TaskRequest, max_steps: int | None = None) -> str:
url = f"{self.base_url}/tasks"
payload, headers = self.generate_curl_params(task_request_body, max_steps=max_steps)
response = requests.post(url, headers=headers, data=payload)
assert "task_id" in response.json(), f"Failed to create task: {response.text}"
return response.json()["task_id"]
def create_workflow_run(
self, workflow_pid: str, workflow_request_body: WorkflowRequestBody, max_steps: int | None = None
) -> str:
url = f"{self.base_url}/workflows/{workflow_pid}/run/"
payload, headers = self.generate_curl_params(workflow_request_body, max_steps=max_steps)
response = requests.post(url, headers=headers, data=payload)
assert "workflow_run_id" in response.json(), f"Failed to create workflow run: {response.text}"
return response.json()["workflow_run_id"]
def create_cruise(self, cruise_request: ObserverTaskRequest, max_steps: int | None = None) -> ObserverTask:
url = f"{self.base_url}/cruise"
payload, headers = self.generate_curl_params(cruise_request, max_steps=max_steps)
response = requests.post(url, headers=headers, data=payload)
assert "observer_cruise_id" in response.json(), f"Failed to create observer cruise: {response.text}"
return ObserverTask.model_validate(response.json())
def get_task(self, task_id: str) -> TaskResponse:
"""Get a task by id."""
url = f"{self.base_url}/tasks/{task_id}"
headers = {"x-api-key": self.credentials}
response = requests.get(url, headers=headers)
assert response.status_code == 200, f"Expected to get task response status 200, but got {response.status_code}"
return TaskResponse(**response.json())
async def get_workflow_run(self, workflow_pid: str, workflow_run_id: str) -> WorkflowRunStatusResponse:
url = f"{self.base_url}/workflows/{workflow_pid}/runs/{workflow_run_id}"
headers = {"x-api-key": self.credentials}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
assert response.status_code == 200, (
f"Expected to get workflow run response status 200, but got {response.status_code}"
)
return WorkflowRunStatusResponse(**response.json())
class Evaluator:
def __init__(self, client: SkyvernClient, artifact_folder: str) -> None:
self.client = client
self.artifact_folder = artifact_folder
async def _wait_for_task_finish(self, task_id: str) -> None:
while True:
task_response = self.client.get_task(task_id)
if task_response.status.is_final():
return
await asyncio.sleep(20)
@staticmethod
def _download_screenshot(url: str) -> bytes | None:
if url.startswith("file://"):
file_path = urlparse(url).path
with open(file_path, "rb") as f:
return f.read()
elif url.startswith("http://") or url.startswith("https://"):
response = requests.get(url)
assert response.status_code == 200, (
f"Expected screenshot download response is 200, but got {response.status_code}"
)
return response.content
return None
def _get_final_screenshot_from_task(self, task_response: TaskResponse) -> bytes | None:
screenshot_url: str = ""
if task_response.screenshot_url is not None:
screenshot_url = task_response.screenshot_url
if (
not screenshot_url
and task_response.action_screenshot_urls is not None
and len(task_response.action_screenshot_urls) > 0
):
screenshot_url = task_response.action_screenshot_urls[0]
assert screenshot_url, (
f"{task_response.task_id} Expected final screenshot is not None, but got NONE: {task_response.model_dump_json(indent=2)}"
)
if screenshot_url.startswith("file://"):
file_path = urlparse(screenshot_url).path
with open(file_path, "rb") as f:
return f.read()
elif screenshot_url.startswith("http://") or screenshot_url.startswith("https://"):
response = requests.get(screenshot_url)
assert response.status_code == 200, (
f"Expected screenshot download response is 200, but got {response.status_code}"
)
return response.content
return None
def _save_artifact(self, file_name: str, data: bytes) -> None:
if not self.artifact_folder:
return
create_folder_if_not_exist(self.artifact_folder)
file_path = os.path.join(self.artifact_folder, f"{int(datetime.now().timestamp())}-{file_name}")
with open(file_path, "wb") as f:
f.write(data)
async def _execute_eval(
self,
question: str,
answer: str,
extracted_information: list | dict[str, Any] | str | None,
final_screenshot: bytes,
is_updated: bool = False,
) -> tuple[bool, str]:
if extracted_information is None:
extracted_information = ""
if not isinstance(extracted_information, str):
extracted_information = json.dumps(extracted_information)
prompt = prompt_engine.load_prompt(
"evaluate-prompt",
ques=question,
answer=answer,
is_updated=is_updated,
extracted_information=extracted_information,
)
self._save_artifact("prompt_evaluate_result.txt", prompt.encode())
self._save_artifact("screenshot_evaluate_result.png", final_screenshot)
json_response = await app.LLM_API_HANDLER(
prompt=prompt, screenshots=[final_screenshot], prompt_name="evaluate-prompt"
)
self._save_artifact("llm_response_evaluate_result.json", json.dumps(json_response, indent=2).encode())
verdict = json_response.get("verdict")
return verdict == "SUCCESS", json_response.get("thoughts", "")
async def generate_task_request(self, user_prompt: str, proxy: ProxyLocation | None = None) -> TaskRequest:
prompt = prompt_engine.load_prompt("generate-task", user_prompt=user_prompt)
self._save_artifact("prompt_generate_task.txt", prompt.encode())
json_response = await app.LLM_API_HANDLER(prompt=prompt, prompt_name="generate-task")
self._save_artifact("llm_response_generate_task.json", json.dumps(json_response, indent=2).encode())
task = TaskRequest(
title=json_response["suggested_title"],
url=json_response["url"],
navigation_goal=json_response["navigation_goal"],
navigation_payload=json_response["navigation_payload"],
data_extraction_goal=json_response["data_extraction_goal"],
proxy_location=proxy,
)
return task
def queue_skyvern_task(self, task: TaskRequest, max_step: int | None = None) -> str:
task_id = self.client.create_task(task_request_body=task, max_steps=max_step)
self._save_artifact("task_id.txt", task_id.encode())
assert task_id
return task_id
def queue_skyvern_workflow(
self, workflow_pid: str, workflow_request: WorkflowRequestBody, max_step: int | None = None
) -> str:
workflow_run_id = self.client.create_workflow_run(
workflow_pid=workflow_pid, workflow_request_body=workflow_request, max_steps=max_step
)
self._save_artifact("workflow_run_id.txt", workflow_run_id.encode())
assert workflow_run_id
return workflow_run_id
def queue_skyvern_cruise(self, cruise_request: ObserverTaskRequest, max_step: int | None = None) -> ObserverTask:
cruise = self.client.create_cruise(cruise_request=cruise_request, max_steps=max_step)
self._save_artifact("cruise.json", cruise.model_dump_json(indent=2).encode())
return cruise
async def eval_skyvern_task(
self,
task_id: str,
question: str,
answer: str,
) -> None:
task_response = self.client.get_task(task_id=task_id)
assert task_response.status == TaskStatus.completed, f"{task_id} Expected completed, but {task_response.status}"
final_screenshot = self._get_final_screenshot_from_task(task_response=task_response)
assert final_screenshot is not None, f"{task_id} Expected final screenshot, but got None"
ok, reasoning = await self._execute_eval(
question=question,
answer=answer,
extracted_information=task_response.extracted_information,
final_screenshot=final_screenshot,
)
assert ok, f"{task_id} failed due to {reasoning}"
async def eval_skyvern_workflow_run(
self,
workflow_pid: str,
workflow_run_id: str,
question: str,
answer: str,
is_updated: bool,
) -> None:
workflow_run_response = await self.client.get_workflow_run(
workflow_pid=workflow_pid, workflow_run_id=workflow_run_id
)
assert workflow_run_response.status == WorkflowRunStatus.completed, (
f"Expected {workflow_pid + '/' + workflow_run_id} completed, but {workflow_run_response.status}"
)
assert workflow_run_response.screenshot_urls and len(workflow_run_response.screenshot_urls) > 0, (
f"Expected {workflow_pid + '/' + workflow_run_id} with screenshots, but got empty"
)
final_screenshot = self._download_screenshot(workflow_run_response.screenshot_urls[0])
assert final_screenshot is not None, (
f"Expected {workflow_pid + '/' + workflow_run_id} final screenshot, but got None"
)
extracted_information: list | dict[str, Any] | str | None = None
if workflow_run_response.observer_cruise is None:
assert workflow_run_response.outputs and len(workflow_run_response.outputs) > 0, (
f"Expected {workflow_pid + '/' + workflow_run_id} with output, but got empty output"
)
label, result = workflow_run_response.outputs.popitem()
if isinstance(result, dict):
extracted_information = result.get("extracted_information")
else:
# FIXME: improve this when the last block is loop block
extracted_information = result
else:
workflow_run_response.observer_cruise.summary
workflow_run_response.observer_cruise.output
summary = f"{('summary:' + workflow_run_response.observer_cruise.summary) if workflow_run_response.observer_cruise.summary else ''}"
output = f"{('output: ' + json.dumps(workflow_run_response.observer_cruise.output)) if workflow_run_response.observer_cruise.output else ''}"
extracted_information = ""
if summary:
extracted_information = summary
if output:
if extracted_information:
extracted_information = extracted_information + "\n" + output
else:
extracted_information = output
ok, reasoning = await self._execute_eval(
question=question,
answer=answer,
extracted_information=extracted_information,
final_screenshot=final_screenshot,
is_updated=is_updated,
)
assert ok, f"{workflow_pid + '/' + workflow_run_id} failed due to {reasoning}"
async def create_and_eval_skyvern_task(
self, task: TaskRequest, question: str, answer: str, max_step: int | None = None
) -> None:
task_id = self.client.create_task(task_request_body=task, max_steps=max_step)
self._save_artifact("task_id.txt", task_id.encode())
await self._wait_for_task_finish(task_id=task_id)
# (?) looks like there's a bug on agent side:
# sometimes the screenshot_url is NONE if the task is finished. but if we query again later, the value appeared.
await asyncio.sleep(30)
await self.eval_skyvern_task(task_id=task_id, question=question, answer=answer)

58
evaluation/core/utils.py Normal file
View File

@@ -0,0 +1,58 @@
import json
from typing import Iterator
from uuid import uuid4
from pydantic import BaseModel
class WebVoyagerTestCase(BaseModel):
group_id: str
id: str
url: str
question: str
answer: str
is_updated: bool = False
max_steps: int | None = None
class WorkflowRunResultRequest(BaseModel):
id: str
workflow_run_id: str
def load_webvoyager_case_from_json(file_path: str, group_id: str = "") -> Iterator[WebVoyagerTestCase]:
with open("evaluation/datasets/webvoyager_reference_answer.json", "r") as answer_file:
webvoyager_answers: dict = json.load(answer_file)
if not group_id:
group_id = str(uuid4())
with open(file_path, "r", encoding="utf-8") as file:
for line in file:
test_case: dict[str, str] = json.loads(line)
web_name, id = test_case["id"].split("--")
for answer in webvoyager_answers[web_name]["answers"]:
if str(answer["id"]) == id:
ans = answer["ans"]
yield WebVoyagerTestCase(
group_id=group_id,
id=test_case["id"],
url=test_case["web"],
question=test_case["ques"],
answer=ans,
)
break
else:
raise Exception("no answer for the task")
def load_records_from_json(file_path: str) -> Iterator[WorkflowRunResultRequest]:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
item: dict[str, str] = json.loads(line)
id = item["id"]
workflow_run_id = item["workflow_run_id"]
yield WorkflowRunResultRequest(
id=id,
workflow_run_id=workflow_run_id,
)

View File

@@ -0,0 +1,67 @@
import csv
import json
from typing import Any
import typer
from evaluation.core import SkyvernClient
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus
csv_headers = [
"id",
"status",
"assertion",
"failure_reason",
"url",
"question",
"answer",
"summary",
"output",
"is_updated",
"workflow_permanent_id",
"workflow_run_id",
]
def main(
base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"),
cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"),
workflow_pid: str = typer.Option(..., "--workflow-pid", help="workflow pid to execute the evaluation test"),
record_json_path: str = typer.Option(..., "--record-json", help="record json path for evaluation run"),
output_csv_path: str = typer.Option("output.csv", "--output-path", help="output csv path for evaluation run"),
) -> None:
client = SkyvernClient(base_url=base_url, credentials=cred)
with open(record_json_path, "r", encoding="utf-8") as file:
with open(output_csv_path, newline="", mode="w", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=csv_headers)
writer.writeheader()
for line in file:
one_record: dict[str, Any] = json.loads(line)
workflow_run_id: str = one_record.get("workflow_run_id", "")
workflow_run_response = client.get_workflow_run(
workflow_pid=workflow_pid, workflow_run_id=workflow_run_id
)
one_record.update(
{
"workflow_permanent_id": workflow_pid,
"status": str(workflow_run_response.status),
"summary": workflow_run_response.observer_cruise.summary,
"output": workflow_run_response.observer_cruise.output,
"assertion": workflow_run_response.status == WorkflowRunStatus.completed,
"failure_reason": workflow_run_response.failure_reason or "",
}
)
csv_data = {key: one_record[key] for key in csv_headers}
print(
f"{workflow_run_id}(id={one_record.get('id')}) {workflow_run_response.status}. Saving to the output csv.."
)
writer.writerow(csv_data)
print(f"Exported all records in {output_csv_path}")
if __name__ == "__main__":
typer.run(main)

View File

@@ -0,0 +1,66 @@
import asyncio
import json
from datetime import datetime
from uuid import uuid4
import typer
from evaluation.core import Evaluator, SkyvernClient
from evaluation.core.utils import load_webvoyager_case_from_json
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.schemas.observers import ObserverTaskRequest
async def create_observer_cruise(
base_url: str,
cred: str,
) -> None:
client = SkyvernClient(base_url=base_url, credentials=cred)
group_id = uuid4()
cnt = 0
record_file_path = f"{group_id}-webvoyager-record.jsonl"
with open(record_file_path, "w", encoding="utf-8") as f:
for case_data in load_webvoyager_case_from_json(
file_path="evaluation/datasets/webvoyager_tasks.jsonl", group_id=str(group_id)
):
prompt = prompt_engine.load_prompt(
"check-evaluation-goal", user_goal=case_data.question, local_datetime=datetime.now().isoformat()
)
response = await app.LLM_API_HANDLER(prompt=prompt, prompt_name="check-evaluation-goal")
tweaked_user_goal = response.get("tweaked_user_goal")
case_data.is_updated = tweaked_user_goal != case_data.question
case_data.question = tweaked_user_goal
evaluator = Evaluator(client=client, artifact_folder=f"test/artifacts/{case_data.group_id}/{case_data.id}")
request_body = ObserverTaskRequest(
url=case_data.url,
user_prompt=case_data.question,
)
cruise = evaluator.queue_skyvern_cruise(cruise_request=request_body, max_step=case_data.max_steps)
dumped_data = case_data.model_dump()
dumped_data.update(
{
"observer_cruise_id": cruise.observer_cruise_id,
"workflow_run_id": cruise.workflow_run_id,
"workflow_permanent_id": cruise.workflow_permanent_id,
"cruise_url": str(cruise.url) if cruise.url else cruise.url,
}
)
print(f"Queued {cruise.observer_cruise_id} for {case_data.model_dump_json()}")
f.write(json.dumps(dumped_data) + "\n")
cnt += 1
print(f"Queued {cnt} cruises to launch webvoyager evaluation test. saving the records file in {record_file_path}")
def main(
base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"),
cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"),
) -> None:
asyncio.run(create_observer_cruise(base_url=base_url, cred=cred))
if __name__ == "__main__":
typer.run(main)

View File

@@ -0,0 +1,75 @@
import asyncio
import json
from datetime import datetime
from typing import Optional
from uuid import uuid4
import typer
from evaluation.core import Evaluator, SkyvernClient
from evaluation.core.utils import load_webvoyager_case_from_json
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.schemas.tasks import ProxyLocation
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRequestBody
async def create_workflow_run(
base_url: str,
cred: str,
workflow_pid: str,
proxy_location: ProxyLocation | None = None,
) -> None:
client = SkyvernClient(base_url=base_url, credentials=cred)
group_id = uuid4()
cnt = 0
record_file_path = f"{group_id}-webvoyager-record.jsonl"
with open(record_file_path, "w", encoding="utf-8") as f:
for case_data in load_webvoyager_case_from_json(
file_path="evaluation/datasets/webvoyager_tasks.jsonl", group_id=str(group_id)
):
prompt = prompt_engine.load_prompt(
"check-evaluation-goal", user_goal=case_data.question, local_datetime=datetime.now().isoformat()
)
response = await app.LLM_API_HANDLER(prompt=prompt, prompt_name="check-evaluation-goal")
tweaked_user_goal = response.get("tweaked_user_goal")
case_data.is_updated = tweaked_user_goal != case_data.question
case_data.question = tweaked_user_goal
evaluator = Evaluator(client=client, artifact_folder=f"test/artifacts/{case_data.group_id}/{case_data.id}")
request_body = WorkflowRequestBody(
data={
"url": case_data.url,
"instruction": case_data.question,
"answer": case_data.answer,
},
proxy_location=proxy_location,
)
workflow_run_id = evaluator.queue_skyvern_workflow(
workflow_pid=workflow_pid, workflow_request=request_body, max_step=case_data.max_steps
)
dumped_data = case_data.model_dump()
dumped_data.update({"workflow_run_id": workflow_run_id})
print(f"Queued {workflow_run_id} for {case_data.model_dump_json()}")
f.write(json.dumps(dumped_data) + "\n")
cnt += 1
print(f"Queued {cnt} workflows to launch webvoyager evaluation test. saving the records file in {record_file_path}")
def main(
base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"),
cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"),
workflow_pid: str = typer.Option(..., "--workflow-pid", help="workflow pid to execute the evaluation test"),
proxy_location: Optional[ProxyLocation] = typer.Option(
None, "--proxy-location", help="overwrite the workflow proxy location"
),
) -> None:
asyncio.run(
create_workflow_run(base_url=base_url, cred=cred, workflow_pid=workflow_pid, proxy_location=proxy_location)
)
if __name__ == "__main__":
typer.run(main)

View File

@@ -0,0 +1,115 @@
import asyncio
import csv
import json
from typing import Any
import typer
from evaluation.core import Evaluator, SkyvernClient
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus
csv_headers = [
"id",
"status",
"assertion",
"failure_reason",
"url",
"question",
"answer",
"summary",
"output",
"is_updated",
"workflow_permanent_id",
"workflow_run_id",
]
BATCH_SIZE = 5
async def process_record(client: SkyvernClient, one_record: dict[str, Any]) -> dict[str, Any]:
workflow_pid: str = one_record.get("workflow_permanent_id", "")
workflow_run_id: str = one_record.get("workflow_run_id", "")
workflow_run_response = await client.get_workflow_run(workflow_pid=workflow_pid, workflow_run_id=workflow_run_id)
one_record.update(
{
"status": str(workflow_run_response.status),
"summary": workflow_run_response.observer_cruise.summary,
"output": workflow_run_response.observer_cruise.output,
}
)
if workflow_run_response.status != WorkflowRunStatus.completed:
one_record.update(
{
"assertion": False,
"failure_reason": workflow_run_response.failure_reason,
},
)
else:
evaluator = Evaluator(
client=client,
artifact_folder=f"test/artifacts/{one_record.get('group_id', '')}/{one_record.get('id', '')}",
)
try:
await evaluator.eval_skyvern_workflow_run(
workflow_pid=workflow_pid,
workflow_run_id=workflow_run_id,
question=one_record.get("question", ""),
answer=one_record.get("answer", ""),
is_updated=one_record.get("is_updated", False),
)
one_record.update({"assertion": True, "failure_reason": ""})
except Exception as e:
one_record.update({"assertion": False, "failure_reason": str(e)})
csv_data = {key: one_record[key] for key in csv_headers}
print(
f"{workflow_pid}/{workflow_run_id}(id={one_record.get('id')}) {workflow_run_response.status}. Saving to the output csv.."
)
return csv_data
async def run_eval(
base_url: str,
cred: str,
record_json_path: str,
output_csv_path: str,
) -> None:
client = SkyvernClient(base_url=base_url, credentials=cred)
with open(record_json_path, "r", encoding="utf-8") as file:
with open(output_csv_path, newline="", mode="w", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=csv_headers)
writer.writeheader()
current_batch = []
for line in file:
one_record: dict[str, Any] = json.loads(line)
current_batch.append(one_record)
if len(current_batch) >= BATCH_SIZE:
results = await asyncio.gather(*(process_record(client, record) for record in current_batch))
for result in results:
writer.writerow(result)
current_batch = []
if current_batch:
results = await asyncio.gather(*(process_record(client, record) for record in current_batch))
for result in results:
writer.writerow(result)
print(f"Exported all records in {output_csv_path}")
def main(
base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"),
cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"),
record_json_path: str = typer.Option(..., "--record-json", help="record json path for evaluation run"),
output_csv_path: str = typer.Option("output.csv", "--output-path", help="output csv path for evaluation run"),
) -> None:
asyncio.run(
run_eval(base_url=base_url, cred=cred, record_json_path=record_json_path, output_csv_path=output_csv_path)
)
if __name__ == "__main__":
typer.run(main)