Browser recording: events to blocks (#4195)
This commit is contained in:
3
skyvern/services/browser_recording/__init__.py
Normal file
3
skyvern/services/browser_recording/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .service import BrowserSessionRecordingService
|
||||
|
||||
__all__ = ["BrowserSessionRecordingService"]
|
||||
491
skyvern/services/browser_recording/service.py
Normal file
491
skyvern/services/browser_recording/service.py
Normal file
@@ -0,0 +1,491 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import pathlib
|
||||
import typing as t
|
||||
import zlib
|
||||
|
||||
import structlog
|
||||
|
||||
import skyvern.services.browser_recording.state_machines as sm
|
||||
from skyvern.client.types.workflow_definition_yaml_blocks_item import (
|
||||
WorkflowDefinitionYamlBlocksItem_Action,
|
||||
WorkflowDefinitionYamlBlocksItem_GotoUrl,
|
||||
WorkflowDefinitionYamlBlocksItem_Wait,
|
||||
)
|
||||
from skyvern.client.types.workflow_definition_yaml_parameters_item import WorkflowDefinitionYamlParametersItem_Workflow
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.prompts import prompt_engine
|
||||
from skyvern.services.browser_recording.types import (
|
||||
Action,
|
||||
ActionBlockable,
|
||||
ActionKind,
|
||||
ActionUrlChange,
|
||||
ActionWait,
|
||||
ExfiltratedCdpEvent,
|
||||
ExfiltratedConsoleEvent,
|
||||
ExfiltratedEvent,
|
||||
OutputBlock,
|
||||
)
|
||||
|
||||
LOG = structlog.get_logger(__name__)
|
||||
|
||||
# avoid decompression bombs
|
||||
MAX_BASE64_SIZE = 14 * 1024 * 1024 # ~10MB compressed + base64 overhead
|
||||
|
||||
|
||||
class Processor:
|
||||
"""
|
||||
Process browser session recordings into workflow definition blocks.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
browser_session_id: str,
|
||||
organization_id: str,
|
||||
workflow_permanent_id: str,
|
||||
) -> None:
|
||||
self.browser_session_id = browser_session_id
|
||||
self.organization_id = organization_id
|
||||
self.workflow_permanent_id = workflow_permanent_id
|
||||
|
||||
@property
|
||||
def class_name(self) -> str:
|
||||
return self.__class__.__name__
|
||||
|
||||
@property
|
||||
def identity(self) -> dict[str, str]:
|
||||
return dict(
|
||||
browser_session_id=self.browser_session_id,
|
||||
organization_id=self.organization_id,
|
||||
workflow_permanent_id=self.workflow_permanent_id,
|
||||
)
|
||||
|
||||
def decompress(self, base64_payload: str) -> bytes | None:
|
||||
"""
|
||||
Decode a base64 string, decompress it using gzip, and return it.
|
||||
"""
|
||||
|
||||
if len(base64_payload) > MAX_BASE64_SIZE:
|
||||
LOG.warning(f"{self.class_name}: base64 payload too large: {len(base64_payload)} bytes", **self.identity)
|
||||
return None
|
||||
|
||||
try:
|
||||
# base64 decode -> gzip binary data
|
||||
#
|
||||
# NOTE(llm): The data sent from btoa() is technically a "non-standard"
|
||||
# Base64, but Python's standard decoder is usually robust enough to
|
||||
# handle it.
|
||||
compressed_data: bytes = base64.b64decode(base64_payload)
|
||||
except Exception as ex:
|
||||
LOG.warning(f"{self.class_name} failed to decode Base64 payload", exc_info=ex, **self.identity)
|
||||
return None
|
||||
|
||||
try:
|
||||
# gzip decompression -> bytes
|
||||
#
|
||||
# NOTE(llm): We use zlib.decompress with wbits=16 + zlib.MAX_WBITS (31).
|
||||
# This tells zlib to automatically detect and handle Gzip headers,
|
||||
# which is essential since the browser used CompressionStream('gzip').
|
||||
# Using zlib is often faster than the higher-level gzip module for this
|
||||
# purpose.
|
||||
decompressed_bytes: bytes = zlib.decompress(compressed_data, wbits=16 + zlib.MAX_WBITS)
|
||||
except zlib.error as e:
|
||||
LOG.warning(f"{self.class_name} decompression error: {e}", **self.identity)
|
||||
# Log the error, maybe log the first few characters of the payload for debugging
|
||||
return None
|
||||
|
||||
return decompressed_bytes
|
||||
|
||||
def serialize(self, decompressed_bytes: bytes | None) -> list[dict[str, t.Any]]:
|
||||
"""
|
||||
Convert decompressed bytes into a list of events (Python list/dictionary).
|
||||
"""
|
||||
if not decompressed_bytes:
|
||||
LOG.warning(f"{self.class_name} No decompressed bytes to serialize", **self.identity)
|
||||
return []
|
||||
|
||||
try:
|
||||
# bytes -> JSON string
|
||||
json_string: str = decompressed_bytes.decode("utf-8")
|
||||
except Exception as e:
|
||||
LOG.warning(f"{self.class_name} decode error: {e}", **self.identity)
|
||||
return []
|
||||
|
||||
try:
|
||||
# JSON string -> list of dicts
|
||||
events_list: list[dict[str, t.Any]] = json.loads(json_string)
|
||||
except Exception as e:
|
||||
LOG.warning(f"{self.class_name} JSON parsing error: {e}", **self.identity)
|
||||
return []
|
||||
|
||||
if not isinstance(events_list, list):
|
||||
LOG.warning(f"{self.class_name} Expected a list of events, got:", type(events_list), **self.identity)
|
||||
return []
|
||||
|
||||
return events_list
|
||||
|
||||
def reify(self, events_list: list[dict[str, t.Any]]) -> list[ExfiltratedEvent]:
|
||||
"""
|
||||
Convert a list of event dictionaries into a list of `ExfiltratedEvent`s.
|
||||
"""
|
||||
|
||||
if not events_list:
|
||||
LOG.warning(f"{self.class_name} No events to reify", **self.identity)
|
||||
return []
|
||||
|
||||
reified_events: list[ExfiltratedEvent] = []
|
||||
for event in events_list:
|
||||
if event.get("source") == "cdp":
|
||||
try:
|
||||
reified_event = ExfiltratedCdpEvent(**event)
|
||||
except Exception as e:
|
||||
LOG.warning(f"{self.class_name} Failed to reify CDP event: {e}", **self.identity)
|
||||
continue
|
||||
elif event.get("source") == "console":
|
||||
try:
|
||||
reified_event = ExfiltratedConsoleEvent(**event)
|
||||
except Exception as e:
|
||||
LOG.warning(f"{self.class_name} Failed to reify console event: {e}", **self.identity)
|
||||
continue
|
||||
else:
|
||||
LOG.error(f"{self.class_name} Unknown event source: {event.get('source')}", **self.identity)
|
||||
continue
|
||||
reified_events.append(reified_event)
|
||||
|
||||
return reified_events
|
||||
|
||||
def compressed_chunks_to_events(self, compressed_chunks: list[str]) -> list[ExfiltratedEvent]:
|
||||
"""
|
||||
Convert a list of base64 encoded and compressed (gzip) event strings into
|
||||
a list of `ExfiltratedEvent`s.
|
||||
"""
|
||||
all_events: list[ExfiltratedEvent] = []
|
||||
|
||||
for compressed_chunk in compressed_chunks:
|
||||
decompressed = self.decompress(compressed_chunk)
|
||||
serialized = self.serialize(decompressed)
|
||||
reified = self.reify(serialized)
|
||||
all_events.extend(reified)
|
||||
|
||||
return all_events
|
||||
|
||||
def events_to_actions(
|
||||
self,
|
||||
events: list[ExfiltratedEvent],
|
||||
machines: list[sm.StateMachine] | None = None,
|
||||
) -> list[Action]:
|
||||
"""
|
||||
Convert a list of `ExfiltratedEvent`s into `Action`s.
|
||||
"""
|
||||
actions: list[Action] = []
|
||||
|
||||
machines = machines or [
|
||||
sm.Click(),
|
||||
sm.Hover(),
|
||||
sm.InputText(),
|
||||
sm.UrlChange(),
|
||||
sm.Wait(),
|
||||
]
|
||||
|
||||
for event in events:
|
||||
for machine in machines:
|
||||
action = machine.tick(event, actions)
|
||||
|
||||
if not action:
|
||||
continue
|
||||
|
||||
allow_action = True
|
||||
|
||||
for m in machines:
|
||||
if not m.on_action(action, actions):
|
||||
allow_action = False
|
||||
LOG.debug(
|
||||
f"{self.class_name} action vetoed by state machine {m.__class__.__name__}",
|
||||
action=action,
|
||||
**self.identity,
|
||||
)
|
||||
|
||||
if allow_action:
|
||||
actions.append(action)
|
||||
else:
|
||||
# if an action was vetoed, we do not allow further processing
|
||||
# of this event through subsequent state machines
|
||||
break
|
||||
|
||||
return actions
|
||||
|
||||
def dedupe_block_labels(self, suspects: list[OutputBlock]) -> list[OutputBlock]:
|
||||
"""
|
||||
Detect if any block labels are duplicated, and, if so, rename them for
|
||||
uniqueness.
|
||||
"""
|
||||
|
||||
blocks: list[OutputBlock] = []
|
||||
labels: set[str] = set()
|
||||
|
||||
for block in suspects:
|
||||
if block.label not in labels:
|
||||
labels.add(block.label)
|
||||
blocks.append(block)
|
||||
continue
|
||||
else:
|
||||
original_label = block.label
|
||||
count = 0
|
||||
while True:
|
||||
new_label = f"{original_label}_{count}"
|
||||
if new_label not in labels:
|
||||
cls = block.__class__
|
||||
data = block.model_dump() | {"label": new_label}
|
||||
new_block = cls(**data)
|
||||
blocks.append(new_block)
|
||||
labels.add(new_label)
|
||||
break
|
||||
count += 1
|
||||
|
||||
return blocks
|
||||
|
||||
async def actions_to_blocks(self, actions: list[Action]) -> list[OutputBlock]:
|
||||
"""
|
||||
Convert a list of `Action` objects into workflow definition (YAML) blocks.
|
||||
"""
|
||||
tasks: list[asyncio.Task] = []
|
||||
|
||||
for action in actions:
|
||||
action_kind = action.kind.value
|
||||
|
||||
match action.kind:
|
||||
case ActionKind.CLICK | ActionKind.HOVER | ActionKind.INPUT_TEXT:
|
||||
task = asyncio.create_task(self.create_action_block(action))
|
||||
tasks.append(task)
|
||||
case ActionKind.URL_CHANGE:
|
||||
task = asyncio.create_task(self.create_url_block(action))
|
||||
tasks.append(task)
|
||||
case ActionKind.WAIT:
|
||||
task = asyncio.create_task(self.create_wait_block(action))
|
||||
tasks.append(task)
|
||||
case _:
|
||||
LOG.warning(
|
||||
f"{self.class_name} Unknown action kind: {action_kind}",
|
||||
action=action,
|
||||
**self.identity,
|
||||
)
|
||||
continue
|
||||
|
||||
blocks: list[OutputBlock] = await asyncio.gather(*tasks)
|
||||
|
||||
blocks = self.dedupe_block_labels(blocks)
|
||||
|
||||
return blocks
|
||||
|
||||
def blocks_to_parameters(self, blocks: list[OutputBlock]) -> list[WorkflowDefinitionYamlParametersItem_Workflow]:
|
||||
"""
|
||||
Convert a list of workflow definition (YAML) blocks into workflow definition (YAML) parameters.
|
||||
"""
|
||||
parameter_names: set[str] = set()
|
||||
|
||||
for block in blocks:
|
||||
if isinstance(block, WorkflowDefinitionYamlBlocksItem_Action):
|
||||
for param_name in block.parameter_keys or []:
|
||||
parameter_names.add(param_name)
|
||||
|
||||
parameters: list[WorkflowDefinitionYamlParametersItem_Workflow] = []
|
||||
|
||||
for param_name in parameter_names:
|
||||
parameter = WorkflowDefinitionYamlParametersItem_Workflow(
|
||||
key=param_name,
|
||||
workflow_parameter_type="string",
|
||||
default_value="",
|
||||
description="",
|
||||
)
|
||||
parameters.append(parameter)
|
||||
|
||||
return parameters
|
||||
|
||||
async def create_action_block(self, action: ActionBlockable) -> WorkflowDefinitionYamlBlocksItem_Action:
|
||||
"""
|
||||
Create a YAML action block from an `ActionBlockable`.
|
||||
"""
|
||||
|
||||
DEFAULT_BLOCK_TITLE = "Browser Action"
|
||||
|
||||
if action.kind == ActionKind.INPUT_TEXT:
|
||||
prompt_name = "recording-action-block-prompt-input-text"
|
||||
else:
|
||||
prompt_name = "recording-action-block-prompt"
|
||||
|
||||
metadata_prompt = prompt_engine.load_prompt(
|
||||
prompt_name,
|
||||
action=action,
|
||||
)
|
||||
|
||||
metadata_response = await app.LLM_API_HANDLER(
|
||||
prompt=metadata_prompt,
|
||||
prompt_name=prompt_name,
|
||||
organization_id=self.organization_id,
|
||||
)
|
||||
|
||||
block_label: str = metadata_response.get("block_label", None) or "act"
|
||||
title: str = metadata_response.get("title", None) or DEFAULT_BLOCK_TITLE
|
||||
navigation_goal: str = metadata_response.get("prompt", "")
|
||||
parameter_name: dict | None = metadata_response.get("parameter_name", None)
|
||||
|
||||
block = WorkflowDefinitionYamlBlocksItem_Action(
|
||||
label=block_label,
|
||||
title=title,
|
||||
navigation_goal=navigation_goal,
|
||||
error_code_mapping=None,
|
||||
parameters=[parameter_name] if parameter_name else [], # sic(jdo): the frontend requires this
|
||||
parameter_keys=[parameter_name.get("key")] if parameter_name else [],
|
||||
)
|
||||
|
||||
return block
|
||||
|
||||
async def create_url_block(self, action: ActionUrlChange) -> WorkflowDefinitionYamlBlocksItem_GotoUrl:
|
||||
"""
|
||||
Create a YAML goto URL block from an `ActionUrlChange`.
|
||||
"""
|
||||
|
||||
prompt_name = "recording-go-to-url-block-prompt"
|
||||
|
||||
metadata_prompt = prompt_engine.load_prompt(
|
||||
prompt_name,
|
||||
action=action,
|
||||
)
|
||||
|
||||
metadata_response = await app.LLM_API_HANDLER(
|
||||
prompt=metadata_prompt,
|
||||
prompt_name=prompt_name,
|
||||
organization_id=self.organization_id,
|
||||
)
|
||||
|
||||
block_label: str = metadata_response.get("block_label", None) or "goto_url"
|
||||
|
||||
block = WorkflowDefinitionYamlBlocksItem_GotoUrl(
|
||||
label=block_label,
|
||||
url=action.url,
|
||||
)
|
||||
|
||||
return block
|
||||
|
||||
async def create_wait_block(self, action: ActionWait) -> WorkflowDefinitionYamlBlocksItem_Wait:
|
||||
"""
|
||||
Create a YAML wait block from an `ActionWait`.
|
||||
"""
|
||||
|
||||
prompt_name = "recording-wait-block-prompt"
|
||||
|
||||
metadata_prompt = prompt_engine.load_prompt(
|
||||
prompt_name,
|
||||
action=action,
|
||||
)
|
||||
|
||||
metadata_response = await app.LLM_API_HANDLER(
|
||||
prompt=metadata_prompt,
|
||||
prompt_name=prompt_name,
|
||||
organization_id=self.organization_id,
|
||||
)
|
||||
|
||||
block_label: str = metadata_response.get("block_label", None) or "wait"
|
||||
|
||||
block = WorkflowDefinitionYamlBlocksItem_Wait(
|
||||
label=block_label,
|
||||
wait_sec=int(max(action.duration_ms / 1000.0, ActionWait.MIN_DURATION_THRESHOLD_MS / 1000.0)),
|
||||
)
|
||||
|
||||
return block
|
||||
|
||||
async def process(
|
||||
self, compressed_chunks: list[str]
|
||||
) -> tuple[list[OutputBlock], list[WorkflowDefinitionYamlParametersItem_Workflow]]:
|
||||
"""
|
||||
Process the compressed browser session recording into workflow definition blocks.
|
||||
"""
|
||||
|
||||
events = self.compressed_chunks_to_events(compressed_chunks)
|
||||
actions = self.events_to_actions(events)
|
||||
blocks = await self.actions_to_blocks(actions)
|
||||
parameters = self.blocks_to_parameters(blocks)
|
||||
|
||||
return blocks, parameters
|
||||
|
||||
|
||||
class BrowserSessionRecordingService:
|
||||
async def process_recording(
|
||||
self,
|
||||
browser_session_id: str,
|
||||
organization_id: str,
|
||||
workflow_permanent_id: str,
|
||||
compressed_chunks: list[str],
|
||||
) -> tuple[list[OutputBlock], list[WorkflowDefinitionYamlParametersItem_Workflow]]:
|
||||
"""
|
||||
Process compressed browser session recording events into workflow definition blocks.
|
||||
"""
|
||||
processor = Processor(
|
||||
browser_session_id,
|
||||
organization_id,
|
||||
workflow_permanent_id,
|
||||
)
|
||||
|
||||
return await processor.process(compressed_chunks)
|
||||
|
||||
|
||||
async def smoke() -> None:
|
||||
with open(pathlib.Path("/path/to/uncompressed/events.json")) as f:
|
||||
raw_events: list[dict] = json.load(f)
|
||||
|
||||
events: list[ExfiltratedEvent] = []
|
||||
|
||||
for i, raw_event in enumerate(raw_events):
|
||||
if not isinstance(raw_event, dict):
|
||||
LOG.debug(f"~ skipping non-dict event: {raw_event}")
|
||||
continue
|
||||
if raw_event.get("source") == "cdp":
|
||||
try:
|
||||
event = ExfiltratedCdpEvent(**raw_event)
|
||||
except Exception:
|
||||
LOG.exception(f"{i} Failed to parse exfiltrated CDP event")
|
||||
LOG.debug(f"~ raw event: {json.dumps(raw_event, sort_keys=True, indent=2)}")
|
||||
continue
|
||||
events.append(event)
|
||||
elif raw_event.get("source") == "console":
|
||||
event = ExfiltratedConsoleEvent(**raw_event)
|
||||
events.append(event)
|
||||
|
||||
LOG.debug(f"{len(events)} events.")
|
||||
|
||||
my_local_org_id = "o_389844905020748346"
|
||||
processor = Processor("pbs_123", my_local_org_id, "wpid_123")
|
||||
actions = processor.events_to_actions(events)
|
||||
|
||||
LOG.debug(f"{len(actions)} actions:")
|
||||
|
||||
for action in actions:
|
||||
id = action.target.sky_id if action.target.sky_id else action.target.id
|
||||
text = ",".join(action.target.texts or [])
|
||||
LOG.debug(f" {action.kind} [{id}] [{text}] @ {action.url}")
|
||||
|
||||
blocks = await processor.actions_to_blocks(actions)
|
||||
|
||||
LOG.debug(f"{len(blocks)} blocks:")
|
||||
|
||||
for block in blocks:
|
||||
LOG.debug(f" {block.label}")
|
||||
|
||||
if isinstance(block, WorkflowDefinitionYamlBlocksItem_Action):
|
||||
LOG.debug(f" title: {block.title}")
|
||||
LOG.debug(f" nav goal: {block.navigation_goal}")
|
||||
|
||||
if isinstance(block, WorkflowDefinitionYamlBlocksItem_GotoUrl):
|
||||
LOG.debug(f" url: {block.url}")
|
||||
|
||||
if isinstance(block, WorkflowDefinitionYamlBlocksItem_Wait):
|
||||
LOG.debug(f" wait sec: {block.wait_sec}")
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# from skyvern.forge.forge_app_initializer import start_forge_app
|
||||
|
||||
# start_forge_app()
|
||||
|
||||
# asyncio.run(smoke())
|
||||
@@ -0,0 +1,15 @@
|
||||
from .click import StateMachineClick as Click
|
||||
from .hover import StateMachineHover as Hover
|
||||
from .input_text import StateMachineInputText as InputText
|
||||
from .state_machine import StateMachine
|
||||
from .url_change import StateMachineUrlChange as UrlChange
|
||||
from .wait import StateMachineWait as Wait
|
||||
|
||||
__all__ = [
|
||||
"Click",
|
||||
"Hover",
|
||||
"InputText",
|
||||
"StateMachine",
|
||||
"UrlChange",
|
||||
"Wait",
|
||||
]
|
||||
89
skyvern/services/browser_recording/state_machines/click.py
Normal file
89
skyvern/services/browser_recording/state_machines/click.py
Normal file
@@ -0,0 +1,89 @@
|
||||
import typing as t
|
||||
|
||||
import structlog
|
||||
|
||||
from skyvern.services.browser_recording.types import (
|
||||
Action,
|
||||
ActionClick,
|
||||
ActionKind,
|
||||
ActionTarget,
|
||||
EventTarget,
|
||||
ExfiltratedEvent,
|
||||
Mouse,
|
||||
MousePosition,
|
||||
)
|
||||
|
||||
from .state_machine import StateMachine
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
|
||||
|
||||
class StateMachineClick(StateMachine):
|
||||
state: t.Literal["void"] = "void"
|
||||
target: EventTarget | None = None
|
||||
timestamp: float | None = None
|
||||
mouse: MousePosition | None = None
|
||||
url: str | None = None
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.reset()
|
||||
|
||||
def tick(self, event: ExfiltratedEvent, current_actions: list[Action]) -> ActionClick | None:
|
||||
if event.source != "console":
|
||||
return None
|
||||
|
||||
if event.params.type != "click":
|
||||
if event.params.mousePosition:
|
||||
if event.params.mousePosition.xp is not None and event.params.mousePosition.yp is not None:
|
||||
self.mouse = event.params.mousePosition
|
||||
return None
|
||||
|
||||
LOG.debug(f"~ click detected [{event.params.target.skyId or event.params.target.id}]")
|
||||
|
||||
self.target = event.params.target
|
||||
self.timestamp = event.params.timestamp
|
||||
self.url = event.params.url
|
||||
|
||||
if event.params.mousePosition:
|
||||
self.mouse = event.params.mousePosition
|
||||
|
||||
return self.emit(event)
|
||||
|
||||
def emit(self, event: ExfiltratedEvent) -> ActionClick | None:
|
||||
if not self.target:
|
||||
LOG.debug("~ cannot emit click, missing target; resetting")
|
||||
self.reset()
|
||||
return None
|
||||
|
||||
xp = (self.mouse.xp or -1) if self.mouse else None
|
||||
yp = (self.mouse.yp or -1) if self.mouse else None
|
||||
|
||||
LOG.debug("~ emitting click action", exfiltrated_event=event)
|
||||
|
||||
action_target = ActionTarget(
|
||||
class_name=self.target.className,
|
||||
id=self.target.id,
|
||||
mouse=Mouse(xp=xp, yp=yp),
|
||||
sky_id=self.target.skyId,
|
||||
tag_name=self.target.tagName,
|
||||
texts=self.target.text,
|
||||
)
|
||||
|
||||
action = ActionClick(
|
||||
kind=ActionKind.CLICK.value,
|
||||
target=action_target,
|
||||
timestamp_start=self.timestamp,
|
||||
timestamp_end=self.timestamp,
|
||||
url=self.url,
|
||||
)
|
||||
|
||||
self.reset()
|
||||
|
||||
return action
|
||||
|
||||
def reset(self) -> None:
|
||||
self.state = "void"
|
||||
self.target = None
|
||||
self.timestamp = None
|
||||
self.mouse = None
|
||||
self.url = None
|
||||
154
skyvern/services/browser_recording/state_machines/hover.py
Normal file
154
skyvern/services/browser_recording/state_machines/hover.py
Normal file
@@ -0,0 +1,154 @@
|
||||
import typing as t
|
||||
|
||||
import structlog
|
||||
|
||||
from skyvern.services.browser_recording.types import (
|
||||
Action,
|
||||
ActionHover,
|
||||
ActionKind,
|
||||
ActionTarget,
|
||||
EventTarget,
|
||||
ExfiltratedEvent,
|
||||
Mouse,
|
||||
MousePosition,
|
||||
target_has_changed,
|
||||
)
|
||||
|
||||
from .state_machine import StateMachine
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
|
||||
|
||||
class StateMachineHover(StateMachine):
|
||||
state: t.Literal["mouseenter", "next-event"] = "mouseenter"
|
||||
target: EventTarget | None = None
|
||||
timestamp_start: float | None = None
|
||||
timestamp_end: float | None = None
|
||||
mouse: MousePosition | None = None
|
||||
# --
|
||||
threshold_ms: int = ActionHover.DURATION_THRESHOLD_MS
|
||||
|
||||
def __init__(self, threshold_ms: int | None = None) -> None:
|
||||
self.threshold_ms = max(
|
||||
threshold_ms or ActionHover.DURATION_THRESHOLD_MS,
|
||||
ActionHover.MIN_DURATION_THRESHOLD_MS,
|
||||
)
|
||||
|
||||
self.reset()
|
||||
|
||||
def tick(self, event: ExfiltratedEvent, current_actions: list[Action]) -> ActionHover | None:
|
||||
if event.source != "console":
|
||||
return None
|
||||
|
||||
match self.state:
|
||||
case "mouseenter":
|
||||
if event.params.mousePosition:
|
||||
if event.params.mousePosition.xp is not None and event.params.mousePosition.yp is not None:
|
||||
self.mouse = event.params.mousePosition
|
||||
|
||||
if event.params.type != "mouseenter":
|
||||
return None
|
||||
|
||||
text = ",".join(event.params.target.text or [])
|
||||
LOG.debug(
|
||||
f"~ hover: mouseenter detected [{event.params.target.skyId or event.params.target.id}] [{text}]"
|
||||
)
|
||||
|
||||
self.target = event.params.target
|
||||
self.state = "next-event"
|
||||
self.timestamp_start = event.params.timestamp
|
||||
|
||||
case "next-event":
|
||||
if event.params.type == "mouseenter":
|
||||
if target_has_changed(self.target, event.params.target):
|
||||
self.timestamp_start = event.params.timestamp
|
||||
self.target = event.params.target
|
||||
return None
|
||||
|
||||
event_end = event.params.timestamp
|
||||
|
||||
if not self.timestamp_start:
|
||||
LOG.debug("~ missing hover start timestamp, resetting")
|
||||
self.reset()
|
||||
|
||||
return None
|
||||
|
||||
duration_ms = int(event_end - self.timestamp_start)
|
||||
|
||||
if event.params.type == "mousemove":
|
||||
target = event.params.target
|
||||
|
||||
if not target_has_changed(self.target, target):
|
||||
return None
|
||||
|
||||
if duration_ms < self.threshold_ms:
|
||||
LOG.debug(f"~ hover duration {duration_ms}ms below threshold {self.threshold_ms}ms, resetting")
|
||||
self.reset()
|
||||
|
||||
return None
|
||||
|
||||
self.timestamp_end = event.params.timestamp
|
||||
LOG.debug(
|
||||
f"~ hover duration {duration_ms}ms meets threshold {self.threshold_ms}ms, emitting",
|
||||
start=self.timestamp_start,
|
||||
end=self.timestamp_end,
|
||||
)
|
||||
|
||||
# dedupe consecutive hover actions on same target
|
||||
if current_actions:
|
||||
last_action = current_actions[-1]
|
||||
|
||||
if last_action.kind == ActionKind.HOVER and self.target:
|
||||
target_id = self.target.skyId or self.target.id
|
||||
last_action_target_id = last_action.target.sky_id or last_action.target.id
|
||||
|
||||
if target_id:
|
||||
if target_id == last_action_target_id:
|
||||
LOG.debug("~ hover: duplicate hover action - skipping", target=self.target)
|
||||
self.reset()
|
||||
|
||||
return None
|
||||
|
||||
return self.emit(event)
|
||||
|
||||
return None
|
||||
|
||||
def emit(self, event: ExfiltratedEvent) -> ActionHover | None:
|
||||
if not self.target:
|
||||
LOG.debug("~ cannot emit hover, missing target; resetting")
|
||||
self.reset()
|
||||
|
||||
return None
|
||||
|
||||
xp = (self.mouse.xp or -1) if self.mouse else None
|
||||
yp = (self.mouse.yp or -1) if self.mouse else None
|
||||
|
||||
LOG.debug("~ emitting hover action", exfiltrated_event=event)
|
||||
|
||||
action_target = ActionTarget(
|
||||
class_name=self.target.className,
|
||||
id=self.target.id,
|
||||
mouse=Mouse(xp=xp, yp=yp),
|
||||
sky_id=self.target.skyId,
|
||||
tag_name=self.target.tagName,
|
||||
texts=self.target.text,
|
||||
)
|
||||
|
||||
action = ActionHover(
|
||||
kind=ActionKind.HOVER.value,
|
||||
target=action_target,
|
||||
timestamp_start=self.timestamp_start or -1,
|
||||
timestamp_end=self.timestamp_end or -1,
|
||||
url=event.params.url,
|
||||
)
|
||||
|
||||
self.reset()
|
||||
|
||||
return action
|
||||
|
||||
def reset(self) -> None:
|
||||
self.state = "mouseenter"
|
||||
self.target = None
|
||||
self.timestamp_start = None
|
||||
self.timestamp_end = None
|
||||
self.mouse = None
|
||||
156
skyvern/services/browser_recording/state_machines/input_text.py
Normal file
156
skyvern/services/browser_recording/state_machines/input_text.py
Normal file
@@ -0,0 +1,156 @@
|
||||
import typing as t
|
||||
|
||||
import structlog
|
||||
|
||||
from skyvern.services.browser_recording.types import (
|
||||
Action,
|
||||
ActionInputText,
|
||||
ActionKind,
|
||||
ActionTarget,
|
||||
EventTarget,
|
||||
ExfiltratedEvent,
|
||||
Mouse,
|
||||
MousePosition,
|
||||
target_has_changed,
|
||||
)
|
||||
|
||||
from .state_machine import StateMachine
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
|
||||
|
||||
class StateMachineInputText(StateMachine):
|
||||
state: t.Literal["focus", "keydown", "blur"] = "focus"
|
||||
target: EventTarget | None = None
|
||||
timestamp_start: float | None = None
|
||||
mouse: MousePosition | None = None
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.reset()
|
||||
|
||||
def tick(self, event: ExfiltratedEvent, current_actions: list[Action]) -> ActionInputText | None:
|
||||
if event.source != "console":
|
||||
return None
|
||||
|
||||
match self.state:
|
||||
case "focus":
|
||||
if event.params.type != "focus":
|
||||
if event.params.mousePosition:
|
||||
if event.params.mousePosition.xp is not None and event.params.mousePosition.yp is not None:
|
||||
self.mouse = event.params.mousePosition
|
||||
|
||||
return None
|
||||
|
||||
LOG.debug(f"~ focus detected [{event.params.target.skyId or event.params.target.id}]")
|
||||
|
||||
if event.params.type == "focus":
|
||||
self.target = event.params.target
|
||||
self.state = "keydown"
|
||||
self.timestamp_start = event.params.timestamp
|
||||
|
||||
case "keydown":
|
||||
if event.params.type != "keydown":
|
||||
return None
|
||||
|
||||
if self.update_target(event.params.target):
|
||||
return None
|
||||
|
||||
LOG.debug(f"~ initial keydown detected '{event.params.key}'")
|
||||
|
||||
self.state = "blur"
|
||||
case "blur":
|
||||
if event.params.type == "keydown":
|
||||
LOG.debug(f"~ input-text: subsequent keydown detected '{event.params.key}'")
|
||||
|
||||
if event.params.key == "Enter":
|
||||
return self.emit(event)
|
||||
|
||||
return None
|
||||
|
||||
if event.params.type != "blur":
|
||||
return None
|
||||
|
||||
if self.update_target(event.params.target):
|
||||
return None
|
||||
|
||||
LOG.debug("~ blur detected")
|
||||
|
||||
return self.emit(event)
|
||||
|
||||
return None
|
||||
|
||||
def update_target(self, event_target: EventTarget) -> bool:
|
||||
if target_has_changed(self.target, event_target):
|
||||
self.target = event_target
|
||||
|
||||
LOG.debug("~ input-text target changed, resetting state machine")
|
||||
|
||||
self.reset()
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def emit(self, event: ExfiltratedEvent) -> ActionInputText | None:
|
||||
if not self.target:
|
||||
LOG.debug("~ cannot emit, missing target or mouse; resetting")
|
||||
|
||||
self.reset()
|
||||
return None
|
||||
|
||||
xp = (self.mouse.xp or -1) if self.mouse else None
|
||||
yp = (self.mouse.yp or -1) if self.mouse else None
|
||||
|
||||
LOG.debug("~ emitting input text action", exfiltrated_event=event)
|
||||
|
||||
input_value = event.params.target.value
|
||||
|
||||
if input_value is None:
|
||||
LOG.debug("~ cannot emit, missing input value; resetting")
|
||||
|
||||
self.reset()
|
||||
return None
|
||||
|
||||
action_target = ActionTarget(
|
||||
class_name=self.target.className,
|
||||
id=self.target.id,
|
||||
mouse=Mouse(xp=xp, yp=yp),
|
||||
sky_id=self.target.skyId,
|
||||
tag_name=self.target.tagName,
|
||||
texts=self.target.text,
|
||||
)
|
||||
|
||||
action = ActionInputText(
|
||||
kind=ActionKind.INPUT_TEXT.value,
|
||||
target=action_target,
|
||||
timestamp_start=self.timestamp_start,
|
||||
timestamp_end=event.params.timestamp,
|
||||
url=event.params.url,
|
||||
input_value=str(input_value),
|
||||
)
|
||||
|
||||
self.reset()
|
||||
|
||||
return action
|
||||
|
||||
def on_action(self, action: Action, current_actions: list[Action]) -> bool:
|
||||
if action.kind == ActionKind.CLICK:
|
||||
# NOTE(jdo): skipping self.reset here; a focus event on an element can often be followed by a
|
||||
# click event, and the identity doesn't always match due to nesting. I think a more precise
|
||||
# check would be to:
|
||||
# - check identity match for click and focus; if not matching:
|
||||
# - ask the browser via cdp if there is a nesting relation between the two elements
|
||||
# - if yes, allow and carry on, otherwise reset
|
||||
# That's another round trip. It's likely pretty fast, tho. For now, we'll just assume a click does
|
||||
# not invalidate the state of the input text state machine.
|
||||
return True
|
||||
|
||||
self.reset()
|
||||
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self.state = "focus"
|
||||
self.target = None
|
||||
self.mouse = None
|
||||
self.timestamp_start = None
|
||||
@@ -0,0 +1,30 @@
|
||||
from skyvern.services.browser_recording.types import (
|
||||
Action,
|
||||
ExfiltratedEvent,
|
||||
)
|
||||
|
||||
|
||||
class StateMachine:
|
||||
"""
|
||||
A minimal, concrete StateMachineProtocol.
|
||||
"""
|
||||
|
||||
state: str
|
||||
|
||||
def tick(self, event: ExfiltratedEvent, current_actions: list[Action]) -> Action | None:
|
||||
return None
|
||||
|
||||
def on_action(self, action: Action, current_actions: list[Action]) -> bool:
|
||||
"""
|
||||
Optional callback when an action is emitted by _any_ state machine.
|
||||
Default is that a state machine resets.
|
||||
|
||||
Return `True` (the default) to allow the action to proceed; return `False`
|
||||
to veto the action.
|
||||
"""
|
||||
self.reset()
|
||||
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
@@ -0,0 +1,91 @@
|
||||
import typing as t
|
||||
|
||||
import structlog
|
||||
|
||||
from skyvern.services.browser_recording.types import (
|
||||
Action,
|
||||
ActionKind,
|
||||
ActionTarget,
|
||||
ActionUrlChange,
|
||||
ExfiltratedEvent,
|
||||
Mouse,
|
||||
)
|
||||
|
||||
from .state_machine import StateMachine
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
|
||||
|
||||
class StateMachineUrlChange(StateMachine):
|
||||
state: t.Literal["void"] = "void"
|
||||
last_url: str | None = None
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.reset()
|
||||
|
||||
def tick(self, event: ExfiltratedEvent, current_actions: list[Action]) -> ActionUrlChange | None:
|
||||
if event.source != "cdp":
|
||||
return None
|
||||
|
||||
if not event.event_name.startswith("nav:"):
|
||||
return None
|
||||
|
||||
if event.event_name == "nav:frame_started_navigating":
|
||||
url = event.params.url
|
||||
|
||||
if url == self.last_url:
|
||||
LOG.debug("~ ignoring navigation to same URL", url=url)
|
||||
return None
|
||||
else:
|
||||
if event.params.frame:
|
||||
self.last_url = event.params.frame.url
|
||||
elif event.params.url:
|
||||
self.last_url = event.params.url
|
||||
|
||||
return None
|
||||
|
||||
if not url:
|
||||
return None
|
||||
|
||||
self.last_url = url
|
||||
|
||||
LOG.debug("~ emitting URL change action", url=url)
|
||||
|
||||
action_target = ActionTarget(
|
||||
class_name=None,
|
||||
id=None,
|
||||
mouse=Mouse(xp=None, yp=None),
|
||||
sky_id=None,
|
||||
tag_name=None,
|
||||
texts=[],
|
||||
)
|
||||
|
||||
return ActionUrlChange(
|
||||
kind=ActionKind.URL_CHANGE.value,
|
||||
target=action_target,
|
||||
timestamp_start=event.timestamp,
|
||||
timestamp_end=event.timestamp,
|
||||
url=url,
|
||||
)
|
||||
|
||||
def on_action(self, action: Action, current_actions: list[Action]) -> bool:
|
||||
if action.kind != ActionKind.URL_CHANGE:
|
||||
return True
|
||||
|
||||
if not current_actions:
|
||||
return True
|
||||
|
||||
last_action = current_actions[-1]
|
||||
|
||||
if last_action.kind != ActionKind.URL_CHANGE:
|
||||
return True
|
||||
|
||||
if last_action.url == action.url:
|
||||
LOG.debug("~ vetoing duplicate URL change action", url=action.url)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self.state = "void"
|
||||
self.last_url = None
|
||||
78
skyvern/services/browser_recording/state_machines/wait.py
Normal file
78
skyvern/services/browser_recording/state_machines/wait.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import typing as t
|
||||
|
||||
import structlog
|
||||
|
||||
from skyvern.services.browser_recording.types import (
|
||||
Action,
|
||||
ActionKind,
|
||||
ActionTarget,
|
||||
ActionWait,
|
||||
ExfiltratedEvent,
|
||||
Mouse,
|
||||
)
|
||||
|
||||
from .state_machine import StateMachine
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
|
||||
|
||||
class StateMachineWait(StateMachine):
|
||||
state: t.Literal["void"] = "void"
|
||||
last_event_timestamp: float | None = None
|
||||
threshold_ms: int = ActionWait.MIN_DURATION_THRESHOLD_MS
|
||||
|
||||
def __init__(self, threshold_ms: int | None = None) -> None:
|
||||
self.threshold_ms = max(
|
||||
threshold_ms or ActionWait.MIN_DURATION_THRESHOLD_MS,
|
||||
ActionWait.MIN_DURATION_THRESHOLD_MS,
|
||||
)
|
||||
|
||||
self.reset()
|
||||
|
||||
def tick(self, event: ExfiltratedEvent, current_actions: list[Action]) -> ActionWait | None:
|
||||
if event.source != "console":
|
||||
return None
|
||||
|
||||
if self.last_event_timestamp is not None:
|
||||
duration_ms = int(event.params.timestamp - self.last_event_timestamp)
|
||||
|
||||
if duration_ms >= self.threshold_ms:
|
||||
LOG.debug("~ emitting wait action", duration_ms=duration_ms)
|
||||
|
||||
action_target = ActionTarget(
|
||||
class_name=None,
|
||||
id=None,
|
||||
mouse=Mouse(xp=None, yp=None),
|
||||
sky_id=None,
|
||||
tag_name=None,
|
||||
texts=[],
|
||||
)
|
||||
|
||||
action = ActionWait(
|
||||
kind=ActionKind.WAIT.value,
|
||||
target=action_target,
|
||||
timestamp_start=self.last_event_timestamp,
|
||||
timestamp_end=event.params.timestamp,
|
||||
url=event.params.url,
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
|
||||
self.reset()
|
||||
|
||||
return action
|
||||
|
||||
self.last_event_timestamp = event.params.timestamp
|
||||
|
||||
return None
|
||||
|
||||
def on_action(self, action: Action, current_actions: list[Action]) -> bool:
|
||||
if action.kind == ActionKind.HOVER:
|
||||
return True
|
||||
|
||||
self.reset()
|
||||
|
||||
return True
|
||||
|
||||
def reset(self) -> None:
|
||||
self.state = "void"
|
||||
self.last_event_timestamp = None
|
||||
243
skyvern/services/browser_recording/types.py
Normal file
243
skyvern/services/browser_recording/types.py
Normal file
@@ -0,0 +1,243 @@
|
||||
"""
|
||||
A types module for browser recording actions and events.
|
||||
"""
|
||||
|
||||
import enum
|
||||
import typing as t
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from skyvern.client.types.workflow_definition_yaml_blocks_item import (
|
||||
WorkflowDefinitionYamlBlocksItem_Action,
|
||||
WorkflowDefinitionYamlBlocksItem_GotoUrl,
|
||||
WorkflowDefinitionYamlBlocksItem_Wait,
|
||||
)
|
||||
|
||||
|
||||
class ActionKind(enum.StrEnum):
|
||||
CLICK = "click"
|
||||
HOVER = "hover"
|
||||
INPUT_TEXT = "input_text"
|
||||
URL_CHANGE = "url_change"
|
||||
WAIT = "wait"
|
||||
|
||||
|
||||
class ActionBase(BaseModel):
|
||||
kind: ActionKind
|
||||
# --
|
||||
target: "ActionTarget"
|
||||
timestamp_start: float
|
||||
timestamp_end: float
|
||||
url: str
|
||||
|
||||
|
||||
class ActionClick(ActionBase):
|
||||
kind: t.Literal[ActionKind.CLICK]
|
||||
|
||||
|
||||
class ActionHover(ActionBase):
|
||||
kind: t.Literal[ActionKind.HOVER]
|
||||
# --
|
||||
DURATION_THRESHOLD_MS: t.ClassVar[int] = 2000
|
||||
MIN_DURATION_THRESHOLD_MS: t.ClassVar[int] = 1000
|
||||
|
||||
|
||||
class ActionInputText(ActionBase):
|
||||
kind: t.Literal[ActionKind.INPUT_TEXT]
|
||||
# --
|
||||
input_value: str
|
||||
|
||||
|
||||
class ActionUrlChange(ActionBase):
|
||||
kind: t.Literal[ActionKind.URL_CHANGE]
|
||||
|
||||
|
||||
class ActionWait(ActionBase):
|
||||
kind: t.Literal[ActionKind.WAIT]
|
||||
# --
|
||||
duration_ms: int
|
||||
MIN_DURATION_THRESHOLD_MS: t.ClassVar[int] = 5000
|
||||
|
||||
|
||||
Action = ActionClick | ActionHover | ActionInputText | ActionUrlChange | ActionWait
|
||||
|
||||
ActionBlockable = ActionClick | ActionHover | ActionInputText
|
||||
|
||||
|
||||
class ActionTarget(BaseModel):
|
||||
class_name: str | None = None
|
||||
id: str | None = None
|
||||
mouse: "Mouse"
|
||||
sky_id: str | None = None
|
||||
tag_name: str | None = None
|
||||
texts: list[str] = []
|
||||
|
||||
|
||||
class Mouse(BaseModel):
|
||||
xp: float | None = None
|
||||
"""
|
||||
0 to 1.0 inclusive, percentage across the viewport
|
||||
"""
|
||||
yp: float | None = None
|
||||
"""
|
||||
0 to 1.0 inclusive, percentage down the viewport
|
||||
"""
|
||||
|
||||
|
||||
OutputBlock = t.Union[
|
||||
WorkflowDefinitionYamlBlocksItem_Action,
|
||||
WorkflowDefinitionYamlBlocksItem_GotoUrl,
|
||||
WorkflowDefinitionYamlBlocksItem_Wait,
|
||||
]
|
||||
|
||||
|
||||
class TargetInfo(BaseModel):
|
||||
attached: bool | None = None
|
||||
browserContextId: str | None = None
|
||||
canAccessOpener: bool | None = None
|
||||
targetId: str | None = None
|
||||
title: str | None = None
|
||||
type: str | None = None
|
||||
url: str | None = None
|
||||
|
||||
|
||||
class CdpEventFrame(BaseModel):
|
||||
url: str | None = None
|
||||
|
||||
class Config:
|
||||
extra = "allow"
|
||||
|
||||
|
||||
class ExfiltratedEventCdpParams(BaseModel):
|
||||
# target_info_changed events
|
||||
targetInfo: TargetInfo | None = None
|
||||
|
||||
# frame_requested_navigation events
|
||||
disposition: str | None = None
|
||||
frameId: str | None = None
|
||||
reason: str | None = None
|
||||
url: str | None = None
|
||||
|
||||
# frame_navigated events
|
||||
frame: CdpEventFrame | None = None
|
||||
|
||||
|
||||
class EventTarget(BaseModel):
|
||||
className: str | None = None
|
||||
id: str | None = None
|
||||
isHtml: bool = False
|
||||
isSvg: bool = False
|
||||
innerText: str | None = None
|
||||
skyId: str | None = None
|
||||
tagName: str | None = None
|
||||
text: list[str] = []
|
||||
value: str | int | None = None
|
||||
|
||||
|
||||
class MousePosition(BaseModel):
|
||||
xa: float | None = None
|
||||
ya: float | None = None
|
||||
xp: float | None = None
|
||||
yp: float | None = None
|
||||
|
||||
|
||||
class BoundingRect(BaseModel):
|
||||
bottom: float
|
||||
height: float
|
||||
left: float
|
||||
right: float
|
||||
top: float
|
||||
width: float
|
||||
x: float
|
||||
y: float
|
||||
|
||||
|
||||
class Scroll(BaseModel):
|
||||
clientHeight: float
|
||||
clientWidth: float
|
||||
scrollHeight: float
|
||||
scrollLeft: float
|
||||
scrollTop: float
|
||||
scrollWidth: float
|
||||
|
||||
|
||||
class ActiveElement(BaseModel):
|
||||
boundingRect: BoundingRect | None = None
|
||||
className: str | None = None
|
||||
id: str | None = None
|
||||
scroll: Scroll | None = None
|
||||
tagName: str | None = None
|
||||
|
||||
|
||||
class Window(BaseModel):
|
||||
height: float
|
||||
scrollX: float
|
||||
scrollY: float
|
||||
width: float
|
||||
|
||||
|
||||
class ExfiltratedEventConsoleParams(BaseModel):
|
||||
activeElement: ActiveElement
|
||||
code: str | None = None
|
||||
inputValue: str | None = None
|
||||
key: str | None = None
|
||||
mousePosition: MousePosition
|
||||
target: EventTarget
|
||||
timestamp: float
|
||||
type: str
|
||||
url: str
|
||||
window: Window
|
||||
|
||||
|
||||
class ExfiltratedCdpEvent(BaseModel):
|
||||
kind: Literal["exfiltrated-event"]
|
||||
event_name: str
|
||||
params: ExfiltratedEventCdpParams
|
||||
source: Literal["cdp"]
|
||||
timestamp: float
|
||||
|
||||
|
||||
class ExfiltratedConsoleEvent(BaseModel):
|
||||
kind: Literal["exfiltrated-event"]
|
||||
event_name: str
|
||||
params: ExfiltratedEventConsoleParams
|
||||
source: Literal["console"]
|
||||
timestamp: float
|
||||
|
||||
|
||||
ExfiltratedEvent = ExfiltratedCdpEvent | ExfiltratedConsoleEvent
|
||||
|
||||
|
||||
class StateMachineProtocol(t.Protocol):
|
||||
state: str
|
||||
|
||||
def tick(self, event: ExfiltratedEvent, current_actions: list[Action]) -> Action | None: ...
|
||||
|
||||
def on_action(self, action: Action, current_actions: list[Action]) -> bool: ...
|
||||
|
||||
def reset(self) -> None: ...
|
||||
|
||||
|
||||
# -- guards, predicates, etc.
|
||||
|
||||
|
||||
def target_has_changed(current_target: EventTarget | None, event_target: EventTarget) -> bool:
|
||||
if not current_target:
|
||||
return False
|
||||
|
||||
if not event_target.skyId and not event_target.id:
|
||||
return True # sic: we cannot compare, so assume changed
|
||||
|
||||
if not current_target.skyId and not current_target.id:
|
||||
return True # sic: we cannot compare, so assume changed
|
||||
|
||||
if current_target.id and event_target.id:
|
||||
if current_target.id != event_target.id:
|
||||
return True
|
||||
|
||||
if current_target.skyId and event_target.skyId:
|
||||
if current_target.skyId != event_target.skyId:
|
||||
return True
|
||||
|
||||
return False
|
||||
Reference in New Issue
Block a user