Files
python-mcp/core/events.py
AuroraCrimsonRose cc64e8d41e Initial commit
2026-05-27 15:07:22 -05:00

147 lines
3.7 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable
from collections.abc import Awaitable
from datetime import datetime
import asyncio
import uuid
# =========================
# EVENT MODEL (GRAPH-AWARE)
# =========================
@dataclass
class Event:
name: str
source: str
level: str = "INFO"
data: dict[str, Any] | None = None
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
parent_event_id: str | None = None
meta: dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
# =========================
# EVENT BUS
# =========================
class EventBus:
def __init__(self):
self._subscribers: list[Callable[[Event], Any]] = []
self._async_subscribers: list[Callable[[Event], Awaitable[Any]]] = []
self._current_trace: str | None = None
self._current_parent: str | None = None
# -------------------------
# SUBSCRIBE
# -------------------------
def subscribe(self, fn: Callable[[Event], Any]):
self._subscribers.append(fn)
def subscribe_async(self, fn: Callable[[Event], Awaitable[Any]]):
self._async_subscribers.append(fn)
# -------------------------
# TRACE CONTROL
# -------------------------
def start_trace(self) -> str:
trace_id = str(uuid.uuid4())
self._current_trace = trace_id
self._current_parent = None
return trace_id
def set_parent(self, event_id: str | None):
self._current_parent = event_id
# -------------------------
# EMIT (CORE SAFE PATH)
# -------------------------
def emit(self, event: Event):
# inject trace
if not event.trace_id:
event.trace_id = self._current_trace or event.trace_id
event.parent_event_id = event.parent_event_id or self._current_parent
# advance causal chain
self._current_parent = event.event_id
# sync subscribers
for fn in self._subscribers:
try:
fn(event)
except Exception:
pass
# async subscribers (SAFE FIX)
if self._async_subscribers:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return # no loop available
for fn in self._async_subscribers:
try:
coro = fn(event)
# IMPORTANT FIX:
# ensure it's actually awaitable before scheduling
if asyncio.iscoroutine(coro):
loop.create_task(coro)
except Exception:
pass
# -------------------------
# STRICT ASYNC EMIT
# -------------------------
async def emit_async(self, event: Event):
for fn in self._subscribers:
try:
fn(event)
except Exception:
pass
await asyncio.gather(
*(fn(event) for fn in self._async_subscribers),
return_exceptions=True
)
# -------------------------
# LOG CONVENIENCE
# -------------------------
def log(
self,
source: str,
name: str,
level: str = "INFO",
data: dict[str, Any] | None = None
):
self.emit(
Event(
name=name,
source=source,
level=level,
data=data or {}
)
)
# =========================
# SINGLETON
# =========================
bus = EventBus()