task v2 refactor part 10: ObserverTask -> TaskV2 in backend code (#1839)

This commit is contained in:
Shuchang Zheng
2025-02-27 20:19:02 -08:00
committed by GitHub
parent 5c5464b187
commit 14689b53e4
21 changed files with 313 additions and 332 deletions

View File

@@ -22,8 +22,6 @@ from skyvern.forge.sdk.db.models import (
BitwardenSensitiveInformationParameterModel,
CredentialModel,
CredentialParameterModel,
ObserverCruiseModel,
ObserverThoughtModel,
OrganizationAuthTokenModel,
OrganizationBitwardenCollectionModel,
OrganizationModel,
@@ -33,6 +31,8 @@ from skyvern.forge.sdk.db.models import (
TaskGenerationModel,
TaskModel,
TaskRunModel,
TaskV2Model,
ThoughtModel,
TOTPCodeModel,
WorkflowModel,
WorkflowParameterModel,
@@ -68,7 +68,7 @@ from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAu
from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession
from skyvern.forge.sdk.schemas.task_generations import TaskGeneration
from skyvern.forge.sdk.schemas.task_runs import TaskRun, TaskRunType
from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverTaskStatus, ObserverThought, ObserverThoughtType
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Status, Thought, ThoughtType
from skyvern.forge.sdk.schemas.tasks import OrderBy, ProxyLocation, SortDirection, Task, TaskStatus
from skyvern.forge.sdk.schemas.totp_codes import TOTPCode
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
@@ -211,7 +211,7 @@ class AgentDB:
workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None,
task_v2_id: str | None = None,
observer_thought_id: str | None = None,
thought_id: str | None = None,
ai_suggestion_id: str | None = None,
organization_id: str | None = None,
) -> Artifact:
@@ -226,7 +226,7 @@ class AgentDB:
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
observer_cruise_id=task_v2_id,
observer_thought_id=observer_thought_id,
observer_thought_id=thought_id,
ai_suggestion_id=ai_suggestion_id,
organization_id=organization_id,
)
@@ -893,7 +893,7 @@ class AgentDB:
step_id: str | None = None,
workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None,
observer_thought_id: str | None = None,
thought_id: str | None = None,
task_v2_id: str | None = None,
organization_id: str | None = None,
) -> list[Artifact]:
@@ -911,8 +911,8 @@ class AgentDB:
query = query.filter_by(workflow_run_id=workflow_run_id)
if workflow_run_block_id is not None:
query = query.filter_by(workflow_run_block_id=workflow_run_block_id)
if observer_thought_id is not None:
query = query.filter_by(observer_thought_id=observer_thought_id)
if thought_id is not None:
query = query.filter_by(observer_thought_id=thought_id)
if task_v2_id is not None:
query = query.filter_by(observer_cruise_id=task_v2_id)
if organization_id is not None:
@@ -937,7 +937,7 @@ class AgentDB:
step_id: str | None = None,
workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None,
observer_thought_id: str | None = None,
thought_id: str | None = None,
task_v2_id: str | None = None,
organization_id: str | None = None,
) -> Artifact | None:
@@ -947,7 +947,7 @@ class AgentDB:
step_id=step_id,
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
observer_thought_id=observer_thought_id,
thought_id=thought_id,
task_v2_id=task_v2_id,
organization_id=organization_id,
)
@@ -2128,24 +2128,24 @@ class AgentDB:
await session.execute(stmt)
await session.commit()
async def get_task_v2(self, task_v2_id: str, organization_id: str | None = None) -> ObserverTask | None:
async def get_task_v2(self, task_v2_id: str, organization_id: str | None = None) -> TaskV2 | None:
async with self.Session() as session:
if task_v2 := (
await session.scalars(
select(ObserverCruiseModel)
select(TaskV2Model)
.filter_by(observer_cruise_id=task_v2_id)
.filter_by(organization_id=organization_id)
)
).first():
return ObserverTask.model_validate(task_v2)
return TaskV2.model_validate(task_v2)
return None
async def delete_observer_thoughts_for_cruise(self, task_v2_id: str, organization_id: str | None = None) -> None:
async def delete_thoughts(self, task_v2_id: str, organization_id: str | None = None) -> None:
async with self.Session() as session:
stmt = delete(ObserverThoughtModel).where(
stmt = delete(ThoughtModel).where(
and_(
ObserverThoughtModel.observer_cruise_id == task_v2_id,
ObserverThoughtModel.organization_id == organization_id,
ThoughtModel.observer_cruise_id == task_v2_id,
ThoughtModel.organization_id == organization_id,
)
)
await session.execute(stmt)
@@ -2155,49 +2155,47 @@ class AgentDB:
self,
workflow_run_id: str,
organization_id: str | None = None,
) -> ObserverTask | None:
) -> TaskV2 | None:
async with self.Session() as session:
if task_v2 := (
await session.scalars(
select(ObserverCruiseModel)
select(TaskV2Model)
.filter_by(organization_id=organization_id)
.filter_by(workflow_run_id=workflow_run_id)
)
).first():
return ObserverTask.model_validate(task_v2)
return TaskV2.model_validate(task_v2)
return None
async def get_observer_thought(
self, observer_thought_id: str, organization_id: str | None = None
) -> ObserverThought | None:
async def get_thought(self, thought_id: str, organization_id: str | None = None) -> Thought | None:
async with self.Session() as session:
if observer_thought := (
if thought := (
await session.scalars(
select(ObserverThoughtModel)
.filter_by(observer_thought_id=observer_thought_id)
select(ThoughtModel)
.filter_by(observer_thought_id=thought_id)
.filter_by(organization_id=organization_id)
)
).first():
return ObserverThought.model_validate(observer_thought)
return Thought.model_validate(thought)
return None
async def get_observer_thoughts(
async def get_thoughts(
self,
task_v2_id: str,
observer_thought_types: list[ObserverThoughtType] | None = None,
thought_types: list[ThoughtType] | None = None,
organization_id: str | None = None,
) -> list[ObserverThought]:
) -> list[Thought]:
async with self.Session() as session:
query = (
select(ObserverThoughtModel)
select(ThoughtModel)
.filter_by(observer_cruise_id=task_v2_id)
.filter_by(organization_id=organization_id)
.order_by(ObserverThoughtModel.created_at)
.order_by(ThoughtModel.created_at)
)
if observer_thought_types:
query = query.filter(ObserverThoughtModel.observer_thought_type.in_(observer_thought_types))
observer_thoughts = (await session.scalars(query)).all()
return [ObserverThought.model_validate(thought) for thought in observer_thoughts]
if thought_types:
query = query.filter(ThoughtModel.observer_thought_type.in_(thought_types))
thoughts = (await session.scalars(query)).all()
return [Thought.model_validate(thought) for thought in thoughts]
async def create_task_v2(
self,
@@ -2211,9 +2209,9 @@ class AgentDB:
totp_identifier: str | None = None,
totp_verification_url: str | None = None,
webhook_callback_url: str | None = None,
) -> ObserverTask:
) -> TaskV2:
async with self.Session() as session:
new_task_v2 = ObserverCruiseModel(
new_task_v2 = TaskV2Model(
workflow_run_id=workflow_run_id,
workflow_id=workflow_id,
workflow_permanent_id=workflow_permanent_id,
@@ -2228,9 +2226,9 @@ class AgentDB:
session.add(new_task_v2)
await session.commit()
await session.refresh(new_task_v2)
return ObserverTask.model_validate(new_task_v2)
return TaskV2.model_validate(new_task_v2)
async def create_observer_thought(
async def create_thought(
self,
task_v2_id: str,
workflow_run_id: str | None = None,
@@ -2241,16 +2239,16 @@ class AgentDB:
observation: str | None = None,
thought: str | None = None,
answer: str | None = None,
observer_thought_scenario: str | None = None,
observer_thought_type: str = ObserverThoughtType.plan,
thought_scenario: str | None = None,
thought_type: str = ThoughtType.plan,
output: dict[str, Any] | None = None,
input_token_count: int | None = None,
output_token_count: int | None = None,
thought_cost: float | None = None,
organization_id: str | None = None,
) -> ObserverThought:
) -> Thought:
async with self.Session() as session:
new_observer_thought = ObserverThoughtModel(
new_thought = ThoughtModel(
observer_cruise_id=task_v2_id,
workflow_run_id=workflow_run_id,
workflow_id=workflow_id,
@@ -2260,22 +2258,22 @@ class AgentDB:
observation=observation,
thought=thought,
answer=answer,
observer_thought_scenario=observer_thought_scenario,
observer_thought_type=observer_thought_type,
observer_thought_scenario=thought_scenario,
observer_thought_type=thought_type,
output=output,
input_token_count=input_token_count,
output_token_count=output_token_count,
thought_cost=thought_cost,
organization_id=organization_id,
)
session.add(new_observer_thought)
session.add(new_thought)
await session.commit()
await session.refresh(new_observer_thought)
return ObserverThought.model_validate(new_observer_thought)
await session.refresh(new_thought)
return Thought.model_validate(new_thought)
async def update_observer_thought(
async def update_thought(
self,
observer_thought_id: str,
thought_id: str,
workflow_run_block_id: str | None = None,
workflow_run_id: str | None = None,
workflow_id: str | None = None,
@@ -2288,47 +2286,47 @@ class AgentDB:
output_token_count: int | None = None,
thought_cost: float | None = None,
organization_id: str | None = None,
) -> ObserverThought:
) -> Thought:
async with self.Session() as session:
observer_thought = (
thought_obj = (
await session.scalars(
select(ObserverThoughtModel)
.filter_by(observer_thought_id=observer_thought_id)
select(ThoughtModel)
.filter_by(observer_thought_id=thought_id)
.filter_by(organization_id=organization_id)
)
).first()
if observer_thought:
if thought_obj:
if workflow_run_block_id:
observer_thought.workflow_run_block_id = workflow_run_block_id
thought_obj.workflow_run_block_id = workflow_run_block_id
if workflow_run_id:
observer_thought.workflow_run_id = workflow_run_id
thought_obj.workflow_run_id = workflow_run_id
if workflow_id:
observer_thought.workflow_id = workflow_id
thought_obj.workflow_id = workflow_id
if workflow_permanent_id:
observer_thought.workflow_permanent_id = workflow_permanent_id
thought_obj.workflow_permanent_id = workflow_permanent_id
if observation:
observer_thought.observation = observation
thought_obj.observation = observation
if thought:
observer_thought.thought = thought
thought_obj.thought = thought
if answer:
observer_thought.answer = answer
thought_obj.answer = answer
if output:
observer_thought.output = output
thought_obj.output = output
if input_token_count:
observer_thought.input_token_count = input_token_count
thought_obj.input_token_count = input_token_count
if output_token_count:
observer_thought.output_token_count = output_token_count
thought_obj.output_token_count = output_token_count
if thought_cost:
observer_thought.thought_cost = thought_cost
thought_obj.thought_cost = thought_cost
await session.commit()
await session.refresh(observer_thought)
return ObserverThought.model_validate(observer_thought)
raise NotFoundError(f"ObserverThought {observer_thought_id}")
await session.refresh(thought_obj)
return Thought.model_validate(thought_obj)
raise NotFoundError(f"Thought {thought_id}")
async def update_task_v2(
self,
task_v2_id: str,
status: ObserverTaskStatus | None = None,
status: TaskV2Status | None = None,
workflow_run_id: str | None = None,
workflow_id: str | None = None,
workflow_permanent_id: str | None = None,
@@ -2337,11 +2335,11 @@ class AgentDB:
summary: str | None = None,
output: dict[str, Any] | None = None,
organization_id: str | None = None,
) -> ObserverTask:
) -> TaskV2:
async with self.Session() as session:
task_v2 = (
await session.scalars(
select(ObserverCruiseModel)
select(TaskV2Model)
.filter_by(observer_cruise_id=task_v2_id)
.filter_by(organization_id=organization_id)
)
@@ -2365,7 +2363,7 @@ class AgentDB:
task_v2.output = output
await session.commit()
await session.refresh(task_v2)
return ObserverTask.model_validate(task_v2)
return TaskV2.model_validate(task_v2)
raise NotFoundError(f"TaskV2 {task_v2_id} not found")
async def create_workflow_run_block(