diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 393a506b..31fc3b6a 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -83,6 +83,7 @@ from skyvern.forge.sdk.schemas.files import FileInfo from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus from skyvern.forge.sdk.trace import TraceManager +from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus @@ -312,6 +313,11 @@ class ForgeAgent: close_browser_on_completion and browser_session_id is None and not task.browser_address ) + # Collect and add experiment metadata to the trace + experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER) + if experiment_metadata: + TraceManager.add_experiment_metadata(experiment_metadata) + workflow_run: WorkflowRun | None = None if task.workflow_run_id: workflow_run = await app.DATABASE.get_workflow_run( @@ -2471,6 +2477,9 @@ class ForgeAgent: # log the task status as an event analytics.capture("skyvern-oss-agent-task-status", {"status": task.status}) + + # Add task completion tag to Laminar trace + TraceManager.add_task_completion_tag(task.status.value) if need_final_screenshot: # Take one last screenshot and create an artifact before closing the browser to see the final state # We don't need the artifacts and send the webhook response directly only when there is an issue with the browser diff --git a/skyvern/forge/sdk/trace/__init__.py b/skyvern/forge/sdk/trace/__init__.py index a618ba33..8efdd7e0 100644 --- a/skyvern/forge/sdk/trace/__init__.py +++ b/skyvern/forge/sdk/trace/__init__.py @@ -90,3 +90,13 @@ class TraceManager: @staticmethod def set_trace_provider(trace_provider: BaseTrace) -> None: TraceManager.__instance = trace_provider + + @staticmethod + def add_task_completion_tag(status: str) -> None: + """Add a completion tag to the current trace based on task/workflow status.""" + TraceManager.__instance.add_task_completion_tag(status) + + @staticmethod + def add_experiment_metadata(experiment_data: dict[str, Any]) -> None: + """Add experiment metadata to the current trace.""" + TraceManager.__instance.add_experiment_metadata(experiment_data) diff --git a/skyvern/forge/sdk/trace/base.py b/skyvern/forge/sdk/trace/base.py index e7108530..0af8f8a0 100644 --- a/skyvern/forge/sdk/trace/base.py +++ b/skyvern/forge/sdk/trace/base.py @@ -26,6 +26,12 @@ class BaseTrace(ABC): ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: pass + def add_task_completion_tag(self, status: str) -> None: + """Add a completion tag to the current trace based on task/workflow status.""" + + def add_experiment_metadata(self, experiment_data: dict[str, Any]) -> None: + """Add experiment metadata to the current trace.""" + class NoOpTrace(BaseTrace): def traced( diff --git a/skyvern/forge/sdk/trace/experiment_utils.py b/skyvern/forge/sdk/trace/experiment_utils.py new file mode 100644 index 00000000..690ceaca --- /dev/null +++ b/skyvern/forge/sdk/trace/experiment_utils.py @@ -0,0 +1,97 @@ +"""Utilities for collecting and formatting experiment data for Laminar tracing.""" + +from typing import TYPE_CHECKING, Any + +import structlog + +from skyvern.forge.sdk.core import skyvern_context + +if TYPE_CHECKING: + from skyvern.forge.sdk.experimentation.providers import BaseExperimentationProvider + +LOG = structlog.get_logger() + + +async def collect_experiment_metadata_safely( + experimentation_provider: "BaseExperimentationProvider", +) -> dict[str, Any]: + """ + Safely collect experiment-related metadata from the current context. + + This is a safe wrapper around collect_experiment_metadata() that ensures + any exceptions are caught and handled gracefully. + + Args: + experimentation_provider: The experimentation provider to use for fetching experiment data. + + Returns: + Dictionary containing experiment data, or empty dict if collection fails. + """ + try: + return await collect_experiment_metadata(experimentation_provider) + except Exception: + LOG.warning("Failed to collect experiment metadata", exc_info=True) + return {} + + +async def collect_experiment_metadata( + experimentation_provider: "BaseExperimentationProvider", +) -> dict[str, Any]: + """ + Collect experiment-related metadata from the current context. + + Args: + experimentation_provider: The experimentation provider to use for fetching experiment data. + + Returns: + Dictionary containing experiment data that can be added to Laminar traces. + """ + # Get the current context + context = skyvern_context.current() + if not context or not context.run_id: + return {} + + # Use run_id as the distinct_id for experiments + distinct_id = context.run_id + organization_id = context.organization_id + + if not distinct_id or not organization_id: + return {} + + experiment_metadata: dict[str, Any] = {} + + try: + # Only collect critical experiment flags that are relevant for tracing + experiment_flags = [ + "LLM_NAME", + "LLM_SECONDARY_NAME", + # Add more experiment flags as needed + "PROMPT_CACHING_ENABLED", + "THINKING_BUDGET_OPTIMIZATION", + ] + + for flag in experiment_flags: + try: + # Get the experiment value (already cached by experimentation provider) + value = experimentation_provider.get_value_cached( + flag, distinct_id, properties={"organization_id": organization_id} + ) + + # Get the payload if available (already cached by experimentation provider) + payload = experimentation_provider.get_payload_cached( + flag, distinct_id, properties={"organization_id": organization_id} + ) + + # Only include if we have actual experiment data + if value is not None or payload is not None: + experiment_metadata[f"experiment_{flag}"] = {"value": value, "payload": payload} + + except Exception: + # Silently skip failed experiments + continue + + except Exception: + # Silently fail if experimentation provider is not available + pass + + return experiment_metadata diff --git a/skyvern/forge/sdk/trace/lmnr.py b/skyvern/forge/sdk/trace/lmnr.py index 4dbba0b4..4e142eb4 100644 --- a/skyvern/forge/sdk/trace/lmnr.py +++ b/skyvern/forge/sdk/trace/lmnr.py @@ -31,3 +31,35 @@ class LaminarTrace(BaseTrace): **kwargs: Any, ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: return observe(name=name, ignore_output=True, metadata=metadata, tags=tags, **kwargs) + + def add_task_completion_tag(self, status: str) -> None: + """Add a completion tag to the current trace based on task/workflow status.""" + try: + # Get the current trace ID + trace_id = Laminar.get_trace_id() + if trace_id is None: + return + + # Map status to appropriate tag + status_tag_map = { + "completed": "COMPLETION", + "failed": "FAILURE", + "timed_out": "TIMEOUT", + "canceled": "CANCELED", + "terminated": "TERMINATED", + } + + tag = status_tag_map.get(status, "FAILURE") + Laminar.set_span_tags([tag]) + except Exception: + # Silently fail if tracing is not available or there's an error + pass + + def add_experiment_metadata(self, experiment_data: dict[str, Any]) -> None: + """Add experiment metadata to the current trace.""" + try: + # Add experiment metadata to the current trace + Laminar.set_trace_metadata(experiment_data) + except Exception: + # Silently fail if tracing is not available or there's an error + pass diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index e93fd9c8..11c03171 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -38,6 +38,7 @@ from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunTimeline, WorkflowRunTimelineType from skyvern.forge.sdk.trace import TraceManager +from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely from skyvern.forge.sdk.workflow.exceptions import ( ContextParameterSourceNotDefined, InvalidWaitBlockTime, @@ -339,6 +340,12 @@ class WorkflowService: ) -> WorkflowRun: """Execute a workflow.""" organization_id = organization.organization_id + + # Collect and add experiment metadata to the trace + experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER) + if experiment_metadata: + TraceManager.add_experiment_metadata(experiment_metadata) + LOG.info( "Executing workflow", workflow_run_id=workflow_run_id, @@ -1147,6 +1154,10 @@ class WorkflowService: workflow_run_id=workflow_run_id, workflow_status="completed", ) + + # Add workflow completion tag to Laminar trace + TraceManager.add_task_completion_tag(WorkflowRunStatus.completed) + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.completed, @@ -1165,6 +1176,10 @@ class WorkflowService: workflow_status="failed", failure_reason=failure_reason, ) + + # Add workflow failure tag to Laminar trace + TraceManager.add_task_completion_tag(WorkflowRunStatus.failed) + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.failed, @@ -1197,6 +1212,10 @@ class WorkflowService: workflow_status="terminated", failure_reason=failure_reason, ) + + # Add workflow terminated tag to Laminar trace + TraceManager.add_task_completion_tag(WorkflowRunStatus.terminated) + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.terminated, @@ -1210,6 +1229,10 @@ class WorkflowService: workflow_run_id=workflow_run_id, workflow_status="canceled", ) + + # Add workflow canceled tag to Laminar trace + TraceManager.add_task_completion_tag(WorkflowRunStatus.canceled) + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.canceled, @@ -1227,6 +1250,10 @@ class WorkflowService: workflow_run_id=workflow_run_id, workflow_status="timed_out", ) + + # Add workflow timed out tag to Laminar trace + TraceManager.add_task_completion_tag(WorkflowRunStatus.timed_out) + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.timed_out, diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 6c6fe03b..3311036b 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -29,6 +29,7 @@ from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Metadata, TaskV2Status, ThoughtScenario, ThoughtType from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunTimeline, WorkflowRunTimelineType from skyvern.forge.sdk.trace import TraceManager +from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely from skyvern.forge.sdk.workflow.models.block import ( BlockTypeVar, ExtractionBlock, @@ -326,6 +327,11 @@ async def run_task_v2( LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id) raise TaskV2NotFound(task_v2_id=task_v2_id) + # Collect and add experiment metadata to the trace + experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER) + if experiment_metadata: + TraceManager.add_experiment_metadata(experiment_metadata) + workflow, workflow_run = None, None try: workflow, workflow_run, task_v2 = await run_task_v2_helper( @@ -1459,6 +1465,10 @@ async def mark_task_v2_as_failed( await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed" ) + + # Add task failure tag to Laminar trace + TraceManager.add_task_completion_tag("failed") + await send_task_v2_webhook(task_v2) return task_v2 @@ -1480,6 +1490,9 @@ async def mark_task_v2_as_completed( if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) + # Add task completion tag to Laminar trace + TraceManager.add_task_completion_tag("completed") + await send_task_v2_webhook(task_v2) return task_v2 @@ -1496,6 +1509,10 @@ async def mark_task_v2_as_canceled( ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id) + + # Add task canceled tag to Laminar trace + TraceManager.add_task_completion_tag("canceled") + await send_task_v2_webhook(task_v2) return task_v2 @@ -1513,6 +1530,10 @@ async def mark_task_v2_as_terminated( ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(workflow_run_id, failure_reason) + + # Add task terminated tag to Laminar trace + TraceManager.add_task_completion_tag("terminated") + await send_task_v2_webhook(task_v2) return task_v2 @@ -1530,6 +1551,10 @@ async def mark_task_v2_as_timed_out( ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_timed_out(workflow_run_id, failure_reason) + + # Add task timed out tag to Laminar trace + TraceManager.add_task_completion_tag("timed_out") + await send_task_v2_webhook(task_v2) return task_v2