from __future__ import annotations from typing import Any from urllib.parse import urljoin, urlparse import urllib.request import re from core.tools.base import BaseTool, ToolContext from core.tools.registry import registry from core.events import bus class CrawlerTool(BaseTool): """ Lightweight safe web crawler. Designed for: - page fetching - link extraction - basic text scraping - bounded crawling """ name = "crawler" description = "Fetch and crawl web pages safely" # ========================= # EXECUTE # ========================= def execute(self, payload: dict[str, Any], ctx: ToolContext): action = str(payload.get("action", "fetch")).strip() bus.log( "CRAWLER", "crawler_execute", "INFO", {"action": action} ) match action: case "fetch": return self.fetch(payload) case "links": return self.extract_links(payload) case "crawl": return self.crawl(payload) case _: raise ValueError(f"Unknown crawler action: {action}") # ========================= # FETCH PAGE # ========================= def fetch(self, payload: dict[str, Any]): url = payload.get("url") if not isinstance(url, str): raise ValueError("url must be string") req = urllib.request.Request( url, headers={"User-Agent": "MCP-Crawler/1.0"} ) try: with urllib.request.urlopen(req, timeout=6) as resp: html = resp.read().decode("utf-8", errors="ignore") text = self._strip_html(html) return { "url": url, "text": text[:5000], "length": len(text) } except Exception as e: return { "url": url, "error": str(e) } # ========================= # EXTRACT LINKS # ========================= def extract_links(self, payload: dict[str, Any]): url = payload.get("url") if not isinstance(url, str): raise ValueError("url must be string") try: req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=6) as resp: html = resp.read().decode("utf-8", errors="ignore") links = re.findall(r'href=["\'](.*?)["\']', html) normalized = [] for link in links: normalized.append(urljoin(url, link)) return { "url": url, "links": normalized[:200], "count": len(normalized) } except Exception as e: return { "url": url, "error": str(e) } # ========================= # SIMPLE CRAWL (DEPTH 1–2) # ========================= def crawl(self, payload: dict[str, Any]): start_url = payload.get("url") depth = payload.get("depth", 1) if not isinstance(start_url, str): raise ValueError("url must be string") if not isinstance(depth, int): depth = 1 visited = set() results = [] def safe_fetch(u: str): try: req = urllib.request.Request(u) with urllib.request.urlopen(req, timeout=5) as resp: return resp.read().decode("utf-8", errors="ignore") except Exception: return "" def crawl_url(url: str, d: int): if d < 0 or url in visited: return visited.add(url) html = safe_fetch(url) text = self._strip_html(html) results.append({ "url": url, "text": text[:2000] }) if d == 0: return links = re.findall(r'href=["\'](.*?)["\']', html) for link in links[:20]: full = urljoin(url, link) # safety: stay same domain if urlparse(full).netloc == urlparse(start_url).netloc: crawl_url(full, d - 1) crawl_url(start_url, depth) return { "start": start_url, "depth": depth, "pages": results, "count": len(results) } # ========================= # HTML STRIPPER # ========================= def _strip_html(self, html: str) -> str: html = re.sub(r".*?", "", html, flags=re.S) html = re.sub(r".*?", "", html, flags=re.S) html = re.sub(r"<.*?>", " ", html) html = re.sub(r"\s+", " ", html) return html.strip() # ========================= # REGISTER # ========================= registry.register(CrawlerTool())