From 3f92c50a8f96260ceb60062122f5e5ba471d4556 Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Mon, 12 Aug 2024 19:36:24 +0300 Subject: [PATCH] Screen streaming under docker environment (#674) Co-authored-by: Shuchang Zheng --- .gitignore | 3 +- Dockerfile | 5 +- Dockerfile.ui | 1 + docker-compose.yml | 4 +- entrypoint-skyvern.sh | 21 +++- run_streaming.py | 38 +++++++ run_streaming.sh | 8 ++ skyvern/config.py | 3 + skyvern/forge/api_app.py | 8 +- skyvern/forge/sdk/artifact/storage/base.py | 8 ++ skyvern/forge/sdk/artifact/storage/local.py | 18 +++ skyvern/forge/sdk/routes/streaming.py | 115 ++++++++++++++++++++ 12 files changed, 222 insertions(+), 10 deletions(-) create mode 100644 run_streaming.py create mode 100755 run_streaming.sh create mode 100644 skyvern/forge/sdk/routes/streaming.py diff --git a/.gitignore b/.gitignore index b278a4a2..4bc94d70 100644 --- a/.gitignore +++ b/.gitignore @@ -165,6 +165,7 @@ traces/ *.pkl har/ postgres-data +files/ # Streamlit ignores **/secrets*.toml @@ -172,4 +173,4 @@ postgres-data ## Frontend node_modules .env.backup -.env.old \ No newline at end of file +.env.old diff --git a/Dockerfile b/Dockerfile index 87123311..3d1de87a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,7 +13,7 @@ RUN pip install --no-cache-dir --upgrade -r requirements.txt RUN pip install --no-cache-dir streamlit RUN playwright install-deps RUN playwright install -RUN apt-get install -y xauth && apt-get clean +RUN apt-get install -y xauth x11-apps netpbm && apt-get clean COPY . /app @@ -29,6 +29,3 @@ COPY ./entrypoint-streamlit.sh /app/entrypoint-streamlit.sh RUN chmod +x /app/entrypoint-streamlit.sh CMD [ "/bin/bash", "/app/entrypoint-skyvern.sh" ] - - - diff --git a/Dockerfile.ui b/Dockerfile.ui index 9f887a3f..e415829c 100644 --- a/Dockerfile.ui +++ b/Dockerfile.ui @@ -6,6 +6,7 @@ COPY ./entrypoint-skyvernui.sh /app/entrypoint-skyvernui.sh RUN npm install ENV VITE_API_BASE_URL=http://localhost:8000/api/v1 +ENV VITE_WSS_BASE_URL=ws://localhost:8000/api/v1 ENV VITE_ARTIFACT_API_BASE_URL=http://localhost:9090 CMD [ "/bin/bash", "/app/entrypoint-skyvernui.sh" ] diff --git a/docker-compose.yml b/docker-compose.yml index 25e85dd4..98948277 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,7 @@ services: - ./videos:/data/videos - ./har:/data/har - ./.streamlit:/app/.streamlit + - ./files:/tmp environment: - DATABASE_STRING=postgresql+psycopg://skyvern:skyvern@postgres:5432/skyvern - BROWSER_TYPE=chromium-headful @@ -66,7 +67,8 @@ services: - ./videos:/data/videos - ./har:/data/har - ./.streamlit:/app/.streamlit - # environment: + environment: + - VITE_WSS_BASE_URL=ws://localhost:8000/api/v1 # - VITE_API_BASE_URL= # - VITE_SKYVERN_API_KEY= depends_on: diff --git a/entrypoint-skyvern.sh b/entrypoint-skyvern.sh index 50385118..7ab0caa8 100644 --- a/entrypoint-skyvern.sh +++ b/entrypoint-skyvern.sh @@ -16,5 +16,24 @@ if [ ! -f ".streamlit/secrets.toml" ]; then echo ".streamlit/secrets.toml file updated with organization details." fi +_kill_xvfb_on_term() { + kill -TERM $xvfb +} + +# Setup a trap to catch SIGTERM and relay it to child processes +trap _kill_xvfb_on_term TERM + +echo "Starting Xvfb..." +# delete the lock file if any +rm -f /tmp/.X99-lock +# Set display environment variable +export DISPLAY=:99 +# Start Xvfb +Xvfb :99 -screen 0 1920x1080x16 & +xvfb=$! + +DISPLAY=:99 xterm 2>/dev/null & +python run_streaming.py > /dev/null & + # Run the command and pass in all three arguments -xvfb-run python -m skyvern.forge +python -m skyvern.forge diff --git a/run_streaming.py b/run_streaming.py new file mode 100644 index 00000000..c33123f1 --- /dev/null +++ b/run_streaming.py @@ -0,0 +1,38 @@ +import asyncio +import subprocess + +import structlog +import typer + +from skyvern.forge import app +from skyvern.forge.sdk.settings_manager import SettingsManager + +INTERVAL = 1 +LOG = structlog.get_logger() + + +async def run() -> None: + file_name = "skyvern_screenshot.png" + png_file_path = f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/{file_name}" + + while True: + # run subprocess to take screenshot + subprocess.run( + f"xwd -root | xwdtopnm 2>/dev/null | pnmtopng > {png_file_path}", shell=True, env={"DISPLAY": ":99"} + ) + + # upload screenshot to S3 + try: + await app.STORAGE.save_streaming_file("placeholder_org", file_name) + except Exception: + LOG.info("Failed to save screenshot") + + await asyncio.sleep(INTERVAL) + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + typer.run(main) diff --git a/run_streaming.sh b/run_streaming.sh new file mode 100755 index 00000000..ea055dc7 --- /dev/null +++ b/run_streaming.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +echo "Starting streaming..." + +while true; do + xwd -root | xwdtopnm | pnmtopng > /tmp/skyvern_screenshot.png + sleep 1 +done diff --git a/skyvern/config.py b/skyvern/config.py index e89fdc5c..68f99496 100644 --- a/skyvern/config.py +++ b/skyvern/config.py @@ -63,6 +63,9 @@ class Settings(BaseSettings): # Workflow constant parameters WORKFLOW_DOWNLOAD_DIRECTORY_PARAMETER_KEY: str = "SKYVERN_DOWNLOAD_DIRECTORY" + # streaming settings + STREAMING_FILE_BASE_PATH: str = "/tmp" + ##################### # Bitwarden Configs # ##################### diff --git a/skyvern/forge/api_app.py b/skyvern/forge/api_app.py index 1c41e73b..ecfb94c7 100644 --- a/skyvern/forge/api_app.py +++ b/skyvern/forge/api_app.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import Awaitable, Callable import structlog -from fastapi import APIRouter, FastAPI, Response, status +from fastapi import FastAPI, Response, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import ValidationError @@ -17,6 +17,7 @@ from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.db.exceptions import NotFoundError from skyvern.forge.sdk.routes.agent_protocol import base_router +from skyvern.forge.sdk.routes.streaming import websocket_router from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.scheduler import SCHEDULER @@ -30,7 +31,7 @@ class ExecutionDatePlugin(Plugin): return datetime.now() -def get_agent_app(router: APIRouter = base_router) -> FastAPI: +def get_agent_app() -> FastAPI: """ Start the agent server. """ @@ -46,7 +47,8 @@ def get_agent_app(router: APIRouter = base_router) -> FastAPI: allow_headers=["*"], ) - app.include_router(router, prefix="/api/v1") + app.include_router(base_router, prefix="/api/v1") + app.include_router(websocket_router, prefix="/api/v1/stream") app.add_middleware( RawContextMiddleware, diff --git a/skyvern/forge/sdk/artifact/storage/base.py b/skyvern/forge/sdk/artifact/storage/base.py index 0690eb7b..4bd5d383 100644 --- a/skyvern/forge/sdk/artifact/storage/base.py +++ b/skyvern/forge/sdk/artifact/storage/base.py @@ -51,3 +51,11 @@ class BaseStorage(ABC): @abstractmethod async def store_artifact_from_path(self, artifact: Artifact, path: str) -> None: pass + + @abstractmethod + async def save_streaming_file(self, organization_id: str, file_name: str) -> None: + pass + + @abstractmethod + async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None: + pass diff --git a/skyvern/forge/sdk/artifact/storage/local.py b/skyvern/forge/sdk/artifact/storage/local.py index 950b953d..1f4096a2 100644 --- a/skyvern/forge/sdk/artifact/storage/local.py +++ b/skyvern/forge/sdk/artifact/storage/local.py @@ -67,6 +67,24 @@ class LocalStorage(BaseStorage): async def get_share_links(self, artifacts: list[Artifact]) -> list[str]: return [artifact.uri for artifact in artifacts] + async def save_streaming_file(self, organization_id: str, file_name: str) -> None: + return + + async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None: + file_path = Path(f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/skyvern_screenshot.png") + if not use_default: + file_path = Path(f"{SettingsManager.get_settings().STREAMING_FILE_BASE_PATH}/{organization_id}/{file_name}") + try: + with open(file_path, "rb") as f: + return f.read() + except Exception: + LOG.exception( + "Failed to retrieve streaming file.", + organization_id=organization_id, + file_name=file_name, + ) + return None + @staticmethod def _parse_uri_to_path(uri: str) -> str: parsed_uri = urlparse(uri) diff --git a/skyvern/forge/sdk/routes/streaming.py b/skyvern/forge/sdk/routes/streaming.py new file mode 100644 index 00000000..ad815788 --- /dev/null +++ b/skyvern/forge/sdk/routes/streaming.py @@ -0,0 +1,115 @@ +import asyncio +import base64 +from datetime import datetime + +import structlog +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from pydantic import ValidationError +from websockets.exceptions import ConnectionClosedOK + +from skyvern.forge import app +from skyvern.forge.sdk.schemas.tasks import TaskStatus +from skyvern.forge.sdk.services.org_auth_service import get_current_org + +LOG = structlog.get_logger() +websocket_router = APIRouter() +STREAMING_TIMEOUT = 300 + + +@websocket_router.websocket("/tasks/{task_id}") +async def task_stream( + websocket: WebSocket, + task_id: str, + apikey: str | None = None, + token: str | None = None, +) -> None: + try: + await websocket.accept() + if not token and not apikey: + await websocket.send_text("No valid credential provided") + return + except ConnectionClosedOK: + LOG.info("ConnectionClosedOK error. Streaming won't start") + return + + try: + organization = await get_current_org(x_api_key=apikey, authorization=token) + organization_id = organization.organization_id + except Exception: + try: + await websocket.send_text("Invalid credential provided") + except ConnectionClosedOK: + LOG.info("ConnectionClosedOK error while sending invalid credential message") + return + + LOG.info("Started task streaming", task_id=task_id, organization_id=organization_id) + # timestamp last time when streaming activity happens + last_activity_timestamp = datetime.utcnow() + + try: + while True: + # if no activity for 5 minutes, close the connection + if (datetime.utcnow() - last_activity_timestamp).total_seconds() > STREAMING_TIMEOUT: + LOG.info( + "No activity for 5 minutes. Closing connection", task_id=task_id, organization_id=organization_id + ) + await websocket.send_json( + { + "task_id": task_id, + "status": "timeout", + } + ) + return + + task = await app.DATABASE.get_task(task_id=task_id, organization_id=organization_id) + if not task: + LOG.info("Task not found. Closing connection", task_id=task_id, organization_id=organization_id) + await websocket.send_json( + { + "task_id": task_id, + "status": "not_found", + } + ) + return + if task.status.is_final(): + LOG.info( + "Task is in a final state. Closing connection", + task_status=task.status, + task_id=task_id, + organization_id=organization_id, + ) + await websocket.send_json( + { + "task_id": task_id, + "status": task.status, + } + ) + return + + if task.status == TaskStatus.running: + file_name = f"{task_id}.png" + screenshot = await app.STORAGE.get_streaming_file(organization_id, file_name) + if screenshot: + encoded_screenshot = base64.b64encode(screenshot).decode("utf-8") + await websocket.send_json( + { + "task_id": task_id, + "status": task.status, + "screenshot": encoded_screenshot, + } + ) + last_activity_timestamp = datetime.utcnow() + await asyncio.sleep(2) + + except ValidationError as e: + await websocket.send_text(f"Invalid data: {e}") + except WebSocketDisconnect: + LOG.info("WebSocket connection closed") + except ConnectionClosedOK: + LOG.info("ConnectionClosedOK error while streaming", exc_info=True) + return + except Exception: + LOG.warning("Error while streaming", exc_info=True) + return + LOG.info("WebSocket connection closed successfully") + return