From 264b1751d42c8519d5ac2586e21e89491c17d8ae Mon Sep 17 00:00:00 2001 From: Jonathan Dobson Date: Fri, 24 Oct 2025 16:34:14 -0400 Subject: [PATCH] Backend: implement InteractionBlock (#3810) --- skyvern/config.py | 6 + .../script_generations/generate_script.py | 9 + skyvern/forge/sdk/api/email.py | 77 ++++++ skyvern/forge/sdk/db/client.py | 8 +- skyvern/forge/sdk/routes/agent_protocol.py | 28 +++ skyvern/forge/sdk/workflow/models/block.py | 219 ++++++++++++++++++ skyvern/forge/sdk/workflow/service.py | 58 +++++ skyvern/schemas/workflows.py | 16 ++ 8 files changed, 420 insertions(+), 1 deletion(-) create mode 100644 skyvern/forge/sdk/api/email.py diff --git a/skyvern/config.py b/skyvern/config.py index 3c4a51ed..65e3d2fc 100644 --- a/skyvern/config.py +++ b/skyvern/config.py @@ -95,6 +95,12 @@ class Settings(BaseSettings): SKYVERN_TELEMETRY: bool = True ANALYTICS_ID: str = "anonymous" + # email settings + SMTP_HOST: str = "localhost" + SMTP_PORT: int = 25 + SMTP_USERNAME: str = "username" + SMTP_PASSWORD: str = "password" + # browser settings BROWSER_LOCALE: str = "en-US" BROWSER_TIMEZONE: str = "America/New_York" diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index 425b0c66..1b698950 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -872,6 +872,13 @@ def _build_validate_statement( return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) +def _build_human_interaction_statement( + block: dict[str, Any], +) -> cst.SimpleStatementLine: + LOG.warning("Human interaction code generation is not yet implemented.", block=block) + return cst.SimpleStatementLine([cst.Expr(cst.Comment("# TODO: Implement human interaction logic"))]) + + def _build_wait_statement(block: dict[str, Any]) -> cst.SimpleStatementLine: """Build a skyvern.wait statement.""" args = [ @@ -1577,6 +1584,8 @@ def _build_block_statement(block: dict[str, Any], data_variable_name: str | None stmt = _build_navigate_statement(block_title, block, data_variable_name) elif block_type == "validation": stmt = _build_validate_statement(block_title, block, data_variable_name) + elif block_type == "human_interaction": + stmt = _build_human_interaction_statement(block) elif block_type == "task_v2": stmt = _build_run_task_statement(block_title, block, data_variable_name) elif block_type == "send_email": diff --git a/skyvern/forge/sdk/api/email.py b/skyvern/forge/sdk/api/email.py new file mode 100644 index 00000000..396d2cb1 --- /dev/null +++ b/skyvern/forge/sdk/api/email.py @@ -0,0 +1,77 @@ +import smtplib +from email.message import EmailMessage + +import structlog +from email_validator import EmailNotValidError, validate_email + +from skyvern.config import settings + +LOG = structlog.get_logger() + + +async def _send(*, message: EmailMessage) -> bool: + try: + smtp_host = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) + + LOG.info("email: Connected to SMTP server") + + smtp_host.starttls() + smtp_host.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD) + + LOG.info("email: Logged in to SMTP server") + + smtp_host.send_message(message) + + LOG.info("email: Email sent") + except Exception as e: + LOG.error("email: Failed to send email", error=str(e)) + raise e + + return True + + +def validate_recipients(recipients: list[str]) -> None: + for recipient in recipients: + try: + validate_email(recipient) + except EmailNotValidError: + raise Exception( + f"invalid email address: {recipient}", + ) + + +async def build_message( + *, + body: str | None = None, + recipients: list[str], + sender: str, + subject: str, +) -> EmailMessage: + to = ", ".join(recipients) + msg = EmailMessage() + msg["BCC"] = sender # BCC the sender so there is a record of the email being sent + msg["From"] = sender + msg["Subject"] = subject + msg["To"] = to + msg.set_content(body) + + return msg + + +async def send( + *, + sender: str, + subject: str, + recipients: list[str], + body: str | None = None, +) -> bool: + validate_recipients(recipients) + + message = await build_message( + body=body, + recipients=recipients, + sender=sender, + subject=subject, + ) + + return await _send(message=message) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 4069ae79..5929a97c 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1975,7 +1975,11 @@ class AgentDB: raise async def get_workflow_run( - self, workflow_run_id: str, organization_id: str | None = None, job_id: str | None = None + self, + workflow_run_id: str, + organization_id: str | None = None, + job_id: str | None = None, + status: WorkflowRunStatus | None = None, ) -> WorkflowRun | None: try: async with self.Session() as session: @@ -1984,6 +1988,8 @@ class AgentDB: get_workflow_run_query = get_workflow_run_query.filter_by(organization_id=organization_id) if job_id: get_workflow_run_query = get_workflow_run_query.filter_by(job_id=job_id) + if status: + get_workflow_run_query = get_workflow_run_query.filter_by(status=status.value) if workflow_run := (await session.scalars(get_workflow_run_query)).first(): return convert_to_workflow_run(workflow_run) return None diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 5d6fcda9..ea99747e 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1271,6 +1271,22 @@ async def _cancel_workflow_run(workflow_run_id: str, organization_id: str, x_api await app.WORKFLOW_SERVICE.execute_workflow_webhook(workflow_run, api_key=x_api_key) +async def _continue_workflow_run(workflow_run_id: str, organization_id: str) -> None: + workflow_run = await app.DATABASE.get_workflow_run( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + status=WorkflowRunStatus.paused, + ) + + if not workflow_run: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow run not found {workflow_run_id}", + ) + + await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id) + + @legacy_base_router.post( "/workflows/runs/{workflow_run_id}/cancel", tags=["agent"], @@ -1287,6 +1303,18 @@ async def cancel_workflow_run( await _cancel_workflow_run(workflow_run_id, current_org.organization_id, x_api_key) +@base_router.post( + "/workflows/runs/{workflow_run_id}/continue", + include_in_schema=False, +) +@base_router.post("/workflows/runs/{workflow_run_id}/continue/", include_in_schema=False) +async def continue_workflow_run( + workflow_run_id: str, + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> None: + await _continue_workflow_run(workflow_run_id, current_org.organization_id) + + @legacy_base_router.post( "/runs/{browser_session_id}/workflow_run/{workflow_run_id}/cancel/", tags=["agent"], diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 9af7bbd1..c5b117a3 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -46,6 +46,7 @@ from skyvern.exceptions import ( ) from skyvern.forge import app from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.api import email from skyvern.forge.sdk.api.aws import AsyncAWSClient from skyvern.forge.sdk.api.azure import AsyncAzureStorageClient from skyvern.forge.sdk.api.files import ( @@ -3049,6 +3050,223 @@ class WaitBlock(Block): ) +class HumanInteractionBlock(BaseTaskBlock): + """ + A block for human/agent interaction. + + For the first pass at this, the implicit behaviour is that the user is given a single binary + choice (a go//no-go). + + If the human: + - chooses positively, the workflow continues + - chooses negatively, the workflow is terminated + - does not respond within the timeout period, the workflow terminates + """ + + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + block_type: Literal[BlockType.HUMAN_INTERACTION] = BlockType.HUMAN_INTERACTION # type: ignore + + instructions: str = "Please review and approve or reject to continue the workflow." + positive_descriptor: str = "Approve" + negative_descriptor: str = "Reject" + timeout_seconds: int = 60 * 60 * 2 # two hours + + # email options + sender: str = "hello@skyvern.com" + recipients: list[str] = [] + subject: str = "Human interaction required for workflow run" + body: str = "Your interaction is required for a workflow run!" + + def format_potential_template_parameters(self, workflow_run_context: WorkflowRunContext) -> None: + super().format_potential_template_parameters(workflow_run_context) + + self.instructions = self.format_block_parameter_template_from_workflow_run_context( + self.instructions, workflow_run_context + ) + + self.body = self.format_block_parameter_template_from_workflow_run_context(self.body, workflow_run_context) + + self.subject = self.format_block_parameter_template_from_workflow_run_context( + self.subject, workflow_run_context + ) + + formatted: list[str] = [] + for recipient in self.recipients: + formatted.append( + self.format_block_parameter_template_from_workflow_run_context(recipient, workflow_run_context) + ) + + self.recipients = formatted + + async def execute( + self, + workflow_run_id: str, + workflow_run_block_id: str, + organization_id: str | None = None, + browser_session_id: str | None = None, + **kwargs: dict, + ) -> BlockResult: + # avoid circular import + from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus # noqa: PLC0415 + + LOG.info( + "Pausing workflow for human interaction", + workflow_run_id=workflow_run_id, + recipients=self.recipients, + timeout=self.timeout_seconds, + browser_session_id=browser_session_id, + ) + + await app.DATABASE.update_workflow_run( + workflow_run_id=workflow_run_id, + status=WorkflowRunStatus.paused, + ) + + workflow_run = await app.DATABASE.get_workflow_run( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + + if not workflow_run: + return await self.build_block_result( + success=False, + failure_reason="Workflow run not found", + output_parameter_value=None, + status=BlockStatus.failed, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + + try: + self.format_potential_template_parameters(workflow_run_context) + except Exception as e: + return await self.build_block_result( + success=False, + failure_reason=f"Failed to format jinja template: {str(e)}", + output_parameter_value=None, + status=BlockStatus.failed, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + + workflow_permanent_id = workflow_run.workflow_permanent_id + app_url = f"{settings.SKYVERN_APP_URL}/workflows/{workflow_permanent_id}/{workflow_run_id}/overview" + body = f"{self.body}\n\nKindly visit {app_url}\n\n{self.instructions}\n\n" + subject = f"{self.subject} - Workflow Run ID: {workflow_run_id}" + + try: + await email.send( + body=body, + sender=self.sender, + subject=subject, + recipients=self.recipients, + ) + + email_success = True + email_failure_reason = None + except Exception as ex: + LOG.error( + "Failed to send human interaction email", + workflow_run_id=workflow_run_id, + error=str(ex), + browser_session_id=browser_session_id, + ) + email_success = False + email_failure_reason = str(ex) + + if not email_success: + return await self.build_block_result( + success=False, + failure_reason=f"Failed to send human interaction email: {email_failure_reason or 'email failed'}", + output_parameter_value=None, + status=BlockStatus.failed, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + + # Wait for the timeout_seconds or until the workflow run status changes from paused + start_time = asyncio.get_event_loop().time() + check_interval = 5 # Check every 5 seconds + log_that_we_are_waiting = True + log_wait = 0 + + while True: + if not log_that_we_are_waiting: + log_wait += check_interval + if log_wait >= 60: # Log every 1 minute + log_that_we_are_waiting = True + log_wait = 0 + + elapsed_time_seconds = asyncio.get_event_loop().time() - start_time + + if log_that_we_are_waiting: + LOG.info( + "Waiting for human interaction...", + workflow_run_id=workflow_run_id, + elapsed_time_seconds=elapsed_time_seconds, + timeout_seconds=self.timeout_seconds, + browser_session_id=browser_session_id, + ) + log_that_we_are_waiting = False + + # Check if timeout_seconds has elapsed + if elapsed_time_seconds >= self.timeout_seconds: + LOG.info( + "Human Interaction block timeout_seconds reached", + workflow_run_id=workflow_run_id, + elapsed_time_seconds=elapsed_time_seconds, + browser_session_id=browser_session_id, + ) + + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + success = False + reason = "Timeout elapsed with no human interaction" + result_dict = {"success": success, "reason": reason} + + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict) + + return await self.build_block_result( + success=success, + failure_reason=reason, + output_parameter_value=result_dict, + status=BlockStatus.timed_out, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + + workflow_run = await app.DATABASE.get_workflow_run( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + + if workflow_run and workflow_run.status != WorkflowRunStatus.paused: + LOG.info( + "Workflow run status changed from paused", + workflow_run_id=workflow_run_id, + new_status=workflow_run.status, + browser_session_id=browser_session_id, + ) + + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + result_dict = {"success": True, "reason": f"status_changed:{workflow_run.status}"} + + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict) + + return await self.build_block_result( + success=True, + failure_reason=None, + output_parameter_value=result_dict, + status=BlockStatus.completed, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + + await asyncio.sleep(min(check_interval, self.timeout_seconds - elapsed_time_seconds)) + + class ValidationBlock(BaseTaskBlock): # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: # Parameter 1 of Literal[...] cannot be of type "Any" @@ -3521,6 +3739,7 @@ BlockSubclasses = Union[ ExtractionBlock, LoginBlock, WaitBlock, + HumanInteractionBlock, FileDownloadBlock, UrlBlock, TaskV2Block, diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 2ccfb412..d3404fec 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -38,6 +38,7 @@ from skyvern.forge.sdk.db.enums import TaskType from skyvern.forge.sdk.models import Step, StepStatus from skyvern.forge.sdk.schemas.files import FileInfo from skyvern.forge.sdk.schemas.organizations import Organization +from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunTimeline, WorkflowRunTimelineType from skyvern.forge.sdk.trace import TraceManager @@ -60,6 +61,7 @@ from skyvern.forge.sdk.workflow.models.block import ( FileUploadBlock, ForLoopBlock, HttpRequestBlock, + HumanInteractionBlock, LoginBlock, NavigationBlock, PDFParserBlock, @@ -333,6 +335,35 @@ class WorkflowService: return workflow_run + async def auto_create_browser_session_if_needed( + self, + organization_id: str, + workflow: Workflow, + *, + browser_session_id: str | None = None, + proxy_location: ProxyLocation | None = None, + ) -> PersistentBrowserSession | None: + if browser_session_id: # the user has supplied an id, so no need to create one + return None + + workflow_definition = workflow.workflow_definition + blocks = workflow_definition.blocks + human_interaction_blocks = [block for block in blocks if block.block_type == BlockType.HUMAN_INTERACTION] + + if human_interaction_blocks: + timeouts = [getattr(block, "timeout_seconds", 60 * 60) for block in human_interaction_blocks] + timeout_seconds = sum(timeouts) + 60 * 60 + + browser_session = await app.PERSISTENT_SESSIONS_MANAGER.create_session( + organization_id=organization_id, + timeout_minutes=timeout_seconds // 60, + proxy_location=proxy_location, + ) + + return browser_session + + return None + @TraceManager.traced_async(ignore_inputs=["organization", "api_key"]) async def execute_workflow( self, @@ -424,6 +455,17 @@ class WorkflowService: ) return workflow_run + browser_session = await self.auto_create_browser_session_if_needed( + organization.organization_id, + workflow, + browser_session_id=browser_session_id, + proxy_location=workflow_run.proxy_location, + ) + + if browser_session: + browser_session_id = browser_session.persistent_browser_session_id + close_browser_on_completion = True + # Check if there's a related workflow script that should be used instead workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels) current_context = skyvern_context.current() @@ -595,6 +637,7 @@ class WorkflowService: if not blocks: raise SkyvernException(f"No blocks found for the given block labels: {block_labels}") + # # Execute workflow blocks blocks_cnt = len(blocks) block_result = None @@ -2744,6 +2787,21 @@ class WorkflowService: include_action_history_in_verification=block_yaml.include_action_history_in_verification, ) + elif block_yaml.block_type == BlockType.HUMAN_INTERACTION: + return HumanInteractionBlock( + label=block_yaml.label, + output_parameter=output_parameter, + instructions=block_yaml.instructions, + positive_descriptor=block_yaml.positive_descriptor, + negative_descriptor=block_yaml.negative_descriptor, + timeout_seconds=block_yaml.timeout_seconds, + # -- + sender=block_yaml.sender, + recipients=block_yaml.recipients, + subject=block_yaml.subject, + body=block_yaml.body, + ) + elif block_yaml.block_type == BlockType.EXTRACTION: extraction_block_parameters = ( [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] diff --git a/skyvern/schemas/workflows.py b/skyvern/schemas/workflows.py index b92a4930..54de18ec 100644 --- a/skyvern/schemas/workflows.py +++ b/skyvern/schemas/workflows.py @@ -37,6 +37,7 @@ class BlockType(StrEnum): GOTO_URL = "goto_url" PDF_PARSER = "pdf_parser" HTTP_REQUEST = "http_request" + HUMAN_INTERACTION = "human_interaction" class BlockStatus(StrEnum): @@ -429,6 +430,20 @@ class WaitBlockYAML(BlockYAML): wait_sec: int = 0 +class HumanInteractionBlockYAML(BlockYAML): + block_type: Literal[BlockType.HUMAN_INTERACTION] = BlockType.HUMAN_INTERACTION # type: ignore + + instructions: str = "Please review and approve or reject to continue the workflow." + positive_descriptor: str = "Approve" + negative_descriptor: str = "Reject" + timeout_seconds: int + + sender: str + recipients: list[str] + subject: str + body: str + + class FileDownloadBlockYAML(BlockYAML): block_type: Literal[BlockType.FILE_DOWNLOAD] = BlockType.FILE_DOWNLOAD # type: ignore @@ -511,6 +526,7 @@ BLOCK_YAML_SUBCLASSES = ( | ExtractionBlockYAML | LoginBlockYAML | WaitBlockYAML + | HumanInteractionBlockYAML | FileDownloadBlockYAML | UrlBlockYAML | PDFParserBlockYAML