Debugger Continuity (BE) (#3314)

This commit is contained in:
Jonathan Dobson
2025-08-28 20:05:24 -04:00
committed by GitHub
parent 916ab6c067
commit 1067e9a076
12 changed files with 306 additions and 2 deletions

View File

@@ -754,3 +754,8 @@ class NoTOTPSecretFound(SkyvernException):
class NoElementFound(SkyvernException):
def __init__(self) -> None:
super().__init__("No element found.")
class OutputParameterNotFound(SkyvernException):
def __init__(self, block_label: str, workflow_permanent_id: str) -> None:
super().__init__(f"Output parameter for {block_label} not found in workflow {workflow_permanent_id}")

View File

@@ -20,6 +20,7 @@ from skyvern.forge.sdk.db.models import (
BitwardenCreditCardDataParameterModel,
BitwardenLoginCredentialParameterModel,
BitwardenSensitiveInformationParameterModel,
BlockRunModel,
CredentialModel,
CredentialParameterModel,
DebugSessionModel,
@@ -75,7 +76,7 @@ from skyvern.forge.sdk.log_artifacts import save_workflow_run_logs
from skyvern.forge.sdk.models import Step, StepStatus
from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion
from skyvern.forge.sdk.schemas.credentials import Credential, CredentialType
from skyvern.forge.sdk.schemas.debug_sessions import DebugSession
from skyvern.forge.sdk.schemas.debug_sessions import BlockRun, DebugSession
from skyvern.forge.sdk.schemas.organization_bitwarden_collections import OrganizationBitwardenCollection
from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAuthToken
from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession
@@ -2124,6 +2125,29 @@ class AgentDB:
LOG.error("SQLAlchemyError", exc_info=True)
raise
async def get_workflow_run_output_parameter_by_id(
self, workflow_run_id: str, output_parameter_id: str
) -> WorkflowRunOutputParameter | None:
try:
async with self.Session() as session:
parameter = (
await session.scalars(
select(WorkflowRunOutputParameterModel)
.filter_by(workflow_run_id=workflow_run_id)
.filter_by(output_parameter_id=output_parameter_id)
.order_by(WorkflowRunOutputParameterModel.created_at)
)
).first()
if parameter:
return convert_to_workflow_run_output_parameter(parameter, self.debug_enabled)
return None
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
async def create_or_update_workflow_run_output_parameter(
self,
workflow_run_id: str,
@@ -3520,6 +3544,72 @@ class AgentDB:
return DebugSession.model_validate(debug_session)
async def get_latest_block_run(
self,
*,
organization_id: str,
user_id: str,
block_label: str,
) -> BlockRun | None:
async with self.Session() as session:
query = (
select(BlockRunModel)
.filter_by(organization_id=organization_id)
.filter_by(user_id=user_id)
.filter_by(block_label=block_label)
.order_by(BlockRunModel.created_at.desc())
)
model = (await session.scalars(query)).first()
return BlockRun.model_validate(model) if model else None
async def get_latest_completed_block_run(
self,
*,
organization_id: str,
user_id: str,
block_label: str,
workflow_permanent_id: str,
) -> BlockRun | None:
async with self.Session() as session:
query = (
select(BlockRunModel)
.join(WorkflowRunModel, BlockRunModel.workflow_run_id == WorkflowRunModel.workflow_run_id)
.filter(BlockRunModel.organization_id == organization_id)
.filter(BlockRunModel.user_id == user_id)
.filter(BlockRunModel.block_label == block_label)
.filter(WorkflowRunModel.status == WorkflowRunStatus.completed)
.filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id)
.order_by(BlockRunModel.created_at.desc())
)
model = (await session.scalars(query)).first()
return BlockRun.model_validate(model) if model else None
async def create_block_run(
self,
*,
organization_id: str,
user_id: str,
block_label: str,
output_parameter_id: str,
workflow_run_id: str,
) -> None:
async with self.Session() as session:
block_run = BlockRunModel(
organization_id=organization_id,
user_id=user_id,
block_label=block_label,
output_parameter_id=output_parameter_id,
workflow_run_id=workflow_run_id,
)
session.add(block_run)
await session.commit()
async def get_latest_debug_session_for_user(
self,
*,

View File

@@ -785,6 +785,27 @@ class DebugSessionModel(Base):
status = Column(String, nullable=False, default="created")
class BlockRunModel(Base):
"""
When a block is run in the debugger, it runs "as a 'workflow run'", but that
workflow run has just a single block in it. This table ties a block run to
the workflow run, and a particular output parameter id (which gets
overwritten on each run.)
Use the `created_at` timestamp to find the latest workflow run (and output
param id) for a given `(org_id, user_id, block_label)`.
"""
__tablename__ = "block_runs"
organization_id = Column(String, nullable=False)
user_id = Column(String, nullable=False)
block_label = Column(String, nullable=False)
output_parameter_id = Column(String, nullable=False)
workflow_run_id = Column(String, primary_key=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
class ScriptModel(Base):
__tablename__ = "scripts"
__table_args__ = (

View File

@@ -47,6 +47,7 @@ class AsyncExecutor(abc.ABC):
api_key: str | None,
browser_session_id: str | None,
block_labels: list[str] | None,
block_outputs: dict[str, Any] | None,
**kwargs: dict,
) -> None:
pass
@@ -151,6 +152,7 @@ class BackgroundTaskExecutor(AsyncExecutor):
api_key: str | None,
browser_session_id: str | None,
block_labels: list[str] | None,
block_outputs: dict[str, Any] | None,
**kwargs: dict,
) -> None:
if background_tasks:
@@ -170,6 +172,7 @@ class BackgroundTaskExecutor(AsyncExecutor):
organization=organization,
browser_session_id=browser_session_id,
block_labels=block_labels,
block_outputs=block_outputs,
)
else:
LOG.warning("Background tasks not enabled, skipping workflow execution")

View File

@@ -920,6 +920,7 @@ async def run_block(
background_tasks: BackgroundTasks,
block_run_request: BlockRunRequest,
organization: Organization = Depends(org_auth_service.get_current_org),
user_id: str = Depends(org_auth_service.get_current_user_id),
template: bool = Query(False),
x_api_key: Annotated[str | None, Header()] = None,
) -> BlockRunResponse:
@@ -928,6 +929,12 @@ async def run_block(
workflow_run_id.
"""
# NOTE(jdo): if you're running debugger locally, and you want to see the
# block runs happening (no temporal; no pbs), then uncomment these two
# lines; that'll make the block run happen in a new local browser instance.
# LOG.critical("REMOVING BROWSER SESSION ID")
# block_run_request.browser_session_id = None
workflow_run = await block_service.ensure_workflow_run(
organization=organization,
template=template,
@@ -946,7 +953,9 @@ async def run_block(
workflow_run_id=workflow_run.workflow_run_id,
workflow_permanent_id=workflow_run.workflow_permanent_id,
organization=organization,
user_id=user_id,
browser_session_id=browser_session_id,
block_outputs=block_run_request.block_outputs,
)
return BlockRunResponse(
@@ -2290,3 +2299,22 @@ async def new_debug_session(
)
return debug_session
@base_router.get(
"/debug-session/{workflow_permanent_id}/block-outputs",
response_model=dict[str, dict[str, Any]],
include_in_schema=False,
)
async def get_block_outputs_for_debug_session(
workflow_permanent_id: str,
version: int | None = None,
current_org: Organization = Depends(org_auth_service.get_current_org),
current_user_id: str = Depends(org_auth_service.get_current_user_id),
) -> dict[str, dict[str, Any]]:
return await app.WORKFLOW_SERVICE.get_block_outputs_for_debug_session(
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
user_id=current_user_id,
version=version,
)

View File

@@ -6,6 +6,15 @@ from pydantic import BaseModel, ConfigDict
DebugSessionStatus = t.Literal["created", "completed"]
class BlockRun(BaseModel):
model_config = ConfigDict(from_attributes=True)
block_label: str
output_parameter_id: str
workflow_run_id: str
created_at: datetime
class DebugSession(BaseModel):
model_config = ConfigDict(from_attributes=True)

View File

@@ -64,6 +64,7 @@ class WorkflowRunContext:
| BitwardenSensitiveInformationParameter
| CredentialParameter
],
block_outputs: dict[str, Any] | None = None,
) -> Self:
# key is label name
workflow_run_context = cls(aws_client=aws_client, azure_client=azure_client)
@@ -88,6 +89,10 @@ class WorkflowRunContext:
raise OutputParameterKeyCollisionError(output_parameter.key)
workflow_run_context.parameters[output_parameter.key] = output_parameter
if block_outputs:
for label, value in block_outputs.items():
workflow_run_context.values[f"{label}_output"] = value
for secrete_parameter in secret_parameters:
if isinstance(secrete_parameter, AWSSecretParameter):
await workflow_run_context.register_aws_secret_parameter_value(secrete_parameter)
@@ -884,6 +889,7 @@ class WorkflowContextManager:
| BitwardenCreditCardDataParameter
| BitwardenSensitiveInformationParameter
],
block_outputs: dict[str, Any] | None = None,
) -> WorkflowRunContext:
workflow_run_context = await WorkflowRunContext.init(
self.aws_client,
@@ -893,6 +899,7 @@ class WorkflowContextManager:
workflow_output_parameters,
context_parameters,
secret_parameters,
block_outputs,
)
self.workflow_run_contexts[workflow_run_id] = workflow_run_context
return workflow_run_context

View File

@@ -9,6 +9,7 @@ import structlog
from jinja2.sandbox import SandboxedEnvironment
from skyvern import analytics
from skyvern.client.types.output_parameter import OutputParameter as BlockOutputParameter
from skyvern.config import settings
from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT, SAVE_DOWNLOADED_FILES_TIMEOUT
from skyvern.core.script_generations.generate_script import generate_workflow_script as generate_python_workflow_script
@@ -263,6 +264,7 @@ class WorkflowService:
api_key: str,
organization: Organization,
block_labels: list[str] | None = None,
block_outputs: dict[str, Any] | None = None,
browser_session_id: str | None = None,
) -> WorkflowRun:
"""Execute a workflow."""
@@ -273,6 +275,7 @@ class WorkflowService:
organization_id=organization_id,
browser_session_id=browser_session_id,
block_labels=block_labels,
block_outputs=block_outputs,
)
workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id)
workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id)
@@ -316,6 +319,7 @@ class WorkflowService:
workflow_output_parameters,
context_parameters,
secret_parameters,
block_outputs,
)
except Exception as e:
LOG.exception(
@@ -375,6 +379,7 @@ class WorkflowService:
workflow_run_id=workflow_run.workflow_run_id,
block_cnt=len(blocks),
block_labels=block_labels,
block_outputs=block_outputs,
)
else:
@@ -714,6 +719,56 @@ class WorkflowService:
return workflow
async def get_block_outputs_for_debug_session(
self,
workflow_permanent_id: str,
user_id: str,
organization_id: str,
exclude_deleted: bool = True,
version: int | None = None,
) -> dict[str, dict[str, Any]]:
workflow = await app.DATABASE.get_workflow_by_permanent_id(
workflow_permanent_id,
organization_id=organization_id,
version=version,
exclude_deleted=exclude_deleted,
)
if not workflow:
raise WorkflowNotFound(workflow_permanent_id=workflow_permanent_id, version=version)
labels_to_outputs: dict[str, BlockOutputParameter] = {}
for block in workflow.workflow_definition.blocks:
label = block.label
block_run = await app.DATABASE.get_latest_completed_block_run(
organization_id=organization_id,
user_id=user_id,
block_label=label,
workflow_permanent_id=workflow_permanent_id,
)
if not block_run:
continue
output_parameter = await app.DATABASE.get_workflow_run_output_parameter_by_id(
workflow_run_id=block_run.workflow_run_id, output_parameter_id=block_run.output_parameter_id
)
if not output_parameter:
continue
block_output_parameter = output_parameter.value
if not isinstance(block_output_parameter, dict):
continue
block_output_parameter["created_at"] = output_parameter.created_at
labels_to_outputs[label] = block_output_parameter
return labels_to_outputs
async def get_workflows_by_permanent_ids(
self,
workflow_permanent_ids: list[str],
@@ -1773,7 +1828,7 @@ class WorkflowService:
raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys)
# Create blocks from the request
block_label_mapping = {}
blocks = []
blocks: list[BlockTypeVar] = []
for block_yaml in request.workflow_definition.blocks:
block = await self.block_yaml_to_block(workflow, block_yaml, parameters)
blocks.append(block)

View File

@@ -372,6 +372,12 @@ class BlockRunRequest(WorkflowRunRequest):
description="Labels of the blocks to execute",
examples=["block_1", "block_2"],
)
block_outputs: dict[str, Any] | None = Field(
default=None,
# NOTE(jdo): this is either the last output of the block for a given
# org_id/user_id, or an override supplied by the user
description="Any active outputs of blocks in a workflow being debugged",
)
class BaseRunResponse(BaseModel):

View File

@@ -1,9 +1,14 @@
import typing as t
import structlog
from fastapi import BackgroundTasks, Request
from skyvern.exceptions import OutputParameterNotFound, WorkflowNotFound
from skyvern.forge import app
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory
from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.workflow.models.parameter import OutputParameter
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRequestBody, WorkflowRun
from skyvern.schemas.runs import WorkflowRunRequest
from skyvern.services import workflow_service
@@ -53,17 +58,47 @@ async def execute_blocks(
workflow_run_id: str,
workflow_permanent_id: str,
organization: Organization,
user_id: str,
browser_session_id: str | None = None,
block_outputs: dict[str, t.Any] | None = None,
) -> None:
"""
Runs one or more blocks of a workflow.
"""
workflow = await app.DATABASE.get_workflow_by_permanent_id(
workflow_permanent_id=workflow_id,
organization_id=organization.organization_id,
)
if not workflow:
raise WorkflowNotFound(workflow_permanent_id=workflow_id)
block_output_parameters: dict[str, OutputParameter] = {}
for block_label in block_labels:
output_parameter = workflow.get_output_parameter(block_label)
if not output_parameter:
raise OutputParameterNotFound(block_label=block_label, workflow_permanent_id=workflow_id)
block_output_parameters[block_label] = output_parameter
for block_label, output_parameter in block_output_parameters.items():
await app.DATABASE.create_block_run(
organization_id=organization.organization_id,
user_id=user_id,
block_label=block_label,
output_parameter_id=output_parameter.output_parameter_id,
workflow_run_id=workflow_run_id,
)
LOG.info(
"Executing block(s)",
organization_id=organization.organization_id,
workflow_run_id=workflow_run_id,
block_labels=block_labels,
block_outputs=block_outputs,
)
await AsyncExecutorFactory.get_executor().execute_workflow(
@@ -77,4 +112,5 @@ async def execute_blocks(
browser_session_id=browser_session_id,
api_key=api_key,
block_labels=block_labels,
block_outputs=block_outputs,
)

View File

@@ -1,3 +1,5 @@
import typing as t
import structlog
from fastapi import BackgroundTasks, Request
@@ -69,6 +71,7 @@ async def run_workflow(
request: Request | None = None,
background_tasks: BackgroundTasks | None = None,
block_labels: list[str] | None = None,
block_outputs: dict[str, t.Any] | None = None,
) -> WorkflowRun:
workflow_run = await prepare_workflow(
workflow_id=workflow_id,
@@ -91,6 +94,7 @@ async def run_workflow(
browser_session_id=workflow_request.browser_session_id,
api_key=api_key,
block_labels=block_labels,
block_outputs=block_outputs,
)
return workflow_run