Backend: implement InteractionBlock (#3810)
This commit is contained in:
77
skyvern/forge/sdk/api/email.py
Normal file
77
skyvern/forge/sdk/api/email.py
Normal file
@@ -0,0 +1,77 @@
|
||||
import smtplib
|
||||
from email.message import EmailMessage
|
||||
|
||||
import structlog
|
||||
from email_validator import EmailNotValidError, validate_email
|
||||
|
||||
from skyvern.config import settings
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
|
||||
|
||||
async def _send(*, message: EmailMessage) -> bool:
|
||||
try:
|
||||
smtp_host = smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT)
|
||||
|
||||
LOG.info("email: Connected to SMTP server")
|
||||
|
||||
smtp_host.starttls()
|
||||
smtp_host.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD)
|
||||
|
||||
LOG.info("email: Logged in to SMTP server")
|
||||
|
||||
smtp_host.send_message(message)
|
||||
|
||||
LOG.info("email: Email sent")
|
||||
except Exception as e:
|
||||
LOG.error("email: Failed to send email", error=str(e))
|
||||
raise e
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def validate_recipients(recipients: list[str]) -> None:
|
||||
for recipient in recipients:
|
||||
try:
|
||||
validate_email(recipient)
|
||||
except EmailNotValidError:
|
||||
raise Exception(
|
||||
f"invalid email address: {recipient}",
|
||||
)
|
||||
|
||||
|
||||
async def build_message(
|
||||
*,
|
||||
body: str | None = None,
|
||||
recipients: list[str],
|
||||
sender: str,
|
||||
subject: str,
|
||||
) -> EmailMessage:
|
||||
to = ", ".join(recipients)
|
||||
msg = EmailMessage()
|
||||
msg["BCC"] = sender # BCC the sender so there is a record of the email being sent
|
||||
msg["From"] = sender
|
||||
msg["Subject"] = subject
|
||||
msg["To"] = to
|
||||
msg.set_content(body)
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
async def send(
|
||||
*,
|
||||
sender: str,
|
||||
subject: str,
|
||||
recipients: list[str],
|
||||
body: str | None = None,
|
||||
) -> bool:
|
||||
validate_recipients(recipients)
|
||||
|
||||
message = await build_message(
|
||||
body=body,
|
||||
recipients=recipients,
|
||||
sender=sender,
|
||||
subject=subject,
|
||||
)
|
||||
|
||||
return await _send(message=message)
|
||||
@@ -1975,7 +1975,11 @@ class AgentDB:
|
||||
raise
|
||||
|
||||
async def get_workflow_run(
|
||||
self, workflow_run_id: str, organization_id: str | None = None, job_id: str | None = None
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
organization_id: str | None = None,
|
||||
job_id: str | None = None,
|
||||
status: WorkflowRunStatus | None = None,
|
||||
) -> WorkflowRun | None:
|
||||
try:
|
||||
async with self.Session() as session:
|
||||
@@ -1984,6 +1988,8 @@ class AgentDB:
|
||||
get_workflow_run_query = get_workflow_run_query.filter_by(organization_id=organization_id)
|
||||
if job_id:
|
||||
get_workflow_run_query = get_workflow_run_query.filter_by(job_id=job_id)
|
||||
if status:
|
||||
get_workflow_run_query = get_workflow_run_query.filter_by(status=status.value)
|
||||
if workflow_run := (await session.scalars(get_workflow_run_query)).first():
|
||||
return convert_to_workflow_run(workflow_run)
|
||||
return None
|
||||
|
||||
@@ -1271,6 +1271,22 @@ async def _cancel_workflow_run(workflow_run_id: str, organization_id: str, x_api
|
||||
await app.WORKFLOW_SERVICE.execute_workflow_webhook(workflow_run, api_key=x_api_key)
|
||||
|
||||
|
||||
async def _continue_workflow_run(workflow_run_id: str, organization_id: str) -> None:
|
||||
workflow_run = await app.DATABASE.get_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
organization_id=organization_id,
|
||||
status=WorkflowRunStatus.paused,
|
||||
)
|
||||
|
||||
if not workflow_run:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Workflow run not found {workflow_run_id}",
|
||||
)
|
||||
|
||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id)
|
||||
|
||||
|
||||
@legacy_base_router.post(
|
||||
"/workflows/runs/{workflow_run_id}/cancel",
|
||||
tags=["agent"],
|
||||
@@ -1287,6 +1303,18 @@ async def cancel_workflow_run(
|
||||
await _cancel_workflow_run(workflow_run_id, current_org.organization_id, x_api_key)
|
||||
|
||||
|
||||
@base_router.post(
|
||||
"/workflows/runs/{workflow_run_id}/continue",
|
||||
include_in_schema=False,
|
||||
)
|
||||
@base_router.post("/workflows/runs/{workflow_run_id}/continue/", include_in_schema=False)
|
||||
async def continue_workflow_run(
|
||||
workflow_run_id: str,
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
) -> None:
|
||||
await _continue_workflow_run(workflow_run_id, current_org.organization_id)
|
||||
|
||||
|
||||
@legacy_base_router.post(
|
||||
"/runs/{browser_session_id}/workflow_run/{workflow_run_id}/cancel/",
|
||||
tags=["agent"],
|
||||
|
||||
@@ -46,6 +46,7 @@ from skyvern.exceptions import (
|
||||
)
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.prompts import prompt_engine
|
||||
from skyvern.forge.sdk.api import email
|
||||
from skyvern.forge.sdk.api.aws import AsyncAWSClient
|
||||
from skyvern.forge.sdk.api.azure import AsyncAzureStorageClient
|
||||
from skyvern.forge.sdk.api.files import (
|
||||
@@ -3049,6 +3050,223 @@ class WaitBlock(Block):
|
||||
)
|
||||
|
||||
|
||||
class HumanInteractionBlock(BaseTaskBlock):
|
||||
"""
|
||||
A block for human/agent interaction.
|
||||
|
||||
For the first pass at this, the implicit behaviour is that the user is given a single binary
|
||||
choice (a go//no-go).
|
||||
|
||||
If the human:
|
||||
- chooses positively, the workflow continues
|
||||
- chooses negatively, the workflow is terminated
|
||||
- does not respond within the timeout period, the workflow terminates
|
||||
"""
|
||||
|
||||
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
|
||||
# Parameter 1 of Literal[...] cannot be of type "Any"
|
||||
block_type: Literal[BlockType.HUMAN_INTERACTION] = BlockType.HUMAN_INTERACTION # type: ignore
|
||||
|
||||
instructions: str = "Please review and approve or reject to continue the workflow."
|
||||
positive_descriptor: str = "Approve"
|
||||
negative_descriptor: str = "Reject"
|
||||
timeout_seconds: int = 60 * 60 * 2 # two hours
|
||||
|
||||
# email options
|
||||
sender: str = "hello@skyvern.com"
|
||||
recipients: list[str] = []
|
||||
subject: str = "Human interaction required for workflow run"
|
||||
body: str = "Your interaction is required for a workflow run!"
|
||||
|
||||
def format_potential_template_parameters(self, workflow_run_context: WorkflowRunContext) -> None:
|
||||
super().format_potential_template_parameters(workflow_run_context)
|
||||
|
||||
self.instructions = self.format_block_parameter_template_from_workflow_run_context(
|
||||
self.instructions, workflow_run_context
|
||||
)
|
||||
|
||||
self.body = self.format_block_parameter_template_from_workflow_run_context(self.body, workflow_run_context)
|
||||
|
||||
self.subject = self.format_block_parameter_template_from_workflow_run_context(
|
||||
self.subject, workflow_run_context
|
||||
)
|
||||
|
||||
formatted: list[str] = []
|
||||
for recipient in self.recipients:
|
||||
formatted.append(
|
||||
self.format_block_parameter_template_from_workflow_run_context(recipient, workflow_run_context)
|
||||
)
|
||||
|
||||
self.recipients = formatted
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
workflow_run_block_id: str,
|
||||
organization_id: str | None = None,
|
||||
browser_session_id: str | None = None,
|
||||
**kwargs: dict,
|
||||
) -> BlockResult:
|
||||
# avoid circular import
|
||||
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus # noqa: PLC0415
|
||||
|
||||
LOG.info(
|
||||
"Pausing workflow for human interaction",
|
||||
workflow_run_id=workflow_run_id,
|
||||
recipients=self.recipients,
|
||||
timeout=self.timeout_seconds,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
|
||||
await app.DATABASE.update_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
status=WorkflowRunStatus.paused,
|
||||
)
|
||||
|
||||
workflow_run = await app.DATABASE.get_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
if not workflow_run:
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason="Workflow run not found",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
||||
|
||||
try:
|
||||
self.format_potential_template_parameters(workflow_run_context)
|
||||
except Exception as e:
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=f"Failed to format jinja template: {str(e)}",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
workflow_permanent_id = workflow_run.workflow_permanent_id
|
||||
app_url = f"{settings.SKYVERN_APP_URL}/workflows/{workflow_permanent_id}/{workflow_run_id}/overview"
|
||||
body = f"{self.body}\n\nKindly visit {app_url}\n\n{self.instructions}\n\n"
|
||||
subject = f"{self.subject} - Workflow Run ID: {workflow_run_id}"
|
||||
|
||||
try:
|
||||
await email.send(
|
||||
body=body,
|
||||
sender=self.sender,
|
||||
subject=subject,
|
||||
recipients=self.recipients,
|
||||
)
|
||||
|
||||
email_success = True
|
||||
email_failure_reason = None
|
||||
except Exception as ex:
|
||||
LOG.error(
|
||||
"Failed to send human interaction email",
|
||||
workflow_run_id=workflow_run_id,
|
||||
error=str(ex),
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
email_success = False
|
||||
email_failure_reason = str(ex)
|
||||
|
||||
if not email_success:
|
||||
return await self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=f"Failed to send human interaction email: {email_failure_reason or 'email failed'}",
|
||||
output_parameter_value=None,
|
||||
status=BlockStatus.failed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
# Wait for the timeout_seconds or until the workflow run status changes from paused
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
check_interval = 5 # Check every 5 seconds
|
||||
log_that_we_are_waiting = True
|
||||
log_wait = 0
|
||||
|
||||
while True:
|
||||
if not log_that_we_are_waiting:
|
||||
log_wait += check_interval
|
||||
if log_wait >= 60: # Log every 1 minute
|
||||
log_that_we_are_waiting = True
|
||||
log_wait = 0
|
||||
|
||||
elapsed_time_seconds = asyncio.get_event_loop().time() - start_time
|
||||
|
||||
if log_that_we_are_waiting:
|
||||
LOG.info(
|
||||
"Waiting for human interaction...",
|
||||
workflow_run_id=workflow_run_id,
|
||||
elapsed_time_seconds=elapsed_time_seconds,
|
||||
timeout_seconds=self.timeout_seconds,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
log_that_we_are_waiting = False
|
||||
|
||||
# Check if timeout_seconds has elapsed
|
||||
if elapsed_time_seconds >= self.timeout_seconds:
|
||||
LOG.info(
|
||||
"Human Interaction block timeout_seconds reached",
|
||||
workflow_run_id=workflow_run_id,
|
||||
elapsed_time_seconds=elapsed_time_seconds,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
|
||||
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
||||
success = False
|
||||
reason = "Timeout elapsed with no human interaction"
|
||||
result_dict = {"success": success, "reason": reason}
|
||||
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict)
|
||||
|
||||
return await self.build_block_result(
|
||||
success=success,
|
||||
failure_reason=reason,
|
||||
output_parameter_value=result_dict,
|
||||
status=BlockStatus.timed_out,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
workflow_run = await app.DATABASE.get_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
if workflow_run and workflow_run.status != WorkflowRunStatus.paused:
|
||||
LOG.info(
|
||||
"Workflow run status changed from paused",
|
||||
workflow_run_id=workflow_run_id,
|
||||
new_status=workflow_run.status,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
|
||||
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
||||
result_dict = {"success": True, "reason": f"status_changed:{workflow_run.status}"}
|
||||
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict)
|
||||
|
||||
return await self.build_block_result(
|
||||
success=True,
|
||||
failure_reason=None,
|
||||
output_parameter_value=result_dict,
|
||||
status=BlockStatus.completed,
|
||||
workflow_run_block_id=workflow_run_block_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
await asyncio.sleep(min(check_interval, self.timeout_seconds - elapsed_time_seconds))
|
||||
|
||||
|
||||
class ValidationBlock(BaseTaskBlock):
|
||||
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
|
||||
# Parameter 1 of Literal[...] cannot be of type "Any"
|
||||
@@ -3521,6 +3739,7 @@ BlockSubclasses = Union[
|
||||
ExtractionBlock,
|
||||
LoginBlock,
|
||||
WaitBlock,
|
||||
HumanInteractionBlock,
|
||||
FileDownloadBlock,
|
||||
UrlBlock,
|
||||
TaskV2Block,
|
||||
|
||||
@@ -38,6 +38,7 @@ from skyvern.forge.sdk.db.enums import TaskType
|
||||
from skyvern.forge.sdk.models import Step, StepStatus
|
||||
from skyvern.forge.sdk.schemas.files import FileInfo
|
||||
from skyvern.forge.sdk.schemas.organizations import Organization
|
||||
from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession
|
||||
from skyvern.forge.sdk.schemas.tasks import Task
|
||||
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunTimeline, WorkflowRunTimelineType
|
||||
from skyvern.forge.sdk.trace import TraceManager
|
||||
@@ -60,6 +61,7 @@ from skyvern.forge.sdk.workflow.models.block import (
|
||||
FileUploadBlock,
|
||||
ForLoopBlock,
|
||||
HttpRequestBlock,
|
||||
HumanInteractionBlock,
|
||||
LoginBlock,
|
||||
NavigationBlock,
|
||||
PDFParserBlock,
|
||||
@@ -333,6 +335,35 @@ class WorkflowService:
|
||||
|
||||
return workflow_run
|
||||
|
||||
async def auto_create_browser_session_if_needed(
|
||||
self,
|
||||
organization_id: str,
|
||||
workflow: Workflow,
|
||||
*,
|
||||
browser_session_id: str | None = None,
|
||||
proxy_location: ProxyLocation | None = None,
|
||||
) -> PersistentBrowserSession | None:
|
||||
if browser_session_id: # the user has supplied an id, so no need to create one
|
||||
return None
|
||||
|
||||
workflow_definition = workflow.workflow_definition
|
||||
blocks = workflow_definition.blocks
|
||||
human_interaction_blocks = [block for block in blocks if block.block_type == BlockType.HUMAN_INTERACTION]
|
||||
|
||||
if human_interaction_blocks:
|
||||
timeouts = [getattr(block, "timeout_seconds", 60 * 60) for block in human_interaction_blocks]
|
||||
timeout_seconds = sum(timeouts) + 60 * 60
|
||||
|
||||
browser_session = await app.PERSISTENT_SESSIONS_MANAGER.create_session(
|
||||
organization_id=organization_id,
|
||||
timeout_minutes=timeout_seconds // 60,
|
||||
proxy_location=proxy_location,
|
||||
)
|
||||
|
||||
return browser_session
|
||||
|
||||
return None
|
||||
|
||||
@TraceManager.traced_async(ignore_inputs=["organization", "api_key"])
|
||||
async def execute_workflow(
|
||||
self,
|
||||
@@ -424,6 +455,17 @@ class WorkflowService:
|
||||
)
|
||||
return workflow_run
|
||||
|
||||
browser_session = await self.auto_create_browser_session_if_needed(
|
||||
organization.organization_id,
|
||||
workflow,
|
||||
browser_session_id=browser_session_id,
|
||||
proxy_location=workflow_run.proxy_location,
|
||||
)
|
||||
|
||||
if browser_session:
|
||||
browser_session_id = browser_session.persistent_browser_session_id
|
||||
close_browser_on_completion = True
|
||||
|
||||
# Check if there's a related workflow script that should be used instead
|
||||
workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels)
|
||||
current_context = skyvern_context.current()
|
||||
@@ -595,6 +637,7 @@ class WorkflowService:
|
||||
if not blocks:
|
||||
raise SkyvernException(f"No blocks found for the given block labels: {block_labels}")
|
||||
|
||||
#
|
||||
# Execute workflow blocks
|
||||
blocks_cnt = len(blocks)
|
||||
block_result = None
|
||||
@@ -2744,6 +2787,21 @@ class WorkflowService:
|
||||
include_action_history_in_verification=block_yaml.include_action_history_in_verification,
|
||||
)
|
||||
|
||||
elif block_yaml.block_type == BlockType.HUMAN_INTERACTION:
|
||||
return HumanInteractionBlock(
|
||||
label=block_yaml.label,
|
||||
output_parameter=output_parameter,
|
||||
instructions=block_yaml.instructions,
|
||||
positive_descriptor=block_yaml.positive_descriptor,
|
||||
negative_descriptor=block_yaml.negative_descriptor,
|
||||
timeout_seconds=block_yaml.timeout_seconds,
|
||||
# --
|
||||
sender=block_yaml.sender,
|
||||
recipients=block_yaml.recipients,
|
||||
subject=block_yaml.subject,
|
||||
body=block_yaml.body,
|
||||
)
|
||||
|
||||
elif block_yaml.block_type == BlockType.EXTRACTION:
|
||||
extraction_block_parameters = (
|
||||
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
|
||||
|
||||
Reference in New Issue
Block a user