Files
python-mcp/core/event_store.py
AuroraCrimsonRose cc64e8d41e Initial commit
2026-05-27 15:07:22 -05:00

108 lines
2.7 KiB
Python

from collections import deque
from typing import Optional, Any, Callable, Iterator
from core.events import Event, bus
class EventStore:
def __init__(self, max_size: int = 10_000):
self.buffer: deque[Event] = deque(maxlen=max_size)
self._cursor: int = 0
bus.subscribe(self._capture)
# =========================
# CAPTURE
# =========================
def _capture(self, event: Event):
event.meta["cursor"] = self._cursor
self.buffer.append(event)
self._cursor += 1
# =========================
# QUERY
# =========================
def query(
self,
source: Optional[str] = None,
level: Optional[str] = None,
name: Optional[str] = None,
start_cursor: Optional[int] = None,
end_cursor: Optional[int] = None,
) -> list[Event]:
events = list(self.buffer)
if source is not None:
events = [e for e in events if e.source == source]
if level is not None:
events = [e for e in events if e.level == level]
if name is not None:
events = [e for e in events if e.name == name]
def cursor_of(e: Event) -> int:
return e.meta.get("cursor", -1)
if start_cursor is not None:
events = [e for e in events if cursor_of(e) >= start_cursor]
if end_cursor is not None:
events = [e for e in events if cursor_of(e) <= end_cursor]
return events
# =========================
# PAGINATION
# =========================
def page(self, cursor: int = 0, limit: int = 100) -> dict[str, Any]:
events = list(self.buffer)
sliced = events[cursor:cursor + limit]
return {
"cursor": cursor,
"next_cursor": cursor + len(sliced),
"count": len(sliced),
"events": sliced
}
# =========================
# REPLAY
# =========================
def replay(
self,
source: Optional[str] = None,
level: Optional[str] = None,
name: Optional[str] = None,
handler: Optional[Callable[[Event], Any]] = None,
):
for event in self.query(source, level, name):
if handler:
handler(event)
else:
bus.emit(event)
# =========================
# REVERSE DEBUG
# =========================
def replay_reverse(self, handler: Callable[[Event], Any]):
for event in reversed(self.buffer):
handler(event)
# =========================
# STREAM TAIL
# =========================
def tail(self, last_n: int = 50) -> Iterator[Event]:
yield from list(self.buffer)[-last_n:]
# singleton
event_store = EventStore()