Files
python-mcp/tools/intelligent_search.py
AuroraCrimsonRose e471f9bc54 Added many tools
2026-06-03 06:01:06 -05:00

183 lines
4.1 KiB
Python

from __future__ import annotations
from typing import Any
from urllib.parse import urlparse
from core.tools.base import BaseTool, ToolContext
from core.tools.registry import registry
from core.events import bus
class IntelligentSearchTool(BaseTool):
"""
Intelligent wrapper over basic search results.
Enhances:
- ranking
- deduplication
- best-result selection
"""
name = "intelligent_search"
description = "Rerank and filter search results for best relevance"
# =========================
# EXECUTE
# =========================
def execute(self, payload: dict[str, Any], ctx: ToolContext):
action = str(payload.get("action", "rank")).strip()
bus.log(
"SEARCH",
"intelligent_search_execute",
"INFO",
{"action": action}
)
match action:
case "rank":
return self.rank(payload)
case "best":
return self.best(payload)
case _:
raise ValueError(f"Unknown action: {action}")
# =========================
# RANK RESULTS
# =========================
def rank(self, payload: dict[str, Any]):
results = payload.get("results")
query = payload.get("query", "")
if not isinstance(results, list):
raise ValueError("results must be list")
scored = []
for r in results:
if not isinstance(r, dict):
continue
title = r.get("title", "")
url = r.get("url", "")
score = self._score(query, title, url)
scored.append({
"title": title,
"url": url,
"score": score
})
scored.sort(key=lambda x: x["score"], reverse=True)
return {
"query": query,
"ranked": scored
}
# =========================
# BEST RESULT ONLY
# =========================
def best(self, payload: dict[str, Any]):
results = payload.get("results")
query = payload.get("query", "")
if not isinstance(results, list):
raise ValueError("results must be list")
best_item = None
best_score = -1
seen_domains = set()
for r in results:
if not isinstance(r, dict):
continue
title = r.get("title", "")
url = r.get("url", "")
domain = self._domain(url)
# simple dedupe
if domain in seen_domains:
continue
seen_domains.add(domain)
score = self._score(query, title, url)
if score > best_score:
best_score = score
best_item = {
"title": title,
"url": url,
"score": score
}
return {
"query": query,
"best": best_item
}
# =========================
# SCORING FUNCTION
# =========================
def _score(self, query: str, title: str, url: str) -> float:
"""
Lightweight heuristic ranking system.
Replace later with LLM scoring if desired.
"""
q = query.lower()
t = title.lower()
u = url.lower()
score = 0.0
# keyword overlap
for word in q.split():
if word in t:
score += 2.0
if word in u:
score += 1.0
# title boost
if q in t:
score += 5.0
# HTTPS boost
if url.startswith("https"):
score += 0.5
# domain quality heuristic
domain = self._domain(url)
if domain.endswith(".edu") or domain.endswith(".org"):
score += 1.5
return score
# =========================
# DOMAIN HELPERS
# =========================
def _domain(self, url: str) -> str:
try:
return urlparse(url).netloc.lower()
except Exception:
return ""
# =========================
# REGISTER
# =========================
registry.register(IntelligentSearchTool())