import asyncio from mcp.server import Server from utils.file_utils import normalize_log_line, keyword_search from utils.summarizer import summarize_text from utils.docgen import generate_doc server = Server("omniscient") # ─── Expose Tools ───────────────────────────────────────────── @server.tool() async def summarize_file(content: str) -> str: """Summarize a text or log file.""" return summarize_text(content) @server.tool() async def analyze_log(content: str, query: str = None) -> dict: """Normalize and search logs.""" normalized = [normalize_log_line(line) for line in content.splitlines()] result = { "total_lines": len(normalized), "unique_entries": len(set(normalized)), "preview": normalized[:20] } if query: result["matches"] = keyword_search("\n".join(normalized), query)[:20] return result @server.tool() async def generate_script_doc(name: str, content: str) -> str: """Generate README-style documentation for a script.""" return generate_doc(name, "mcp", content) # ─── Health Check ───────────────────────────────────────────── @server.tool() async def health() -> dict: """Check MCP backend status.""" return {"ok": True, "message": "Omniscient MCP alive"} # ─── Run Server ─────────────────────────────────────────────── if __name__ == "__main__": asyncio.run(server.run_stdio())