from __future__ import annotations from typing import Any import time import json from core.tools.base import BaseTool, ToolContext from core.tools.registry import registry from core.events import bus from tools.memory import MemoryTool class ReflectionTool(BaseTool): """ Analyzes stored memory and extracts insights. """ name = "reflection" description = "Analyze memory and extract insights or patterns" # ========================= # EXECUTE # ========================= def execute(self, payload: dict[str, Any], ctx: ToolContext): action = str(payload.get("action", "reflect")).strip() bus.log( "REFLECTION", "reflection_execute", "INFO", {"action": action} ) match action: case "reflect": return self.reflect(payload, ctx) case "summarize_failures": return self.summarize_failures(payload, ctx) case "summarize_successes": return self.summarize_successes(payload, ctx) case "detect_loops": return self.detect_loops(payload, ctx) case _: raise ValueError(f"Unknown reflection action: {action}") # ========================= # CORE REFLECTION # ========================= def reflect(self, payload: dict[str, Any], ctx: ToolContext): memory_tool = self._get_memory() memory = memory_tool._load() insights = [] for item in memory: entry = item.get("entry", {}) text = json.dumps(entry).lower() if "error" in text or "failed" in text: insights.append({ "type": "failure_pattern", "id": item.get("id"), "note": "Failure-related memory detected" }) if "success" in text or "ok" in text: insights.append({ "type": "success_pattern", "id": item.get("id"), "note": "Success-related memory detected" }) reflection = { "timestamp": time.time(), "insights": insights, "total_memory": len(memory), "insight_count": len(insights) } # store reflection back into memory memory_tool.add({ "entry": { "type": "reflection", "data": reflection } }, ctx) # type: ignore return reflection # ========================= # FAILURE ANALYSIS # ========================= def summarize_failures(self, payload: dict[str, Any], ctx: ToolContext): memory_tool = self._get_memory() memory = memory_tool._load() failures = [ item for item in memory if "error" in json.dumps(item.get("entry", {})).lower() or "fail" in json.dumps(item.get("entry", {})).lower() ] return { "count": len(failures), "failures": failures } # ========================= # SUCCESS ANALYSIS # ========================= def summarize_successes(self, payload: dict[str, Any], ctx: ToolContext): memory_tool = self._get_memory() memory = memory_tool._load() successes = [ item for item in memory if "success" in json.dumps(item.get("entry", {})).lower() or "ok" in json.dumps(item.get("entry", {})).lower() ] return { "count": len(successes), "successes": successes } # ========================= # LOOP DETECTION # ========================= def detect_loops(self, payload: dict[str, Any], ctx: ToolContext): memory_tool = self._get_memory() memory = memory_tool._load() seen = {} loops = [] for item in memory: key = json.dumps(item.get("entry", {}), sort_keys=True) if key in seen: loops.append({ "original_id": seen[key], "duplicate_id": item.get("id") }) else: seen[key] = item.get("id") return { "loop_count": len(loops), "loops": loops } # ========================= # HELPERS # ========================= def _get_memory(self) -> MemoryTool: for tool in registry.all_tools(): if tool.name == "memory": return tool # type: ignore raise RuntimeError("Memory tool not found") # ========================= # REGISTER # ========================= registry.register(ReflectionTool())