add observer cruise creation and completion (#1354)

This commit is contained in:
Shuchang Zheng
2024-12-08 12:43:59 -08:00
committed by GitHub
parent fbc6677f9e
commit bda119027e
6 changed files with 145 additions and 14 deletions

View File

@@ -1,7 +1,7 @@
import ipaddress
from urllib.parse import urlparse
from pydantic import HttpUrl, ValidationError, parse_obj_as
from pydantic import HttpUrl, ValidationError
from skyvern.config import settings
from skyvern.exceptions import InvalidUrl
@@ -27,17 +27,6 @@ def prepend_scheme_and_validate_url(url: str) -> str:
return url
def validate_url(url: str) -> str:
try:
if url:
# Use parse_obj_as to validate the string as an HttpUrl
parse_obj_as(HttpUrl, url)
return url
except ValidationError:
# Handle the validation error
raise InvalidUrl(url=url)
def is_blocked_host(host: str) -> bool:
try:
ip = ipaddress.ip_address(host)

View File

@@ -19,6 +19,8 @@ from skyvern.forge.sdk.db.models import (
BitwardenCreditCardDataParameterModel,
BitwardenLoginCredentialParameterModel,
BitwardenSensitiveInformationParameterModel,
ObserverCruiseModel,
ObserverThoughtModel,
OrganizationAuthTokenModel,
OrganizationModel,
OutputParameterModel,
@@ -50,6 +52,7 @@ from skyvern.forge.sdk.db.utils import (
convert_to_workflow_run_parameter,
)
from skyvern.forge.sdk.models import Step, StepStatus
from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverCruiseStatus, ObserverThought
from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAuthToken
from skyvern.forge.sdk.schemas.task_generations import TaskGeneration
from skyvern.forge.sdk.schemas.tasks import OrderBy, ProxyLocation, SortDirection, Task, TaskStatus
@@ -1729,3 +1732,103 @@ class AgentDB:
)
await session.execute(stmt)
await session.commit()
async def get_observer_cruise(
self, observer_cruise_id: str, organization_id: str | None = None
) -> ObserverCruise | None:
async with self.Session() as session:
if observer_cruise := (
await session.scalars(
select(ObserverCruiseModel)
.filter_by(observer_cruise_id=observer_cruise_id)
.filter_by(organization_id=organization_id)
)
).first():
return ObserverCruise.model_validate(observer_cruise)
return None
async def get_observer_thought(
self, observer_thought_id: str, organization_id: str | None = None
) -> ObserverThought | None:
async with self.Session() as session:
if observer_thought := (
await session.scalars(
select(ObserverThoughtModel)
.filter_by(observer_thought_id=observer_thought_id)
.filter_by(organization_id=organization_id)
)
).first():
return ObserverThought.model_validate(observer_thought)
return None
async def create_observer_cruise(
self,
workflow_run_id: str | None = None,
workflow_id: str | None = None,
prompt: str | None = None,
url: str | None = None,
organization_id: str | None = None,
) -> ObserverCruise:
async with self.Session() as session:
new_observer_cruise = ObserverCruiseModel(
workflow_run_id=workflow_run_id,
workflow_id=workflow_id,
prompt=prompt,
url=url,
organization_id=organization_id,
)
session.add(new_observer_cruise)
await session.commit()
await session.refresh(new_observer_cruise)
return ObserverCruise.model_validate(new_observer_cruise)
async def create_observer_thought(
self,
observer_cruise_id: str,
workflow_run_id: str | None = None,
workflow_id: str | None = None,
workflow_run_block_id: str | None = None,
user_input: str | None = None,
observation: str | None = None,
thought: str | None = None,
answer: str | None = None,
organization_id: str | None = None,
) -> ObserverThought:
async with self.Session() as session:
new_observer_thought = ObserverThoughtModel(
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
workflow_id=workflow_id,
workflow_run_block_id=workflow_run_block_id,
user_input=user_input,
observation=observation,
thought=thought,
answer=answer,
organization_id=organization_id,
)
session.add(new_observer_thought)
await session.commit()
await session.refresh(new_observer_thought)
return ObserverThought.model_validate(new_observer_thought)
async def update_observer_cruise(
self,
observer_cruise_id: str,
status: ObserverCruiseStatus | None = None,
organization_id: str | None = None,
) -> ObserverCruise:
async with self.Session() as session:
observer_cruise = (
await session.scalars(
select(ObserverCruiseModel)
.filter_by(observer_cruise_id=observer_cruise_id)
.filter_by(organization_id=organization_id)
)
).first()
if observer_cruise:
if status:
observer_cruise.status = status
await session.commit()
await session.refresh(observer_cruise)
return ObserverCruise.model_validate(observer_cruise)
raise NotFoundError(f"ObserverCruise {observer_cruise_id} not found")

View File

@@ -515,6 +515,8 @@ class ObserverCruiseModel(Base):
organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=True)
workflow_run_id = Column(String, ForeignKey("workflow_runs.workflow_run_id"), nullable=True)
workflow_id = Column(String, ForeignKey("workflows.workflow_id"), nullable=True)
prompt = Column(UnicodeText, nullable=True)
url = Column(String, nullable=True)
class ObserverThoughtModel(Base):

View File

@@ -1,7 +1,7 @@
from datetime import datetime
from enum import StrEnum
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, HttpUrl
class ObserverCruiseStatus(StrEnum):
@@ -23,12 +23,16 @@ class ObserverCruise(BaseModel):
organization_id: str | None = None
workflow_run_id: str | None = None
workflow_id: str | None = None
prompt: str | None = None
url: HttpUrl | None = None
created_at: datetime
modified_at: datetime
class ObserverThought(BaseModel):
model_config = ConfigDict(from_attributes=True)
observer_thought_id: str
observer_cruise_id: str
organization_id: str | None = None