diff --git a/alembic/versions/2025_08_28_2356-f54bab75b1c4_add_block_runs_table.py b/alembic/versions/2025_08_28_2356-f54bab75b1c4_add_block_runs_table.py new file mode 100644 index 00000000..dd8fb94a --- /dev/null +++ b/alembic/versions/2025_08_28_2356-f54bab75b1c4_add_block_runs_table.py @@ -0,0 +1,40 @@ +"""add block_runs table + +Revision ID: f54bab75b1c4 +Revises: d3ec63728c2a +Create Date: 2025-08-28 23:56:31.973207+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "f54bab75b1c4" +down_revision: Union[str, None] = "d3ec63728c2a" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "block_runs", + sa.Column("organization_id", sa.String(), nullable=False), + sa.Column("user_id", sa.String(), nullable=False), + sa.Column("block_label", sa.String(), nullable=False), + sa.Column("output_parameter_id", sa.String(), nullable=False), + sa.Column("workflow_run_id", sa.String(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("workflow_run_id"), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("block_runs") + # ### end Alembic commands ### diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 9e17b076..b567375a 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -754,3 +754,8 @@ class NoTOTPSecretFound(SkyvernException): class NoElementFound(SkyvernException): def __init__(self) -> None: super().__init__("No element found.") + + +class OutputParameterNotFound(SkyvernException): + def __init__(self, block_label: str, workflow_permanent_id: str) -> None: + super().__init__(f"Output parameter for {block_label} not found in workflow {workflow_permanent_id}") diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 2e845e3f..9e17cdcb 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -20,6 +20,7 @@ from skyvern.forge.sdk.db.models import ( BitwardenCreditCardDataParameterModel, BitwardenLoginCredentialParameterModel, BitwardenSensitiveInformationParameterModel, + BlockRunModel, CredentialModel, CredentialParameterModel, DebugSessionModel, @@ -75,7 +76,7 @@ from skyvern.forge.sdk.log_artifacts import save_workflow_run_logs from skyvern.forge.sdk.models import Step, StepStatus from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion from skyvern.forge.sdk.schemas.credentials import Credential, CredentialType -from skyvern.forge.sdk.schemas.debug_sessions import DebugSession +from skyvern.forge.sdk.schemas.debug_sessions import BlockRun, DebugSession from skyvern.forge.sdk.schemas.organization_bitwarden_collections import OrganizationBitwardenCollection from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAuthToken from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession @@ -2124,6 +2125,29 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise + async def get_workflow_run_output_parameter_by_id( + self, workflow_run_id: str, output_parameter_id: str + ) -> WorkflowRunOutputParameter | None: + try: + async with self.Session() as session: + parameter = ( + await session.scalars( + select(WorkflowRunOutputParameterModel) + .filter_by(workflow_run_id=workflow_run_id) + .filter_by(output_parameter_id=output_parameter_id) + .order_by(WorkflowRunOutputParameterModel.created_at) + ) + ).first() + + if parameter: + return convert_to_workflow_run_output_parameter(parameter, self.debug_enabled) + + return None + + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + async def create_or_update_workflow_run_output_parameter( self, workflow_run_id: str, @@ -3520,6 +3544,72 @@ class AgentDB: return DebugSession.model_validate(debug_session) + async def get_latest_block_run( + self, + *, + organization_id: str, + user_id: str, + block_label: str, + ) -> BlockRun | None: + async with self.Session() as session: + query = ( + select(BlockRunModel) + .filter_by(organization_id=organization_id) + .filter_by(user_id=user_id) + .filter_by(block_label=block_label) + .order_by(BlockRunModel.created_at.desc()) + ) + + model = (await session.scalars(query)).first() + + return BlockRun.model_validate(model) if model else None + + async def get_latest_completed_block_run( + self, + *, + organization_id: str, + user_id: str, + block_label: str, + workflow_permanent_id: str, + ) -> BlockRun | None: + async with self.Session() as session: + query = ( + select(BlockRunModel) + .join(WorkflowRunModel, BlockRunModel.workflow_run_id == WorkflowRunModel.workflow_run_id) + .filter(BlockRunModel.organization_id == organization_id) + .filter(BlockRunModel.user_id == user_id) + .filter(BlockRunModel.block_label == block_label) + .filter(WorkflowRunModel.status == WorkflowRunStatus.completed) + .filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id) + .order_by(BlockRunModel.created_at.desc()) + ) + + model = (await session.scalars(query)).first() + + return BlockRun.model_validate(model) if model else None + + async def create_block_run( + self, + *, + organization_id: str, + user_id: str, + block_label: str, + output_parameter_id: str, + workflow_run_id: str, + ) -> None: + async with self.Session() as session: + block_run = BlockRunModel( + organization_id=organization_id, + user_id=user_id, + block_label=block_label, + output_parameter_id=output_parameter_id, + workflow_run_id=workflow_run_id, + ) + + session.add(block_run) + + await session.commit() + async def get_latest_debug_session_for_user( self, *, diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 1fa1c1aa..a7e273fd 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -785,6 +785,27 @@ class DebugSessionModel(Base): status = Column(String, nullable=False, default="created") +class BlockRunModel(Base): + """ + When a block is run in the debugger, it runs "as a 'workflow run'", but that + workflow run has just a single block in it. This table ties a block run to + the workflow run, and a particular output parameter id (which gets + overwritten on each run.) + + Use the `created_at` timestamp to find the latest workflow run (and output + param id) for a given `(org_id, user_id, block_label)`. + """ + + __tablename__ = "block_runs" + + organization_id = Column(String, nullable=False) + user_id = Column(String, nullable=False) + block_label = Column(String, nullable=False) + output_parameter_id = Column(String, nullable=False) + workflow_run_id = Column(String, primary_key=True) + created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) + + class ScriptModel(Base): __tablename__ = "scripts" __table_args__ = ( diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index b1170cbd..4c66aab1 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -47,6 +47,7 @@ class AsyncExecutor(abc.ABC): api_key: str | None, browser_session_id: str | None, block_labels: list[str] | None, + block_outputs: dict[str, Any] | None, **kwargs: dict, ) -> None: pass @@ -151,6 +152,7 @@ class BackgroundTaskExecutor(AsyncExecutor): api_key: str | None, browser_session_id: str | None, block_labels: list[str] | None, + block_outputs: dict[str, Any] | None, **kwargs: dict, ) -> None: if background_tasks: @@ -170,6 +172,7 @@ class BackgroundTaskExecutor(AsyncExecutor): organization=organization, browser_session_id=browser_session_id, block_labels=block_labels, + block_outputs=block_outputs, ) else: LOG.warning("Background tasks not enabled, skipping workflow execution") diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index c68ada94..8e297dee 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -920,6 +920,7 @@ async def run_block( background_tasks: BackgroundTasks, block_run_request: BlockRunRequest, organization: Organization = Depends(org_auth_service.get_current_org), + user_id: str = Depends(org_auth_service.get_current_user_id), template: bool = Query(False), x_api_key: Annotated[str | None, Header()] = None, ) -> BlockRunResponse: @@ -928,6 +929,12 @@ async def run_block( workflow_run_id. """ + # NOTE(jdo): if you're running debugger locally, and you want to see the + # block runs happening (no temporal; no pbs), then uncomment these two + # lines; that'll make the block run happen in a new local browser instance. + # LOG.critical("REMOVING BROWSER SESSION ID") + # block_run_request.browser_session_id = None + workflow_run = await block_service.ensure_workflow_run( organization=organization, template=template, @@ -946,7 +953,9 @@ async def run_block( workflow_run_id=workflow_run.workflow_run_id, workflow_permanent_id=workflow_run.workflow_permanent_id, organization=organization, + user_id=user_id, browser_session_id=browser_session_id, + block_outputs=block_run_request.block_outputs, ) return BlockRunResponse( @@ -2290,3 +2299,22 @@ async def new_debug_session( ) return debug_session + + +@base_router.get( + "/debug-session/{workflow_permanent_id}/block-outputs", + response_model=dict[str, dict[str, Any]], + include_in_schema=False, +) +async def get_block_outputs_for_debug_session( + workflow_permanent_id: str, + version: int | None = None, + current_org: Organization = Depends(org_auth_service.get_current_org), + current_user_id: str = Depends(org_auth_service.get_current_user_id), +) -> dict[str, dict[str, Any]]: + return await app.WORKFLOW_SERVICE.get_block_outputs_for_debug_session( + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + user_id=current_user_id, + version=version, + ) diff --git a/skyvern/forge/sdk/schemas/debug_sessions.py b/skyvern/forge/sdk/schemas/debug_sessions.py index 48aa658d..bf8cbf90 100644 --- a/skyvern/forge/sdk/schemas/debug_sessions.py +++ b/skyvern/forge/sdk/schemas/debug_sessions.py @@ -6,6 +6,15 @@ from pydantic import BaseModel, ConfigDict DebugSessionStatus = t.Literal["created", "completed"] +class BlockRun(BaseModel): + model_config = ConfigDict(from_attributes=True) + + block_label: str + output_parameter_id: str + workflow_run_id: str + created_at: datetime + + class DebugSession(BaseModel): model_config = ConfigDict(from_attributes=True) diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index f4988731..eadec533 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -64,6 +64,7 @@ class WorkflowRunContext: | BitwardenSensitiveInformationParameter | CredentialParameter ], + block_outputs: dict[str, Any] | None = None, ) -> Self: # key is label name workflow_run_context = cls(aws_client=aws_client, azure_client=azure_client) @@ -88,6 +89,10 @@ class WorkflowRunContext: raise OutputParameterKeyCollisionError(output_parameter.key) workflow_run_context.parameters[output_parameter.key] = output_parameter + if block_outputs: + for label, value in block_outputs.items(): + workflow_run_context.values[f"{label}_output"] = value + for secrete_parameter in secret_parameters: if isinstance(secrete_parameter, AWSSecretParameter): await workflow_run_context.register_aws_secret_parameter_value(secrete_parameter) @@ -884,6 +889,7 @@ class WorkflowContextManager: | BitwardenCreditCardDataParameter | BitwardenSensitiveInformationParameter ], + block_outputs: dict[str, Any] | None = None, ) -> WorkflowRunContext: workflow_run_context = await WorkflowRunContext.init( self.aws_client, @@ -893,6 +899,7 @@ class WorkflowContextManager: workflow_output_parameters, context_parameters, secret_parameters, + block_outputs, ) self.workflow_run_contexts[workflow_run_id] = workflow_run_context return workflow_run_context diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 9b2426ed..fe99dcfb 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -9,6 +9,7 @@ import structlog from jinja2.sandbox import SandboxedEnvironment from skyvern import analytics +from skyvern.client.types.output_parameter import OutputParameter as BlockOutputParameter from skyvern.config import settings from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT, SAVE_DOWNLOADED_FILES_TIMEOUT from skyvern.core.script_generations.generate_script import generate_workflow_script as generate_python_workflow_script @@ -263,6 +264,7 @@ class WorkflowService: api_key: str, organization: Organization, block_labels: list[str] | None = None, + block_outputs: dict[str, Any] | None = None, browser_session_id: str | None = None, ) -> WorkflowRun: """Execute a workflow.""" @@ -273,6 +275,7 @@ class WorkflowService: organization_id=organization_id, browser_session_id=browser_session_id, block_labels=block_labels, + block_outputs=block_outputs, ) workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id) workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id) @@ -316,6 +319,7 @@ class WorkflowService: workflow_output_parameters, context_parameters, secret_parameters, + block_outputs, ) except Exception as e: LOG.exception( @@ -375,6 +379,7 @@ class WorkflowService: workflow_run_id=workflow_run.workflow_run_id, block_cnt=len(blocks), block_labels=block_labels, + block_outputs=block_outputs, ) else: @@ -714,6 +719,56 @@ class WorkflowService: return workflow + async def get_block_outputs_for_debug_session( + self, + workflow_permanent_id: str, + user_id: str, + organization_id: str, + exclude_deleted: bool = True, + version: int | None = None, + ) -> dict[str, dict[str, Any]]: + workflow = await app.DATABASE.get_workflow_by_permanent_id( + workflow_permanent_id, + organization_id=organization_id, + version=version, + exclude_deleted=exclude_deleted, + ) + + if not workflow: + raise WorkflowNotFound(workflow_permanent_id=workflow_permanent_id, version=version) + + labels_to_outputs: dict[str, BlockOutputParameter] = {} + + for block in workflow.workflow_definition.blocks: + label = block.label + + block_run = await app.DATABASE.get_latest_completed_block_run( + organization_id=organization_id, + user_id=user_id, + block_label=label, + workflow_permanent_id=workflow_permanent_id, + ) + + if not block_run: + continue + + output_parameter = await app.DATABASE.get_workflow_run_output_parameter_by_id( + workflow_run_id=block_run.workflow_run_id, output_parameter_id=block_run.output_parameter_id + ) + + if not output_parameter: + continue + + block_output_parameter = output_parameter.value + + if not isinstance(block_output_parameter, dict): + continue + + block_output_parameter["created_at"] = output_parameter.created_at + labels_to_outputs[label] = block_output_parameter + + return labels_to_outputs + async def get_workflows_by_permanent_ids( self, workflow_permanent_ids: list[str], @@ -1773,7 +1828,7 @@ class WorkflowService: raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys) # Create blocks from the request block_label_mapping = {} - blocks = [] + blocks: list[BlockTypeVar] = [] for block_yaml in request.workflow_definition.blocks: block = await self.block_yaml_to_block(workflow, block_yaml, parameters) blocks.append(block) diff --git a/skyvern/schemas/runs.py b/skyvern/schemas/runs.py index b8be0fec..28ec4f64 100644 --- a/skyvern/schemas/runs.py +++ b/skyvern/schemas/runs.py @@ -372,6 +372,12 @@ class BlockRunRequest(WorkflowRunRequest): description="Labels of the blocks to execute", examples=["block_1", "block_2"], ) + block_outputs: dict[str, Any] | None = Field( + default=None, + # NOTE(jdo): this is either the last output of the block for a given + # org_id/user_id, or an override supplied by the user + description="Any active outputs of blocks in a workflow being debugged", + ) class BaseRunResponse(BaseModel): diff --git a/skyvern/services/block_service.py b/skyvern/services/block_service.py index 18da5285..d57dae84 100644 --- a/skyvern/services/block_service.py +++ b/skyvern/services/block_service.py @@ -1,9 +1,14 @@ +import typing as t + import structlog from fastapi import BackgroundTasks, Request +from skyvern.exceptions import OutputParameterNotFound, WorkflowNotFound +from skyvern.forge import app from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory from skyvern.forge.sdk.schemas.organizations import Organization +from skyvern.forge.sdk.workflow.models.parameter import OutputParameter from skyvern.forge.sdk.workflow.models.workflow import WorkflowRequestBody, WorkflowRun from skyvern.schemas.runs import WorkflowRunRequest from skyvern.services import workflow_service @@ -53,17 +58,47 @@ async def execute_blocks( workflow_run_id: str, workflow_permanent_id: str, organization: Organization, + user_id: str, browser_session_id: str | None = None, + block_outputs: dict[str, t.Any] | None = None, ) -> None: """ Runs one or more blocks of a workflow. """ + workflow = await app.DATABASE.get_workflow_by_permanent_id( + workflow_permanent_id=workflow_id, + organization_id=organization.organization_id, + ) + + if not workflow: + raise WorkflowNotFound(workflow_permanent_id=workflow_id) + + block_output_parameters: dict[str, OutputParameter] = {} + + for block_label in block_labels: + output_parameter = workflow.get_output_parameter(block_label) + + if not output_parameter: + raise OutputParameterNotFound(block_label=block_label, workflow_permanent_id=workflow_id) + + block_output_parameters[block_label] = output_parameter + + for block_label, output_parameter in block_output_parameters.items(): + await app.DATABASE.create_block_run( + organization_id=organization.organization_id, + user_id=user_id, + block_label=block_label, + output_parameter_id=output_parameter.output_parameter_id, + workflow_run_id=workflow_run_id, + ) + LOG.info( "Executing block(s)", organization_id=organization.organization_id, workflow_run_id=workflow_run_id, block_labels=block_labels, + block_outputs=block_outputs, ) await AsyncExecutorFactory.get_executor().execute_workflow( @@ -77,4 +112,5 @@ async def execute_blocks( browser_session_id=browser_session_id, api_key=api_key, block_labels=block_labels, + block_outputs=block_outputs, ) diff --git a/skyvern/services/workflow_service.py b/skyvern/services/workflow_service.py index 5fee060d..a284fc78 100644 --- a/skyvern/services/workflow_service.py +++ b/skyvern/services/workflow_service.py @@ -1,3 +1,5 @@ +import typing as t + import structlog from fastapi import BackgroundTasks, Request @@ -69,6 +71,7 @@ async def run_workflow( request: Request | None = None, background_tasks: BackgroundTasks | None = None, block_labels: list[str] | None = None, + block_outputs: dict[str, t.Any] | None = None, ) -> WorkflowRun: workflow_run = await prepare_workflow( workflow_id=workflow_id, @@ -91,6 +94,7 @@ async def run_workflow( browser_session_id=workflow_request.browser_session_id, api_key=api_key, block_labels=block_labels, + block_outputs=block_outputs, ) return workflow_run