cache get_workflow_script_by_cache_key_value query and re-enable post action execution to generate code (#4385)
This commit is contained in:
@@ -639,5 +639,4 @@ class AgentFunction:
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def post_action_execution(self, action: Action) -> None:
|
async def post_action_execution(self, action: Action) -> None:
|
||||||
return
|
asyncio.create_task(self._post_action_execution(action))
|
||||||
# asyncio.create_task(self._post_action_execution(action))
|
|
||||||
|
|||||||
@@ -441,7 +441,7 @@ async def get_workflow_script_blocks(
|
|||||||
cache_key = block_script_request.cache_key or workflow.cache_key or ""
|
cache_key = block_script_request.cache_key or workflow.cache_key or ""
|
||||||
status = block_script_request.status
|
status = block_script_request.status
|
||||||
|
|
||||||
script = await app.DATABASE.get_workflow_script_by_cache_key_value(
|
script = await workflow_script_service.get_workflow_script_by_cache_key_value(
|
||||||
organization_id=current_org.organization_id,
|
organization_id=current_org.organization_id,
|
||||||
workflow_permanent_id=workflow_permanent_id,
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
workflow_run_id=block_script_request.workflow_run_id,
|
workflow_run_id=block_script_request.workflow_run_id,
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import base64
|
import base64
|
||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
|
from cachetools import TTLCache
|
||||||
from jinja2.sandbox import SandboxedEnvironment
|
from jinja2.sandbox import SandboxedEnvironment
|
||||||
|
|
||||||
from skyvern.config import settings
|
from skyvern.config import settings
|
||||||
@@ -17,6 +18,23 @@ from skyvern.services import script_service
|
|||||||
LOG = structlog.get_logger()
|
LOG = structlog.get_logger()
|
||||||
jinja_sandbox_env = SandboxedEnvironment()
|
jinja_sandbox_env = SandboxedEnvironment()
|
||||||
|
|
||||||
|
# Cache for workflow scripts - only stores non-None results
|
||||||
|
_workflow_script_cache: TTLCache[tuple, "Script"] = TTLCache(maxsize=128, ttl=60 * 60)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_workflow_script_cache_key(
|
||||||
|
organization_id: str,
|
||||||
|
workflow_permanent_id: str,
|
||||||
|
cache_key_value: str,
|
||||||
|
workflow_run_id: str | None = None,
|
||||||
|
cache_key: str | None = None,
|
||||||
|
statuses: list[ScriptStatus] | None = None,
|
||||||
|
) -> tuple:
|
||||||
|
"""Create a hashable cache key from the function arguments."""
|
||||||
|
# Convert list to tuple for hashability
|
||||||
|
statuses_key = tuple(statuses) if statuses else None
|
||||||
|
return (organization_id, workflow_permanent_id, cache_key_value, workflow_run_id, cache_key, statuses_key)
|
||||||
|
|
||||||
|
|
||||||
async def generate_or_update_pending_workflow_script(
|
async def generate_or_update_pending_workflow_script(
|
||||||
workflow_run: WorkflowRun,
|
workflow_run: WorkflowRun,
|
||||||
@@ -78,11 +96,12 @@ async def get_workflow_script(
|
|||||||
return None, rendered_cache_key_value
|
return None, rendered_cache_key_value
|
||||||
|
|
||||||
# Check if there are existing cached scripts for this workflow + cache_key_value
|
# Check if there are existing cached scripts for this workflow + cache_key_value
|
||||||
existing_script = await app.DATABASE.get_workflow_script_by_cache_key_value(
|
existing_script = await get_workflow_script_by_cache_key_value(
|
||||||
organization_id=workflow.organization_id,
|
organization_id=workflow.organization_id,
|
||||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||||
cache_key_value=rendered_cache_key_value,
|
cache_key_value=rendered_cache_key_value,
|
||||||
statuses=[status],
|
statuses=[status],
|
||||||
|
use_cache=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if existing_script:
|
if existing_script:
|
||||||
@@ -107,6 +126,54 @@ async def get_workflow_script(
|
|||||||
return None, rendered_cache_key_value
|
return None, rendered_cache_key_value
|
||||||
|
|
||||||
|
|
||||||
|
async def get_workflow_script_by_cache_key_value(
|
||||||
|
organization_id: str,
|
||||||
|
workflow_permanent_id: str,
|
||||||
|
cache_key_value: str,
|
||||||
|
workflow_run_id: str | None = None,
|
||||||
|
cache_key: str | None = None,
|
||||||
|
statuses: list[ScriptStatus] | None = None,
|
||||||
|
use_cache: bool = False,
|
||||||
|
) -> Script | None:
|
||||||
|
if use_cache:
|
||||||
|
cache_key_tuple = _make_workflow_script_cache_key(
|
||||||
|
organization_id=organization_id,
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
cache_key_value=cache_key_value,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
cache_key=cache_key,
|
||||||
|
statuses=statuses,
|
||||||
|
)
|
||||||
|
# Check cache first
|
||||||
|
if cache_key_tuple in _workflow_script_cache:
|
||||||
|
return _workflow_script_cache[cache_key_tuple]
|
||||||
|
|
||||||
|
# Cache miss - fetch from database
|
||||||
|
result = await app.DATABASE.get_workflow_script_by_cache_key_value(
|
||||||
|
organization_id=organization_id,
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
cache_key_value=cache_key_value,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
cache_key=cache_key,
|
||||||
|
statuses=statuses,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Only cache non-None results
|
||||||
|
if result is not None:
|
||||||
|
_workflow_script_cache[cache_key_tuple] = result
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
return await app.DATABASE.get_workflow_script_by_cache_key_value(
|
||||||
|
organization_id=organization_id,
|
||||||
|
workflow_permanent_id=workflow_permanent_id,
|
||||||
|
cache_key_value=cache_key_value,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
cache_key=cache_key,
|
||||||
|
statuses=statuses,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _load_cached_script_block_sources(
|
async def _load_cached_script_block_sources(
|
||||||
script: Script,
|
script: Script,
|
||||||
organization_id: str,
|
organization_id: str,
|
||||||
|
|||||||
Reference in New Issue
Block a user