108 lines
2.7 KiB
Python
108 lines
2.7 KiB
Python
from collections import deque
|
|
from typing import Optional, Any, Callable, Iterator
|
|
|
|
from core.events import Event, bus
|
|
|
|
|
|
class EventStore:
|
|
def __init__(self, max_size: int = 10_000):
|
|
self.buffer: deque[Event] = deque(maxlen=max_size)
|
|
self._cursor: int = 0
|
|
|
|
bus.subscribe(self._capture)
|
|
|
|
# =========================
|
|
# CAPTURE
|
|
# =========================
|
|
|
|
def _capture(self, event: Event):
|
|
event.meta["cursor"] = self._cursor
|
|
|
|
self.buffer.append(event)
|
|
self._cursor += 1
|
|
|
|
# =========================
|
|
# QUERY
|
|
# =========================
|
|
|
|
def query(
|
|
self,
|
|
source: Optional[str] = None,
|
|
level: Optional[str] = None,
|
|
name: Optional[str] = None,
|
|
start_cursor: Optional[int] = None,
|
|
end_cursor: Optional[int] = None,
|
|
) -> list[Event]:
|
|
|
|
events = list(self.buffer)
|
|
|
|
if source is not None:
|
|
events = [e for e in events if e.source == source]
|
|
|
|
if level is not None:
|
|
events = [e for e in events if e.level == level]
|
|
|
|
if name is not None:
|
|
events = [e for e in events if e.name == name]
|
|
|
|
def cursor_of(e: Event) -> int:
|
|
return e.meta.get("cursor", -1)
|
|
|
|
if start_cursor is not None:
|
|
events = [e for e in events if cursor_of(e) >= start_cursor]
|
|
|
|
if end_cursor is not None:
|
|
events = [e for e in events if cursor_of(e) <= end_cursor]
|
|
|
|
return events
|
|
|
|
# =========================
|
|
# PAGINATION
|
|
# =========================
|
|
|
|
def page(self, cursor: int = 0, limit: int = 100) -> dict[str, Any]:
|
|
events = list(self.buffer)
|
|
sliced = events[cursor:cursor + limit]
|
|
|
|
return {
|
|
"cursor": cursor,
|
|
"next_cursor": cursor + len(sliced),
|
|
"count": len(sliced),
|
|
"events": sliced
|
|
}
|
|
|
|
# =========================
|
|
# REPLAY
|
|
# =========================
|
|
|
|
def replay(
|
|
self,
|
|
source: Optional[str] = None,
|
|
level: Optional[str] = None,
|
|
name: Optional[str] = None,
|
|
handler: Optional[Callable[[Event], Any]] = None,
|
|
):
|
|
for event in self.query(source, level, name):
|
|
if handler:
|
|
handler(event)
|
|
else:
|
|
bus.emit(event)
|
|
|
|
# =========================
|
|
# REVERSE DEBUG
|
|
# =========================
|
|
|
|
def replay_reverse(self, handler: Callable[[Event], Any]):
|
|
for event in reversed(self.buffer):
|
|
handler(event)
|
|
|
|
# =========================
|
|
# STREAM TAIL
|
|
# =========================
|
|
|
|
def tail(self, last_n: int = 50) -> Iterator[Event]:
|
|
yield from list(self.buffer)[-last_n:]
|
|
|
|
|
|
# singleton
|
|
event_store = EventStore() |
