Files
python-mcp/tools/ffmpeg.py
AuroraCrimsonRose e471f9bc54 Added many tools
2026-06-03 06:01:06 -05:00

254 lines
7.2 KiB
Python

from __future__ import annotations
from typing import Any
from core.tools.base import BaseTool, ToolContext
from core.tools.registry import registry
from core.events import bus
from core.subprocess import run_command
class FFmpegTool(BaseTool):
name = "ffmpeg"
description = "Media processing using FFmpeg"
# =========================================================
# ROUTER
# =========================================================
def execute(self, payload: dict[str, Any], ctx: ToolContext):
action = str(payload.get("action", "")).strip()
bus.log(
"FFMPEG",
"ffmpeg_execute",
"INFO",
{"action": action}
)
match action:
case "convert":
return self.convert(payload, ctx)
case "extract_audio":
return self.extract_audio(payload, ctx)
case "trim":
return self.trim(payload, ctx)
case "merge":
return self.merge(payload, ctx)
case "probe":
return self.probe(payload)
case "thumbnail":
return self.thumbnail(payload, ctx)
case _:
raise ValueError(f"Unknown ffmpeg action: {action}")
# =========================================================
# CONVERT
# =========================================================
def convert(self, payload: dict[str, Any], ctx: ToolContext):
input_file = payload.get("input")
output_file = payload.get("output")
codec = payload.get("codec") # optional
if not isinstance(input_file, str) or not isinstance(output_file, str):
raise ValueError("input/output must be strings")
cmd = ["ffmpeg", "-y", "-i", input_file]
if isinstance(codec, str):
cmd += ["-c:v", codec]
cmd.append(output_file)
if ctx.dry_run:
return {"dry_run": True, "command": cmd}
result = run_command(cmd=cmd)
return {
"action": "convert",
"status": "ok" if result.get("return_code") == 0 else "error",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
# =========================================================
# EXTRACT AUDIO
# =========================================================
def extract_audio(self, payload: dict[str, Any], ctx: ToolContext):
input_file = payload.get("input")
output_file = payload.get("output")
if not isinstance(input_file, str) or not isinstance(output_file, str):
raise ValueError("input/output must be strings")
cmd = [
"ffmpeg",
"-y",
"-i", input_file,
"-vn",
"-acodec", "copy",
output_file
]
if ctx.dry_run:
return {"dry_run": True, "command": cmd}
result = run_command(cmd=cmd)
return {
"action": "extract_audio",
"status": "ok" if result.get("return_code") == 0 else "error",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
# =========================================================
# TRIM
# =========================================================
def trim(self, payload: dict[str, Any], ctx: ToolContext):
input_file = payload.get("input")
output_file = payload.get("output")
start = payload.get("start", "00:00:00")
duration = payload.get("duration")
if not isinstance(input_file, str) or not isinstance(output_file, str):
raise ValueError("input/output must be strings")
cmd = [
"ffmpeg",
"-y",
"-i", input_file,
"-ss", str(start),
]
if duration:
cmd += ["-t", str(duration)]
cmd.append(output_file)
if ctx.dry_run:
return {"dry_run": True, "command": cmd}
result = run_command(cmd=cmd)
return {
"action": "trim",
"status": "ok" if result.get("return_code") == 0 else "error",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
# =========================================================
# MERGE FILES
# =========================================================
def merge(self, payload: dict[str, Any], ctx: ToolContext):
inputs = payload.get("inputs")
output = payload.get("output")
if not isinstance(inputs, list) or not isinstance(output, str):
raise ValueError("inputs must be list, output must be string")
# ffmpeg concat demuxer style
file_list = "ffmpeg_concat.txt"
with open(file_list, "w", encoding="utf-8") as f:
for item in inputs:
f.write(f"file '{item}'\n")
cmd = [
"ffmpeg",
"-y",
"-f", "concat",
"-safe", "0",
"-i", file_list,
"-c", "copy",
output
]
if ctx.dry_run:
return {"dry_run": True, "command": cmd}
result = run_command(cmd=cmd)
return {
"action": "merge",
"status": "ok" if result.get("return_code") == 0 else "error",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
# =========================================================
# PROBE
# =========================================================
def probe(self, payload: dict[str, Any]):
input_file = payload.get("input")
if not isinstance(input_file, str):
raise ValueError("input must be string")
cmd = [
"ffprobe",
"-v", "error",
"-show_format",
"-show_streams",
input_file
]
result = run_command(cmd=cmd)
return {
"action": "probe",
"data": result.get("stdout", ""),
"error": result.get("stderr", "")
}
# =========================================================
# THUMBNAIL
# =========================================================
def thumbnail(self, payload: dict[str, Any], ctx: ToolContext):
input_file = payload.get("input")
output_file = payload.get("output")
time = payload.get("time", "00:00:01")
if not isinstance(input_file, str) or not isinstance(output_file, str):
raise ValueError("input/output must be strings")
cmd = [
"ffmpeg",
"-y",
"-ss", str(time),
"-i", input_file,
"-vframes", "1",
output_file
]
if ctx.dry_run:
return {"dry_run": True, "command": cmd}
result = run_command(cmd=cmd)
return {
"action": "thumbnail",
"status": "ok" if result.get("return_code") == 0 else "error",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
# =========================================================
# REGISTER
# =========================================================
registry.register(FFmpegTool())