Workflow: YAML interface (#123)

This commit is contained in:
Kerem Yilmaz
2024-03-24 22:55:38 -07:00
committed by GitHub
parent cf4749c1d5
commit 0b5456a4c6
8 changed files with 306 additions and 16 deletions

View File

@@ -44,6 +44,8 @@ repos:
- types-requests - types-requests
- types-cachetools - types-cachetools
- alembic - alembic
- "sqlalchemy[mypy]"
- types-PyYAML
exclude: | exclude: |
(?x)( (?x)(
^tests.*| ^tests.*|

View File

@@ -689,21 +689,31 @@ class AgentDB:
title: str | None = None, title: str | None = None,
description: str | None = None, description: str | None = None,
workflow_definition: dict[str, Any] | None = None, workflow_definition: dict[str, Any] | None = None,
) -> Workflow | None: ) -> Workflow:
async with self.Session() as session: try:
workflow = (await session.scalars(select(WorkflowModel).filter_by(workflow_id=workflow_id))).first() async with self.Session() as session:
if workflow: if workflow := await session.scalars(select(WorkflowModel).filter_by(workflow_id=workflow_id).first()):
if title: if title:
workflow.title = title workflow.title = title
if description: if description:
workflow.description = description workflow.description = description
if workflow_definition: if workflow_definition:
workflow.workflow_definition = workflow_definition workflow.workflow_definition = workflow_definition
await session.commit() await session.commit()
await session.refresh(workflow) await session.refresh(workflow)
return convert_to_workflow(workflow, self.debug_enabled) return convert_to_workflow(workflow, self.debug_enabled)
LOG.error("Workflow not found, nothing to update", workflow_id=workflow_id) else:
return None raise NotFoundError("Workflow not found")
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except NotFoundError:
LOG.error("No workflow found to update", workflow_id=workflow_id)
LOG.error("NotFoundError", exc_info=True)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise
async def create_workflow_run( async def create_workflow_run(
self, workflow_id: str, proxy_location: ProxyLocation | None = None, webhook_callback_url: str | None = None self, workflow_id: str, proxy_location: ProxyLocation | None = None, webhook_callback_url: str | None = None

View File

@@ -1,6 +1,7 @@
from typing import Annotated, Any from typing import Annotated, Any
import structlog import structlog
import yaml
from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, Query, Request, Response, status from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, Query, Request, Response, status
from fastapi.responses import ORJSONResponse from fastapi.responses import ORJSONResponse
from pydantic import BaseModel from pydantic import BaseModel
@@ -25,9 +26,11 @@ from skyvern.forge.sdk.services import org_auth_service
from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.forge.sdk.workflow.models.workflow import ( from skyvern.forge.sdk.workflow.models.workflow import (
RunWorkflowResponse, RunWorkflowResponse,
Workflow,
WorkflowRequestBody, WorkflowRequestBody,
WorkflowRunStatusResponse, WorkflowRunStatusResponse,
) )
from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest
base_router = APIRouter() base_router = APIRouter()
@@ -446,3 +449,30 @@ async def get_workflow_run(
return await app.WORKFLOW_SERVICE.build_workflow_run_status_response( return await app.WORKFLOW_SERVICE.build_workflow_run_status_response(
workflow_id=workflow_id, workflow_run_id=workflow_run_id, organization_id=current_org.organization_id workflow_id=workflow_id, workflow_run_id=workflow_run_id, organization_id=current_org.organization_id
) )
@base_router.post(
"/workflows",
openapi_extra={
"requestBody": {
"content": {"application/x-yaml": {"schema": WorkflowCreateYAMLRequest.model_json_schema()}},
"required": True,
},
},
response_model=Workflow,
)
async def create_workflow(
request: Request,
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> Workflow:
analytics.capture("skyvern-oss-agent-workflow-create")
raw_yaml = await request.body()
try:
workflow_yaml = yaml.safe_load(raw_yaml)
except yaml.YAMLError:
raise HTTPException(status_code=422, detail="Invalid YAML")
workflow_create_request = WorkflowCreateYAMLRequest.model_validate(workflow_yaml)
return await app.WORKFLOW_SERVICE.create_workflow_from_request(
organization_id=current_org.organization_id, request=workflow_create_request
)

View File

@@ -60,6 +60,12 @@ class WorkflowRunContext:
""" """
return self.values[key] return self.values[key]
def has_parameter(self, key: str) -> bool:
return key in self.parameters
def has_value(self, key: str) -> bool:
return key in self.values
def set_value(self, key: str, value: Any) -> None: def set_value(self, key: str, value: Any) -> None:
self.values[key] = value self.values[key] = value

View File

@@ -21,3 +21,11 @@ class OutputParameterKeyCollisionError(BaseWorkflowException):
elif retry_count == 0: elif retry_count == 0:
message += " Max duplicate retries reached, aborting." message += " Max duplicate retries reached, aborting."
super().__init__(message) super().__init__(message)
class WorkflowDefinitionHasDuplicateParameterKeys(BaseWorkflowException):
def __init__(self, duplicate_keys: set[str]) -> None:
super().__init__(
f"WorkflowDefinition has parameters with duplicate keys. Each parameter needs to have a unique "
f"key. Duplicate key(s): {','.join(duplicate_keys)}"
)

View File

@@ -110,6 +110,17 @@ class TaskBlock(Block):
will_retry = True will_retry = True
workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id=workflow_run_id) workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id=workflow_run_id)
workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id) workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id)
# if the task url is parameterized, we need to get the value from the workflow run context
if self.url and workflow_run_context.has_parameter(self.url) and workflow_run_context.has_value(self.url):
task_url_parameter_value = workflow_run_context.get_value(self.url)
if task_url_parameter_value:
LOG.info(
"Task URL is parameterized, using parameter value",
task_url_parameter_value=task_url_parameter_value,
task_url_parameter_key=self.url,
)
self.url = task_url_parameter_value
# TODO (kerem) we should always retry on terminated. We should make a distinction between retriable and # TODO (kerem) we should always retry on terminated. We should make a distinction between retriable and
# non-retryable terminations # non-retryable terminations
while will_retry: while will_retry:

View File

@@ -0,0 +1,112 @@
import abc
from typing import Annotated, Any, Literal
from pydantic import BaseModel, Field
from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.forge.sdk.workflow.models.parameter import ParameterType, WorkflowParameterType
class ParameterYAML(BaseModel, abc.ABC):
parameter_type: ParameterType
key: str
description: str | None = None
class AWSSecretParameterYAML(ParameterYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
parameter_type: Literal[ParameterType.AWS_SECRET] = ParameterType.AWS_SECRET # type: ignore
aws_key: str
class WorkflowParameterYAML(ParameterYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
parameter_type: Literal[ParameterType.WORKFLOW] = ParameterType.WORKFLOW # type: ignore
workflow_parameter_type: WorkflowParameterType
default_value: str | int | float | bool | dict | list | None = None
class ContextParameterYAML(ParameterYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
parameter_type: Literal[ParameterType.CONTEXT] = ParameterType.CONTEXT # type: ignore
source_workflow_parameter_key: str
class OutputParameterYAML(ParameterYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
parameter_type: Literal[ParameterType.OUTPUT] = ParameterType.OUTPUT # type: ignore
class BlockYAML(BaseModel, abc.ABC):
block_type: BlockType
label: str
output_parameter_key: str | None = None
class TaskBlockYAML(BlockYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
block_type: Literal[BlockType.TASK] = BlockType.TASK # type: ignore
url: str | None = None
title: str = "Untitled Task"
navigation_goal: str | None = None
data_extraction_goal: str | None = None
data_schema: dict[str, Any] | None = None
error_code_mapping: dict[str, str] | None = None
max_retries: int = 0
parameter_keys: list[str] | None = None
class ForLoopBlockYAML(BlockYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
block_type: Literal[BlockType.FOR_LOOP] = BlockType.FOR_LOOP # type: ignore
loop_over_parameter_key: str
loop_block: BlockYAML
class CodeBlockYAML(BlockYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
# This pattern already works in block.py but since the BlockType is not defined in this file, mypy is not able
# to infer the type of the parameter_type attribute.
block_type: Literal[BlockType.CODE] = BlockType.CODE # type: ignore
code: str
parameter_keys: list[str] | None = None
PARAMETER_YAML_SUBCLASSES = AWSSecretParameterYAML | WorkflowParameterYAML | ContextParameterYAML | OutputParameterYAML
PARAMETER_YAML_TYPES = Annotated[PARAMETER_YAML_SUBCLASSES, Field(discriminator="parameter_type")]
BLOCK_YAML_SUBCLASSES = TaskBlockYAML | ForLoopBlockYAML | CodeBlockYAML
BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")]
class WorkflowDefinitionYAML(BaseModel):
parameters: list[PARAMETER_YAML_TYPES]
blocks: list[BLOCK_YAML_TYPES]
class WorkflowCreateYAMLRequest(BaseModel):
title: str
description: str | None = None
workflow_definition: WorkflowDefinitionYAML

View File

@@ -20,9 +20,13 @@ from skyvern.forge.sdk.core.security import generate_skyvern_signature
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus
from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateParameterKeys
from skyvern.forge.sdk.workflow.models.block import BlockType, BlockTypeVar, CodeBlock, ForLoopBlock, TaskBlock
from skyvern.forge.sdk.workflow.models.parameter import ( from skyvern.forge.sdk.workflow.models.parameter import (
AWSSecretParameter, AWSSecretParameter,
OutputParameter, OutputParameter,
Parameter,
ParameterType,
WorkflowParameter, WorkflowParameter,
WorkflowParameterType, WorkflowParameterType,
) )
@@ -36,6 +40,7 @@ from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRunStatus, WorkflowRunStatus,
WorkflowRunStatusResponse, WorkflowRunStatusResponse,
) )
from skyvern.forge.sdk.workflow.models.yaml import BLOCK_YAML_TYPES, WorkflowCreateYAMLRequest
from skyvern.webeye.browser_factory import BrowserState from skyvern.webeye.browser_factory import BrowserState
LOG = structlog.get_logger() LOG = structlog.get_logger()
@@ -252,7 +257,7 @@ class WorkflowService:
title: str | None = None, title: str | None = None,
description: str | None = None, description: str | None = None,
workflow_definition: WorkflowDefinition | None = None, workflow_definition: WorkflowDefinition | None = None,
) -> Workflow | None: ) -> Workflow:
if workflow_definition: if workflow_definition:
workflow_definition.validate() workflow_definition.validate()
return await app.DATABASE.update_workflow( return await app.DATABASE.update_workflow(
@@ -604,3 +609,109 @@ class WorkflowService:
await self.persist_har_data(browser_state, last_step, workflow, workflow_run) await self.persist_har_data(browser_state, last_step, workflow, workflow_run)
await self.persist_tracing_data(browser_state, last_step, workflow_run) await self.persist_tracing_data(browser_state, last_step, workflow_run)
async def create_workflow_from_request(self, organization_id: str, request: WorkflowCreateYAMLRequest) -> Workflow:
LOG.info("Creating workflow from request", organization_id=organization_id, title=request.title)
try:
workflow = await self.create_workflow(
organization_id=organization_id,
title=request.title,
description=request.description,
workflow_definition=WorkflowDefinition(blocks=[]),
)
# Create parameters from the request
parameters = {}
duplicate_parameter_keys = set()
for parameter in request.workflow_definition.parameters:
if parameter.key in parameters:
LOG.error(f"Duplicate parameter key {parameter.key}")
duplicate_parameter_keys.add(parameter.key)
continue
if parameter.parameter_type == ParameterType.AWS_SECRET:
parameters[parameter.key] = await self.create_aws_secret_parameter(
workflow_id=workflow.workflow_id,
aws_key=parameter.aws_key,
key=parameter.key,
description=parameter.description,
)
elif parameter.parameter_type == ParameterType.WORKFLOW:
parameters[parameter.key] = await self.create_workflow_parameter(
workflow_id=workflow.workflow_id,
workflow_parameter_type=parameter.workflow_parameter_type,
key=parameter.key,
default_value=parameter.default_value,
description=parameter.description,
)
elif parameter.parameter_type == ParameterType.OUTPUT:
parameters[parameter.key] = await self.create_output_parameter(
workflow_id=workflow.workflow_id,
key=parameter.key,
description=parameter.description,
)
if duplicate_parameter_keys:
raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys)
# Create blocks from the request
block_label_mapping = {}
blocks = []
for block_yaml in request.workflow_definition.blocks:
block = await self.block_yaml_to_block(block_yaml, parameters)
blocks.append(block)
block_label_mapping[block.label] = block
# Set the blocks for the workflow definition
workflow_definition = WorkflowDefinition(blocks=blocks)
workflow = await self.update_workflow(
workflow_id=workflow.workflow_id,
workflow_definition=workflow_definition,
)
LOG.info(
f"Created workflow from request, title: {request.title}",
parameter_keys=[parameter.key for parameter in parameters.values()],
block_labels=[block.label for block in blocks],
organization_id=organization_id,
title=request.title,
workflow_id=workflow.workflow_id,
)
return workflow
except Exception as e:
LOG.exception(f"Failed to create workflow from request, title: {request.title}")
raise e
@staticmethod
async def block_yaml_to_block(block_yaml: BLOCK_YAML_TYPES, parameters: dict[str, Parameter]) -> BlockTypeVar:
output_parameter = parameters.get(block_yaml.output_parameter_key) if block_yaml.output_parameter_key else None
if block_yaml.block_type == BlockType.TASK:
task_block_parameters = (
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
return TaskBlock(
label=block_yaml.label,
url=block_yaml.url,
title=block_yaml.title,
parameters=task_block_parameters,
output_parameter=output_parameter,
navigation_goal=block_yaml.navigation_goal,
data_extraction_goal=block_yaml.data_extraction_goal,
data_schema=block_yaml.data_schema,
error_code_mapping=block_yaml.error_code_mapping,
max_retries=block_yaml.max_retries,
)
elif block_yaml.block_type == BlockType.FOR_LOOP:
return ForLoopBlock(
label=block_yaml.label,
loop_over_parameter_key=parameters[block_yaml.loop_over_parameter_key],
loop_block=WorkflowService.block_yaml_to_block(block_yaml.loop_block, parameters),
output_parameter=output_parameter,
)
elif block_yaml.block_type == BlockType.CODE:
return CodeBlock(
label=block_yaml.label,
code=block_yaml.code,
parameters=[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else [],
output_parameter=output_parameter,
)
raise ValueError(f"Invalid block type {block_yaml.block_type}")