diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index e339d07f..f82a4108 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -64,6 +64,7 @@ from skyvern.webeye.actions.actions import ( CompleteAction, CompleteVerifyResult, DecisiveAction, + ExtractAction, ReloadPageAction, UserDefinedError, WebAction, @@ -721,19 +722,7 @@ class ForgeAgent: using_cached_action_plan = False if not task.navigation_goal and not isinstance(task_block, ValidationBlock): - actions = [ - CompleteAction( - reasoning="Task has no navigation goal.", - data_extraction_goal=task.data_extraction_goal, - organization_id=task.organization_id, - task_id=task.task_id, - workflow_run_id=task.workflow_run_id, - step_id=step.step_id, - step_order=step.order, - action_order=0, - confidence_float=1.0, - ) - ] + actions = [await self.create_extract_action(task, step, scraped_page)] elif ( task_block and task_block.cache_actions @@ -1039,6 +1028,21 @@ class ForgeAgent: ) detailed_agent_step_output.actions_and_results.append((complete_action, complete_results)) await self.record_artifacts_after_action(task, step, browser_state) + + # if the last action is complete and is successful, check if there's a data extraction goal + # if task has navigation goal and extraction goal at the same time, handle ExtractAction before marking step as completed + if ( + task.navigation_goal + and task.data_extraction_goal + and self.step_has_completed_goal(detailed_agent_step_output) + ): + working_page = await browser_state.must_get_working_page() + extract_action = await self.create_extract_action(task, step, scraped_page) + extract_results = await ActionHandler.handle_action( + scraped_page, task, step, working_page, extract_action + ) + detailed_agent_step_output.actions_and_results.append((extract_action, extract_results)) + # If no action errors return the agent state and output completed_step = await self.update_step( step=step, @@ -1490,6 +1494,7 @@ class ForgeAgent: """ Find the last successful ScrapeAction for the task and return the extracted information. """ + # TODO: make sure we can get extracted information with the ExtractAction change steps = await app.DATABASE.get_task_steps( task_id=task.task_id, organization_id=task.organization_id, @@ -1500,7 +1505,7 @@ class ForgeAgent: if not step.output or not step.output.actions_and_results: continue for action, action_results in step.output.actions_and_results: - if action.action_type != ActionType.COMPLETE: + if action.action_type != ActionType.EXTRACT: continue for action_result in action_results: @@ -2197,3 +2202,43 @@ class ForgeAgent: organization_id=task.organization_id, errors=task_errors, ) + + @staticmethod + async def create_extract_action(task: Task, step: Step, scraped_page: ScrapedPage) -> ExtractAction: + context = skyvern_context.ensure_context() + # generate reasoning by prompt llm to think briefly what data to extract + prompt = prompt_engine.load_prompt( + "data-extraction-summary", + data_extraction_goal=task.data_extraction_goal, + data_extraction_schema=task.extracted_information_schema, + current_url=scraped_page.url, + local_datetime=datetime.now(context.tz_info).isoformat(), + ) + + data_extraction_summary_resp = await app.SECONDARY_LLM_API_HANDLER( + prompt=prompt, + step=step, + screenshots=scraped_page.screenshots, + ) + return ExtractAction( + reasoning=data_extraction_summary_resp.get("summary", "Extracting information from the page"), + data_extraction_goal=task.data_extraction_goal, + organization_id=task.organization_id, + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + step_id=step.step_id, + step_order=step.order, + action_order=0, + confidence_float=1.0, + ) + + @staticmethod + def step_has_completed_goal(detailed_agent_step_output: DetailedAgentStepOutput) -> bool: + if not detailed_agent_step_output.actions_and_results: + return False + + last_action, last_action_results = detailed_agent_step_output.actions_and_results[-1] + if last_action.action_type not in [ActionType.COMPLETE, ActionType.EXTRACT]: + return False + + return any(action_result.success for action_result in last_action_results) diff --git a/skyvern/forge/prompts/skyvern/data-extraction-summary.j2 b/skyvern/forge/prompts/skyvern/data-extraction-summary.j2 new file mode 100644 index 00000000..55fc7792 --- /dev/null +++ b/skyvern/forge/prompts/skyvern/data-extraction-summary.j2 @@ -0,0 +1,23 @@ +Your are an AI assistant to help the user extract data from websites. Given a goal to extract information from a web page{% if data_extraction_schema%} and the output schema of the data you're going to extract{% endif %}, summarize what data you're going to extract from the page so that the user has a clear overview of your plan. + +Reply in JSON format with the following keys: +{ + "summary": str, // Summary of the data you will extract within one sentence. Be precise and concise. +} + +The URL of the page you're on right now is `{{ current_url }}`. + +Data extraction goal: +``` +{{ data_extraction_goal }} +```{% if data_extraction_schema %} + +Data extraction schema: +``` +{{ data_extraction_schema }} +```{% endif %} + +Current datetime, ISO format: +``` +{{ local_datetime }} +``` \ No newline at end of file diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 5d889216..bc3788c8 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -2010,6 +2010,10 @@ class AgentDB: prompt: str | None = None, url: str | None = None, organization_id: str | None = None, + proxy_location: ProxyLocation | None = None, + totp_identifier: str | None = None, + totp_verification_url: str | None = None, + webhook_callback_url: str | None = None, ) -> ObserverTask: async with self.Session() as session: new_observer_cruise = ObserverCruiseModel( @@ -2018,6 +2022,10 @@ class AgentDB: workflow_permanent_id=workflow_permanent_id, prompt=prompt, url=url, + proxy_location=proxy_location, + totp_identifier=totp_identifier, + totp_verification_url=totp_verification_url, + webhook_callback_url=webhook_callback_url, organization_id=organization_id, ) session.add(new_observer_cruise) diff --git a/skyvern/forge/sdk/models.py b/skyvern/forge/sdk/models.py index 642ad08f..556bd222 100644 --- a/skyvern/forge/sdk/models.py +++ b/skyvern/forge/sdk/models.py @@ -87,6 +87,9 @@ class Step(BaseModel): raise ValueError(f"cant_set_is_last_to_false({self.step_id})") def is_goal_achieved(self) -> bool: + # TODO: now we also consider a step has achieved the goal if the task doesn't have a navigation goal + # and the data extraction is successful + if self.status != StepStatus.completed: return False # TODO (kerem): Remove this check once we have backfilled all the steps @@ -94,14 +97,14 @@ class Step(BaseModel): return False # Check if there is a successful complete action - for action, action_results in self.output.actions_and_results: - if action.action_type != ActionType.COMPLETE: - continue + if not self.output.actions_and_results: + return False - if any(action_result.success for action_result in action_results): - return True + last_action, last_action_results = self.output.actions_and_results[-1] + if last_action.action_type not in [ActionType.COMPLETE, ActionType.EXTRACT]: + return False - return False + return any(action_result.success for action_result in last_action_results) def is_success(self) -> bool: if self.status != StepStatus.completed: diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 5a0fab34..5d491e19 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1147,6 +1147,10 @@ async def observer_task( organization=organization, user_prompt=data.user_prompt, user_url=str(data.url) if data.url else None, + totp_identifier=data.totp_identifier, + totp_verification_url=data.totp_verification_url, + webhook_callback_url=data.webhook_callback_url, + proxy_location=data.proxy_location, ) except LLMProviderError: LOG.error("LLM failure to initialize observer cruise", exc_info=True) diff --git a/skyvern/forge/sdk/schemas/observers.py b/skyvern/forge/sdk/schemas/observers.py index 3a8ffb46..758a5189 100644 --- a/skyvern/forge/sdk/schemas/observers.py +++ b/skyvern/forge/sdk/schemas/observers.py @@ -112,6 +112,7 @@ class ObserverTaskRequest(BaseModel): webhook_callback_url: str | None = None totp_verification_url: str | None = None totp_identifier: str | None = None + proxy_location: ProxyLocation | None = None @field_validator("url", "webhook_callback_url", "totp_verification_url") @classmethod diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index 19424e54..3ae14774 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -80,11 +80,21 @@ def _generate_data_extraction_schema_for_loop(loop_values_key: str) -> dict: async def initialize_observer_cruise( - organization: Organization, user_prompt: str, user_url: str | None = None + organization: Organization, + user_prompt: str, + user_url: str | None = None, + proxy_location: ProxyLocation | None = None, + totp_identifier: str | None = None, + totp_verification_url: str | None = None, + webhook_callback_url: str | None = None, ) -> ObserverTask: observer_cruise = await app.DATABASE.create_observer_cruise( prompt=user_prompt, organization_id=organization.organization_id, + totp_verification_url=totp_verification_url, + totp_identifier=totp_identifier, + webhook_callback_url=webhook_callback_url, + proxy_location=proxy_location, ) # set observer cruise id in context context = skyvern_context.current() @@ -117,7 +127,9 @@ async def initialize_observer_cruise( # create workflow and workflow run max_steps_override = 10 try: - new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow(organization, metadata.workflow_title) + new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow( + organization, metadata.workflow_title, proxy_location=proxy_location + ) workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run( request_id=None, workflow_request=WorkflowRequestBody(), diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 29e60bfb..b519d683 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -1693,7 +1693,9 @@ class WorkflowService: raise ValueError(f"Invalid block type {block_yaml.block_type}") - async def create_empty_workflow(self, organization: Organization, title: str) -> Workflow: + async def create_empty_workflow( + self, organization: Organization, title: str, proxy_location: ProxyLocation | None = None + ) -> Workflow: """ Create a blank workflow with no blocks """ @@ -1704,6 +1706,7 @@ class WorkflowService: parameters=[], blocks=[], ), + proxy_location=proxy_location, ) return await app.WORKFLOW_SERVICE.create_workflow_from_request( organization=organization, diff --git a/skyvern/webeye/actions/actions.py b/skyvern/webeye/actions/actions.py index aa96b1ba..790da6da 100644 --- a/skyvern/webeye/actions/actions.py +++ b/skyvern/webeye/actions/actions.py @@ -27,6 +27,8 @@ class ActionType(StrEnum): COMPLETE = "complete" RELOAD_PAGE = "reload_page" + EXTRACT = "extract" + def is_web_action(self) -> bool: return self in [ ActionType.CLICK, @@ -248,6 +250,12 @@ class CompleteAction(DecisiveAction): data_extraction_goal: str | None = None +class ExtractAction(Action): + action_type: ActionType = ActionType.EXTRACT + data_extraction_goal: str | None = None + data_extraction_schema: dict[str, Any] | None = None + + class ScrapeResult(BaseModel): """ Scraped response from a webpage, including: diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index 4d46c2b1..3465cc0e 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -53,6 +53,7 @@ from skyvern.exceptions import ( from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.files import download_file, get_download_dir, list_files_in_directory +from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError from skyvern.forge.sdk.core.aiohttp_helper import aiohttp_post from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.core.skyvern_context import ensure_context @@ -309,6 +310,9 @@ class ActionHandler: action=action, ) actions_result.append(ActionFailure(e)) + except LLMProviderError as e: + LOG.exception("LLM error in action handler", action=action, exc_info=True) + actions_result.append(ActionFailure(e)) except Exception as e: LOG.exception("Unhandled exception in action handler", action=action) actions_result.append(ActionFailure(e)) @@ -1318,15 +1322,28 @@ async def handle_complete_action( ) action.verified = True + return [ActionSuccess()] + + +async def handle_extract_action( + action: actions.ExtractAction, + page: Page, + scraped_page: ScrapedPage, + task: Task, + step: Step, +) -> list[ActionResult]: extracted_data = None - if action.data_extraction_goal: + if task.data_extraction_goal: scrape_action_result = await extract_information_for_navigation_goal( scraped_page=scraped_page, task=task, step=step, ) extracted_data = scrape_action_result.scraped_data - return [ActionSuccess(data=extracted_data)] + return [ActionSuccess(data=extracted_data)] + else: + LOG.warning("No data extraction goal, skipping extract action", step_id=step.step_id) + return [ActionFailure(exception=Exception("No data extraction goal"))] ActionHandler.register_action_type(ActionType.SOLVE_CAPTCHA, handle_solve_captcha_action) @@ -1339,6 +1356,7 @@ ActionHandler.register_action_type(ActionType.SELECT_OPTION, handle_select_optio ActionHandler.register_action_type(ActionType.WAIT, handle_wait_action) ActionHandler.register_action_type(ActionType.TERMINATE, handle_terminate_action) ActionHandler.register_action_type(ActionType.COMPLETE, handle_complete_action) +ActionHandler.register_action_type(ActionType.EXTRACT, handle_extract_action) async def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) -> Any: