136 lines
3.7 KiB
Python
136 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from core.tools.base import BaseTool, ToolContext
|
|
from core.tools.registry import registry
|
|
from core.events import bus
|
|
from core.subprocess import run_command
|
|
|
|
|
|
class PwshTool(BaseTool):
|
|
"""
|
|
PowerShell execution tool.
|
|
|
|
Provides controlled execution of PowerShell commands and scripts.
|
|
"""
|
|
|
|
name = "pwsh"
|
|
description = "Execute PowerShell commands safely"
|
|
|
|
# =========================================================
|
|
# ROUTER
|
|
# =========================================================
|
|
|
|
def execute(self, payload: dict[str, Any], ctx: ToolContext):
|
|
action = str(payload.get("action", "run")).strip()
|
|
|
|
bus.log(
|
|
"PWSH",
|
|
"pwsh_execute",
|
|
"INFO",
|
|
{"action": action}
|
|
)
|
|
|
|
match action:
|
|
case "run":
|
|
return self.run(payload, ctx)
|
|
|
|
case "script":
|
|
return self.run_script(payload, ctx)
|
|
|
|
case "check":
|
|
return self.check_pwsh(payload)
|
|
|
|
case _:
|
|
raise ValueError(f"Unknown pwsh action: {action}")
|
|
|
|
# =========================================================
|
|
# RUN COMMAND
|
|
# =========================================================
|
|
|
|
def run(self, payload: dict[str, Any], ctx: ToolContext):
|
|
command = payload.get("command")
|
|
cwd = payload.get("cwd")
|
|
|
|
if not isinstance(command, str):
|
|
raise ValueError("command must be string")
|
|
|
|
if cwd is not None and not isinstance(cwd, str):
|
|
raise ValueError("cwd must be string")
|
|
|
|
cmd = ["pwsh", "-NoProfile", "-Command", command]
|
|
|
|
if ctx.dry_run:
|
|
return {
|
|
"dry_run": True,
|
|
"command": cmd,
|
|
"cwd": cwd
|
|
}
|
|
|
|
result = run_command(
|
|
cmd=cmd,
|
|
cwd=cwd
|
|
)
|
|
|
|
return {
|
|
"action": "run",
|
|
"status": "ok" if result.get("return_code") == 0 else "error",
|
|
"stdout": result.get("stdout", ""),
|
|
"stderr": result.get("stderr", "")
|
|
}
|
|
|
|
# =========================================================
|
|
# RUN SCRIPT
|
|
# =========================================================
|
|
|
|
def run_script(self, payload: dict[str, Any], ctx: ToolContext):
|
|
script = payload.get("script")
|
|
cwd = payload.get("cwd")
|
|
|
|
if not isinstance(script, str):
|
|
raise ValueError("script must be string")
|
|
|
|
cmd = ["pwsh", "-NoProfile", "-Command", script]
|
|
|
|
if ctx.dry_run:
|
|
return {
|
|
"dry_run": True,
|
|
"script_preview": script[:500]
|
|
}
|
|
|
|
result = run_command(
|
|
cmd=cmd,
|
|
cwd=cwd
|
|
)
|
|
|
|
return {
|
|
"action": "script",
|
|
"status": "ok" if result.get("return_code") == 0 else "error",
|
|
"stdout": result.get("stdout", ""),
|
|
"stderr": result.get("stderr", "")
|
|
}
|
|
|
|
# =========================================================
|
|
# CHECK POWERSHELL
|
|
# =========================================================
|
|
|
|
def check_pwsh(self, payload: dict[str, Any]):
|
|
"""Check if PowerShell is available."""
|
|
result = run_command(
|
|
cmd=["pwsh", "-Command", "$PSVersionTable.PSVersion"]
|
|
)
|
|
|
|
return {
|
|
"action": "check",
|
|
"available": result.get("return_code") == 0,
|
|
"version_output": result.get("stdout", ""),
|
|
"error": result.get("stderr", "")
|
|
}
|
|
|
|
|
|
# =========================================================
|
|
# REGISTER TOOL
|
|
# =========================================================
|
|
|
|
registry.register(PwshTool()) |
