OTEL backend for metrics/traces/logs (#4632)

Co-authored-by: Benji Visser <benji@093b.org>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Shuchang Zheng
2026-02-04 17:54:07 -08:00
committed by GitHub
parent be00c65071
commit 566a108d5d
19 changed files with 2935 additions and 2807 deletions

View File

@@ -16,6 +16,7 @@ from typing import Any, Tuple, cast
import httpx
import structlog
from openai.types.responses.response import Response as OpenAIResponse
from opentelemetry import trace as otel_trace
from playwright._impl._errors import TargetClosedError
from playwright.async_api import Page
@@ -91,7 +92,7 @@ from skyvern.forge.sdk.schemas.files import FileInfo
from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus
from skyvern.forge.sdk.schemas.totp_codes import OTPType
from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.trace import traced
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.models.block import (
ActionBlock,
@@ -305,9 +306,7 @@ class ForgeAgent:
operations = await app.AGENT_FUNCTION.generate_async_operations(organization, task, page)
self.async_operation_pool.add_operations(task.task_id, operations)
@TraceManager.traced_async(
ignore_inputs=["api_key", "close_browser_on_completion", "task_block", "cua_response", "llm_caller"]
)
@traced()
async def execute_step(
self,
organization: Organization,
@@ -895,9 +894,7 @@ class ForgeAgent:
)
return True
@TraceManager.traced_async(
ignore_inputs=["browser_state", "organization", "task_block", "cua_response", "llm_caller"]
)
@traced()
async def agent_step(
self,
task: Task,
@@ -3284,7 +3281,7 @@ class ForgeAgent:
analytics.capture("skyvern-oss-agent-task-status", {"status": task.status})
# Add task completion tag to trace
TraceManager.add_task_completion_tag(task.status.value)
otel_trace.get_current_span().set_attribute("task.completion_status", task.status.value)
if need_final_screenshot:
# Take one last screenshot and create an artifact before closing the browser to see the final state
# We don't need the artifacts and send the webhook response directly only when there is an issue with the browser

View File

@@ -18,7 +18,7 @@ from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.models import Step, StepStatus
from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus
from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.trace import traced
from skyvern.forge.sdk.workflow.models.block import BlockTypeVar
from skyvern.services import workflow_script_service
from skyvern.webeye.actions.action_types import POST_ACTION_EXECUTION_ACTION_TYPES
@@ -506,7 +506,7 @@ class AgentFunction:
) -> CleanupElementTreeFunc:
MAX_ELEMENT_CNT = settings.SVG_MAX_PARSING_ELEMENT_CNT
@TraceManager.traced_async(ignore_input=True)
@traced()
async def cleanup_element_tree_func(frame: Page | Frame, url: str, element_tree: list[dict]) -> list[dict]:
"""
Remove rect and attribute.unique_id from the elements.

View File

@@ -24,6 +24,11 @@ from skyvern.forge.sdk.db.exceptions import NotFoundError
from skyvern.forge.sdk.routes import internal_auth
from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router, legacy_v2_router
try:
from cloud.observability.otel_setup import OTELSetup
except ImportError:
OTELSetup = None # type: ignore[assignment,misc]
LOG = structlog.get_logger()
@@ -53,14 +58,14 @@ def custom_openapi(app: FastAPI) -> dict:
@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncGenerator[None, Any]:
async def lifespan(fastapi_app: FastAPI) -> AsyncGenerator[None, Any]:
"""Lifespan context manager for FastAPI app startup and shutdown."""
LOG.info("Server started")
if forge_app.api_app_startup_event:
LOG.info("Calling api app startup event")
try:
await forge_app.api_app_startup_event()
await forge_app.api_app_startup_event(fastapi_app)
except Exception:
LOG.exception("Failed to execute api app startup event")
yield
@@ -77,6 +82,17 @@ def create_api_app() -> FastAPI:
"""
Start the agent server.
"""
# CRITICAL: Initialize OTEL FIRST, before any other code runs
# This must happen before start_forge_app() because that function
# creates database connections. If we don't instrument the libraries
# first, the DB spans won't be children of the HTTP request spans.
if settings.OTEL_ENABLED and OTELSetup is not None:
try:
otel = OTELSetup.get_instance()
otel.initialize_tracer_provider()
LOG.info("OTEL tracer provider initialized before forge app creation")
except Exception as e:
LOG.warning("Failed to initialize OTEL tracer provider early", error=str(e))
forge_app_instance = start_forge_app()

View File

@@ -84,7 +84,7 @@ class ForgeApp:
authentication_function: Callable[[str], Awaitable[Organization]] | None
authenticate_user_function: Callable[[str], Awaitable[str | None]] | None
setup_api_app: Callable[[FastAPI], None] | None
api_app_startup_event: Callable[[], Awaitable[None]] | None
api_app_startup_event: Callable[[FastAPI], Awaitable[None]] | None
api_app_shutdown_event: Callable[[], Awaitable[None]] | None
agent: ForgeAgent

View File

@@ -43,7 +43,7 @@ from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
from skyvern.forge.sdk.models import SpeculativeLLMMetadata, Step
from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought
from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.trace import traced
from skyvern.utils.image_resizer import Resolution, get_resize_target_dimension, resize_screenshots
LOG = structlog.get_logger()
@@ -410,7 +410,7 @@ class LLMAPIHandlerFactory:
)
main_model_group = llm_config.main_model_group
@TraceManager.traced_async(tags=[llm_key], ignore_inputs=["prompt", "screenshots", "parameters"])
@traced(tags=[llm_key])
async def llm_api_handler_with_router_and_fallback(
prompt: str,
prompt_name: str,
@@ -843,7 +843,7 @@ class LLMAPIHandlerFactory:
assert isinstance(llm_config, LLMConfig)
@TraceManager.traced_async(tags=[llm_key], ignore_inputs=["prompt", "screenshots", "parameters"])
@traced(tags=[llm_key])
async def llm_api_handler(
prompt: str,
prompt_name: str,
@@ -1569,7 +1569,7 @@ class LLMCaller:
return get_resize_target_dimension(window_dimension)
return self.screenshot_resize_target_dimension
@TraceManager.traced_async(ignore_input=True)
@traced()
async def _dispatch_llm_call(
self,
messages: list[dict[str, Any]],

View File

@@ -305,19 +305,21 @@ def setup_logger() -> None:
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(LOG_LEVEL_VAL),
logger_factory=structlog.stdlib.LoggerFactory(),
processors=[
structlog.processors.add_log_level,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
add_error_processor,
structlog.processors.format_exc_info,
]
+ additional_processors
+ [skyvern_logs_processor, renderer],
+ [skyvern_logs_processor, structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
)
handler = logging.StreamHandler()
handler.setFormatter(
structlog.stdlib.ProcessorFormatter(
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),

View File

@@ -18,6 +18,7 @@ from fastapi import (
)
from fastapi import status as http_status
from fastapi.responses import ORJSONResponse
from opentelemetry import trace
from pydantic import ValidationError
from skyvern import analytics
@@ -214,6 +215,10 @@ async def run_task(
request=request,
background_tasks=background_tasks,
)
if settings.OTEL_ENABLED:
span = trace.get_current_span()
if span and task_v1_response.task_id:
span.set_attribute("task_id", task_v1_response.task_id)
run_type = RunType.task_v1
if run_request.engine == RunEngine.openai_cua:
run_type = RunType.openai_cua
@@ -273,6 +278,13 @@ async def run_task(
raise HTTPException(
status_code=500, detail="Skyvern LLM failure to initialize task v2. Please try again later."
)
if settings.OTEL_ENABLED:
span = trace.get_current_span()
if span:
if task_v2.observer_cruise_id:
span.set_attribute("task_v2_id", task_v2.observer_cruise_id)
if task_v2.workflow_run_id:
span.set_attribute("workflow_run_id", task_v2.workflow_run_id)
await AsyncExecutorFactory.get_executor().execute_task_v2(
request=request,
background_tasks=background_tasks,
@@ -385,6 +397,14 @@ async def run_workflow(
except MissingBrowserAddressError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
if settings.OTEL_ENABLED:
span = trace.get_current_span()
if span:
if workflow_run.workflow_run_id:
span.set_attribute("workflow_run_id", workflow_run.workflow_run_id)
if workflow_run.workflow_id:
span.set_attribute("workflow_id", workflow_run.workflow_id)
# Hydrate workflow title from workflow_run.workflow_id
workflow = await app.WORKFLOW_SERVICE.get_workflow(
workflow_id=workflow_run.workflow_id,

View File

@@ -8,6 +8,7 @@ from cachetools import TTLCache
from fastapi import Header, HTTPException, status
from jose import jwt
from jose.exceptions import JWTError
from opentelemetry import trace
from pydantic import ValidationError
from skyvern.config import settings
@@ -17,13 +18,6 @@ from skyvern.forge.sdk.db.agent_db import AgentDB
from skyvern.forge.sdk.models import TokenPayload
from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAuthToken, OrganizationAuthTokenType
try:
from ddtrace import tracer
_DDTRACE_AVAILABLE = True
except ImportError:
_DDTRACE_AVAILABLE = False
LOG = structlog.get_logger()
AUTHENTICATION_TTL = 60 * 60 # one hour
@@ -66,12 +60,16 @@ async def get_current_org(
curr_ctx.organization_id = organization.organization_id
curr_ctx.organization_name = organization.organization_name
if _DDTRACE_AVAILABLE:
span = tracer.current_span()
if span:
span.set_tag("organization_id", organization.organization_id)
if organization.organization_name:
span.set_tag("organization_name", organization.organization_name)
# Set organization info on OTEL span for tracing
if settings.OTEL_ENABLED:
try:
span = trace.get_current_span()
if span:
span.set_attribute("organization_id", organization.organization_id)
if organization.organization_name:
span.set_attribute("organization_name", organization.organization_name)
except Exception:
pass # Silently ignore OTEL errors
except Exception:
pass

View File

@@ -1,109 +1,50 @@
import asyncio
from functools import wraps
from typing import Any, Awaitable, Callable, ParamSpec, TypeVar
from typing import Any, Callable
from skyvern.forge import app
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.forge.sdk.trace.base import BaseTrace, NoOpTrace
from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely
P = ParamSpec("P")
R = TypeVar("R")
from opentelemetry import trace
class TraceManager:
__instance: BaseTrace = NoOpTrace()
def traced(name: str | None = None, tags: list[str] | None = None) -> Callable:
"""Decorator that creates an OTEL span. No-op without SDK installed.
Args:
name: Span name. If not provided, uses func.__qualname__.
tags: Tags to add as a span attribute.
"""
def decorator(func: Callable) -> Callable:
span_name = name or func.__qualname__
if asyncio.iscoroutinefunction(func):
@staticmethod
def traced_async(
*,
name: str | None = None,
metadata: dict[str, Any] | None = None,
tags: list[str] | None = None,
**trace_parameters: Any,
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
new_metadata: dict[str, Any] = metadata or {}
user_id: str | None = None
context = skyvern_context.current()
if context is not None:
new_metadata["request_id"] = context.request_id
new_metadata["organization_id"] = context.organization_id
new_metadata["task_id"] = context.task_id
new_metadata["workflow_id"] = context.workflow_id
new_metadata["workflow_run_id"] = context.workflow_run_id
new_metadata["task_v2_id"] = context.task_v2_id
new_metadata["run_id"] = context.run_id
new_metadata["organization_name"] = context.organization_name
user_id = context.run_id
async def async_wrapper(*args: Any, **kw: Any) -> Any:
with trace.get_tracer("skyvern").start_as_current_span(span_name) as span:
if tags:
span.set_attribute("tags", tags)
try:
return await func(*args, **kw)
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
raise
# Collect experiment metadata and include it in the span metadata
experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER)
if experiment_metadata:
new_metadata.update(experiment_metadata)
return async_wrapper
else:
new_tags: list[str] = tags or []
new_tags.append(SettingsManager.get_settings().ENV)
@wraps(func)
def sync_wrapper(*args: Any, **kw: Any) -> Any:
with trace.get_tracer("skyvern").start_as_current_span(span_name) as span:
if tags:
span.set_attribute("tags", tags)
try:
return func(*args, **kw)
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
raise
return await TraceManager.__instance.traced_async(
name=name, metadata=new_metadata, tags=new_tags, user_id=user_id, **trace_parameters
)(func)(*args, **kwargs)
return sync_wrapper
return wrapper
return decorator
@staticmethod
def traced(
*,
name: str | None = None,
metadata: dict[str, Any] | None = None,
tags: list[str] | None = None,
**trace_parameters: Any,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(func: Callable[P, R]) -> Callable[P, R]:
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
new_metadata: dict[str, Any] = metadata or {}
user_id: str | None = None
context = skyvern_context.current()
if context is not None:
new_metadata["request_id"] = context.request_id
new_metadata["organization_id"] = context.organization_id
new_metadata["task_id"] = context.task_id
new_metadata["workflow_id"] = context.workflow_id
new_metadata["workflow_run_id"] = context.workflow_run_id
new_metadata["task_v2_id"] = context.task_v2_id
new_metadata["run_id"] = context.run_id
new_metadata["organization_name"] = context.organization_name
user_id = context.run_id
new_tags: list[str] = tags or []
new_tags.append(SettingsManager.get_settings().ENV)
return TraceManager.__instance.traced(
name=name, metadata=new_metadata, tags=new_tags, user_id=user_id, **trace_parameters
)(func)(*args, **kwargs)
return wrapper
return decorator
@staticmethod
def get_trace_provider() -> BaseTrace:
return TraceManager.__instance
@staticmethod
def set_trace_provider(trace_provider: BaseTrace) -> None:
TraceManager.__instance = trace_provider
@staticmethod
def add_task_completion_tag(status: str) -> None:
"""Add a completion tag to the current trace based on task/workflow status."""
TraceManager.__instance.add_task_completion_tag(status)
@staticmethod
def add_experiment_metadata(experiment_data: dict[str, Any]) -> None:
"""Add experiment metadata to the current trace."""
TraceManager.__instance.add_experiment_metadata(experiment_data)
return decorator

View File

@@ -74,7 +74,7 @@ from skyvern.forge.sdk.schemas.task_v2 import TaskV2Status
from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus
from skyvern.forge.sdk.services.bitwarden import BitwardenConstants
from skyvern.forge.sdk.services.credentials import AzureVaultConstants, OnePasswordConstants
from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.trace import traced
from skyvern.forge.sdk.utils.pdf_parser import extract_pdf_file, validate_pdf_file
from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext
from skyvern.forge.sdk.workflow.exceptions import (
@@ -476,7 +476,7 @@ class Block(BaseModel, abc.ABC):
organization_id=organization_id,
)
@TraceManager.traced_async(ignore_inputs=["kwargs"])
@traced()
async def execute_safe(
self,
workflow_run_id: str,

View File

@@ -24,6 +24,8 @@ except ImportError:
pass
from opentelemetry import trace as otel_trace
import skyvern
from skyvern import analytics
from skyvern.client.types.output_parameter import OutputParameter as BlockOutputParameter
@@ -57,7 +59,7 @@ from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession
from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunTimeline, WorkflowRunTimelineType
from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.trace import traced
from skyvern.forge.sdk.workflow.exceptions import (
InvalidWorkflowDefinition,
WorkflowVersionConflict,
@@ -670,7 +672,7 @@ class WorkflowService:
return None
@TraceManager.traced_async(ignore_inputs=["organization", "api_key"])
@traced()
async def execute_workflow(
self,
workflow_run_id: str,
@@ -2307,7 +2309,7 @@ class WorkflowService:
)
# Add workflow completion tag to trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.completed)
otel_trace.get_current_span().set_attribute("task.completion_status", WorkflowRunStatus.completed)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
@@ -2358,7 +2360,7 @@ class WorkflowService:
)
# Add workflow failure tag to trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.failed)
otel_trace.get_current_span().set_attribute("task.completion_status", WorkflowRunStatus.failed)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
@@ -2402,7 +2404,7 @@ class WorkflowService:
)
# Add workflow terminated tag to trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.terminated)
otel_trace.get_current_span().set_attribute("task.completion_status", WorkflowRunStatus.terminated)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
@@ -2419,7 +2421,7 @@ class WorkflowService:
)
# Add workflow canceled tag to trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.canceled)
otel_trace.get_current_span().set_attribute("task.completion_status", WorkflowRunStatus.canceled)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
@@ -2440,7 +2442,7 @@ class WorkflowService:
)
# Add workflow timed out tag to trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.timed_out)
otel_trace.get_current_span().set_attribute("task.completion_status", WorkflowRunStatus.timed_out)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,