Integrate posthog and log events to track usage (#17)
This commit is contained in:
@@ -45,6 +45,7 @@ structlog = "^23.2.0"
|
||||
plotly = "^5.18.0"
|
||||
clipboard = "^0.0.4"
|
||||
curlify = "^2.2.1"
|
||||
typer = "^0.9.0"
|
||||
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
|
||||
@@ -7,4 +7,5 @@ if [ ! -f .env ]; then
|
||||
echo "Please add your api keys to the .env file."
|
||||
fi
|
||||
source "$(poetry env info --path)/bin/activate"
|
||||
python scripts/tracking.py skyvern-oss-run-server
|
||||
poetry run python -m skyvern.forge
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
source "$(poetry env info --path)/bin/activate"
|
||||
python scripts/tracking.py skyvern-oss-run-ui
|
||||
streamlit run streamlit_app/visualizer/streamlit.py
|
||||
69
scripts/tracking.py
Normal file
69
scripts/tracking.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import importlib.metadata
|
||||
import platform
|
||||
from typing import Any, Dict
|
||||
|
||||
import typer
|
||||
from posthog import Posthog
|
||||
|
||||
from skyvern.forge.sdk.settings_manager import SettingsManager
|
||||
|
||||
posthog = Posthog(
|
||||
"phc_bVT2ugnZhMHRWqMvSRHPdeTjaPxQqT3QSsI3r5FlQR5",
|
||||
host="https://app.posthog.com",
|
||||
disable_geoip=False,
|
||||
)
|
||||
|
||||
DISTINCT_ID = "oss"
|
||||
|
||||
|
||||
def get_oss_version() -> str:
|
||||
try:
|
||||
return importlib.metadata.version("skyvern")
|
||||
except Exception:
|
||||
return "unknown"
|
||||
|
||||
|
||||
def analytics_metadata() -> Dict[str, Any]:
|
||||
return {
|
||||
"os": platform.system().lower(),
|
||||
"oss_version": get_oss_version(),
|
||||
"machine": platform.machine(),
|
||||
"platform": platform.platform(),
|
||||
"python_version": platform.python_version(),
|
||||
"environment": SettingsManager.get_settings().ENV,
|
||||
}
|
||||
|
||||
|
||||
def capture(
|
||||
event: str,
|
||||
data: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
# If telemetry is disabled, don't send any data
|
||||
if not SettingsManager.get_settings().SKYVERN_TELEMETRY:
|
||||
return
|
||||
|
||||
payload: dict[str, Any] = data or {}
|
||||
try:
|
||||
posthog.capture(distinct_id=DISTINCT_ID, event=event, properties=payload)
|
||||
except Exception as e:
|
||||
payload.update(
|
||||
{
|
||||
"capture_error": str(e),
|
||||
}
|
||||
)
|
||||
posthog.capture(
|
||||
distinct_id=DISTINCT_ID,
|
||||
event="failure",
|
||||
properties=payload,
|
||||
)
|
||||
|
||||
|
||||
# This is the main function that will be called by the typer CLI. This is separate from capture because typer
|
||||
# doesn't support dict type input arguments.
|
||||
def capture_simple(event: str) -> None:
|
||||
capture(event)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
typer.run(capture_simple)
|
||||
9
setup.sh
9
setup.sh
@@ -1,5 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Call function to send telemetry event
|
||||
log_event() {
|
||||
if [ -n $1 ]; then
|
||||
python scripts/tracking.py $1
|
||||
fi
|
||||
}
|
||||
|
||||
# Function to check if a command exists
|
||||
command_exists() {
|
||||
command -v "$1" &> /dev/null
|
||||
@@ -35,6 +42,7 @@ activate_poetry_env() {
|
||||
}
|
||||
|
||||
install_dependencies_after_poetry_env() {
|
||||
echo "Installing playwright dependencies..."
|
||||
playwright install
|
||||
}
|
||||
|
||||
@@ -100,6 +108,7 @@ main() {
|
||||
install_dependencies_after_poetry_env
|
||||
run_alembic_upgrade
|
||||
create_organization
|
||||
log_event "skyvern-oss-setup-complete"
|
||||
echo "Setup completed successfully."
|
||||
}
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ class Settings(BaseSettings):
|
||||
# Artifact storage settings
|
||||
ARTIFACT_STORAGE_PATH: str = f"{SKYVERN_DIR}/artifacts"
|
||||
|
||||
ASYNC_ENABLED: bool = False
|
||||
SKYVERN_TELEMETRY: bool = True
|
||||
|
||||
def is_cloud_environment(self) -> bool:
|
||||
"""
|
||||
|
||||
@@ -8,6 +8,7 @@ import requests
|
||||
import structlog
|
||||
from playwright._impl._errors import TargetClosedError
|
||||
|
||||
from scripts import tracking
|
||||
from skyvern.exceptions import (
|
||||
BrowserStateMissingPage,
|
||||
FailedToSendWebhook,
|
||||
@@ -194,6 +195,7 @@ class ForgeAgent(Agent):
|
||||
await self.validate_step_execution(task, step)
|
||||
step, browser_state, detailed_output = await self._initialize_execution_state(task, step, workflow_run)
|
||||
step, detailed_output = await self.agent_step(task, step, browser_state, organization=organization)
|
||||
tracking.capture("skyvern-oss-agent-step-status", {"status": step.status})
|
||||
retry = False
|
||||
|
||||
# If the step failed, mark the step as failed and retry
|
||||
@@ -661,6 +663,18 @@ class ForgeAgent(Agent):
|
||||
"""
|
||||
send the task response to the webhook callback url
|
||||
"""
|
||||
# refresh the task from the db to get the latest status
|
||||
try:
|
||||
refreshed_task = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id)
|
||||
if not refreshed_task:
|
||||
LOG.error("Failed to get task from db when sending task response")
|
||||
raise TaskNotFound(task_id=task.task_id)
|
||||
except Exception as e:
|
||||
LOG.error("Failed to get task from db when sending task response", task_id=task.task_id, error=e)
|
||||
raise TaskNotFound(task_id=task.task_id) from e
|
||||
task = refreshed_task
|
||||
# log the task status as an event
|
||||
tracking.capture("skyvern-oss-agent-task-status", {"status": task.status})
|
||||
# Take one last screenshot and create an artifact before closing the browser to see the final state
|
||||
browser_state: BrowserState = await app.BROWSER_MANAGER.get_or_create_for_task(task)
|
||||
page = await browser_state.get_or_create_page()
|
||||
|
||||
@@ -5,6 +5,7 @@ from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException,
|
||||
from fastapi.responses import ORJSONResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from scripts import tracking
|
||||
from skyvern.exceptions import StepNotFound
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
|
||||
@@ -31,6 +32,7 @@ async def webhook(
|
||||
x_skyvern_signature: Annotated[str | None, Header()] = None,
|
||||
x_skyvern_timestamp: Annotated[str | None, Header()] = None,
|
||||
) -> Response:
|
||||
tracking.capture("skyvern-oss-agent-webhook-received")
|
||||
payload = await request.body()
|
||||
|
||||
if not x_skyvern_signature or not x_skyvern_timestamp:
|
||||
@@ -75,6 +77,7 @@ async def create_agent_task(
|
||||
x_api_key: Annotated[str | None, Header()] = None,
|
||||
x_max_steps_override: Annotated[int | None, Header()] = None,
|
||||
) -> CreateTaskResponse:
|
||||
tracking.capture("skyvern-oss-agent-task-create", data={"url": task.url})
|
||||
agent = request["agent"]
|
||||
|
||||
created_task = await agent.create_task(task, current_org.organization_id)
|
||||
@@ -108,6 +111,7 @@ async def execute_agent_task_step(
|
||||
step_id: str | None = None,
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
) -> Response:
|
||||
tracking.capture("skyvern-oss-agent-task-step-execute")
|
||||
agent = request["agent"]
|
||||
task = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id)
|
||||
if not task:
|
||||
@@ -166,6 +170,7 @@ async def get_task(
|
||||
task_id: str,
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
) -> TaskResponse:
|
||||
tracking.capture("skyvern-oss-agent-task-get")
|
||||
request["agent"]
|
||||
task_obj = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id)
|
||||
if not task_obj:
|
||||
@@ -229,6 +234,7 @@ async def retry_webhook(
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
x_api_key: Annotated[str | None, Header()] = None,
|
||||
) -> TaskResponse:
|
||||
tracking.capture("skyvern-oss-agent-task-retry-webhook")
|
||||
agent = request["agent"]
|
||||
task_obj = await agent.db.get_task(task_id, organization_id=current_org.organization_id)
|
||||
if not task_obj:
|
||||
@@ -262,6 +268,7 @@ async def get_task_internal(
|
||||
:return: List of tasks with pagination without steps populated. Steps can be populated by calling the
|
||||
get_agent_task endpoint.
|
||||
"""
|
||||
tracking.capture("skyvern-oss-agent-task-get-internal")
|
||||
task = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id)
|
||||
if not task:
|
||||
raise HTTPException(
|
||||
@@ -286,6 +293,7 @@ async def get_agent_tasks(
|
||||
:return: List of tasks with pagination without steps populated. Steps can be populated by calling the
|
||||
get_agent_task endpoint.
|
||||
"""
|
||||
tracking.capture("skyvern-oss-agent-tasks-get")
|
||||
request["agent"]
|
||||
tasks = await app.DATABASE.get_tasks(page, page_size, organization_id=current_org.organization_id)
|
||||
return ORJSONResponse([task.to_task_response().model_dump() for task in tasks])
|
||||
@@ -306,6 +314,7 @@ async def get_agent_tasks_internal(
|
||||
:return: List of tasks with pagination without steps populated. Steps can be populated by calling the
|
||||
get_agent_task endpoint.
|
||||
"""
|
||||
tracking.capture("skyvern-oss-agent-tasks-get-internal")
|
||||
request["agent"]
|
||||
tasks = await app.DATABASE.get_tasks(page, page_size, organization_id=current_org.organization_id)
|
||||
return ORJSONResponse([task.model_dump() for task in tasks])
|
||||
@@ -323,6 +332,7 @@ async def get_agent_task_steps(
|
||||
:param task_id:
|
||||
:return: List of steps for a task with pagination.
|
||||
"""
|
||||
tracking.capture("skyvern-oss-agent-task-steps-get")
|
||||
request["agent"]
|
||||
steps = await app.DATABASE.get_task_steps(task_id, organization_id=current_org.organization_id)
|
||||
return ORJSONResponse([step.model_dump() for step in steps])
|
||||
@@ -342,6 +352,7 @@ async def get_agent_task_step_artifacts(
|
||||
:param step_id:
|
||||
:return: List of artifacts for a list of steps.
|
||||
"""
|
||||
tracking.capture("skyvern-oss-agent-task-step-artifacts-get")
|
||||
request["agent"]
|
||||
artifacts = await app.DATABASE.get_artifacts_for_task_step(
|
||||
task_id,
|
||||
@@ -364,6 +375,7 @@ async def get_task_actions(
|
||||
task_id: str,
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
) -> list[ActionResultTmp]:
|
||||
tracking.capture("skyvern-oss-agent-task-actions-get")
|
||||
request["agent"]
|
||||
steps = await app.DATABASE.get_task_step_models(task_id, organization_id=current_org.organization_id)
|
||||
results: list[ActionResultTmp] = []
|
||||
@@ -385,6 +397,7 @@ async def execute_workflow(
|
||||
x_api_key: Annotated[str | None, Header()] = None,
|
||||
x_max_steps_override: Annotated[int | None, Header()] = None,
|
||||
) -> RunWorkflowResponse:
|
||||
tracking.capture("skyvern-oss-agent-workflow-execute")
|
||||
LOG.info(
|
||||
f"Running workflow {workflow_id}",
|
||||
workflow_id=workflow_id,
|
||||
@@ -421,6 +434,7 @@ async def get_workflow_run(
|
||||
workflow_run_id: str,
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
) -> WorkflowRunStatusResponse:
|
||||
tracking.capture("skyvern-oss-agent-workflow-run-get")
|
||||
request["agent"]
|
||||
return await app.WORKFLOW_SERVICE.build_workflow_run_status_response(
|
||||
workflow_id=workflow_id, workflow_run_id=workflow_run_id, organization_id=current_org.organization_id
|
||||
|
||||
@@ -6,6 +6,7 @@ from datetime import datetime
|
||||
import requests
|
||||
import structlog
|
||||
|
||||
from scripts import tracking
|
||||
from skyvern.exceptions import (
|
||||
FailedToSendWebhook,
|
||||
MissingValueForParameter,
|
||||
@@ -374,6 +375,7 @@ class WorkflowService:
|
||||
api_key: str | None = None,
|
||||
close_browser_on_completion: bool = True,
|
||||
) -> None:
|
||||
tracking.capture("skyvern-oss-agent-workflow-status", {"status": workflow_run.status})
|
||||
browser_state = await app.BROWSER_MANAGER.cleanup_for_workflow_run(
|
||||
workflow_run.workflow_run_id, close_browser_on_completion
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user