Make PostHog async (#3786)

This commit is contained in:
Stanislav Novosad
2025-10-21 21:14:02 -06:00
committed by GitHub
parent 1b5fad9743
commit 54918a1b92
7 changed files with 31 additions and 25 deletions

View File

@@ -980,7 +980,7 @@ class ForgeAgent:
scraped_page=scraped_page,
llm_caller=llm_caller,
)
elif engine == RunEngine.ui_tars and not app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
elif engine == RunEngine.ui_tars and not await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"DISABLE_UI_TARS_CUA",
task.workflow_run_id or task.task_id,
properties={"organization_id": task.organization_id},
@@ -1374,7 +1374,7 @@ class ForgeAgent:
and complete_verification
and (task.navigation_goal or task.complete_criterion)
):
disable_user_goal_check = app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
disable_user_goal_check = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"DISABLE_USER_GOAL_CHECK",
task.task_id,
properties={"task_url": task.url, "organization_id": task.organization_id},
@@ -1857,7 +1857,7 @@ class ForgeAgent:
try:
screenshot = await browser_state.take_post_action_screenshot(
scrolling_number=scrolling_number,
use_playwright_fullpage=app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
use_playwright_fullpage=await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"ENABLE_PLAYWRIGHT_FULLPAGE",
task.workflow_run_id or task.task_id,
properties={"organization_id": task.organization_id},
@@ -2487,7 +2487,7 @@ class ForgeAgent:
if browser_state is not None and await browser_state.get_working_page() is not None:
try:
screenshot = await browser_state.take_fullpage_screenshot(
use_playwright_fullpage=app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
use_playwright_fullpage=await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"ENABLE_PLAYWRIGHT_FULLPAGE",
task.workflow_run_id or task.task_id,
properties={"organization_id": task.organization_id},

View File

@@ -12,14 +12,16 @@ class BaseExperimentationProvider(ABC):
payload_map: dict[str, dict[str, str | None]] = {}
@abstractmethod
def is_feature_enabled(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> bool:
async def is_feature_enabled(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> bool:
"""Check if a specific feature is enabled."""
def is_feature_enabled_cached(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> bool:
async def is_feature_enabled_cached(
self, feature_name: str, distinct_id: str, properties: dict | None = None
) -> bool:
if feature_name not in self.result_map:
self.result_map[feature_name] = {}
if distinct_id not in self.result_map[feature_name]:
feature_flag_value = self.is_feature_enabled(feature_name, distinct_id, properties)
feature_flag_value = await self.is_feature_enabled(feature_name, distinct_id, properties)
self.result_map[feature_name][distinct_id] = feature_flag_value
if feature_flag_value:
LOG.info("Feature flag is enabled", flag=feature_name, distinct_id=distinct_id)
@@ -27,30 +29,32 @@ class BaseExperimentationProvider(ABC):
return self.result_map[feature_name][distinct_id]
@abstractmethod
def get_value(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
async def get_value(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
"""Get the value of a feature."""
@abstractmethod
def get_payload(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
async def get_payload(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
"""Get the payload for a feature flag if it exists."""
def get_value_cached(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
async def get_value_cached(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
"""Get the value of a feature."""
if feature_name not in self.variant_map:
self.variant_map[feature_name] = {}
if distinct_id not in self.variant_map[feature_name]:
variant = self.get_value(feature_name, distinct_id, properties)
variant = await self.get_value(feature_name, distinct_id, properties)
self.variant_map[feature_name][distinct_id] = variant
if variant:
LOG.info("Feature is found", flag=feature_name, distinct_id=distinct_id, variant=variant)
return self.variant_map[feature_name][distinct_id]
def get_payload_cached(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
async def get_payload_cached(
self, feature_name: str, distinct_id: str, properties: dict | None = None
) -> str | None:
"""Get the payload for a feature flag if it exists."""
if feature_name not in self.payload_map:
self.payload_map[feature_name] = {}
if distinct_id not in self.payload_map[feature_name]:
payload = self.get_payload(feature_name, distinct_id, properties)
payload = await self.get_payload(feature_name, distinct_id, properties)
self.payload_map[feature_name][distinct_id] = payload
if payload:
LOG.info("Feature payload is found", flag=feature_name, distinct_id=distinct_id)
@@ -58,11 +62,11 @@ class BaseExperimentationProvider(ABC):
class NoOpExperimentationProvider(BaseExperimentationProvider):
def is_feature_enabled(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> bool:
async def is_feature_enabled(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> bool:
return False
def get_value(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
async def get_value(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
return None
def get_payload(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
async def get_payload(self, feature_name: str, distinct_id: str, properties: dict | None = None) -> str | None:
return None

View File

@@ -73,12 +73,12 @@ async def collect_experiment_metadata(
for flag in experiment_flags:
try:
# Get the experiment value (already cached by experimentation provider)
value = experimentation_provider.get_value_cached(
value = await experimentation_provider.get_value_cached(
flag, distinct_id, properties={"organization_id": organization_id}
)
# Get the payload if available (already cached by experimentation provider)
payload = experimentation_provider.get_payload_cached(
payload = await experimentation_provider.get_payload_cached(
flag, distinct_id, properties={"organization_id": organization_id}
)

View File

@@ -148,7 +148,9 @@ class WorkflowRunContext:
workflow_run_context.parameters[context_parameter.key] = context_parameter
# Compute once and cache whether secrets should be included in templates
workflow_run_context.include_secrets_in_templates = workflow_run_context._should_include_secrets_in_templates()
workflow_run_context.include_secrets_in_templates = (
await workflow_run_context._should_include_secrets_in_templates()
)
return workflow_run_context
@@ -203,12 +205,12 @@ class WorkflowRunContext:
label = ""
return self.blocks_metadata.get(label, BlockMetadata())
def _should_include_secrets_in_templates(self) -> bool:
async def _should_include_secrets_in_templates(self) -> bool:
"""
Check if secrets should be included in template formatting based on experimentation provider.
This check is done once per workflow run context to avoid repeated calls.
"""
return app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
return await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"CODE_BLOCK_ENABLED",
self.workflow_run_id,
properties={"organization_id": self.organization_id},

View File

@@ -351,7 +351,7 @@ class Block(BaseModel, abc.ABC):
else:
try:
screenshot = await browser_state.take_fullpage_screenshot(
use_playwright_fullpage=app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
use_playwright_fullpage=await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"ENABLE_PLAYWRIGHT_FULLPAGE",
workflow_run_id,
properties={"organization_id": str(organization_id)},
@@ -659,7 +659,7 @@ class BaseTaskBlock(Block):
try:
# add screenshot artifact for the first task
screenshot = await browser_state.take_fullpage_screenshot(
use_playwright_fullpage=app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
use_playwright_fullpage=await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"ENABLE_PLAYWRIGHT_FULLPAGE",
workflow_run_id,
properties={"organization_id": str(organization_id)},

View File

@@ -379,7 +379,7 @@ async def _take_workflow_run_block_screenshot(
LOG.warning("No browser state found when creating workflow_run_block", workflow_run_id=workflow_run_id)
else:
screenshot = await browser_state.take_fullpage_screenshot(
use_playwright_fullpage=app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
use_playwright_fullpage=await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"ENABLE_PLAYWRIGHT_FULLPAGE",
workflow_run_id,
properties={"organization_id": str(organization_id)},

View File

@@ -465,7 +465,7 @@ async def run_task_v2_helper(
current_run_id = context.run_id if context and context.run_id else task_v2_id
# task v2 can be nested inside a workflow run, so we need to use the root workflow run id
root_workflow_run_id = context.root_workflow_run_id if context and context.root_workflow_run_id else workflow_run_id
enable_parse_select_in_extract = app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
enable_parse_select_in_extract = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"ENABLE_PARSE_SELECT_IN_EXTRACT",
current_run_id,
properties={"organization_id": organization_id, "task_url": task_v2.url},