147 lines
3.7 KiB
Python
147 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Callable
|
|
from collections.abc import Awaitable
|
|
from datetime import datetime
|
|
import asyncio
|
|
import uuid
|
|
|
|
|
|
# =========================
|
|
# EVENT MODEL (GRAPH-AWARE)
|
|
# =========================
|
|
|
|
@dataclass
|
|
class Event:
|
|
name: str
|
|
source: str
|
|
level: str = "INFO"
|
|
data: dict[str, Any] | None = None
|
|
|
|
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
parent_event_id: str | None = None
|
|
|
|
meta: dict[str, Any] = field(default_factory=dict)
|
|
|
|
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
|
|
|
|
|
|
# =========================
|
|
# EVENT BUS
|
|
# =========================
|
|
|
|
class EventBus:
|
|
def __init__(self):
|
|
self._subscribers: list[Callable[[Event], Any]] = []
|
|
self._async_subscribers: list[Callable[[Event], Awaitable[Any]]] = []
|
|
|
|
self._current_trace: str | None = None
|
|
self._current_parent: str | None = None
|
|
|
|
# -------------------------
|
|
# SUBSCRIBE
|
|
# -------------------------
|
|
|
|
def subscribe(self, fn: Callable[[Event], Any]):
|
|
self._subscribers.append(fn)
|
|
|
|
def subscribe_async(self, fn: Callable[[Event], Awaitable[Any]]):
|
|
self._async_subscribers.append(fn)
|
|
|
|
# -------------------------
|
|
# TRACE CONTROL
|
|
# -------------------------
|
|
|
|
def start_trace(self) -> str:
|
|
trace_id = str(uuid.uuid4())
|
|
self._current_trace = trace_id
|
|
self._current_parent = None
|
|
return trace_id
|
|
|
|
def set_parent(self, event_id: str | None):
|
|
self._current_parent = event_id
|
|
|
|
# -------------------------
|
|
# EMIT (CORE SAFE PATH)
|
|
# -------------------------
|
|
|
|
def emit(self, event: Event):
|
|
# inject trace
|
|
if not event.trace_id:
|
|
event.trace_id = self._current_trace or event.trace_id
|
|
|
|
event.parent_event_id = event.parent_event_id or self._current_parent
|
|
|
|
# advance causal chain
|
|
self._current_parent = event.event_id
|
|
|
|
# sync subscribers
|
|
for fn in self._subscribers:
|
|
try:
|
|
fn(event)
|
|
except Exception:
|
|
pass
|
|
|
|
# async subscribers (SAFE FIX)
|
|
if self._async_subscribers:
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
return # no loop available
|
|
|
|
for fn in self._async_subscribers:
|
|
try:
|
|
coro = fn(event)
|
|
|
|
# IMPORTANT FIX:
|
|
# ensure it's actually awaitable before scheduling
|
|
if asyncio.iscoroutine(coro):
|
|
loop.create_task(coro)
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
# -------------------------
|
|
# STRICT ASYNC EMIT
|
|
# -------------------------
|
|
|
|
async def emit_async(self, event: Event):
|
|
for fn in self._subscribers:
|
|
try:
|
|
fn(event)
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.gather(
|
|
*(fn(event) for fn in self._async_subscribers),
|
|
return_exceptions=True
|
|
)
|
|
|
|
# -------------------------
|
|
# LOG CONVENIENCE
|
|
# -------------------------
|
|
|
|
def log(
|
|
self,
|
|
source: str,
|
|
name: str,
|
|
level: str = "INFO",
|
|
data: dict[str, Any] | None = None
|
|
):
|
|
self.emit(
|
|
Event(
|
|
name=name,
|
|
source=source,
|
|
level=level,
|
|
data=data or {}
|
|
)
|
|
)
|
|
|
|
|
|
# =========================
|
|
# SINGLETON
|
|
# =========================
|
|
|
|
bus = EventBus() |
