laminar integration (#2887)
This commit is contained in:
@@ -71,6 +71,7 @@ from skyvern.forge.sdk.models import Step, StepStatus
|
||||
from skyvern.forge.sdk.schemas.files import FileInfo
|
||||
from skyvern.forge.sdk.schemas.organizations import Organization
|
||||
from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus
|
||||
from skyvern.forge.sdk.trace import TraceManager
|
||||
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
|
||||
from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock
|
||||
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus
|
||||
@@ -268,6 +269,9 @@ class ForgeAgent:
|
||||
operations = await app.AGENT_FUNCTION.generate_async_operations(organization, task, page)
|
||||
self.async_operation_pool.add_operations(task.task_id, operations)
|
||||
|
||||
@TraceManager.traced_async(
|
||||
ignore_inputs=["api_key", "close_browser_on_completion", "task_block", "cua_response", "llm_caller"]
|
||||
)
|
||||
async def execute_step(
|
||||
self,
|
||||
organization: Organization,
|
||||
@@ -428,8 +432,9 @@ class ForgeAgent:
|
||||
if not llm_caller:
|
||||
# create a new UI-TARS llm_caller
|
||||
llm_key = task.llm_key or settings.VOLCENGINE_CUA_LLM_KEY
|
||||
llm_caller = UITarsLLMCaller(llm_key=llm_key, screenshot_scaling_enabled=True)
|
||||
llm_caller.initialize_conversation(task)
|
||||
ui_tars_llm_caller = UITarsLLMCaller(llm_key=llm_key, screenshot_scaling_enabled=True)
|
||||
ui_tars_llm_caller.initialize_conversation(task)
|
||||
llm_caller = ui_tars_llm_caller
|
||||
|
||||
# TODO: remove the code after migrating everything to llm callers
|
||||
# currently, only anthropic cua and ui_tars tasks use llm_caller
|
||||
@@ -829,6 +834,9 @@ class ForgeAgent:
|
||||
)
|
||||
return True
|
||||
|
||||
@TraceManager.traced_async(
|
||||
ignore_inputs=["browser_state", "organization", "task_block", "cua_response", "llm_caller"]
|
||||
)
|
||||
async def agent_step(
|
||||
self,
|
||||
task: Task,
|
||||
|
||||
@@ -18,6 +18,7 @@ from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.forge.sdk.models import Step, StepStatus
|
||||
from skyvern.forge.sdk.schemas.organizations import Organization
|
||||
from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus
|
||||
from skyvern.forge.sdk.trace import TraceManager
|
||||
from skyvern.forge.sdk.workflow.models.block import BlockTypeVar
|
||||
from skyvern.webeye.browser_factory import BrowserState
|
||||
from skyvern.webeye.scraper.scraper import ELEMENT_NODE_ATTRIBUTES, CleanupElementTreeFunc, json_to_html
|
||||
@@ -538,6 +539,7 @@ class AgentFunction:
|
||||
) -> CleanupElementTreeFunc:
|
||||
MAX_ELEMENT_CNT = 3000
|
||||
|
||||
@TraceManager.traced_async(ignore_input=True)
|
||||
async def cleanup_element_tree_func(frame: Page | Frame, url: str, element_tree: list[dict]) -> list[dict]:
|
||||
"""
|
||||
Remove rect and attribute.unique_id from the elements.
|
||||
|
||||
@@ -15,6 +15,8 @@ from skyvern.forge.sdk.db.client import AgentDB
|
||||
from skyvern.forge.sdk.experimentation.providers import BaseExperimentationProvider, NoOpExperimentationProvider
|
||||
from skyvern.forge.sdk.schemas.organizations import Organization
|
||||
from skyvern.forge.sdk.settings_manager import SettingsManager
|
||||
from skyvern.forge.sdk.trace import TraceManager
|
||||
from skyvern.forge.sdk.trace.lmnr import LaminarTrace
|
||||
from skyvern.forge.sdk.workflow.context_manager import WorkflowContextManager
|
||||
from skyvern.forge.sdk.workflow.service import WorkflowService
|
||||
from skyvern.webeye.browser_manager import BrowserManager
|
||||
@@ -76,3 +78,7 @@ authentication_function: Callable[[str], Awaitable[Organization]] | None = None
|
||||
setup_api_app: Callable[[FastAPI], None] | None = None
|
||||
|
||||
agent = ForgeAgent()
|
||||
|
||||
if SettingsManager.get_settings().TRACE_ENABLED:
|
||||
if SettingsManager.get_settings().TRACE_PROVIDER == "lmnr":
|
||||
TraceManager.set_trace_provider(LaminarTrace(api_key=SettingsManager.get_settings().TRACE_PROVIDER_API_KEY))
|
||||
|
||||
@@ -31,6 +31,7 @@ from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.forge.sdk.models import Step
|
||||
from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion
|
||||
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought
|
||||
from skyvern.forge.sdk.trace import TraceManager
|
||||
from skyvern.utils.image_resizer import Resolution, get_resize_target_dimension, resize_screenshots
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
@@ -89,6 +90,7 @@ class LLMAPIHandlerFactory:
|
||||
)
|
||||
main_model_group = llm_config.main_model_group
|
||||
|
||||
@TraceManager.traced_async(tags=[llm_key], ignore_inputs=["prompt", "screenshots", "parameters"])
|
||||
async def llm_api_handler_with_router_and_fallback(
|
||||
prompt: str,
|
||||
prompt_name: str,
|
||||
@@ -286,6 +288,7 @@ class LLMAPIHandlerFactory:
|
||||
|
||||
assert isinstance(llm_config, LLMConfig)
|
||||
|
||||
@TraceManager.traced_async(tags=[llm_key], ignore_inputs=["prompt", "screenshots", "parameters"])
|
||||
async def llm_api_handler(
|
||||
prompt: str,
|
||||
prompt_name: str,
|
||||
@@ -743,6 +746,7 @@ class LLMCaller:
|
||||
return get_resize_target_dimension(window_dimension)
|
||||
return self.screenshot_resize_target_dimension
|
||||
|
||||
@TraceManager.traced_async(ignore_input=True)
|
||||
async def _dispatch_llm_call(
|
||||
self,
|
||||
messages: list[dict[str, Any]],
|
||||
|
||||
92
skyvern/forge/sdk/trace/__init__.py
Normal file
92
skyvern/forge/sdk/trace/__init__.py
Normal file
@@ -0,0 +1,92 @@
|
||||
from functools import wraps
|
||||
from typing import Any, Awaitable, Callable, ParamSpec, TypeVar
|
||||
|
||||
from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.forge.sdk.settings_manager import SettingsManager
|
||||
from skyvern.forge.sdk.trace.base import BaseTrace, NoOpTrace
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
class TraceManager:
|
||||
__instance: BaseTrace = NoOpTrace()
|
||||
|
||||
@staticmethod
|
||||
def traced_async(
|
||||
*,
|
||||
name: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
tags: list[str] | None = None,
|
||||
**trace_parameters: Any,
|
||||
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
|
||||
def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
|
||||
@wraps(func)
|
||||
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
|
||||
new_metadata: dict[str, Any] = metadata or {}
|
||||
user_id: str | None = None
|
||||
context = skyvern_context.current()
|
||||
if context is not None:
|
||||
new_metadata["request_id"] = context.request_id
|
||||
new_metadata["organization_id"] = context.organization_id
|
||||
new_metadata["task_id"] = context.task_id
|
||||
new_metadata["workflow_id"] = context.workflow_id
|
||||
new_metadata["workflow_run_id"] = context.workflow_run_id
|
||||
new_metadata["task_v2_id"] = context.task_v2_id
|
||||
new_metadata["run_id"] = context.run_id
|
||||
new_metadata["organization_name"] = context.organization_name
|
||||
user_id = context.run_id
|
||||
|
||||
new_tags: list[str] = tags or []
|
||||
new_tags.append(SettingsManager.get_settings().ENV)
|
||||
|
||||
return await TraceManager.__instance.traced_async(
|
||||
name=name, metadata=new_metadata, tags=new_tags, user_id=user_id, **trace_parameters
|
||||
)(func)(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
@staticmethod
|
||||
def traced(
|
||||
*,
|
||||
name: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
tags: list[str] | None = None,
|
||||
**trace_parameters: Any,
|
||||
) -> Callable[[Callable[P, R]], Callable[P, R]]:
|
||||
def decorator(func: Callable[P, R]) -> Callable[P, R]:
|
||||
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
|
||||
new_metadata: dict[str, Any] = metadata or {}
|
||||
user_id: str | None = None
|
||||
context = skyvern_context.current()
|
||||
if context is not None:
|
||||
new_metadata["request_id"] = context.request_id
|
||||
new_metadata["organization_id"] = context.organization_id
|
||||
new_metadata["task_id"] = context.task_id
|
||||
new_metadata["workflow_id"] = context.workflow_id
|
||||
new_metadata["workflow_run_id"] = context.workflow_run_id
|
||||
new_metadata["task_v2_id"] = context.task_v2_id
|
||||
new_metadata["run_id"] = context.run_id
|
||||
new_metadata["organization_name"] = context.organization_name
|
||||
user_id = context.run_id
|
||||
|
||||
new_tags: list[str] = tags or []
|
||||
new_tags.append(SettingsManager.get_settings().ENV)
|
||||
|
||||
return TraceManager.__instance.traced(
|
||||
name=name, metadata=new_metadata, tags=new_tags, user_id=user_id, **trace_parameters
|
||||
)(func)(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
@staticmethod
|
||||
def get_trace_provider() -> BaseTrace:
|
||||
return TraceManager.__instance
|
||||
|
||||
@staticmethod
|
||||
def set_trace_provider(trace_provider: BaseTrace) -> None:
|
||||
TraceManager.__instance = trace_provider
|
||||
47
skyvern/forge/sdk/trace/base.py
Normal file
47
skyvern/forge/sdk/trace/base.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Awaitable, Callable, ParamSpec, TypeVar
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
class BaseTrace(ABC):
|
||||
@abstractmethod
|
||||
def traced(
|
||||
self,
|
||||
name: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
tags: list[str] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Callable[[Callable[P, R]], Callable[P, R]]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def traced_async(
|
||||
self,
|
||||
name: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
tags: list[str] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
|
||||
pass
|
||||
|
||||
|
||||
class NoOpTrace(BaseTrace):
|
||||
def traced(
|
||||
self,
|
||||
name: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
tags: list[str] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Callable[[Callable[P, R]], Callable[P, R]]:
|
||||
return lambda func: func
|
||||
|
||||
def traced_async(
|
||||
self,
|
||||
name: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
tags: list[str] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
|
||||
return lambda func: func
|
||||
33
skyvern/forge/sdk/trace/lmnr.py
Normal file
33
skyvern/forge/sdk/trace/lmnr.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from typing import Any, Awaitable, Callable, ParamSpec, TypeVar
|
||||
|
||||
import litellm
|
||||
from lmnr import Instruments, Laminar, LaminarLiteLLMCallback, observe
|
||||
|
||||
from skyvern.forge.sdk.trace.base import BaseTrace
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
class LaminarTrace(BaseTrace):
|
||||
def __init__(self, api_key: str) -> None:
|
||||
Laminar.initialize(project_api_key=api_key, disabled_instruments={Instruments.SKYVERN})
|
||||
litellm.callbacks.append(LaminarLiteLLMCallback())
|
||||
|
||||
def traced(
|
||||
self,
|
||||
name: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
tags: list[str] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Callable[[Callable[P, R]], Callable[P, R]]:
|
||||
return observe(name=name, ignore_output=True, metadata=metadata, tags=tags, **kwargs)
|
||||
|
||||
def traced_async(
|
||||
self,
|
||||
name: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
tags: list[str] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
|
||||
return observe(name=name, ignore_output=True, metadata=metadata, tags=tags, **kwargs)
|
||||
@@ -54,6 +54,7 @@ from skyvern.forge.sdk.db.enums import TaskType
|
||||
from skyvern.forge.sdk.schemas.files import FileInfo
|
||||
from skyvern.forge.sdk.schemas.task_v2 import TaskV2Status
|
||||
from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus
|
||||
from skyvern.forge.sdk.trace import TraceManager
|
||||
from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext
|
||||
from skyvern.forge.sdk.workflow.exceptions import (
|
||||
CustomizedCodeException,
|
||||
@@ -292,6 +293,7 @@ class Block(BaseModel, abc.ABC):
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
@TraceManager.traced_async(ignore_inputs=["kwargs"])
|
||||
async def execute_safe(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
|
||||
@@ -30,6 +30,7 @@ from skyvern.forge.sdk.schemas.files import FileInfo
|
||||
from skyvern.forge.sdk.schemas.organizations import Organization
|
||||
from skyvern.forge.sdk.schemas.tasks import Task
|
||||
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunTimeline, WorkflowRunTimelineType
|
||||
from skyvern.forge.sdk.trace import TraceManager
|
||||
from skyvern.forge.sdk.workflow.exceptions import (
|
||||
ContextParameterSourceNotDefined,
|
||||
InvalidWaitBlockTime,
|
||||
@@ -248,6 +249,7 @@ class WorkflowService:
|
||||
|
||||
return workflow_run
|
||||
|
||||
@TraceManager.traced_async(ignore_inputs=["organization", "api_key"])
|
||||
async def execute_workflow(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
|
||||
Reference in New Issue
Block a user