add endpoint and query for fetching script blocks for a wpid (#3164)
This commit is contained in:
@@ -3819,6 +3819,24 @@ class AgentDB:
|
|||||||
).all()
|
).all()
|
||||||
return [convert_to_script_file(script_file) for script_file in script_files]
|
return [convert_to_script_file(script_file) for script_file in script_files]
|
||||||
|
|
||||||
|
async def get_script_file_by_id(
|
||||||
|
self,
|
||||||
|
script_revision_id: str,
|
||||||
|
file_id: str,
|
||||||
|
organization_id: str,
|
||||||
|
) -> ScriptFile | None:
|
||||||
|
async with self.Session() as session:
|
||||||
|
script_file = (
|
||||||
|
await session.scalars(
|
||||||
|
select(ScriptFileModel)
|
||||||
|
.filter_by(script_revision_id=script_revision_id)
|
||||||
|
.filter_by(file_id=file_id)
|
||||||
|
.filter_by(organization_id=organization_id)
|
||||||
|
)
|
||||||
|
).first()
|
||||||
|
|
||||||
|
return convert_to_script_file(script_file) if script_file else None
|
||||||
|
|
||||||
async def get_script_block(
|
async def get_script_block(
|
||||||
self,
|
self,
|
||||||
script_block_id: str,
|
script_block_id: str,
|
||||||
@@ -3887,6 +3905,7 @@ class AgentDB:
|
|||||||
organization_id: str,
|
organization_id: str,
|
||||||
workflow_permanent_id: str,
|
workflow_permanent_id: str,
|
||||||
cache_key_value: str,
|
cache_key_value: str,
|
||||||
|
cache_key: str | None = None,
|
||||||
) -> list[Script]:
|
) -> list[Script]:
|
||||||
"""Get latest script versions linked to a workflow by a specific cache_key_value."""
|
"""Get latest script versions linked to a workflow by a specific cache_key_value."""
|
||||||
try:
|
try:
|
||||||
@@ -3900,6 +3919,9 @@ class AgentDB:
|
|||||||
.where(WorkflowScriptModel.deleted_at.is_(None))
|
.where(WorkflowScriptModel.deleted_at.is_(None))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if cache_key:
|
||||||
|
ws_script_ids_subquery = ws_script_ids_subquery.where(WorkflowScriptModel.cache_key == cache_key)
|
||||||
|
|
||||||
# Latest version per script_id within the org and not deleted
|
# Latest version per script_id within the org and not deleted
|
||||||
latest_versions_subquery = (
|
latest_versions_subquery = (
|
||||||
select(
|
select(
|
||||||
|
|||||||
@@ -9,7 +9,14 @@ from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory
|
|||||||
from skyvern.forge.sdk.routes.routers import base_router
|
from skyvern.forge.sdk.routes.routers import base_router
|
||||||
from skyvern.forge.sdk.schemas.organizations import Organization
|
from skyvern.forge.sdk.schemas.organizations import Organization
|
||||||
from skyvern.forge.sdk.services import org_auth_service
|
from skyvern.forge.sdk.services import org_auth_service
|
||||||
from skyvern.schemas.scripts import CreateScriptRequest, CreateScriptResponse, DeployScriptRequest, Script
|
from skyvern.schemas.scripts import (
|
||||||
|
CreateScriptRequest,
|
||||||
|
CreateScriptResponse,
|
||||||
|
DeployScriptRequest,
|
||||||
|
Script,
|
||||||
|
ScriptBlocksRequest,
|
||||||
|
ScriptBlocksResponse,
|
||||||
|
)
|
||||||
from skyvern.services import script_service
|
from skyvern.services import script_service
|
||||||
|
|
||||||
LOG = structlog.get_logger()
|
LOG = structlog.get_logger()
|
||||||
@@ -261,3 +268,154 @@ async def run_script(
|
|||||||
organization_id=current_org.organization_id,
|
organization_id=current_org.organization_id,
|
||||||
background_tasks=background_tasks,
|
background_tasks=background_tasks,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@base_router.post(
|
||||||
|
"/scripts/{workflow_permanent_id}/blocks",
|
||||||
|
include_in_schema=False,
|
||||||
|
response_model=ScriptBlocksResponse,
|
||||||
|
)
|
||||||
|
async def get_workflow_script_blocks(
|
||||||
|
workflow_permanent_id: str,
|
||||||
|
block_script_request: ScriptBlocksRequest,
|
||||||
|
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||||
|
) -> ScriptBlocksResponse:
|
||||||
|
empty = ScriptBlocksResponse(blocks={})
|
||||||
|
cache_key_value = block_script_request.cache_key_value
|
||||||
|
|
||||||
|
workflow = await app.DATABASE.get_workflow_by_permanent_id(
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not workflow:
|
||||||
|
raise HTTPException(status_code=404, detail="Workflow not found")
|
||||||
|
|
||||||
|
cache_key = block_script_request.cache_key or workflow.cache_key or ""
|
||||||
|
|
||||||
|
scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value(
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
cache_key_value=cache_key_value,
|
||||||
|
cache_key=cache_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not scripts:
|
||||||
|
LOG.info(
|
||||||
|
"No scripts found for workflow",
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
cache_key_value=cache_key_value,
|
||||||
|
cache_key=cache_key,
|
||||||
|
)
|
||||||
|
return empty
|
||||||
|
|
||||||
|
first_script = scripts[0]
|
||||||
|
|
||||||
|
script_blocks = await app.DATABASE.get_script_blocks_by_script_revision_id(
|
||||||
|
script_revision_id=first_script.script_revision_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not script_blocks:
|
||||||
|
LOG.info(
|
||||||
|
"No script block found for workflow",
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
script_revision_id=first_script.script_revision_id,
|
||||||
|
)
|
||||||
|
return empty
|
||||||
|
|
||||||
|
result: dict[str, str] = {}
|
||||||
|
|
||||||
|
# TODO(jdo): make concurrent to speed up
|
||||||
|
for script_block in script_blocks:
|
||||||
|
script_file_id = script_block.script_file_id
|
||||||
|
|
||||||
|
if not script_file_id:
|
||||||
|
LOG.info(
|
||||||
|
"No script file ID found for script block",
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
script_revision_id=first_script.script_revision_id,
|
||||||
|
block_label=script_block.script_block_label,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
script_file = await app.DATABASE.get_script_file_by_id(
|
||||||
|
script_revision_id=first_script.script_revision_id,
|
||||||
|
file_id=script_file_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not script_file:
|
||||||
|
LOG.info(
|
||||||
|
"No script file found for script block",
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
script_revision_id=first_script.script_revision_id,
|
||||||
|
block_label=script_block.script_block_label,
|
||||||
|
script_file_id=script_file_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
artifact_id = script_file.artifact_id
|
||||||
|
|
||||||
|
if not artifact_id:
|
||||||
|
LOG.info(
|
||||||
|
"No artifact ID found for script file",
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
script_revision_id=first_script.script_revision_id,
|
||||||
|
block_label=script_block.script_block_label,
|
||||||
|
script_file_id=script_file_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
artifact = await app.DATABASE.get_artifact_by_id(
|
||||||
|
artifact_id,
|
||||||
|
current_org.organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not artifact:
|
||||||
|
LOG.error(
|
||||||
|
"No artifact found for script file",
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
script_revision_id=first_script.script_revision_id,
|
||||||
|
block_label=script_block.script_block_label,
|
||||||
|
script_file_id=script_file_id,
|
||||||
|
artifact_id=artifact_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
data = await app.STORAGE.retrieve_artifact(artifact)
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
LOG.error(
|
||||||
|
"No data found for artifact",
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
block_label=script_block.script_block_label,
|
||||||
|
script_revision_id=script_block.script_revision_id,
|
||||||
|
file_id=script_file_id,
|
||||||
|
artifact_id=artifact_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
decoded_data = data.decode("utf-8")
|
||||||
|
result[script_block.script_block_label] = decoded_data
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
LOG.error(
|
||||||
|
"File content is not valid UTF-8 text",
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
organization_id=current_org.organization_id,
|
||||||
|
block_label=script_block.script_block_label,
|
||||||
|
script_revision_id=script_block.script_revision_id,
|
||||||
|
file_id=script_file_id,
|
||||||
|
artifact_id=artifact_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
return ScriptBlocksResponse(blocks=result)
|
||||||
|
|||||||
@@ -137,3 +137,12 @@ class ScriptBlock(BaseModel):
|
|||||||
created_at: datetime
|
created_at: datetime
|
||||||
modified_at: datetime
|
modified_at: datetime
|
||||||
deleted_at: datetime | None = None
|
deleted_at: datetime | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class ScriptBlocksResponse(BaseModel):
|
||||||
|
blocks: dict[str, str]
|
||||||
|
|
||||||
|
|
||||||
|
class ScriptBlocksRequest(BaseModel):
|
||||||
|
cache_key_value: str
|
||||||
|
cache_key: str | None = None
|
||||||
|
|||||||
Reference in New Issue
Block a user