From 5f4089edf55c32578ee39b884dc652289db15ada Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Mon, 30 Sep 2024 09:56:34 -0700 Subject: [PATCH] workflow run streaming API (#887) Co-authored-by: Shuchang Zheng --- skyvern/forge/sdk/routes/streaming.py | 132 ++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/skyvern/forge/sdk/routes/streaming.py b/skyvern/forge/sdk/routes/streaming.py index a5df5e99..e11a5428 100644 --- a/skyvern/forge/sdk/routes/streaming.py +++ b/skyvern/forge/sdk/routes/streaming.py @@ -10,6 +10,7 @@ from websockets.exceptions import ConnectionClosedOK from skyvern.forge import app from skyvern.forge.sdk.schemas.tasks import TaskStatus from skyvern.forge.sdk.services.org_auth_service import get_current_org +from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus LOG = structlog.get_logger() websocket_router = APIRouter() @@ -116,3 +117,134 @@ async def task_stream( return LOG.info("WebSocket connection closed successfully", task_id=task_id, organization_id=organization_id) return + + +@websocket_router.websocket("/workflow_runs/{workflow_run_id}") +async def workflow_run_streaming( + websocket: WebSocket, + workflow_run_id: str, + apikey: str | None = None, + token: str | None = None, +) -> None: + try: + await websocket.accept() + if not token and not apikey: + await websocket.send_text("No valid credential provided") + return + except ConnectionClosedOK: + LOG.info("WofklowRun Streaming: ConnectionClosedOK error. Streaming won't start") + return + + try: + organization = await get_current_org(x_api_key=apikey, authorization=token) + organization_id = organization.organization_id + except Exception: + LOG.exception("WofklowRun Streaming: Error while getting organization", workflow_run_id=workflow_run_id) + try: + await websocket.send_text("Invalid credential provided") + except ConnectionClosedOK: + LOG.info("WofklowRun Streaming: ConnectionClosedOK error while sending invalid credential message") + return + + LOG.info( + "WofklowRun Streaming: Started workflow run streaming", + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + # timestamp last time when streaming activity happens + last_activity_timestamp = datetime.utcnow() + + try: + while True: + # if no activity for 5 minutes, close the connection + if (datetime.utcnow() - last_activity_timestamp).total_seconds() > STREAMING_TIMEOUT: + LOG.info( + "WofklowRun Streaming: No activity for 5 minutes. Closing connection", + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + await websocket.send_json( + { + "workflow_run_id": workflow_run_id, + "status": "timeout", + } + ) + return + + workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id=workflow_run_id) + if not workflow_run or workflow_run.organization_id != organization_id: + LOG.info( + "WofklowRun Streaming: Workflow not found", + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + await websocket.send_json( + { + "workflow_run_id": workflow_run_id, + "status": "not_found", + } + ) + return + if workflow_run.status in [ + WorkflowRunStatus.completed, + WorkflowRunStatus.failed, + WorkflowRunStatus.terminated, + ]: + LOG.info( + "Workflow run is in a final state. Closing connection", + workflow_run_status=workflow_run.status, + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + await websocket.send_json( + { + "workflow_run_id": workflow_run_id, + "status": workflow_run.status, + } + ) + return + + if workflow_run.status == WorkflowRunStatus.running: + file_name = f"{workflow_run_id}.png" + screenshot = await app.STORAGE.get_streaming_file(organization_id, file_name) + if screenshot: + encoded_screenshot = base64.b64encode(screenshot).decode("utf-8") + await websocket.send_json( + { + "workflow_run_id": workflow_run_id, + "status": workflow_run.status, + "screenshot": encoded_screenshot, + } + ) + last_activity_timestamp = datetime.utcnow() + await asyncio.sleep(2) + + except ValidationError as e: + await websocket.send_text(f"Invalid data: {e}") + except WebSocketDisconnect: + LOG.info( + "WofklowRun Streaming: WebSocket connection closed", + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + except ConnectionClosedOK: + LOG.info( + "WofklowRun Streaming: ConnectionClosedOK error while streaming", + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + return + except Exception: + LOG.warning( + "WofklowRun Streaming: Error while streaming", + workflow_run_id=workflow_run_id, + organization_id=organization_id, + exc_info=True, + ) + return + LOG.info( + "WofklowRun Streaming: WebSocket connection closed successfully", + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + return