shu/get rid off observer cruise part3 (#1658)
This commit is contained in:
@@ -176,7 +176,7 @@ class BackgroundTaskExecutor(AsyncExecutor):
|
|||||||
|
|
||||||
if background_tasks:
|
if background_tasks:
|
||||||
background_tasks.add_task(
|
background_tasks.add_task(
|
||||||
observer_service.run_observer_cruise,
|
observer_service.run_observer_task,
|
||||||
organization=organization,
|
organization=organization,
|
||||||
observer_cruise_id=observer_cruise_id,
|
observer_cruise_id=observer_cruise_id,
|
||||||
max_iterations_override=max_iterations_override,
|
max_iterations_override=max_iterations_override,
|
||||||
|
|||||||
@@ -1152,7 +1152,7 @@ async def observer_task(
|
|||||||
LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override)
|
LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
observer_task = await observer_service.initialize_observer_cruise(
|
observer_task = await observer_service.initialize_observer_task(
|
||||||
organization=organization,
|
organization=organization,
|
||||||
user_prompt=data.user_prompt,
|
user_prompt=data.user_prompt,
|
||||||
user_url=str(data.url) if data.url else None,
|
user_url=str(data.url) if data.url else None,
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ def _generate_data_extraction_schema_for_loop(loop_values_key: str) -> dict:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async def initialize_observer_cruise(
|
async def initialize_observer_task(
|
||||||
organization: Organization,
|
organization: Organization,
|
||||||
user_prompt: str,
|
user_prompt: str,
|
||||||
user_url: str | None = None,
|
user_url: str | None = None,
|
||||||
@@ -95,7 +95,7 @@ async def initialize_observer_cruise(
|
|||||||
webhook_callback_url: str | None = None,
|
webhook_callback_url: str | None = None,
|
||||||
publish_workflow: bool = False,
|
publish_workflow: bool = False,
|
||||||
) -> ObserverTask:
|
) -> ObserverTask:
|
||||||
observer_cruise = await app.DATABASE.create_observer_cruise(
|
observer_task = await app.DATABASE.create_observer_cruise(
|
||||||
prompt=user_prompt,
|
prompt=user_prompt,
|
||||||
organization_id=organization.organization_id,
|
organization_id=organization.organization_id,
|
||||||
totp_verification_url=totp_verification_url,
|
totp_verification_url=totp_verification_url,
|
||||||
@@ -106,10 +106,10 @@ async def initialize_observer_cruise(
|
|||||||
# set observer cruise id in context
|
# set observer cruise id in context
|
||||||
context = skyvern_context.current()
|
context = skyvern_context.current()
|
||||||
if context:
|
if context:
|
||||||
context.observer_cruise_id = observer_cruise.observer_cruise_id
|
context.observer_cruise_id = observer_task.observer_cruise_id
|
||||||
|
|
||||||
observer_thought = await app.DATABASE.create_observer_thought(
|
observer_thought = await app.DATABASE.create_observer_thought(
|
||||||
observer_cruise_id=observer_cruise.observer_cruise_id,
|
observer_cruise_id=observer_task.observer_cruise_id,
|
||||||
organization_id=organization.organization_id,
|
organization_id=organization.organization_id,
|
||||||
observer_thought_type=ObserverThoughtType.metadata,
|
observer_thought_type=ObserverThoughtType.metadata,
|
||||||
observer_thought_scenario=ObserverThoughtScenario.generate_metadata,
|
observer_thought_scenario=ObserverThoughtScenario.generate_metadata,
|
||||||
@@ -173,8 +173,8 @@ async def initialize_observer_cruise(
|
|||||||
|
|
||||||
# update oserver cruise
|
# update oserver cruise
|
||||||
try:
|
try:
|
||||||
observer_cruise = await app.DATABASE.update_observer_cruise(
|
observer_task = await app.DATABASE.update_observer_cruise(
|
||||||
observer_cruise_id=observer_cruise.observer_cruise_id,
|
observer_cruise_id=observer_task.observer_cruise_id,
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
workflow_id=new_workflow.workflow_id,
|
workflow_id=new_workflow.workflow_id,
|
||||||
workflow_permanent_id=new_workflow.workflow_permanent_id,
|
workflow_permanent_id=new_workflow.workflow_permanent_id,
|
||||||
@@ -190,10 +190,10 @@ async def initialize_observer_cruise(
|
|||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
return observer_cruise
|
return observer_task
|
||||||
|
|
||||||
|
|
||||||
async def run_observer_cruise(
|
async def run_observer_task(
|
||||||
organization: Organization,
|
organization: Organization,
|
||||||
observer_cruise_id: str,
|
observer_cruise_id: str,
|
||||||
request_id: str | None = None,
|
request_id: str | None = None,
|
||||||
@@ -210,14 +210,14 @@ async def run_observer_cruise(
|
|||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
return await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id)
|
return await mark_observer_task_as_failed(observer_cruise_id, organization_id=organization_id)
|
||||||
if not observer_task:
|
if not observer_task:
|
||||||
LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id)
|
LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id)
|
||||||
raise ObserverCruiseNotFound(observer_cruise_id=observer_cruise_id)
|
raise ObserverCruiseNotFound(observer_cruise_id=observer_cruise_id)
|
||||||
|
|
||||||
workflow, workflow_run = None, None
|
workflow, workflow_run = None, None
|
||||||
try:
|
try:
|
||||||
workflow, workflow_run, observer_task = await run_observer_cruise_helper(
|
workflow, workflow_run, observer_task = await run_observer_task_helper(
|
||||||
organization=organization,
|
organization=organization,
|
||||||
observer_task=observer_task,
|
observer_task=observer_task,
|
||||||
request_id=request_id,
|
request_id=request_id,
|
||||||
@@ -226,7 +226,7 @@ async def run_observer_cruise(
|
|||||||
)
|
)
|
||||||
except OperationalError:
|
except OperationalError:
|
||||||
LOG.error("Database error when running observer cruise", exc_info=True)
|
LOG.error("Database error when running observer cruise", exc_info=True)
|
||||||
observer_task = await mark_observer_cruise_as_failed(
|
observer_task = await mark_observer_task_as_failed(
|
||||||
observer_cruise_id,
|
observer_cruise_id,
|
||||||
workflow_run_id=observer_task.workflow_run_id,
|
workflow_run_id=observer_task.workflow_run_id,
|
||||||
failure_reason="Database error when running task 2.0",
|
failure_reason="Database error when running task 2.0",
|
||||||
@@ -235,7 +235,7 @@ async def run_observer_cruise(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.error("Failed to run observer cruise", exc_info=True)
|
LOG.error("Failed to run observer cruise", exc_info=True)
|
||||||
failure_reason = f"Failed to run task 2.0: {str(e)}"
|
failure_reason = f"Failed to run task 2.0: {str(e)}"
|
||||||
observer_task = await mark_observer_cruise_as_failed(
|
observer_task = await mark_observer_task_as_failed(
|
||||||
observer_cruise_id,
|
observer_cruise_id,
|
||||||
workflow_run_id=observer_task.workflow_run_id,
|
workflow_run_id=observer_task.workflow_run_id,
|
||||||
failure_reason=failure_reason,
|
failure_reason=failure_reason,
|
||||||
@@ -257,7 +257,7 @@ async def run_observer_cruise(
|
|||||||
return observer_task
|
return observer_task
|
||||||
|
|
||||||
|
|
||||||
async def run_observer_cruise_helper(
|
async def run_observer_task_helper(
|
||||||
organization: Organization,
|
organization: Organization,
|
||||||
observer_task: ObserverTask,
|
observer_task: ObserverTask,
|
||||||
request_id: str | None = None,
|
request_id: str | None = None,
|
||||||
@@ -419,7 +419,7 @@ async def run_observer_cruise_helper(
|
|||||||
iteration=i,
|
iteration=i,
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
)
|
)
|
||||||
observer_task = await _summarize_observer_cruise(
|
observer_task = await _summarize_observer_task(
|
||||||
observer_task=observer_task,
|
observer_task=observer_task,
|
||||||
task_history=task_history,
|
task_history=task_history,
|
||||||
context=context,
|
context=context,
|
||||||
@@ -610,7 +610,7 @@ async def run_observer_cruise_helper(
|
|||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
completion_resp=completion_resp,
|
completion_resp=completion_resp,
|
||||||
)
|
)
|
||||||
observer_task = await _summarize_observer_cruise(
|
observer_task = await _summarize_observer_task(
|
||||||
observer_task=observer_task,
|
observer_task=observer_task,
|
||||||
task_history=task_history,
|
task_history=task_history,
|
||||||
context=context,
|
context=context,
|
||||||
@@ -623,7 +623,7 @@ async def run_observer_cruise_helper(
|
|||||||
max_iterations=max_iterations,
|
max_iterations=max_iterations,
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
)
|
)
|
||||||
observer_task = await mark_observer_cruise_as_failed(
|
observer_task = await mark_observer_task_as_failed(
|
||||||
observer_cruise_id=observer_cruise_id,
|
observer_cruise_id=observer_cruise_id,
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
# TODO: add a better failure reason with LLM
|
# TODO: add a better failure reason with LLM
|
||||||
@@ -1087,7 +1087,7 @@ async def get_observer_cruise(observer_cruise_id: str, organization_id: str | No
|
|||||||
return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)
|
return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)
|
||||||
|
|
||||||
|
|
||||||
async def mark_observer_cruise_as_failed(
|
async def mark_observer_task_as_failed(
|
||||||
observer_cruise_id: str,
|
observer_cruise_id: str,
|
||||||
workflow_run_id: str | None = None,
|
workflow_run_id: str | None = None,
|
||||||
failure_reason: str | None = None,
|
failure_reason: str | None = None,
|
||||||
@@ -1102,11 +1102,11 @@ async def mark_observer_cruise_as_failed(
|
|||||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
|
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
|
||||||
workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed"
|
workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed"
|
||||||
)
|
)
|
||||||
await send_observer_cruise_webhook(observer_task)
|
await send_observer_task_webhook(observer_task)
|
||||||
return observer_task
|
return observer_task
|
||||||
|
|
||||||
|
|
||||||
async def mark_observer_cruise_as_completed(
|
async def mark_observer_task_as_completed(
|
||||||
observer_cruise_id: str,
|
observer_cruise_id: str,
|
||||||
workflow_run_id: str | None = None,
|
workflow_run_id: str | None = None,
|
||||||
organization_id: str | None = None,
|
organization_id: str | None = None,
|
||||||
@@ -1123,7 +1123,7 @@ async def mark_observer_cruise_as_completed(
|
|||||||
if workflow_run_id:
|
if workflow_run_id:
|
||||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
|
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
|
||||||
|
|
||||||
await send_observer_cruise_webhook(observer_task)
|
await send_observer_task_webhook(observer_task)
|
||||||
return observer_task
|
return observer_task
|
||||||
|
|
||||||
|
|
||||||
@@ -1196,7 +1196,7 @@ def _get_extracted_data_from_block_result(
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def _summarize_observer_cruise(
|
async def _summarize_observer_task(
|
||||||
observer_task: ObserverTask,
|
observer_task: ObserverTask,
|
||||||
task_history: list[dict],
|
task_history: list[dict],
|
||||||
context: SkyvernContext,
|
context: SkyvernContext,
|
||||||
@@ -1234,7 +1234,7 @@ async def _summarize_observer_cruise(
|
|||||||
output=observer_summary_resp,
|
output=observer_summary_resp,
|
||||||
)
|
)
|
||||||
|
|
||||||
return await mark_observer_cruise_as_completed(
|
return await mark_observer_task_as_completed(
|
||||||
observer_cruise_id=observer_task.observer_cruise_id,
|
observer_cruise_id=observer_task.observer_cruise_id,
|
||||||
workflow_run_id=observer_task.workflow_run_id,
|
workflow_run_id=observer_task.workflow_run_id,
|
||||||
organization_id=observer_task.organization_id,
|
organization_id=observer_task.organization_id,
|
||||||
@@ -1243,7 +1243,7 @@ async def _summarize_observer_cruise(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def send_observer_cruise_webhook(observer_task: ObserverTask) -> None:
|
async def send_observer_task_webhook(observer_task: ObserverTask) -> None:
|
||||||
if not observer_task.webhook_callback_url:
|
if not observer_task.webhook_callback_url:
|
||||||
return
|
return
|
||||||
organization_id = observer_task.organization_id
|
organization_id = observer_task.organization_id
|
||||||
|
|||||||
Reference in New Issue
Block a user