Enrich Laminar data (#3650)

This commit is contained in:
pedrohsdb
2025-10-08 14:58:50 -07:00
committed by GitHub
parent f8e76162d0
commit 89931b50ca
7 changed files with 206 additions and 0 deletions

View File

@@ -83,6 +83,7 @@ from skyvern.forge.sdk.schemas.files import FileInfo
from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus
from skyvern.forge.sdk.trace import TraceManager from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus
@@ -312,6 +313,11 @@ class ForgeAgent:
close_browser_on_completion and browser_session_id is None and not task.browser_address close_browser_on_completion and browser_session_id is None and not task.browser_address
) )
# Collect and add experiment metadata to the trace
experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER)
if experiment_metadata:
TraceManager.add_experiment_metadata(experiment_metadata)
workflow_run: WorkflowRun | None = None workflow_run: WorkflowRun | None = None
if task.workflow_run_id: if task.workflow_run_id:
workflow_run = await app.DATABASE.get_workflow_run( workflow_run = await app.DATABASE.get_workflow_run(
@@ -2471,6 +2477,9 @@ class ForgeAgent:
# log the task status as an event # log the task status as an event
analytics.capture("skyvern-oss-agent-task-status", {"status": task.status}) analytics.capture("skyvern-oss-agent-task-status", {"status": task.status})
# Add task completion tag to Laminar trace
TraceManager.add_task_completion_tag(task.status.value)
if need_final_screenshot: if need_final_screenshot:
# Take one last screenshot and create an artifact before closing the browser to see the final state # Take one last screenshot and create an artifact before closing the browser to see the final state
# We don't need the artifacts and send the webhook response directly only when there is an issue with the browser # We don't need the artifacts and send the webhook response directly only when there is an issue with the browser

View File

@@ -90,3 +90,13 @@ class TraceManager:
@staticmethod @staticmethod
def set_trace_provider(trace_provider: BaseTrace) -> None: def set_trace_provider(trace_provider: BaseTrace) -> None:
TraceManager.__instance = trace_provider TraceManager.__instance = trace_provider
@staticmethod
def add_task_completion_tag(status: str) -> None:
"""Add a completion tag to the current trace based on task/workflow status."""
TraceManager.__instance.add_task_completion_tag(status)
@staticmethod
def add_experiment_metadata(experiment_data: dict[str, Any]) -> None:
"""Add experiment metadata to the current trace."""
TraceManager.__instance.add_experiment_metadata(experiment_data)

View File

@@ -26,6 +26,12 @@ class BaseTrace(ABC):
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
pass pass
def add_task_completion_tag(self, status: str) -> None:
"""Add a completion tag to the current trace based on task/workflow status."""
def add_experiment_metadata(self, experiment_data: dict[str, Any]) -> None:
"""Add experiment metadata to the current trace."""
class NoOpTrace(BaseTrace): class NoOpTrace(BaseTrace):
def traced( def traced(

View File

@@ -0,0 +1,97 @@
"""Utilities for collecting and formatting experiment data for Laminar tracing."""
from typing import TYPE_CHECKING, Any
import structlog
from skyvern.forge.sdk.core import skyvern_context
if TYPE_CHECKING:
from skyvern.forge.sdk.experimentation.providers import BaseExperimentationProvider
LOG = structlog.get_logger()
async def collect_experiment_metadata_safely(
experimentation_provider: "BaseExperimentationProvider",
) -> dict[str, Any]:
"""
Safely collect experiment-related metadata from the current context.
This is a safe wrapper around collect_experiment_metadata() that ensures
any exceptions are caught and handled gracefully.
Args:
experimentation_provider: The experimentation provider to use for fetching experiment data.
Returns:
Dictionary containing experiment data, or empty dict if collection fails.
"""
try:
return await collect_experiment_metadata(experimentation_provider)
except Exception:
LOG.warning("Failed to collect experiment metadata", exc_info=True)
return {}
async def collect_experiment_metadata(
experimentation_provider: "BaseExperimentationProvider",
) -> dict[str, Any]:
"""
Collect experiment-related metadata from the current context.
Args:
experimentation_provider: The experimentation provider to use for fetching experiment data.
Returns:
Dictionary containing experiment data that can be added to Laminar traces.
"""
# Get the current context
context = skyvern_context.current()
if not context or not context.run_id:
return {}
# Use run_id as the distinct_id for experiments
distinct_id = context.run_id
organization_id = context.organization_id
if not distinct_id or not organization_id:
return {}
experiment_metadata: dict[str, Any] = {}
try:
# Only collect critical experiment flags that are relevant for tracing
experiment_flags = [
"LLM_NAME",
"LLM_SECONDARY_NAME",
# Add more experiment flags as needed
"PROMPT_CACHING_ENABLED",
"THINKING_BUDGET_OPTIMIZATION",
]
for flag in experiment_flags:
try:
# Get the experiment value (already cached by experimentation provider)
value = experimentation_provider.get_value_cached(
flag, distinct_id, properties={"organization_id": organization_id}
)
# Get the payload if available (already cached by experimentation provider)
payload = experimentation_provider.get_payload_cached(
flag, distinct_id, properties={"organization_id": organization_id}
)
# Only include if we have actual experiment data
if value is not None or payload is not None:
experiment_metadata[f"experiment_{flag}"] = {"value": value, "payload": payload}
except Exception:
# Silently skip failed experiments
continue
except Exception:
# Silently fail if experimentation provider is not available
pass
return experiment_metadata

View File

@@ -31,3 +31,35 @@ class LaminarTrace(BaseTrace):
**kwargs: Any, **kwargs: Any,
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
return observe(name=name, ignore_output=True, metadata=metadata, tags=tags, **kwargs) return observe(name=name, ignore_output=True, metadata=metadata, tags=tags, **kwargs)
def add_task_completion_tag(self, status: str) -> None:
"""Add a completion tag to the current trace based on task/workflow status."""
try:
# Get the current trace ID
trace_id = Laminar.get_trace_id()
if trace_id is None:
return
# Map status to appropriate tag
status_tag_map = {
"completed": "COMPLETION",
"failed": "FAILURE",
"timed_out": "TIMEOUT",
"canceled": "CANCELED",
"terminated": "TERMINATED",
}
tag = status_tag_map.get(status, "FAILURE")
Laminar.set_span_tags([tag])
except Exception:
# Silently fail if tracing is not available or there's an error
pass
def add_experiment_metadata(self, experiment_data: dict[str, Any]) -> None:
"""Add experiment metadata to the current trace."""
try:
# Add experiment metadata to the current trace
Laminar.set_trace_metadata(experiment_data)
except Exception:
# Silently fail if tracing is not available or there's an error
pass

View File

@@ -38,6 +38,7 @@ from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunTimeline, WorkflowRunTimelineType from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunTimeline, WorkflowRunTimelineType
from skyvern.forge.sdk.trace import TraceManager from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely
from skyvern.forge.sdk.workflow.exceptions import ( from skyvern.forge.sdk.workflow.exceptions import (
ContextParameterSourceNotDefined, ContextParameterSourceNotDefined,
InvalidWaitBlockTime, InvalidWaitBlockTime,
@@ -339,6 +340,12 @@ class WorkflowService:
) -> WorkflowRun: ) -> WorkflowRun:
"""Execute a workflow.""" """Execute a workflow."""
organization_id = organization.organization_id organization_id = organization.organization_id
# Collect and add experiment metadata to the trace
experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER)
if experiment_metadata:
TraceManager.add_experiment_metadata(experiment_metadata)
LOG.info( LOG.info(
"Executing workflow", "Executing workflow",
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
@@ -1147,6 +1154,10 @@ class WorkflowService:
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_status="completed", workflow_status="completed",
) )
# Add workflow completion tag to Laminar trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.completed)
return await self._update_workflow_run_status( return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.completed, status=WorkflowRunStatus.completed,
@@ -1165,6 +1176,10 @@ class WorkflowService:
workflow_status="failed", workflow_status="failed",
failure_reason=failure_reason, failure_reason=failure_reason,
) )
# Add workflow failure tag to Laminar trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.failed)
return await self._update_workflow_run_status( return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.failed, status=WorkflowRunStatus.failed,
@@ -1197,6 +1212,10 @@ class WorkflowService:
workflow_status="terminated", workflow_status="terminated",
failure_reason=failure_reason, failure_reason=failure_reason,
) )
# Add workflow terminated tag to Laminar trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.terminated)
return await self._update_workflow_run_status( return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.terminated, status=WorkflowRunStatus.terminated,
@@ -1210,6 +1229,10 @@ class WorkflowService:
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_status="canceled", workflow_status="canceled",
) )
# Add workflow canceled tag to Laminar trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.canceled)
return await self._update_workflow_run_status( return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.canceled, status=WorkflowRunStatus.canceled,
@@ -1227,6 +1250,10 @@ class WorkflowService:
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_status="timed_out", workflow_status="timed_out",
) )
# Add workflow timed out tag to Laminar trace
TraceManager.add_task_completion_tag(WorkflowRunStatus.timed_out)
return await self._update_workflow_run_status( return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.timed_out, status=WorkflowRunStatus.timed_out,

View File

@@ -29,6 +29,7 @@ from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Metadata, TaskV2Status, ThoughtScenario, ThoughtType from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Metadata, TaskV2Status, ThoughtScenario, ThoughtType
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunTimeline, WorkflowRunTimelineType from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunTimeline, WorkflowRunTimelineType
from skyvern.forge.sdk.trace import TraceManager from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely
from skyvern.forge.sdk.workflow.models.block import ( from skyvern.forge.sdk.workflow.models.block import (
BlockTypeVar, BlockTypeVar,
ExtractionBlock, ExtractionBlock,
@@ -326,6 +327,11 @@ async def run_task_v2(
LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id) LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id)
raise TaskV2NotFound(task_v2_id=task_v2_id) raise TaskV2NotFound(task_v2_id=task_v2_id)
# Collect and add experiment metadata to the trace
experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER)
if experiment_metadata:
TraceManager.add_experiment_metadata(experiment_metadata)
workflow, workflow_run = None, None workflow, workflow_run = None, None
try: try:
workflow, workflow_run, task_v2 = await run_task_v2_helper( workflow, workflow_run, task_v2 = await run_task_v2_helper(
@@ -1459,6 +1465,10 @@ async def mark_task_v2_as_failed(
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed" workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed"
) )
# Add task failure tag to Laminar trace
TraceManager.add_task_completion_tag("failed")
await send_task_v2_webhook(task_v2) await send_task_v2_webhook(task_v2)
return task_v2 return task_v2
@@ -1480,6 +1490,9 @@ async def mark_task_v2_as_completed(
if workflow_run_id: if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
# Add task completion tag to Laminar trace
TraceManager.add_task_completion_tag("completed")
await send_task_v2_webhook(task_v2) await send_task_v2_webhook(task_v2)
return task_v2 return task_v2
@@ -1496,6 +1509,10 @@ async def mark_task_v2_as_canceled(
) )
if workflow_run_id: if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id) await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id)
# Add task canceled tag to Laminar trace
TraceManager.add_task_completion_tag("canceled")
await send_task_v2_webhook(task_v2) await send_task_v2_webhook(task_v2)
return task_v2 return task_v2
@@ -1513,6 +1530,10 @@ async def mark_task_v2_as_terminated(
) )
if workflow_run_id: if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(workflow_run_id, failure_reason) await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(workflow_run_id, failure_reason)
# Add task terminated tag to Laminar trace
TraceManager.add_task_completion_tag("terminated")
await send_task_v2_webhook(task_v2) await send_task_v2_webhook(task_v2)
return task_v2 return task_v2
@@ -1530,6 +1551,10 @@ async def mark_task_v2_as_timed_out(
) )
if workflow_run_id: if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_timed_out(workflow_run_id, failure_reason) await app.WORKFLOW_SERVICE.mark_workflow_run_as_timed_out(workflow_run_id, failure_reason)
# Add task timed out tag to Laminar trace
TraceManager.add_task_completion_tag("timed_out")
await send_task_v2_webhook(task_v2) await send_task_v2_webhook(task_v2)
return task_v2 return task_v2