workflow run block db + schema transformation code (#1418)

This commit is contained in:
Shuchang Zheng
2024-12-20 07:40:32 -08:00
committed by GitHub
parent a12776e630
commit 8b75586fb1
4 changed files with 96 additions and 1 deletions

View File

@@ -30,6 +30,7 @@ from skyvern.forge.sdk.db.models import (
TOTPCodeModel,
WorkflowModel,
WorkflowParameterModel,
WorkflowRunBlockModel,
WorkflowRunModel,
WorkflowRunOutputParameterModel,
WorkflowRunParameterModel,
@@ -48,6 +49,7 @@ from skyvern.forge.sdk.db.utils import (
convert_to_workflow,
convert_to_workflow_parameter,
convert_to_workflow_run,
convert_to_workflow_run_block,
convert_to_workflow_run_output_parameter,
convert_to_workflow_run_parameter,
)
@@ -58,6 +60,8 @@ from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAu
from skyvern.forge.sdk.schemas.task_generations import TaskGeneration
from skyvern.forge.sdk.schemas.tasks import OrderBy, ProxyLocation, SortDirection, Task, TaskStatus
from skyvern.forge.sdk.schemas.totp_codes import TOTPCode
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
from skyvern.forge.sdk.workflow.models.block import BlockStatus, BlockType
from skyvern.forge.sdk.workflow.models.parameter import (
AWSSecretParameter,
BitwardenCreditCardDataParameter,
@@ -1984,3 +1988,63 @@ class AgentDB:
await session.refresh(observer_cruise)
return ObserverCruise.model_validate(observer_cruise)
raise NotFoundError(f"ObserverCruise {observer_cruise_id} not found")
async def create_workflow_run_block(
self,
workflow_run_id: str,
parent_workflow_run_block_id: str | None = None,
organization_id: str | None = None,
task_id: str | None = None,
label: str | None = None,
block_type: BlockType | None = None,
status: BlockStatus = BlockStatus.running,
output: dict | list | str | None = None,
continue_on_failure: bool = False,
) -> WorkflowRunBlock:
async with self.Session() as session:
new_workflow_run_block = WorkflowRunBlockModel(
workflow_run_id=workflow_run_id,
parent_workflow_run_block_id=parent_workflow_run_block_id,
organization_id=organization_id,
task_id=task_id,
label=label,
block_type=block_type,
status=status,
output=output,
continue_on_failure=continue_on_failure,
)
session.add(new_workflow_run_block)
await session.commit()
await session.refresh(new_workflow_run_block)
task = None
if task_id:
task = await self.get_task(task_id, organization_id=organization_id)
return convert_to_workflow_run_block(new_workflow_run_block, task=task)
async def update_workflow_run_block(
self,
workflow_run_block_id: str,
status: BlockStatus | None = None,
output: dict | list | str | None = None,
) -> WorkflowRunBlock:
async with self.Session() as session:
workflow_run_block = (
await session.scalars(
select(WorkflowRunBlockModel).filter_by(workflow_run_block_id=workflow_run_block_id)
)
).first()
if workflow_run_block:
if status:
workflow_run_block.status = status
if output:
workflow_run_block.output = output
await session.commit()
await session.refresh(workflow_run_block)
else:
raise NotFoundError(f"WorkflowRunBlock {workflow_run_block_id} not found")
task = None
task_id = workflow_run_block.task_id
if task_id:
task = await self.get_task(task_id, organization_id=workflow_run_block.organization_id)
return convert_to_workflow_run_block(workflow_run_block, task=task)