diff --git a/pyproject.toml b/pyproject.toml index 2665ed1e..bd1f9fd2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ structlog = "^23.2.0" plotly = "^5.18.0" clipboard = "^0.0.4" curlify = "^2.2.1" +typer = "^0.9.0" [tool.poetry.group.dev.dependencies] diff --git a/run_skyvern.sh b/run_skyvern.sh index 2f1a0cdc..6c88ea37 100755 --- a/run_skyvern.sh +++ b/run_skyvern.sh @@ -7,4 +7,5 @@ if [ ! -f .env ]; then echo "Please add your api keys to the .env file." fi source "$(poetry env info --path)/bin/activate" +python scripts/tracking.py skyvern-oss-run-server poetry run python -m skyvern.forge diff --git a/run_ui.sh b/run_ui.sh index 8d83413a..7205296d 100755 --- a/run_ui.sh +++ b/run_ui.sh @@ -1,2 +1,3 @@ source "$(poetry env info --path)/bin/activate" +python scripts/tracking.py skyvern-oss-run-ui streamlit run streamlit_app/visualizer/streamlit.py \ No newline at end of file diff --git a/scripts/tracking.py b/scripts/tracking.py new file mode 100644 index 00000000..3d900019 --- /dev/null +++ b/scripts/tracking.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +import importlib.metadata +import platform +from typing import Any, Dict + +import typer +from posthog import Posthog + +from skyvern.forge.sdk.settings_manager import SettingsManager + +posthog = Posthog( + "phc_bVT2ugnZhMHRWqMvSRHPdeTjaPxQqT3QSsI3r5FlQR5", + host="https://app.posthog.com", + disable_geoip=False, +) + +DISTINCT_ID = "oss" + + +def get_oss_version() -> str: + try: + return importlib.metadata.version("skyvern") + except Exception: + return "unknown" + + +def analytics_metadata() -> Dict[str, Any]: + return { + "os": platform.system().lower(), + "oss_version": get_oss_version(), + "machine": platform.machine(), + "platform": platform.platform(), + "python_version": platform.python_version(), + "environment": SettingsManager.get_settings().ENV, + } + + +def capture( + event: str, + data: dict[str, Any] | None = None, +) -> None: + # If telemetry is disabled, don't send any data + if not SettingsManager.get_settings().SKYVERN_TELEMETRY: + return + + payload: dict[str, Any] = data or {} + try: + posthog.capture(distinct_id=DISTINCT_ID, event=event, properties=payload) + except Exception as e: + payload.update( + { + "capture_error": str(e), + } + ) + posthog.capture( + distinct_id=DISTINCT_ID, + event="failure", + properties=payload, + ) + + +# This is the main function that will be called by the typer CLI. This is separate from capture because typer +# doesn't support dict type input arguments. +def capture_simple(event: str) -> None: + capture(event) + + +if __name__ == "__main__": + typer.run(capture_simple) diff --git a/setup.sh b/setup.sh index 2c8ab756..5628cd75 100755 --- a/setup.sh +++ b/setup.sh @@ -1,5 +1,12 @@ #!/bin/bash +# Call function to send telemetry event +log_event() { + if [ -n $1 ]; then + python scripts/tracking.py $1 + fi +} + # Function to check if a command exists command_exists() { command -v "$1" &> /dev/null @@ -35,6 +42,7 @@ activate_poetry_env() { } install_dependencies_after_poetry_env() { + echo "Installing playwright dependencies..." playwright install } @@ -100,6 +108,7 @@ main() { install_dependencies_after_poetry_env run_alembic_upgrade create_organization + log_event "skyvern-oss-setup-complete" echo "Setup completed successfully." } diff --git a/skyvern/config.py b/skyvern/config.py index b973c946..ad3fed10 100644 --- a/skyvern/config.py +++ b/skyvern/config.py @@ -40,7 +40,7 @@ class Settings(BaseSettings): # Artifact storage settings ARTIFACT_STORAGE_PATH: str = f"{SKYVERN_DIR}/artifacts" - ASYNC_ENABLED: bool = False + SKYVERN_TELEMETRY: bool = True def is_cloud_environment(self) -> bool: """ diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 413daf0b..4ce882ad 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -8,6 +8,7 @@ import requests import structlog from playwright._impl._errors import TargetClosedError +from scripts import tracking from skyvern.exceptions import ( BrowserStateMissingPage, FailedToSendWebhook, @@ -194,6 +195,7 @@ class ForgeAgent(Agent): await self.validate_step_execution(task, step) step, browser_state, detailed_output = await self._initialize_execution_state(task, step, workflow_run) step, detailed_output = await self.agent_step(task, step, browser_state, organization=organization) + tracking.capture("skyvern-oss-agent-step-status", {"status": step.status}) retry = False # If the step failed, mark the step as failed and retry @@ -661,6 +663,18 @@ class ForgeAgent(Agent): """ send the task response to the webhook callback url """ + # refresh the task from the db to get the latest status + try: + refreshed_task = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id) + if not refreshed_task: + LOG.error("Failed to get task from db when sending task response") + raise TaskNotFound(task_id=task.task_id) + except Exception as e: + LOG.error("Failed to get task from db when sending task response", task_id=task.task_id, error=e) + raise TaskNotFound(task_id=task.task_id) from e + task = refreshed_task + # log the task status as an event + tracking.capture("skyvern-oss-agent-task-status", {"status": task.status}) # Take one last screenshot and create an artifact before closing the browser to see the final state browser_state: BrowserState = await app.BROWSER_MANAGER.get_or_create_for_task(task) page = await browser_state.get_or_create_page() diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 6bca5fdd..7f751223 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -5,6 +5,7 @@ from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, from fastapi.responses import ORJSONResponse from pydantic import BaseModel +from scripts import tracking from skyvern.exceptions import StepNotFound from skyvern.forge import app from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType @@ -31,6 +32,7 @@ async def webhook( x_skyvern_signature: Annotated[str | None, Header()] = None, x_skyvern_timestamp: Annotated[str | None, Header()] = None, ) -> Response: + tracking.capture("skyvern-oss-agent-webhook-received") payload = await request.body() if not x_skyvern_signature or not x_skyvern_timestamp: @@ -75,6 +77,7 @@ async def create_agent_task( x_api_key: Annotated[str | None, Header()] = None, x_max_steps_override: Annotated[int | None, Header()] = None, ) -> CreateTaskResponse: + tracking.capture("skyvern-oss-agent-task-create", data={"url": task.url}) agent = request["agent"] created_task = await agent.create_task(task, current_org.organization_id) @@ -108,6 +111,7 @@ async def execute_agent_task_step( step_id: str | None = None, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> Response: + tracking.capture("skyvern-oss-agent-task-step-execute") agent = request["agent"] task = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id) if not task: @@ -166,6 +170,7 @@ async def get_task( task_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> TaskResponse: + tracking.capture("skyvern-oss-agent-task-get") request["agent"] task_obj = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id) if not task_obj: @@ -229,6 +234,7 @@ async def retry_webhook( current_org: Organization = Depends(org_auth_service.get_current_org), x_api_key: Annotated[str | None, Header()] = None, ) -> TaskResponse: + tracking.capture("skyvern-oss-agent-task-retry-webhook") agent = request["agent"] task_obj = await agent.db.get_task(task_id, organization_id=current_org.organization_id) if not task_obj: @@ -262,6 +268,7 @@ async def get_task_internal( :return: List of tasks with pagination without steps populated. Steps can be populated by calling the get_agent_task endpoint. """ + tracking.capture("skyvern-oss-agent-task-get-internal") task = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id) if not task: raise HTTPException( @@ -286,6 +293,7 @@ async def get_agent_tasks( :return: List of tasks with pagination without steps populated. Steps can be populated by calling the get_agent_task endpoint. """ + tracking.capture("skyvern-oss-agent-tasks-get") request["agent"] tasks = await app.DATABASE.get_tasks(page, page_size, organization_id=current_org.organization_id) return ORJSONResponse([task.to_task_response().model_dump() for task in tasks]) @@ -306,6 +314,7 @@ async def get_agent_tasks_internal( :return: List of tasks with pagination without steps populated. Steps can be populated by calling the get_agent_task endpoint. """ + tracking.capture("skyvern-oss-agent-tasks-get-internal") request["agent"] tasks = await app.DATABASE.get_tasks(page, page_size, organization_id=current_org.organization_id) return ORJSONResponse([task.model_dump() for task in tasks]) @@ -323,6 +332,7 @@ async def get_agent_task_steps( :param task_id: :return: List of steps for a task with pagination. """ + tracking.capture("skyvern-oss-agent-task-steps-get") request["agent"] steps = await app.DATABASE.get_task_steps(task_id, organization_id=current_org.organization_id) return ORJSONResponse([step.model_dump() for step in steps]) @@ -342,6 +352,7 @@ async def get_agent_task_step_artifacts( :param step_id: :return: List of artifacts for a list of steps. """ + tracking.capture("skyvern-oss-agent-task-step-artifacts-get") request["agent"] artifacts = await app.DATABASE.get_artifacts_for_task_step( task_id, @@ -364,6 +375,7 @@ async def get_task_actions( task_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> list[ActionResultTmp]: + tracking.capture("skyvern-oss-agent-task-actions-get") request["agent"] steps = await app.DATABASE.get_task_step_models(task_id, organization_id=current_org.organization_id) results: list[ActionResultTmp] = [] @@ -385,6 +397,7 @@ async def execute_workflow( x_api_key: Annotated[str | None, Header()] = None, x_max_steps_override: Annotated[int | None, Header()] = None, ) -> RunWorkflowResponse: + tracking.capture("skyvern-oss-agent-workflow-execute") LOG.info( f"Running workflow {workflow_id}", workflow_id=workflow_id, @@ -421,6 +434,7 @@ async def get_workflow_run( workflow_run_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> WorkflowRunStatusResponse: + tracking.capture("skyvern-oss-agent-workflow-run-get") request["agent"] return await app.WORKFLOW_SERVICE.build_workflow_run_status_response( workflow_id=workflow_id, workflow_run_id=workflow_run_id, organization_id=current_org.organization_id diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 0b0971c0..9c5fc218 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -6,6 +6,7 @@ from datetime import datetime import requests import structlog +from scripts import tracking from skyvern.exceptions import ( FailedToSendWebhook, MissingValueForParameter, @@ -374,6 +375,7 @@ class WorkflowService: api_key: str | None = None, close_browser_on_completion: bool = True, ) -> None: + tracking.capture("skyvern-oss-agent-workflow-status", {"status": workflow_run.status}) browser_state = await app.BROWSER_MANAGER.cleanup_for_workflow_run( workflow_run.workflow_run_id, close_browser_on_completion )