backend: normalize returns for evals (items, total); ensure title to … (#2329)
This commit is contained in:
@@ -650,7 +650,11 @@ class AgentDB:
|
|||||||
try:
|
try:
|
||||||
async with self.Session() as session:
|
async with self.Session() as session:
|
||||||
db_page = page - 1 # offset logic is 0 based
|
db_page = page - 1 # offset logic is 0 based
|
||||||
query = select(TaskModel).filter(TaskModel.organization_id == organization_id)
|
query = (
|
||||||
|
select(TaskModel, WorkflowRunModel.workflow_permanent_id)
|
||||||
|
.join(WorkflowRunModel, TaskModel.workflow_run_id == WorkflowRunModel.workflow_run_id, isouter=True)
|
||||||
|
.filter(TaskModel.organization_id == organization_id)
|
||||||
|
)
|
||||||
if task_status:
|
if task_status:
|
||||||
query = query.filter(TaskModel.status.in_(task_status))
|
query = query.filter(TaskModel.status.in_(task_status))
|
||||||
if workflow_run_id:
|
if workflow_run_id:
|
||||||
@@ -665,8 +669,42 @@ class AgentDB:
|
|||||||
.limit(page_size)
|
.limit(page_size)
|
||||||
.offset(db_page * page_size)
|
.offset(db_page * page_size)
|
||||||
)
|
)
|
||||||
tasks = (await session.scalars(query)).all()
|
|
||||||
return [convert_to_task(task, debug_enabled=self.debug_enabled) for task in tasks]
|
results = (await session.execute(query)).all()
|
||||||
|
|
||||||
|
return [
|
||||||
|
convert_to_task(task, debug_enabled=self.debug_enabled, workflow_permanent_id=workflow_permanent_id)
|
||||||
|
for task, workflow_permanent_id in results
|
||||||
|
]
|
||||||
|
except SQLAlchemyError:
|
||||||
|
LOG.error("SQLAlchemyError", exc_info=True)
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
LOG.error("UnexpectedError", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def get_tasks_count(
|
||||||
|
self,
|
||||||
|
organization_id: str,
|
||||||
|
task_status: list[TaskStatus] | None = None,
|
||||||
|
workflow_run_id: str | None = None,
|
||||||
|
only_standalone_tasks: bool = False,
|
||||||
|
application: str | None = None,
|
||||||
|
) -> int:
|
||||||
|
try:
|
||||||
|
async with self.Session() as session:
|
||||||
|
count_query = (
|
||||||
|
select(func.count()).select_from(TaskModel).filter(TaskModel.organization_id == organization_id)
|
||||||
|
)
|
||||||
|
if task_status:
|
||||||
|
count_query = count_query.filter(TaskModel.status.in_(task_status))
|
||||||
|
if workflow_run_id:
|
||||||
|
count_query = count_query.filter(TaskModel.workflow_run_id == workflow_run_id)
|
||||||
|
if only_standalone_tasks:
|
||||||
|
count_query = count_query.filter(TaskModel.workflow_run_id.is_(None))
|
||||||
|
if application:
|
||||||
|
count_query = count_query.filter(TaskModel.application == application)
|
||||||
|
return (await session.execute(count_query)).scalar_one()
|
||||||
except SQLAlchemyError:
|
except SQLAlchemyError:
|
||||||
LOG.error("SQLAlchemyError", exc_info=True)
|
LOG.error("SQLAlchemyError", exc_info=True)
|
||||||
raise
|
raise
|
||||||
@@ -1527,6 +1565,25 @@ class AgentDB:
|
|||||||
LOG.error("SQLAlchemyError", exc_info=True)
|
LOG.error("SQLAlchemyError", exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
async def get_workflow_runs_count(
|
||||||
|
self,
|
||||||
|
organization_id: str,
|
||||||
|
status: list[WorkflowRunStatus] | None = None,
|
||||||
|
) -> int:
|
||||||
|
try:
|
||||||
|
async with self.Session() as session:
|
||||||
|
count_query = (
|
||||||
|
select(func.count())
|
||||||
|
.select_from(WorkflowRunModel)
|
||||||
|
.filter(WorkflowRunModel.organization_id == organization_id)
|
||||||
|
)
|
||||||
|
if status:
|
||||||
|
count_query = count_query.filter(WorkflowRunModel.status.in_(status))
|
||||||
|
return (await session.execute(count_query)).scalar_one()
|
||||||
|
except SQLAlchemyError:
|
||||||
|
LOG.error("SQLAlchemyError", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
async def get_workflow_runs_for_workflow_permanent_id(
|
async def get_workflow_runs_for_workflow_permanent_id(
|
||||||
self,
|
self,
|
||||||
workflow_permanent_id: str,
|
workflow_permanent_id: str,
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ def _custom_json_serializer(*args, **kwargs) -> str:
|
|||||||
return json.dumps(*args, default=pydantic.json.pydantic_encoder, **kwargs)
|
return json.dumps(*args, default=pydantic.json.pydantic_encoder, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def convert_to_task(task_obj: TaskModel, debug_enabled: bool = False) -> Task:
|
def convert_to_task(task_obj: TaskModel, debug_enabled: bool = False, workflow_permanent_id: str | None = None) -> Task:
|
||||||
if debug_enabled:
|
if debug_enabled:
|
||||||
LOG.debug("Converting TaskModel to Task", task_id=task_obj.task_id)
|
LOG.debug("Converting TaskModel to Task", task_id=task_obj.task_id)
|
||||||
task = Task(
|
task = Task(
|
||||||
@@ -83,6 +83,7 @@ def convert_to_task(task_obj: TaskModel, debug_enabled: bool = False) -> Task:
|
|||||||
proxy_location=(ProxyLocation(task_obj.proxy_location) if task_obj.proxy_location else None),
|
proxy_location=(ProxyLocation(task_obj.proxy_location) if task_obj.proxy_location else None),
|
||||||
extracted_information_schema=task_obj.extracted_information_schema,
|
extracted_information_schema=task_obj.extracted_information_schema,
|
||||||
workflow_run_id=task_obj.workflow_run_id,
|
workflow_run_id=task_obj.workflow_run_id,
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
order=task_obj.order,
|
order=task_obj.order,
|
||||||
retry=task_obj.retry,
|
retry=task_obj.retry,
|
||||||
max_steps_per_run=task_obj.max_steps_per_run,
|
max_steps_per_run=task_obj.max_steps_per_run,
|
||||||
|
|||||||
@@ -226,6 +226,7 @@ class Task(TaskBase):
|
|||||||
)
|
)
|
||||||
organization_id: str | None = None
|
organization_id: str | None = None
|
||||||
workflow_run_id: str | None = None
|
workflow_run_id: str | None = None
|
||||||
|
workflow_permanent_id: str | None = None
|
||||||
order: int | None = None
|
order: int | None = None
|
||||||
retry: int | None = None
|
retry: int | None = None
|
||||||
max_steps_per_run: int | None = None
|
max_steps_per_run: int | None = None
|
||||||
|
|||||||
@@ -677,6 +677,16 @@ class WorkflowService:
|
|||||||
organization_id=organization_id, page=page, page_size=page_size, status=status
|
organization_id=organization_id, page=page, page_size=page_size, status=status
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_workflow_runs_count(
|
||||||
|
self,
|
||||||
|
organization_id: str,
|
||||||
|
status: list[WorkflowRunStatus] | None = None,
|
||||||
|
) -> int:
|
||||||
|
return await app.DATABASE.get_workflow_runs_count(
|
||||||
|
organization_id=organization_id,
|
||||||
|
status=status,
|
||||||
|
)
|
||||||
|
|
||||||
async def get_workflow_runs_for_workflow_permanent_id(
|
async def get_workflow_runs_for_workflow_permanent_id(
|
||||||
self,
|
self,
|
||||||
workflow_permanent_id: str,
|
workflow_permanent_id: str,
|
||||||
|
|||||||
Reference in New Issue
Block a user