add functionality to cache task_run (#1755)

This commit is contained in:
Shuchang Zheng
2025-02-11 14:47:41 +08:00
committed by GitHub
parent 8c43e6b70e
commit defd761e58
7 changed files with 127 additions and 18 deletions

View File

@@ -108,7 +108,7 @@ async def _retrieve_action_plan(task: Task, step: Step, scraped_page: ScrapedPag
LOG.info("Found cached actions to execute", actions=cached_actions_to_execute)
actions_queries: list[tuple[Action, str | None]] = []
actions_queries: list[Action] = []
for idx, cached_action in enumerate(cached_actions_to_execute):
updated_action = cached_action.model_copy()
updated_action.status = ActionStatus.pending
@@ -135,7 +135,7 @@ async def _retrieve_action_plan(task: Task, step: Step, scraped_page: ScrapedPag
"All elements with either no hash or multiple hashes should have been already filtered out"
)
actions_queries.append((updated_action, updated_action.intention))
actions_queries.append(updated_action)
# Check for unsupported actions before personalizing the actions
# Classify the supported actions into two groups:
@@ -155,10 +155,12 @@ async def _retrieve_action_plan(task: Task, step: Step, scraped_page: ScrapedPag
async def personalize_actions(
task: Task,
step: Step,
actions_queries: list[tuple[Action, str | None]],
actions_queries: list[Action],
scraped_page: ScrapedPage,
) -> list[Action]:
queries_and_answers: dict[str, str | None] = {query: None for _, query in actions_queries if query}
queries_and_answers: dict[str, str | None] = {
action.intention: None for action in actions_queries if action.intention
}
answered_queries: dict[str, str] = {}
if queries_and_answers:
@@ -168,9 +170,13 @@ async def personalize_actions(
)
personalized_actions = []
for action, query in actions_queries:
for action in actions_queries:
query = action.intention
if query and (personalized_answer := answered_queries.get(query)):
personalized_actions.append(personalize_action(action, query, personalized_answer))
current_personized_actions = await personalize_action(
action, query, personalized_answer, task, step, scraped_page
)
personalized_actions.extend(current_personized_actions)
else:
personalized_actions.append(action)
@@ -198,24 +204,49 @@ async def get_user_detail_answers(
raise e
def personalize_action(action: Action, query: str, answer: str) -> Action:
async def personalize_action(
action: Action,
query: str,
answer: str,
task: Task,
step: Step,
scraped_page: ScrapedPage,
) -> list[Action]:
action.intention = query
action.response = answer
if action.action_type == ActionType.INPUT_TEXT:
action.text = answer
elif action.action_type == ActionType.UPLOAD_FILE:
action.file_url = answer
elif action.action_type == ActionType.CLICK:
# TODO: we only use cached action.intention. send the intention, navigation payload + navigation goal, html
# to small llm and make a decision of which elements to click. Not clicking anything is also an option here
return [action]
elif action.action_type == ActionType.SELECT_OPTION:
# TODO: send the selection action with the original/previous option value. Our current selection agent
# is already able to handle it
return [action]
elif action.action_type in [
ActionType.COMPLETE,
ActionType.WAIT,
ActionType.TERMINATE,
ActionType.SOLVE_CAPTCHA,
]:
return [action]
else:
raise CachedActionPlanError(
f"Unsupported action type for personalization, fallback to no-cache mode: {action.action_type}"
)
return action
return [action]
def check_for_unsupported_actions(actions_queries: list[tuple[Action, str | None]]) -> None:
def check_for_unsupported_actions(actions_queries: list[Action]) -> None:
supported_actions = [ActionType.INPUT_TEXT, ActionType.WAIT, ActionType.CLICK, ActionType.COMPLETE]
supported_actions_with_query = [ActionType.INPUT_TEXT]
for action, query in actions_queries:
for action in actions_queries:
query = action.intention
if action.action_type not in supported_actions:
raise CachedActionPlanError(
f"This action type does not support caching: {action.action_type}, fallback to no-cache mode"

View File

@@ -282,6 +282,15 @@ class ScrapedPage(BaseModel):
self.url = refreshed_page.url
return self
async def generate_scraped_page_without_screenshots(self) -> Self:
return await scrape_website(
browser_state=self._browser_state,
url=self.url,
cleanup_element_tree=self._clean_up_func,
scrape_exclude=self._scrape_exclude,
take_screenshots=False,
)
async def scrape_website(
browser_state: BrowserState,
@@ -289,6 +298,7 @@ async def scrape_website(
cleanup_element_tree: CleanupElementTreeFunc,
num_retry: int = 0,
scrape_exclude: ScrapeExcludeFunc | None = None,
take_screenshots: bool = True,
) -> ScrapedPage:
"""
************************************************************************************************
@@ -318,6 +328,7 @@ async def scrape_website(
url=url,
cleanup_element_tree=cleanup_element_tree,
scrape_exclude=scrape_exclude,
take_screenshots=take_screenshots,
)
except Exception as e:
# NOTE: MAX_SCRAPING_RETRIES is set to 0 in both staging and production
@@ -386,6 +397,7 @@ async def scrape_web_unsafe(
url: str,
cleanup_element_tree: CleanupElementTreeFunc,
scrape_exclude: ScrapeExcludeFunc | None = None,
take_screenshots: bool = True,
) -> ScrapedPage:
"""
Asynchronous function that performs web scraping without any built-in error handling. This function is intended
@@ -410,7 +422,9 @@ async def scrape_web_unsafe(
LOG.info("Waiting for 5 seconds before scraping the website.")
await asyncio.sleep(5)
screenshots = await SkyvernFrame.take_split_screenshots(page=page, url=url, draw_boxes=True)
screenshots = []
if take_screenshots:
screenshots = await SkyvernFrame.take_split_screenshots(page=page, url=url, draw_boxes=True)
elements, element_tree = await get_interactable_element_tree(page, scrape_exclude)
element_tree = await cleanup_element_tree(page, url, copy.deepcopy(element_tree))