Files
python-mcp/tools/pid.py
AuroraCrimsonRose e471f9bc54 Added many tools
2026-06-03 06:01:06 -05:00

166 lines
4.5 KiB
Python

from __future__ import annotations
from typing import Any
from core.tools.base import BaseTool, ToolContext
from core.tools.registry import registry
from core.events import bus
from core.subprocess import run_command
class PidTool(BaseTool):
"""
Process inspection and management tool.
Provides visibility into running system processes and optional control.
"""
name = "pid"
description = "Process listing, lookup, and management"
# =========================================================
# EXECUTE
# =========================================================
def execute(self, payload: dict[str, Any], ctx: ToolContext):
action = str(payload.get("action", "list")).strip()
bus.log(
"PID",
"pid_execute",
"INFO",
{"action": action}
)
match action:
case "list":
return self.list_processes(payload)
case "find":
return self.find_process(payload)
case "details":
return self.process_details(payload)
case "kill":
return self.kill_process(payload, ctx)
case _:
raise ValueError(f"Unknown pid action: {action}")
# =========================================================
# LIST PROCESSES
# =========================================================
def list_processes(self, payload: dict[str, Any]):
limit = payload.get("limit", 50)
result = run_command(
cmd=["tasklist"],
)
if result.get("return_code") != 0:
return {
"status": "error",
"stderr": result.get("stderr", "")
}
lines = result.get("stdout", "").splitlines()
processes = []
for line in lines[3:]: # skip header rows
parts = line.split()
if len(parts) < 2:
continue
processes.append({
"name": parts[0],
"pid": parts[1]
})
if len(processes) >= limit:
break
return {
"count": len(processes),
"processes": processes
}
# =========================================================
# FIND PROCESS
# =========================================================
def find_process(self, payload: dict[str, Any]):
name = payload.get("name")
if not isinstance(name, str):
raise ValueError("name must be string")
result = run_command(
cmd=["tasklist", "/FI", f"IMAGENAME eq {name}"],
)
return {
"name": name,
"raw": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
# =========================================================
# PROCESS DETAILS
# =========================================================
def process_details(self, payload: dict[str, Any]):
pid = payload.get("pid")
if not isinstance(pid, int):
raise ValueError("pid must be int")
result = run_command(
cmd=["wmic", "process", "where", f"ProcessId={pid}", "get", "ProcessId,Name,CommandLine"],
)
return {
"pid": pid,
"raw": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
# =========================================================
# KILL PROCESS
# =========================================================
def kill_process(self, payload: dict[str, Any], ctx: ToolContext):
pid = payload.get("pid")
force = payload.get("force", False)
if not isinstance(pid, int):
raise ValueError("pid must be int")
if ctx.dry_run:
return {
"dry_run": True,
"pid": pid,
"force": force,
"message": "Would terminate process"
}
cmd = ["taskkill", "/PID", str(pid)]
if force:
cmd.append("/F")
result = run_command(cmd)
return {
"pid": pid,
"status": "success" if result.get("return_code") == 0 else "error",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
# =========================================================
# REGISTER TOOL
# =========================================================
registry.register(PidTool())