email relay (#598)

Co-authored-by: Shuchang Zheng <wintonzheng0325@gmail.com>
This commit is contained in:
Kerem Yilmaz
2024-07-11 21:34:00 -07:00
committed by GitHub
parent f6bb4981fc
commit ea1039277f
15 changed files with 191 additions and 18 deletions

View File

@@ -9,6 +9,12 @@ REPO_ROOT_DIR = SKYVERN_DIR.parent
INPUT_TEXT_TIMEOUT = 120000 # 2 minutes
PAGE_CONTENT_TIMEOUT = 300 # 5 mins
# reserved fields for navigation payload
SPECIAL_FIELD_VERIFICATION_CODE = "verification_code"
VERIFICATION_CODE_PLACEHOLDER = "SKYVERN_TOTP_CODE"
VERIFICATION_CODE_POLLING_TIMEOUT_MINS = 10
class ScrapeType(StrEnum):
NORMAL = "normal"

View File

@@ -428,3 +428,8 @@ class WrongElementToUploadFile(SkyvernException):
super().__init__(
f"No file chooser dialog opens, so file can't be uploaded through element {element_id}. Please try to upload again with another element."
)
class FailedToFetchSecret(SkyvernException):
def __init__(self) -> None:
super().__init__("Failed to get the actual value of the secret parameter")

View File

@@ -11,7 +11,12 @@ from playwright._impl._errors import TargetClosedError
from playwright.async_api import Page
from skyvern import analytics
from skyvern.constants import SCRAPE_TYPE_ORDER, ScrapeType
from skyvern.constants import (
SCRAPE_TYPE_ORDER,
SPECIAL_FIELD_VERIFICATION_CODE,
VERIFICATION_CODE_PLACEHOLDER,
ScrapeType,
)
from skyvern.exceptions import (
BrowserStateMissingPage,
EmptyScrapePage,
@@ -120,6 +125,7 @@ class ForgeAgent:
url=task_url,
title=task_block.title,
webhook_callback_url=None,
totp_verification_url=None,
navigation_goal=task_block.navigation_goal,
data_extraction_goal=task_block.data_extraction_goal,
navigation_payload=navigation_payload,
@@ -174,6 +180,7 @@ class ForgeAgent:
url=task_request.url,
title=task_request.title,
webhook_callback_url=task_request.webhook_callback_url,
totp_verification_url=task_request.totp_verification_url,
navigation_goal=task_request.navigation_goal,
data_extraction_goal=task_request.data_extraction_goal,
navigation_payload=task_request.navigation_payload,
@@ -955,10 +962,11 @@ class ForgeAgent:
)
element_tree_in_prompt: str = scraped_page.build_element_tree(element_tree_format)
final_navigation_payload = self._build_navigation_payload(task)
extract_action_prompt = prompt_engine.load_prompt(
prompt_template,
navigation_goal=navigation_goal,
navigation_payload_str=json.dumps(task.navigation_payload),
navigation_payload_str=json.dumps(final_navigation_payload),
starting_url=starting_url,
current_url=current_url,
elements=element_tree_in_prompt,
@@ -996,6 +1004,16 @@ class ForgeAgent:
return scraped_page, extract_action_prompt
def _build_navigation_payload(
self,
task: Task,
) -> dict[str, Any] | list | str | None:
final_navigation_payload = task.navigation_payload
if isinstance(final_navigation_payload, dict) and task.totp_verification_url:
if SPECIAL_FIELD_VERIFICATION_CODE not in final_navigation_payload:
final_navigation_payload[SPECIAL_FIELD_VERIFICATION_CODE] = VERIFICATION_CODE_PLACEHOLDER
return final_navigation_payload
async def _get_action_results(self, task: Task) -> str:
# Get action results from the last app.SETTINGS.PROMPT_ACTION_HISTORY_WINDOW steps
steps = await app.DATABASE.get_task_steps(task_id=task.task_id, organization_id=task.organization_id)

View File

@@ -43,3 +43,39 @@ async def aiohttp_get_json(
await asyncio.sleep(retry_timeout)
count += 1
raise Exception(f"Failed to fetch data from {url}")
async def aiohttp_post(
url: str,
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
cookies: dict[str, str] | None = None,
retry: int = 0,
proxy: str | None = None,
timeout: int = DEFAULT_REQUEST_TIMEOUT,
raise_exception: bool = True,
retry_timeout: float = 0,
) -> dict[str, Any]:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
count = 0
while count <= retry:
try:
async with session.post(
url,
# TODO: make sure to test this out
json=data,
headers=headers,
cookies=cookies,
proxy=proxy,
) as response:
if response.status == 200:
return await response.json()
if raise_exception:
raise HttpException(response.status, url)
LOG.error("None 200 async post response", url=url, status_code=response.status, method="POST")
return {}
except Exception:
if retry_timeout > 0:
await asyncio.sleep(retry_timeout)
count += 1
raise Exception(f"Failed post request url={url}")

View File

@@ -82,6 +82,7 @@ class AgentDB:
data_extraction_goal: str | None,
navigation_payload: dict[str, Any] | list | str | None,
webhook_callback_url: str | None = None,
totp_verification_url: str | None = None,
organization_id: str | None = None,
proxy_location: ProxyLocation | None = None,
extracted_information_schema: dict[str, Any] | list | str | None = None,
@@ -98,6 +99,7 @@ class AgentDB:
url=url,
title=title,
webhook_callback_url=webhook_callback_url,
totp_verification_url=totp_verification_url,
navigation_goal=navigation_goal,
data_extraction_goal=data_extraction_goal,
navigation_payload=navigation_payload,
@@ -806,6 +808,7 @@ class AgentDB:
description: str | None = None,
proxy_location: ProxyLocation | None = None,
webhook_callback_url: str | None = None,
totp_verification_url: str | None = None,
workflow_permanent_id: str | None = None,
version: int | None = None,
is_saved_task: bool = False,
@@ -818,6 +821,7 @@ class AgentDB:
workflow_definition=workflow_definition,
proxy_location=proxy_location,
webhook_callback_url=webhook_callback_url,
totp_verification_url=totp_verification_url,
is_saved_task=is_saved_task,
)
if workflow_permanent_id:
@@ -984,6 +988,7 @@ class AgentDB:
organization_id: str,
proxy_location: ProxyLocation | None = None,
webhook_callback_url: str | None = None,
totp_verification_url: str | None = None,
) -> WorkflowRun:
try:
async with self.Session() as session:
@@ -994,6 +999,7 @@ class AgentDB:
proxy_location=proxy_location,
status="created",
webhook_callback_url=webhook_callback_url,
totp_verification_url=totp_verification_url,
)
session.add(workflow_run)
await session.commit()

View File

@@ -48,6 +48,7 @@ class TaskModel(Base):
organization_id = Column(String, ForeignKey("organizations.organization_id"))
status = Column(String, index=True)
webhook_callback_url = Column(String)
totp_verification_url = Column(String)
title = Column(String)
url = Column(String)
navigation_goal = Column(String)
@@ -178,6 +179,7 @@ class WorkflowModel(Base):
workflow_definition = Column(JSON, nullable=False)
proxy_location = Column(Enum(ProxyLocation))
webhook_callback_url = Column(String)
totp_verification_url = Column(String)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(
@@ -203,6 +205,7 @@ class WorkflowRunModel(Base):
status = Column(String, nullable=False)
proxy_location = Column(Enum(ProxyLocation))
webhook_callback_url = Column(String)
totp_verification_url = Column(String)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(

View File

@@ -63,6 +63,7 @@ def convert_to_task(task_obj: TaskModel, debug_enabled: bool = False) -> Task:
title=task_obj.title,
url=task_obj.url,
webhook_callback_url=task_obj.webhook_callback_url,
totp_verification_url=task_obj.totp_verification_url,
navigation_goal=task_obj.navigation_goal,
data_extraction_goal=task_obj.data_extraction_goal,
navigation_payload=task_obj.navigation_payload,
@@ -160,6 +161,7 @@ def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = Fal
title=workflow_model.title,
workflow_permanent_id=workflow_model.workflow_permanent_id,
webhook_callback_url=workflow_model.webhook_callback_url,
totp_verification_url=workflow_model.totp_verification_url,
proxy_location=(ProxyLocation(workflow_model.proxy_location) if workflow_model.proxy_location else None),
version=workflow_model.version,
is_saved_task=workflow_model.is_saved_task,
@@ -188,6 +190,7 @@ def convert_to_workflow_run(workflow_run_model: WorkflowRunModel, debug_enabled:
ProxyLocation(workflow_run_model.proxy_location) if workflow_run_model.proxy_location else None
),
webhook_callback_url=workflow_run_model.webhook_callback_url,
totp_verification_url=workflow_run_model.totp_verification_url,
created_at=workflow_run_model.created_at,
modified_at=workflow_run_model.modified_at,
)

View File

@@ -32,12 +32,12 @@ class TaskRequest(BaseModel):
description="Starting URL for the task.",
examples=["https://www.geico.com"],
)
# TODO: use HttpUrl instead of str
webhook_callback_url: str | None = Field(
default=None,
description="The URL to call when the task is completed.",
examples=["https://my-webhook.com"],
)
totp_verification_url: str | None = None
navigation_goal: str | None = Field(
default=None,
description="The user's goal for the task.",

View File

@@ -14,6 +14,7 @@ class WorkflowRequestBody(BaseModel):
data: dict[str, Any] | None = None
proxy_location: ProxyLocation | None = None
webhook_callback_url: str | None = None
totp_verification_url: str | None = None
class RunWorkflowResponse(BaseModel):
@@ -49,6 +50,7 @@ class Workflow(BaseModel):
workflow_definition: WorkflowDefinition
proxy_location: ProxyLocation | None = None
webhook_callback_url: str | None = None
totp_verification_url: str | None = None
created_at: datetime
modified_at: datetime
@@ -71,6 +73,7 @@ class WorkflowRun(BaseModel):
status: WorkflowRunStatus
proxy_location: ProxyLocation | None = None
webhook_callback_url: str | None = None
totp_verification_url: str | None = None
created_at: datetime
modified_at: datetime
@@ -96,6 +99,7 @@ class WorkflowRunStatusResponse(BaseModel):
status: WorkflowRunStatus
proxy_location: ProxyLocation | None = None
webhook_callback_url: str | None = None
totp_verification_url: str | None = None
created_at: datetime
modified_at: datetime
parameters: dict[str, Any]

View File

@@ -224,5 +224,6 @@ class WorkflowCreateYAMLRequest(BaseModel):
description: str | None = None
proxy_location: ProxyLocation | None = None
webhook_callback_url: str | None = None
totp_verification_url: str | None = None
workflow_definition: WorkflowDefinitionYAML
is_saved_task: bool = False

View File

@@ -285,6 +285,7 @@ class WorkflowService:
description: str | None = None,
proxy_location: ProxyLocation | None = None,
webhook_callback_url: str | None = None,
totp_verification_url: str | None = None,
workflow_permanent_id: str | None = None,
version: int | None = None,
is_saved_task: bool = False,
@@ -296,6 +297,7 @@ class WorkflowService:
description=description,
proxy_location=proxy_location,
webhook_callback_url=webhook_callback_url,
totp_verification_url=totp_verification_url,
workflow_permanent_id=workflow_permanent_id,
version=version,
is_saved_task=is_saved_task,
@@ -392,6 +394,7 @@ class WorkflowService:
organization_id=organization_id,
proxy_location=workflow_request.proxy_location,
webhook_callback_url=workflow_request.webhook_callback_url,
totp_verification_url=workflow_request.totp_verification_url,
)
async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> None:
@@ -634,6 +637,7 @@ class WorkflowService:
status=workflow_run.status,
proxy_location=workflow_run.proxy_location,
webhook_callback_url=workflow_run.webhook_callback_url,
totp_verification_url=workflow_run.totp_verification_url,
created_at=workflow_run.created_at,
modified_at=workflow_run.modified_at,
parameters=parameters_with_value,
@@ -821,6 +825,7 @@ class WorkflowService:
organization_id=organization_id,
proxy_location=request.proxy_location,
webhook_callback_url=request.webhook_callback_url,
totp_verification_url=request.totp_verification_url,
workflow_permanent_id=workflow_permanent_id,
version=existing_version + 1,
is_saved_task=request.is_saved_task,
@@ -833,6 +838,7 @@ class WorkflowService:
organization_id=organization_id,
proxy_location=request.proxy_location,
webhook_callback_url=request.webhook_callback_url,
totp_verification_url=request.totp_verification_url,
is_saved_task=request.is_saved_task,
)
# Create parameters from the request

View File

@@ -2,16 +2,18 @@ import asyncio
import json
import os
import uuid
from datetime import datetime, timedelta
from typing import Any, Awaitable, Callable, List
import structlog
from deprecation import deprecated
from playwright.async_api import FileChooser, Locator, Page, TimeoutError
from skyvern.constants import REPO_ROOT_DIR
from skyvern.constants import REPO_ROOT_DIR, VERIFICATION_CODE_PLACEHOLDER, VERIFICATION_CODE_POLLING_TIMEOUT_MINS
from skyvern.exceptions import (
EmptySelect,
ErrFoundSelectableElement,
FailedToFetchSecret,
FailToClick,
FailToSelectByIndex,
FailToSelectByLabel,
@@ -33,6 +35,9 @@ from skyvern.forge.sdk.api.files import (
get_number_of_files_in_directory,
get_path_for_workflow_download_directory,
)
from skyvern.forge.sdk.core.aiohttp_helper import aiohttp_post
from skyvern.forge.sdk.core.security import generate_skyvern_signature
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.forge.sdk.services.bitwarden import BitwardenConstants
@@ -277,7 +282,9 @@ async def handle_input_text_action(
# before filling text, we need to validate if the element can be filled if it's not one of COMMON_INPUT_TAGS
tag_name = scraped_page.id_to_element_dict[action.element_id]["tagName"].lower()
text = get_actual_value_of_parameter_if_secret(task, action.text)
text = await get_actual_value_of_parameter_if_secret(task, action.text)
if text is None:
return [ActionFailure(FailedToFetchSecret())]
try:
await skyvern_element.input_clear()
@@ -312,7 +319,7 @@ async def handle_upload_file_action(
# After this point if the file_url is a secret, it will be replaced with the actual value
# In order to make sure we don't log the secret value, we log the action with the original value action.file_url
# ************************************************************************************************************** #
file_url = get_actual_value_of_parameter_if_secret(task, action.file_url)
file_url = await get_actual_value_of_parameter_if_secret(task, action.file_url)
if file_url not in str(task.navigation_payload):
LOG.warning(
"LLM might be imagining the file url, which is not in navigation payload",
@@ -665,7 +672,7 @@ ActionHandler.register_action_type(ActionType.TERMINATE, handle_terminate_action
ActionHandler.register_action_type(ActionType.COMPLETE, handle_complete_action)
def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) -> Any:
async def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) -> Any:
"""
Get the actual value of a parameter if it's a secret. If it's not a secret, return the parameter value as is.
@@ -673,6 +680,13 @@ def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) -> Any:
This is only used for InputTextAction, UploadFileAction, and ClickAction (if it has a file_url).
"""
if task.totp_verification_url and task.organization_id and VERIFICATION_CODE_PLACEHOLDER == parameter:
# if parameter is the secret code in the navigation playload,
# fetch the real verification from totp_verification_url
# do polling every 10 seconds to fetch the verification code
verification_code = await poll_verification_code(task.task_id, task.organization_id, task.totp_verification_url)
return verification_code
if task.workflow_run_id is None:
return parameter
@@ -702,7 +716,7 @@ async def chain_click(
LOG.info("Chain click starts", action=action, locator=locator)
file: list[str] | str = []
if action.file_url:
file_url = get_actual_value_of_parameter_if_secret(task, action.file_url)
file_url = await get_actual_value_of_parameter_if_secret(task, action.file_url)
try:
file = await download_file(file_url)
except Exception:
@@ -1095,7 +1109,6 @@ async def click_listbox_option(
LOG.error(
"Failed to click on the option",
action=action,
locator=locator,
exc_info=True,
)
if "children" in child:
@@ -1108,3 +1121,38 @@ async def get_input_value(tag_name: str, locator: Locator) -> str | None:
return await locator.input_value()
# for span, div, p or other tags:
return await locator.inner_text()
async def poll_verification_code(task_id: str, organization_id: str, url: str) -> str | None:
timeout = timedelta(minutes=VERIFICATION_CODE_POLLING_TIMEOUT_MINS)
start_datetime = datetime.utcnow()
timeout_datetime = start_datetime + timeout
org_token = await app.DATABASE.get_valid_org_auth_token(organization_id, OrganizationAuthTokenType.api)
if not org_token:
LOG.error("Failed to get organization token when trying to get verification code")
return None
while True:
# check timeout
if datetime.utcnow() > timeout_datetime:
return None
request_data = {
"task_id": task_id,
}
payload = json.dumps(request_data)
signature = generate_skyvern_signature(
payload=payload,
api_key=org_token.token,
)
timestamp = str(int(datetime.utcnow().timestamp()))
headers = {
"x-skyvern-timestamp": timestamp,
"x-skyvern-signature": signature,
"Content-Type": "application/json",
}
json_resp = await aiohttp_post(url=url, data=request_data, headers=headers, raise_exception=False)
verification_code = json_resp.get("verification_code", None)
if verification_code:
LOG.info("Got verification code", verification_code=verification_code)
return verification_code
await asyncio.sleep(10)