156 lines
4.0 KiB
Python
156 lines
4.0 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from core.events import bus
|
|
from core.safety import safety
|
|
from core.tools.base import BaseTool, ToolContext
|
|
from core.tools.registry import registry
|
|
|
|
|
|
class FilesystemTool(BaseTool):
|
|
name = "filesystem"
|
|
description = "Safe filesystem operations"
|
|
|
|
MAX_READ_BYTES = 5_000_000
|
|
MAX_LIST_ENTRIES = 5000
|
|
|
|
# =========================
|
|
# EXECUTE ROUTER
|
|
# =========================
|
|
|
|
def execute(self, payload: dict[str, Any], ctx: ToolContext):
|
|
action = str(payload.get("action", "")).strip()
|
|
|
|
bus.log(
|
|
"FILESYSTEM",
|
|
"filesystem_execute",
|
|
"INFO",
|
|
{"action": action}
|
|
)
|
|
|
|
handlers = {
|
|
"read_file": self.read_file,
|
|
"write_file": self.write_file,
|
|
"list_dir": self.list_dir,
|
|
"exists": self.exists,
|
|
"mkdir": self.mkdir,
|
|
}
|
|
|
|
handler = handlers.get(action)
|
|
if not handler:
|
|
raise ValueError(f"Unknown filesystem action: {action}")
|
|
|
|
return handler(payload, ctx)
|
|
|
|
# =========================
|
|
# PATH HELPERS
|
|
# =========================
|
|
|
|
def _get_path(self, payload: dict[str, Any]) -> Path:
|
|
path_value = payload.get("path")
|
|
if not isinstance(path_value, str):
|
|
raise ValueError("path must be string")
|
|
|
|
return safety.validate_path(path_value)
|
|
|
|
# =========================
|
|
# READ FILE
|
|
# =========================
|
|
|
|
def read_file(self, payload: dict[str, Any], ctx: ToolContext):
|
|
path = self._get_path(payload)
|
|
|
|
data = path.read_text(encoding="utf-8")
|
|
|
|
if len(data.encode("utf-8")) > self.MAX_READ_BYTES:
|
|
raise ValueError("File exceeds read size limit")
|
|
|
|
return {"path": str(path), "content": data}
|
|
|
|
# =========================
|
|
# WRITE FILE
|
|
# =========================
|
|
|
|
def write_file(self, payload: dict[str, Any], ctx: ToolContext):
|
|
path = self._get_path(payload)
|
|
|
|
content = payload.get("content")
|
|
if not isinstance(content, str):
|
|
raise ValueError("content must be string")
|
|
|
|
safety.check_file_write(path, content)
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if path.exists():
|
|
backup = path.with_suffix(path.suffix + ".bak")
|
|
try:
|
|
backup.write_text(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
pass
|
|
|
|
path.write_text(content, encoding="utf-8")
|
|
|
|
return {
|
|
"ok": True,
|
|
"path": str(path),
|
|
"bytes_written": len(content.encode("utf-8"))
|
|
}
|
|
|
|
# =========================
|
|
# LIST DIRECTORY
|
|
# =========================
|
|
|
|
def list_dir(self, payload: dict[str, Any], ctx: ToolContext):
|
|
path = self._get_path(payload)
|
|
|
|
entries = []
|
|
for i, item in enumerate(path.iterdir()):
|
|
if i >= self.MAX_LIST_ENTRIES:
|
|
break
|
|
|
|
entries.append({
|
|
"name": item.name,
|
|
"path": str(item),
|
|
"is_dir": item.is_dir(),
|
|
"is_file": item.is_file()
|
|
})
|
|
|
|
return {
|
|
"path": str(path),
|
|
"count": len(entries),
|
|
"entries": entries
|
|
}
|
|
|
|
# =========================
|
|
# EXISTS
|
|
# =========================
|
|
|
|
def exists(self, payload: dict[str, Any], ctx: ToolContext):
|
|
path = self._get_path(payload)
|
|
|
|
return {
|
|
"path": str(path),
|
|
"exists": path.exists(),
|
|
"is_dir": path.is_dir(),
|
|
"is_file": path.is_file()
|
|
}
|
|
|
|
# =========================
|
|
# MKDIR
|
|
# =========================
|
|
|
|
def mkdir(self, payload: dict[str, Any], ctx: ToolContext):
|
|
path = self._get_path(payload)
|
|
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
return {
|
|
"ok": True,
|
|
"path": str(path)
|
|
}
|
|
|
|
|
|
registry.register(FilesystemTool()) |
