117 lines
2.8 KiB
Python
117 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
import urllib.request
|
|
import urllib.parse
|
|
import re
|
|
|
|
from core.tools.base import BaseTool, ToolContext
|
|
from core.tools.registry import registry
|
|
from core.events import bus
|
|
|
|
|
|
class SearchTool(BaseTool):
|
|
"""
|
|
Lightweight web search tool using DuckDuckGo HTML endpoint.
|
|
|
|
Designed for:
|
|
- query → results
|
|
- agent retrieval step before crawling
|
|
"""
|
|
|
|
name = "search"
|
|
description = "Web search (DuckDuckGo HTML scraping)"
|
|
|
|
# =========================
|
|
# EXECUTE
|
|
# =========================
|
|
|
|
def execute(self, payload: dict[str, Any], ctx: ToolContext):
|
|
action = str(payload.get("action", "search")).strip()
|
|
|
|
bus.log(
|
|
"SEARCH",
|
|
"search_execute",
|
|
"INFO",
|
|
{"action": action}
|
|
)
|
|
|
|
match action:
|
|
case "search":
|
|
return self.search(payload)
|
|
|
|
case _:
|
|
raise ValueError(f"Unknown search action: {action}")
|
|
|
|
# =========================
|
|
# SEARCH
|
|
# =========================
|
|
|
|
def search(self, payload: dict[str, Any]):
|
|
query = payload.get("query")
|
|
limit = payload.get("limit", 5)
|
|
|
|
if not isinstance(query, str):
|
|
raise ValueError("query must be string")
|
|
if not isinstance(limit, int):
|
|
limit = 5
|
|
|
|
encoded = urllib.parse.quote(query)
|
|
|
|
url = f"https://duckduckgo.com/html/?q={encoded}"
|
|
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"User-Agent": "MCP-Search/1.0"
|
|
}
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=6) as resp:
|
|
html = resp.read().decode("utf-8", errors="ignore")
|
|
|
|
results = self._parse_results(html)
|
|
|
|
return {
|
|
"query": query,
|
|
"results": results[:limit],
|
|
"count": len(results)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"query": query,
|
|
"error": str(e)
|
|
}
|
|
|
|
# =========================
|
|
# PARSER
|
|
# =========================
|
|
|
|
def _parse_results(self, html: str) -> list[dict[str, Any]]:
|
|
"""
|
|
DuckDuckGo HTML parsing (lightweight heuristic).
|
|
"""
|
|
|
|
results = []
|
|
|
|
# Extract result blocks
|
|
links = re.findall(r'<a rel="nofollow" class="result__a" href="(.*?)".*?>(.*?)</a>', html)
|
|
|
|
for url, title in links:
|
|
clean_title = re.sub("<.*?>", "", title)
|
|
|
|
results.append({
|
|
"title": clean_title,
|
|
"url": url,
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
# =========================
|
|
# REGISTER
|
|
# =========================
|
|
|
|
registry.register(SearchTool()) |
