diff --git a/skyvern-frontend/src/routes/browserSessions/BrowserSessions.tsx b/skyvern-frontend/src/routes/browserSessions/BrowserSessions.tsx index 43e13de1..4b526e9d 100644 --- a/skyvern-frontend/src/routes/browserSessions/BrowserSessions.tsx +++ b/skyvern-frontend/src/routes/browserSessions/BrowserSessions.tsx @@ -37,26 +37,7 @@ import { useCreateBrowserSessionMutation } from "@/routes/browserSessions/hooks/ import { type BrowserSession } from "@/routes/workflows/types/browserSessionTypes"; import { CopyText } from "@/routes/workflows/editor/Workspace"; import { basicTimeFormat } from "@/util/timeFormat"; -import { cn, formatMs } from "@/util/utils"; - -function toDate( - time: string, - defaultDate: Date | null = new Date(0), -): Date | null { - time = time.replace(/\.(\d{3})\d*/, ".$1"); - - if (!time.endsWith("Z")) { - time += "Z"; - } - - const date = new Date(time); - - if (isNaN(date.getTime())) { - return defaultDate; - } - - return date; -} +import { cn, formatMs, toDate } from "@/util/utils"; function sessionIsOpen(browserSession: BrowserSession): boolean { return ( diff --git a/skyvern-frontend/src/routes/workflows/debugger/DebuggerBlockRuns.tsx b/skyvern-frontend/src/routes/workflows/debugger/DebuggerBlockRuns.tsx new file mode 100644 index 00000000..a1a1a58c --- /dev/null +++ b/skyvern-frontend/src/routes/workflows/debugger/DebuggerBlockRuns.tsx @@ -0,0 +1,128 @@ +/** + * THe debugger has an underlying debug_session_id. Block runs that occur within + * same debug session are grouped together. We will show them with this component. + */ + +import { useEffect, useRef } from "react"; +import { useNavigate, useParams } from "react-router-dom"; +import { useQueryClient } from "@tanstack/react-query"; + +import { Tip } from "@/components/Tip"; +import { statusIsFinalized } from "@/routes/tasks/types"; +import { useWorkflowRunQuery } from "@/routes/workflows/hooks/useWorkflowRunQuery"; +import { cn, formatMs, toDate } from "@/util/utils"; + +import { useDebugSessionQuery } from "../hooks/useDebugSessionQuery"; +import { useWorkflowQuery } from "../hooks/useWorkflowQuery"; +import { + useDebugSessionRunsQuery, + type DebugSessionRun, +} from "../hooks/useDebugSessionRunsQuery"; +import { toast } from "@/components/ui/use-toast"; + +function DebuggerBlockRuns() { + const { workflowPermanentId } = useParams(); + const navigate = useNavigate(); + const queryClient = useQueryClient(); + const scrollContainerRef = useRef(null); + const { data: workflowRun } = useWorkflowRunQuery(); + const { data: workflow } = useWorkflowQuery({ + workflowPermanentId, + }); + const { data: debugSession } = useDebugSessionQuery({ + workflowPermanentId, + }); + const { data: debugSessionRuns } = useDebugSessionRunsQuery({ + debugSessionId: debugSession?.debug_session_id, + }); + + const numRuns = debugSessionRuns?.runs.length ?? 0; + const isFinalized = workflowRun ? statusIsFinalized(workflowRun) : null; + const isRunning = isFinalized !== null && !isFinalized; + + const handleClick = (run: DebugSessionRun) => { + if (isRunning) { + return; + } + + const blockLabel = run.block_label; + const workflowDefinition = workflow?.workflow_definition; + const blocks = workflowDefinition?.blocks ?? []; + const block = blocks.find((b) => b.label === blockLabel); + + if (!block) { + toast({ + variant: "destructive", + title: "Block not found", + description: `The block with label '${blockLabel}' is no longer found in the workflow.`, + }); + + return; + } + + navigate( + `/workflows/${run.workflow_permanent_id}/${run.workflow_run_id}/${blockLabel}/debug`, + ); + }; + + useEffect(() => { + queryClient.invalidateQueries({ + queryKey: ["debug-session-runs"], + }); + // We only want to run this when the workflowRun changes, not on every render + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [workflowRun]); + + useEffect(() => { + if (scrollContainerRef.current) { + scrollContainerRef.current.scrollLeft = + scrollContainerRef.current.scrollWidth; + } + }, [debugSessionRuns]); + + if (numRuns <= 1) { + return null; + } + + return ( +
+
+ {[...(debugSessionRuns?.runs ?? [])].reverse().map((run) => { + const dt = toDate(run.created_at ?? "", null); + const ago = dt ? formatMs(Date.now() - dt.getTime()).ago : null; + return ( + +
handleClick(run)} + /> + + ); + })} +
+
+ ); +} + +export { DebuggerBlockRuns }; diff --git a/skyvern-frontend/src/routes/workflows/editor/Workspace.tsx b/skyvern-frontend/src/routes/workflows/editor/Workspace.tsx index 2b53252b..5112c514 100644 --- a/skyvern-frontend/src/routes/workflows/editor/Workspace.tsx +++ b/skyvern-frontend/src/routes/workflows/editor/Workspace.tsx @@ -62,7 +62,7 @@ import { useWorkflowSave, } from "@/store/WorkflowHasChangesStore"; import { getCode, getOrderedBlockLabels } from "@/routes/workflows/utils"; - +import { DebuggerBlockRuns } from "@/routes/workflows/debugger/DebuggerBlockRuns"; import { cn } from "@/util/utils"; import { FlowRenderer, type FlowRendererProps } from "./FlowRenderer"; @@ -1245,7 +1245,7 @@ function Workspace({ split={{ left: workflowWidth }} onResize={() => setContainerResizeTrigger((prev) => prev + 1)} > - {/* code and infinite canvas */} + {/* code, infinite canvas, and block runs */}
+ {/* block runs history for current debug session id*/} +
+ +
diff --git a/skyvern-frontend/src/routes/workflows/hooks/useDebugSessionRunsQuery.ts b/skyvern-frontend/src/routes/workflows/hooks/useDebugSessionRunsQuery.ts new file mode 100644 index 00000000..441dfdfa --- /dev/null +++ b/skyvern-frontend/src/routes/workflows/hooks/useDebugSessionRunsQuery.ts @@ -0,0 +1,70 @@ +import { getClient } from "@/api/AxiosClient"; +import { useCredentialGetter } from "@/hooks/useCredentialGetter"; +import { useQuery } from "@tanstack/react-query"; + +type Props = { + debugSessionId?: string; +}; + +const debugSessionStatuses = ["created", "completed"] as const; + +type DebugSessionStatus = (typeof debugSessionStatuses)[number]; + +interface DebugSession { + debug_session_id: string; + browser_session_id: string; + vnc_streaming_supported: boolean | null; + workflow_permanent_id: string | null; + created_at: string; + modified_at: string; + deleted_at: string | null; + status: DebugSessionStatus; +} + +interface DebugSessionRun { + ai_fallback: boolean | null; + block_label: string; + browser_session_id: string; + code_gen: boolean | null; + debug_session_id: string; + failure_reason: string | null; + output_parameter_id: string; + run_with: string | null; + script_run_id: string | null; + status: string; + workflow_id: string; + workflow_permanent_id: string; + workflow_run_id: string; + created_at: string; + queued_at: string | null; + started_at: string | null; + finished_at: string | null; +} + +interface DebugSessionRuns { + debug_session: DebugSession; + runs: DebugSessionRun[]; +} + +function useDebugSessionRunsQuery({ debugSessionId }: Props) { + const credentialGetter = useCredentialGetter(); + + return useQuery({ + queryKey: ["debug-session-runs", debugSessionId], + queryFn: async () => { + const client = await getClient(credentialGetter, "sans-api-v1"); + const result = await client + .get(`/debug-session/${debugSessionId}/runs`) + .then((response) => response.data); + return result; + }, + enabled: !!debugSessionId, + }); +} + +export { + useDebugSessionRunsQuery, + type DebugSession, + type DebugSessionRun, + type DebugSessionStatus, +}; diff --git a/skyvern-frontend/src/util/utils.ts b/skyvern-frontend/src/util/utils.ts index a1773605..3660df24 100644 --- a/skyvern-frontend/src/util/utils.ts +++ b/skyvern-frontend/src/util/utils.ts @@ -36,3 +36,22 @@ export const formatMs = (elapsed: number) => { day: days, }; }; + +export function toDate( + time: string, + defaultDate: Date | null = new Date(0), +): Date | null { + time = time.replace(/\.(\d{3})\d*/, ".$1"); + + if (!time.endsWith("Z")) { + time += "Z"; + } + + const date = new Date(time); + + if (isNaN(date.getTime())) { + return defaultDate; + } + + return date; +} diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 4ecba12d..ccf8162e 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -78,7 +78,7 @@ from skyvern.forge.sdk.log_artifacts import save_workflow_run_logs from skyvern.forge.sdk.models import Step, StepStatus from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion from skyvern.forge.sdk.schemas.credentials import Credential, CredentialType, CredentialVaultType -from skyvern.forge.sdk.schemas.debug_sessions import BlockRun, DebugSession +from skyvern.forge.sdk.schemas.debug_sessions import BlockRun, DebugSession, DebugSessionRun from skyvern.forge.sdk.schemas.organization_bitwarden_collections import OrganizationBitwardenCollection from skyvern.forge.sdk.schemas.organizations import ( AzureClientSecretCredential, @@ -3908,6 +3908,65 @@ class AgentDB: return DebugSession.model_validate(model) if model else None + async def get_debug_session_by_id( + self, + debug_session_id: str, + organization_id: str, + ) -> DebugSession | None: + async with self.Session() as session: + query = ( + select(DebugSessionModel) + .filter_by(organization_id=organization_id) + .filter_by(deleted_at=None) + .filter_by(debug_session_id=debug_session_id) + ) + + model = (await session.scalars(query)).first() + + return DebugSession.model_validate(model) if model else None + + async def get_workflow_runs_by_debug_session_id( + self, + debug_session_id: str, + organization_id: str, + ) -> list[DebugSessionRun]: + async with self.Session() as session: + query = ( + select(WorkflowRunModel, BlockRunModel) + .join(BlockRunModel, BlockRunModel.workflow_run_id == WorkflowRunModel.workflow_run_id) + .filter(WorkflowRunModel.organization_id == organization_id) + .filter(WorkflowRunModel.debug_session_id == debug_session_id) + .order_by(WorkflowRunModel.created_at.desc()) + ) + + results = (await session.execute(query)).all() + + debug_session_runs = [] + for workflow_run, block_run in results: + debug_session_runs.append( + DebugSessionRun( + ai_fallback=workflow_run.ai_fallback, + block_label=block_run.block_label, + browser_session_id=workflow_run.browser_session_id, + code_gen=workflow_run.code_gen, + debug_session_id=workflow_run.debug_session_id, + failure_reason=workflow_run.failure_reason, + output_parameter_id=block_run.output_parameter_id, + run_with=workflow_run.run_with, + script_run_id=workflow_run.script_run.get("script_run_id") if workflow_run.script_run else None, + status=workflow_run.status, + workflow_id=workflow_run.workflow_id, + workflow_permanent_id=workflow_run.workflow_permanent_id, + workflow_run_id=workflow_run.workflow_run_id, + created_at=workflow_run.created_at, + queued_at=workflow_run.queued_at, + started_at=workflow_run.started_at, + finished_at=workflow_run.finished_at, + ) + ) + + return debug_session_runs + async def complete_debug_sessions( self, *, diff --git a/skyvern/forge/sdk/routes/__init__.py b/skyvern/forge/sdk/routes/__init__.py index a164cd4d..f312a677 100644 --- a/skyvern/forge/sdk/routes/__init__.py +++ b/skyvern/forge/sdk/routes/__init__.py @@ -1,6 +1,7 @@ from skyvern.forge.sdk.routes import agent_protocol # noqa: F401 from skyvern.forge.sdk.routes import browser_sessions # noqa: F401 from skyvern.forge.sdk.routes import credentials # noqa: F401 +from skyvern.forge.sdk.routes import debug_sessions # noqa: F401 from skyvern.forge.sdk.routes import pylon # noqa: F401 from skyvern.forge.sdk.routes import run_blocks # noqa: F401 from skyvern.forge.sdk.routes import scripts # noqa: F401 diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 7390f222..0d3fb7b2 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1,7 +1,4 @@ -import asyncio -from datetime import datetime, timedelta, timezone from enum import Enum -from functools import partial from typing import Annotated, Any import structlog @@ -12,11 +9,7 @@ from fastapi.responses import ORJSONResponse from skyvern import analytics from skyvern._version import __version__ from skyvern.config import settings -from skyvern.exceptions import ( - BrowserSessionNotRenewable, - CannotUpdateWorkflowDueToCodeCache, - MissingBrowserAddressError, -) +from skyvern.exceptions import CannotUpdateWorkflowDueToCodeCache, MissingBrowserAddressError from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError @@ -26,7 +19,6 @@ from skyvern.forge.sdk.core.curl_converter import curl_to_http_request_block_par from skyvern.forge.sdk.core.permissions.permission_checker_factory import PermissionCheckerFactory from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType -from skyvern.forge.sdk.db.exceptions import NotFoundError from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.routes.code_samples import ( @@ -45,14 +37,12 @@ from skyvern.forge.sdk.routes.code_samples import ( ) from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router, legacy_v2_router from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestionBase, AISuggestionRequest -from skyvern.forge.sdk.schemas.debug_sessions import DebugSession from skyvern.forge.sdk.schemas.organizations import ( GetOrganizationAPIKeysResponse, GetOrganizationsResponse, Organization, OrganizationUpdate, ) -from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession from skyvern.forge.sdk.schemas.prompts import CreateFromPromptRequest from skyvern.forge.sdk.schemas.task_generations import GenerateTaskRequest, TaskGeneration from skyvern.forge.sdk.schemas.task_v2 import TaskV2Request @@ -87,7 +77,6 @@ from skyvern.schemas.runs import ( CUA_ENGINES, BlockRunRequest, BlockRunResponse, - ProxyLocation, RunEngine, RunResponse, RunType, @@ -2231,241 +2220,3 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: final_workflow_run_block_timeline.extend(thought_timeline) final_workflow_run_block_timeline.sort(key=lambda x: x.created_at, reverse=True) return final_workflow_run_block_timeline - - -@base_router.get( - "/debug-session/{workflow_permanent_id}", - include_in_schema=False, -) -async def get_or_create_debug_session_by_user_and_workflow_permanent_id( - workflow_permanent_id: str, - current_org: Organization = Depends(org_auth_service.get_current_org), - current_user_id: str = Depends(org_auth_service.get_current_user_id), -) -> DebugSession: - """ - `current_user_id` is a unique identifier for a user, but does not map to an - entity in the database (at time of writing) - - If the debug session does not exist, a new one will be created. - - In addition, the timeout for the debug session's browser session will be - extended to 4 hours from the time of the request. If the browser session - cannot be renewed, a new one will be created and assigned to the debug - session. The browser_session that could not be renewed will be closed. - """ - - debug_session = await app.DATABASE.get_debug_session( - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - - if not debug_session: - LOG.info( - "Existing debug session not found, created a new one, along with a new browser session", - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - - return await new_debug_session( - workflow_permanent_id, - current_org, - current_user_id, - ) - - LOG.info( - "Existing debug session found", - debug_session_id=debug_session.debug_session_id, - browser_session_id=debug_session.browser_session_id, - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - - try: - await app.PERSISTENT_SESSIONS_MANAGER.renew_or_close_session( - debug_session.browser_session_id, - current_org.organization_id, - ) - return debug_session - except BrowserSessionNotRenewable as ex: - LOG.info( - "Browser session was non-renewable; creating a new debug session", - ex=str(ex), - debug_session_id=debug_session.debug_session_id, - browser_session_id=debug_session.browser_session_id, - organization_id=current_org.organization_id, - workflow_permanent_id=workflow_permanent_id, - user_id=current_user_id, - ) - - return await new_debug_session( - workflow_permanent_id, - current_org, - current_user_id, - ) - - -@base_router.post( - "/debug-session/{workflow_permanent_id}/new", - include_in_schema=False, -) -async def new_debug_session( - workflow_permanent_id: str, - current_org: Organization = Depends(org_auth_service.get_current_org), - current_user_id: str = Depends(org_auth_service.get_current_user_id), -) -> DebugSession: - """ - Create a new debug session, along with a new browser session. If any - existing debug sessions are found, "complete" them. Then close the browser - sessions associated with those completed debug sessions. - - Return the new debug session. - - CAVEAT: if an existing debug session for this user is <30s old, then we - return that instead. This is to curtail damage from browser session - spamming. - """ - - if current_user_id: - debug_session = await app.DATABASE.get_latest_debug_session_for_user( - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - - if debug_session: - now = datetime.now(timezone.utc) - created_at_utc = ( - debug_session.created_at.replace(tzinfo=timezone.utc) - if debug_session.created_at.tzinfo is None - else debug_session.created_at - ) - if now - created_at_utc < timedelta(seconds=30): - LOG.info( - "Existing debug session is less than 30s old, returning it", - debug_session_id=debug_session.debug_session_id, - browser_session_id=debug_session.browser_session_id, - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - return debug_session - - completed_debug_sessions = await app.DATABASE.complete_debug_sessions( - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - - LOG.info( - f"Completed {len(completed_debug_sessions)} pre-existing debug session(s)", - num_completed_debug_sessions=len(completed_debug_sessions), - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - - if completed_debug_sessions: - closeable_browser_sessions: list[PersistentBrowserSession] = [] - - for debug_session in completed_debug_sessions: - try: - browser_session = await app.DATABASE.get_persistent_browser_session( - debug_session.browser_session_id, - current_org.organization_id, - ) - except NotFoundError: - browser_session = None - - if browser_session and browser_session.completed_at is None: - closeable_browser_sessions.append(browser_session) - - LOG.info( - f"Closing browser {len(closeable_browser_sessions)} browser session(s)", - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - - def handle_close_browser_session_error( - browser_session_id: str, - organization_id: str, - task: asyncio.Task, - ) -> None: - if task.exception(): - LOG.error( - f"Failed to close session: {task.exception()}", - browser_session_id=browser_session_id, - organization_id=organization_id, - ) - - for browser_session in closeable_browser_sessions: - LOG.info( - "Closing existing browser session for debug session", - browser_session_id=browser_session.persistent_browser_session_id, - organization_id=current_org.organization_id, - ) - - # NOTE(jdo): these may fail to actually close on infra, but the user - # wants (and should get) a new session regardless - so we will just - # log the error and continue - task = asyncio.create_task( - app.PERSISTENT_SESSIONS_MANAGER.close_session( - current_org.organization_id, - browser_session.persistent_browser_session_id, - ) - ) - - task.add_done_callback( - partial( - handle_close_browser_session_error, - browser_session.persistent_browser_session_id, - current_org.organization_id, - ) - ) - - new_browser_session = await app.PERSISTENT_SESSIONS_MANAGER.create_session( - organization_id=current_org.organization_id, - timeout_minutes=settings.DEBUG_SESSION_TIMEOUT_MINUTES, - proxy_location=ProxyLocation.RESIDENTIAL, - ) - - debug_session = await app.DATABASE.create_debug_session( - browser_session_id=new_browser_session.persistent_browser_session_id, - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - vnc_streaming_supported=True if new_browser_session.ip_address else False, - ) - - LOG.info( - "Created new debug session", - debug_session_id=debug_session.debug_session_id, - browser_session_id=new_browser_session.persistent_browser_session_id, - organization_id=current_org.organization_id, - user_id=current_user_id, - workflow_permanent_id=workflow_permanent_id, - ) - - return debug_session - - -@base_router.get( - "/debug-session/{workflow_permanent_id}/block-outputs", - response_model=dict[str, dict[str, Any]], - include_in_schema=False, -) -async def get_block_outputs_for_debug_session( - workflow_permanent_id: str, - version: int | None = None, - current_org: Organization = Depends(org_auth_service.get_current_org), - current_user_id: str = Depends(org_auth_service.get_current_user_id), -) -> dict[str, dict[str, Any]]: - return await app.WORKFLOW_SERVICE.get_block_outputs_for_debug_session( - workflow_permanent_id=workflow_permanent_id, - organization_id=current_org.organization_id, - user_id=current_user_id, - version=version, - ) diff --git a/skyvern/forge/sdk/routes/debug_sessions.py b/skyvern/forge/sdk/routes/debug_sessions.py new file mode 100644 index 00000000..bd6dc86d --- /dev/null +++ b/skyvern/forge/sdk/routes/debug_sessions.py @@ -0,0 +1,294 @@ +import asyncio +import typing as t +from datetime import datetime, timedelta, timezone +from functools import partial + +import structlog +from fastapi import Depends, HTTPException + +from skyvern.config import settings +from skyvern.exceptions import BrowserSessionNotRenewable +from skyvern.forge import app +from skyvern.forge.sdk.db.exceptions import NotFoundError +from skyvern.forge.sdk.routes.routers import base_router +from skyvern.forge.sdk.schemas.debug_sessions import DebugSession, DebugSessionRuns +from skyvern.forge.sdk.schemas.organizations import Organization +from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession +from skyvern.forge.sdk.services import org_auth_service +from skyvern.schemas.runs import ProxyLocation + +LOG = structlog.get_logger() + + +@base_router.get( + "/debug-session/{workflow_permanent_id}", + include_in_schema=False, +) +async def get_or_create_debug_session_by_user_and_workflow_permanent_id( + workflow_permanent_id: str, + current_org: Organization = Depends(org_auth_service.get_current_org), + current_user_id: str = Depends(org_auth_service.get_current_user_id), +) -> DebugSession: + """ + `current_user_id` is a unique identifier for a user, but does not map to an + entity in the database (at time of writing) + + If the debug session does not exist, a new one will be created. + + In addition, the timeout for the debug session's browser session will be + extended to 4 hours from the time of the request. If the browser session + cannot be renewed, a new one will be created and assigned to the debug + session. The browser_session that could not be renewed will be closed. + """ + + debug_session = await app.DATABASE.get_debug_session( + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + + if not debug_session: + LOG.info( + "Existing debug session not found, created a new one, along with a new browser session", + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + + return await new_debug_session( + workflow_permanent_id, + current_org, + current_user_id, + ) + + LOG.info( + "Existing debug session found", + debug_session_id=debug_session.debug_session_id, + browser_session_id=debug_session.browser_session_id, + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + + try: + await app.PERSISTENT_SESSIONS_MANAGER.renew_or_close_session( + debug_session.browser_session_id, + current_org.organization_id, + ) + return debug_session + except BrowserSessionNotRenewable as ex: + LOG.info( + "Browser session was non-renewable; creating a new debug session", + ex=str(ex), + debug_session_id=debug_session.debug_session_id, + browser_session_id=debug_session.browser_session_id, + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + user_id=current_user_id, + ) + + return await new_debug_session( + workflow_permanent_id, + current_org, + current_user_id, + ) + + +@base_router.post( + "/debug-session/{workflow_permanent_id}/new", + include_in_schema=False, +) +async def new_debug_session( + workflow_permanent_id: str, + current_org: Organization = Depends(org_auth_service.get_current_org), + current_user_id: str = Depends(org_auth_service.get_current_user_id), +) -> DebugSession: + """ + Create a new debug session, along with a new browser session. If any + existing debug sessions are found, "complete" them. Then close the browser + sessions associated with those completed debug sessions. + + Return the new debug session. + + CAVEAT: if an existing debug session for this user is <30s old, then we + return that instead. This is to curtail damage from browser session + spamming. + """ + + if current_user_id: + debug_session = await app.DATABASE.get_latest_debug_session_for_user( + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + + if debug_session: + now = datetime.now(timezone.utc) + created_at_utc = ( + debug_session.created_at.replace(tzinfo=timezone.utc) + if debug_session.created_at.tzinfo is None + else debug_session.created_at + ) + if now - created_at_utc < timedelta(seconds=30): + LOG.info( + "Existing debug session is less than 30s old, returning it", + debug_session_id=debug_session.debug_session_id, + browser_session_id=debug_session.browser_session_id, + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + return debug_session + + completed_debug_sessions = await app.DATABASE.complete_debug_sessions( + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + + LOG.info( + f"Completed {len(completed_debug_sessions)} pre-existing debug session(s)", + num_completed_debug_sessions=len(completed_debug_sessions), + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + + if completed_debug_sessions: + closeable_browser_sessions: list[PersistentBrowserSession] = [] + + for debug_session in completed_debug_sessions: + try: + browser_session = await app.DATABASE.get_persistent_browser_session( + debug_session.browser_session_id, + current_org.organization_id, + ) + except NotFoundError: + browser_session = None + + if browser_session and browser_session.completed_at is None: + closeable_browser_sessions.append(browser_session) + + LOG.info( + f"Closing browser {len(closeable_browser_sessions)} browser session(s)", + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + + def handle_close_browser_session_error( + browser_session_id: str, + organization_id: str, + task: asyncio.Task, + ) -> None: + if task.exception(): + LOG.error( + f"Failed to close session: {task.exception()}", + browser_session_id=browser_session_id, + organization_id=organization_id, + ) + + for browser_session in closeable_browser_sessions: + LOG.info( + "Closing existing browser session for debug session", + browser_session_id=browser_session.persistent_browser_session_id, + organization_id=current_org.organization_id, + ) + + # NOTE(jdo): these may fail to actually close on infra, but the user + # wants (and should get) a new session regardless - so we will just + # log the error and continue + task = asyncio.create_task( + app.PERSISTENT_SESSIONS_MANAGER.close_session( + current_org.organization_id, + browser_session.persistent_browser_session_id, + ) + ) + + task.add_done_callback( + partial( + handle_close_browser_session_error, + browser_session.persistent_browser_session_id, + current_org.organization_id, + ) + ) + + new_browser_session = await app.PERSISTENT_SESSIONS_MANAGER.create_session( + organization_id=current_org.organization_id, + timeout_minutes=settings.DEBUG_SESSION_TIMEOUT_MINUTES, + proxy_location=ProxyLocation.RESIDENTIAL, + ) + + debug_session = await app.DATABASE.create_debug_session( + browser_session_id=new_browser_session.persistent_browser_session_id, + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + vnc_streaming_supported=True if new_browser_session.ip_address else False, + ) + + LOG.info( + "Created new debug session", + debug_session_id=debug_session.debug_session_id, + browser_session_id=new_browser_session.persistent_browser_session_id, + organization_id=current_org.organization_id, + user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, + ) + + return debug_session + + +@base_router.get( + "/debug-session/{workflow_permanent_id}/block-outputs", + response_model=dict[str, dict[str, t.Any]], + include_in_schema=False, +) +async def get_block_outputs_for_debug_session( + workflow_permanent_id: str, + version: int | None = None, + current_org: Organization = Depends(org_auth_service.get_current_org), + current_user_id: str = Depends(org_auth_service.get_current_user_id), +) -> dict[str, dict[str, t.Any]]: + return await app.WORKFLOW_SERVICE.get_block_outputs_for_debug_session( + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + user_id=current_user_id, + version=version, + ) + + +@base_router.get( + "/debug-session/{debug_session_id}/runs", + include_in_schema=False, +) +@base_router.get( + "/debug-session/{debug_session_id}/runs/", + include_in_schema=False, +) +async def get_debug_session_runs( + current_org: Organization = Depends(org_auth_service.get_current_org), + debug_session_id: str = "", +) -> DebugSessionRuns: + """Get all debug session runs for the debug_session_id""" + + LOG.critical( + "Fetching runs for debugger", + debug_session_id=debug_session_id, + organization_id=current_org.organization_id, + ) + + debug_session = await app.DATABASE.get_debug_session_by_id( + debug_session_id=debug_session_id, + organization_id=current_org.organization_id, + ) + + if not debug_session: + raise HTTPException(status_code=404, detail="Debug session not found") + + runs = await app.DATABASE.get_workflow_runs_by_debug_session_id( + debug_session_id=debug_session.debug_session_id, + organization_id=current_org.organization_id, + ) + + return DebugSessionRuns(debug_session=debug_session, runs=runs) diff --git a/skyvern/forge/sdk/schemas/debug_sessions.py b/skyvern/forge/sdk/schemas/debug_sessions.py index 43618ee8..a18a72b6 100644 --- a/skyvern/forge/sdk/schemas/debug_sessions.py +++ b/skyvern/forge/sdk/schemas/debug_sessions.py @@ -26,3 +26,33 @@ class DebugSession(BaseModel): modified_at: datetime deleted_at: datetime | None = None status: DebugSessionStatus + + +class DebugSessionRun(BaseModel): + model_config = ConfigDict(from_attributes=True) + + ai_fallback: bool | None = None + block_label: str + browser_session_id: str + code_gen: bool | None = None + debug_session_id: str + failure_reason: str | None = None + output_parameter_id: str + run_with: str | None = None + script_run_id: str | None = None + status: str + workflow_id: str + workflow_permanent_id: str + workflow_run_id: str + # -- + created_at: datetime + queued_at: datetime | None = None + started_at: datetime | None = None + finished_at: datetime | None = None + + +class DebugSessionRuns(BaseModel): + model_config = ConfigDict(from_attributes=True) + + debug_session: DebugSession + runs: list[DebugSessionRun]