From fd5fdb9d3281fd7565a88c24ebf9ed3d3f586df0 Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Fri, 16 Aug 2024 08:25:10 +0300 Subject: [PATCH] add Request context to async_executor (#709) --- poetry.lock | 39 +++++++++++++++++++- pyproject.toml | 1 + skyvern/forge/sdk/executor/async_executor.py | 6 ++- skyvern/forge/sdk/routes/agent_protocol.py | 4 ++ 4 files changed, 47 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index 79898cc6..4c069c34 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3068,6 +3068,7 @@ files = [ {file = "lxml-5.2.1-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:9e2addd2d1866fe112bc6f80117bcc6bc25191c5ed1bfbcf9f1386a884252ae8"}, {file = "lxml-5.2.1-cp37-cp37m-win32.whl", hash = "sha256:f51969bac61441fd31f028d7b3b45962f3ecebf691a510495e5d2cd8c8092dbd"}, {file = "lxml-5.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:b0b58fbfa1bf7367dde8a557994e3b1637294be6cf2169810375caf8571a085c"}, + {file = "lxml-5.2.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:3e183c6e3298a2ed5af9d7a356ea823bccaab4ec2349dc9ed83999fd289d14d5"}, {file = "lxml-5.2.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:804f74efe22b6a227306dd890eecc4f8c59ff25ca35f1f14e7482bbce96ef10b"}, {file = "lxml-5.2.1-cp38-cp38-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:08802f0c56ed150cc6885ae0788a321b73505d2263ee56dad84d200cab11c07a"}, {file = "lxml-5.2.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0f8c09ed18ecb4ebf23e02b8e7a22a05d6411911e6fabef3a36e4f371f4f2585"}, @@ -5247,7 +5248,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -6191,6 +6191,30 @@ files = [ [package.dependencies] mpmath = ">=0.19" +[[package]] +name = "temporalio" +version = "1.6.0" +description = "Temporal.io Python SDK" +optional = false +python-versions = "<4.0,>=3.8" +files = [ + {file = "temporalio-1.6.0-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:50207806c5b9d701226ed2aed1fce44c688225ab9a370b014b06e51872b98ea7"}, + {file = "temporalio-1.6.0-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:499253385dd3ca1827d34a05ae61350d54040e0d6a11502f04cbafa7b35be114"}, + {file = "temporalio-1.6.0-cp38-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:8fb097b97f833483cd500af2460a0996f812e8019327d893844a21b1c7cd9868"}, + {file = "temporalio-1.6.0-cp38-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:6b25d451170ecdf8443f1ed09f75ea708e8679c26636e7aa326bc89bd6bd0c84"}, + {file = "temporalio-1.6.0-cp38-abi3-win_amd64.whl", hash = "sha256:b5ae0bea0665a0bc87d80e7d18870b32eec631694abc0610ee39235e99cc304b"}, + {file = "temporalio-1.6.0.tar.gz", hash = "sha256:a6f24ea91eb1dd1345c68f4ceb21dd2a11a84cda0d6d963d6e570a0c156a80f0"}, +] + +[package.dependencies] +protobuf = ">=3.20" +types-protobuf = ">=3.20" +typing-extensions = ">=4.2.0,<5.0.0" + +[package.extras] +grpc = ["grpcio (>=1.59.0,<2.0.0)"] +opentelemetry = ["opentelemetry-api (>=1.11.1,<2.0.0)", "opentelemetry-sdk (>=1.11.1,<2.0.0)"] + [[package]] name = "tenacity" version = "8.3.0" @@ -6567,6 +6591,17 @@ dev = ["autoflake (>=1.3.1,<2.0.0)", "flake8 (>=3.8.3,<4.0.0)", "pre-commit (>=2 doc = ["cairosvg (>=2.5.2,<3.0.0)", "mdx-include (>=1.4.1,<2.0.0)", "mkdocs (>=1.1.2,<2.0.0)", "mkdocs-material (>=8.1.4,<9.0.0)", "pillow (>=9.3.0,<10.0.0)"] test = ["black (>=22.3.0,<23.0.0)", "coverage (>=6.2,<7.0)", "isort (>=5.0.6,<6.0.0)", "mypy (==0.971)", "pytest (>=4.4.0,<8.0.0)", "pytest-cov (>=2.10.0,<5.0.0)", "pytest-sugar (>=0.9.4,<0.10.0)", "pytest-xdist (>=1.32.0,<4.0.0)", "rich (>=10.11.0,<14.0.0)", "shellingham (>=1.3.0,<2.0.0)"] +[[package]] +name = "types-protobuf" +version = "5.27.0.20240626" +description = "Typing stubs for protobuf" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-protobuf-5.27.0.20240626.tar.gz", hash = "sha256:683ba14043bade6785e3f937a7498f243b37881a91ac8d81b9202ecf8b191e9c"}, + {file = "types_protobuf-5.27.0.20240626-py3-none-any.whl", hash = "sha256:688e8f7e8d9295db26bc560df01fb731b27a25b77cbe4c1ce945647f7024f5c1"}, +] + [[package]] name = "types-python-dateutil" version = "2.9.0.20240316" @@ -7277,4 +7312,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.11,<3.12" -content-hash = "7b3fcdfead53d86754979b22e477654873d315f7381f69268e81f285bb9559b1" +content-hash = "797ed2e712b89db31580d7f9022044bb0ab0d4bea6a3a6b25c5b8ffb0b39165c" diff --git a/pyproject.toml b/pyproject.toml index a264df9b..c4d221bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ stripe = "^9.7.0" tldextract = "^5.1.2" websockets = "^12.0" email-validator = "^2.2.0" +temporalio = "^1.6.0" [tool.poetry.group.dev.dependencies] isort = "^5.13.2" diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index e28cd258..805a3956 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -1,7 +1,7 @@ import abc import structlog -from fastapi import BackgroundTasks +from fastapi import BackgroundTasks, Request from skyvern.exceptions import OrganizationNotFound from skyvern.forge import app @@ -16,6 +16,7 @@ class AsyncExecutor(abc.ABC): @abc.abstractmethod async def execute_task( self, + request: Request | None, background_tasks: BackgroundTasks, task_id: str, organization_id: str, @@ -28,6 +29,7 @@ class AsyncExecutor(abc.ABC): @abc.abstractmethod async def execute_workflow( self, + request: Request | None, background_tasks: BackgroundTasks, organization_id: str, workflow_id: str, @@ -42,6 +44,7 @@ class AsyncExecutor(abc.ABC): class BackgroundTaskExecutor(AsyncExecutor): async def execute_task( self, + request: Request | None, background_tasks: BackgroundTasks, task_id: str, organization_id: str, @@ -83,6 +86,7 @@ class BackgroundTaskExecutor(AsyncExecutor): async def execute_workflow( self, + request: Request | None, background_tasks: BackgroundTasks, organization_id: str, workflow_id: str, diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 66abfc79..f4a1cc46 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -110,6 +110,7 @@ async def check_server_status() -> Response: include_in_schema=False, ) async def create_agent_task( + request: Request, background_tasks: BackgroundTasks, task: TaskRequest, current_org: Organization = Depends(org_auth_service.get_current_org), @@ -123,6 +124,7 @@ async def create_agent_task( if x_max_steps_override: LOG.info("Overriding max steps per run", max_steps_override=x_max_steps_override) await AsyncExecutorFactory.get_executor().execute_task( + request=request, background_tasks=background_tasks, task_id=created_task.task_id, organization_id=current_org.organization_id, @@ -518,6 +520,7 @@ async def get_task_actions( include_in_schema=False, ) async def execute_workflow( + request: Request, background_tasks: BackgroundTasks, workflow_id: str, # this is the workflow_permanent_id workflow_request: WorkflowRequestBody, @@ -540,6 +543,7 @@ async def execute_workflow( if x_max_steps_override: LOG.info("Overriding max steps per run", max_steps_override=x_max_steps_override) await AsyncExecutorFactory.get_executor().execute_workflow( + request=request, background_tasks=background_tasks, organization_id=current_org.organization_id, workflow_id=workflow_run.workflow_id,