Initial commit

This commit is contained in:
AuroraCrimsonRose
2026-05-27 15:07:22 -05:00
commit cc64e8d41e
31 changed files with 2354 additions and 0 deletions

16
tools/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
"""
Tool bootstrap.
Importing this module forces
tool registration into registry.
"""
from tools.ping import PingTool
from tools.filesystem import FilesystemTool
from tools.subprocess import SubprocessTool
__all__ = [
"PingTool",
"FilesystemTool",
"SubprocessTool",
]

0
tools/bochs.py Normal file
View File

0
tools/cmake.py Normal file
View File

22
tools/discovery.py Normal file
View File

@@ -0,0 +1,22 @@
from __future__ import annotations
import importlib
import pkgutil
import tools as tools_pkg
def load_all_tools():
"""
Explicit tool loader.
Imports all modules inside /tools so they register
themselves into the registry.
"""
for module in pkgutil.iter_modules(
tools_pkg.__path__
):
importlib.import_module(
f"tools.{module.name}"
)

0
tools/docker.py Normal file
View File

0
tools/ffmpeg.py Normal file
View File

225
tools/filesystem.py Normal file
View File

@@ -0,0 +1,225 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from core.events import bus
from core.safety import safety
from core.tools.base import BaseTool, ToolContext
from core.tools.registry import registry
class FilesystemTool(BaseTool):
name = "filesystem"
description = "Safe filesystem operations"
# =========================
# EXECUTE
# =========================
def execute(
self,
payload: dict[str, Any],
ctx: ToolContext
):
action = str(payload.get("action", "")).strip()
bus.log(
"FILESYSTEM",
"filesystem_execute",
"INFO",
{
"action": action
}
)
match action:
case "read_file":
return self.read_file(payload)
case "write_file":
return self.write_file(payload)
case "list_dir":
return self.list_dir(payload)
case "exists":
return self.exists(payload)
case "mkdir":
return self.mkdir(payload)
case _:
raise ValueError(
f"Unknown filesystem action: {action}"
)
# =========================
# READ FILE
# =========================
def read_file(
self,
payload: dict[str, Any]
):
path_value = payload.get("path")
if not isinstance(path_value, str):
raise ValueError("path must be string")
path = safety.validate_path(path_value)
return {
"path": str(path),
"content": path.read_text(
encoding="utf-8"
)
}
# =========================
# WRITE FILE
# =========================
def write_file(
self,
payload: dict[str, Any]
):
path_value = payload.get("path")
content_value = payload.get("content")
if not isinstance(path_value, str):
raise ValueError("path must be string")
if not isinstance(content_value, str):
raise ValueError(
"content must be string"
)
path = safety.validate_path(path_value)
safety.check_file_write(
path,
content_value
)
path.parent.mkdir(
parents=True,
exist_ok=True
)
path.write_text(
content_value,
encoding="utf-8"
)
return {
"ok": True,
"path": str(path),
"bytes_written": len(
content_value.encode("utf-8")
)
}
# =========================
# LIST DIRECTORY
# =========================
def list_dir(
self,
payload: dict[str, Any]
):
path_value = payload.get(
"path",
"."
)
if not isinstance(path_value, str):
raise ValueError(
"path must be string"
)
path = safety.validate_path(
path_value
)
entries: list[dict[str, Any]] = []
for item in path.iterdir():
entries.append(
{
"name": item.name,
"path": str(item),
"is_dir": item.is_dir(),
"is_file": item.is_file()
}
)
return {
"path": str(path),
"count": len(entries),
"entries": entries
}
# =========================
# EXISTS
# =========================
def exists(
self,
payload: dict[str, Any]
):
path_value = payload.get("path")
if not isinstance(path_value, str):
raise ValueError(
"path must be string"
)
path = safety.validate_path(
path_value
)
return {
"path": str(path),
"exists": path.exists(),
"is_dir": path.is_dir(),
"is_file": path.is_file()
}
# =========================
# MKDIR
# =========================
def mkdir(
self,
payload: dict[str, Any]
):
path_value = payload.get("path")
if not isinstance(path_value, str):
raise ValueError(
"path must be string"
)
path = safety.validate_path(
path_value
)
path.mkdir(
parents=True,
exist_ok=True
)
return {
"ok": True,
"path": str(path)
}
# =========================
# SELF REGISTER
# =========================
registry.register(
FilesystemTool()
)

0
tools/git.py Normal file
View File

50
tools/gitea.py Normal file
View File

@@ -0,0 +1,50 @@
import requests
from core.config import GITEA_URL, GITEA_TOKEN
from core.logging_core import logger
HEADERS = {
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json"
}
def create_repo(name: str, private: bool = True):
url = f"{GITEA_URL}/api/v1/user/repos"
payload = {
"name": name,
"private": private,
"auto_init": True
}
r = requests.post(url, json=payload, headers=HEADERS)
logger.info(f"[GITEA] create_repo={name} status={r.status_code}")
return r.json()
import subprocess
from core.subprocess import run_command
def git_push(repo_path: str, message: str):
cmds = [
["git", "-C", repo_path, "add", "."],
["git", "-C", repo_path, "commit", "-m", message],
["git", "-C", repo_path, "push"]
]
results = [run_command(c) for c in cmds]
return results
def create_or_update_file(owner, repo, path, content, message):
url = f"{GITEA_URL}/api/v1/repos/{owner}/{repo}/contents/{path}"
payload = {
"content": content.encode("utf-8").decode("utf-8"),
"message": message
}
r = requests.post(url, json=payload, headers=HEADERS)
logger.info(f"[GITEA] file={path} status={r.status_code}")
return r.json()

45
tools/ping.py Normal file
View File

@@ -0,0 +1,45 @@
from __future__ import annotations
from core.tools.base import BaseTool, ToolContext
from core.tools.registry import registry
from core.events import bus
class PingTool(BaseTool):
name = "ping"
# optional metadata (future MCP auto-binding)
description = "Health check"
# -------------------------
# EXECUTE
# -------------------------
def execute(
self,
payload: dict[str, str],
ctx: ToolContext
):
message = payload.get("message", "pong")
bus.log(
"TOOLS",
"ping_execute",
"INFO",
{
"message": message
}
)
return {
"status": "ok",
"echo": message,
"tool": self.name
}
# =========================
# SELF REGISTER
# =========================
registry.register(PingTool())

0
tools/qemu.py Normal file
View File

66
tools/subprocess.py Normal file
View File

@@ -0,0 +1,66 @@
from __future__ import annotations
from typing import Any
from core.subprocess import run_command
from core.tools.base import BaseTool, ToolContext
from core.tools.registry import registry
class SubprocessTool(BaseTool):
name = "subprocess"
description = "Run a subprocess command safely"
# =========================
# EXECUTE
# =========================
def execute(
self,
payload: dict[str, Any],
ctx: ToolContext
):
cmd = payload.get("cmd")
if not isinstance(cmd, list):
raise ValueError(
"cmd must be list[str]"
)
cwd = payload.get("cwd")
if cwd is not None and not isinstance(
cwd,
str
):
raise ValueError(
"cwd must be string"
)
timeout = payload.get(
"timeout",
60
)
if not isinstance(
timeout,
int
):
raise ValueError(
"timeout must be int"
)
return run_command(
cmd=cmd,
cwd=cwd,
timeout=timeout
)
# =========================
# SELF REGISTER
# =========================
registry.register(
SubprocessTool()
)

0
tools/venv.py Normal file
View File