Spaces:
Runtime error
Runtime error
| import asyncio | |
| from mcp.server import Server | |
| from utils.file_utils import normalize_log_line, keyword_search | |
| from utils.summarizer import summarize_text | |
| from utils.docgen import generate_doc | |
| server = Server("omniscient") | |
| # ─── Expose Tools ───────────────────────────────────────────── | |
| async def summarize_file(content: str) -> str: | |
| """Summarize a text or log file.""" | |
| return summarize_text(content) | |
| async def analyze_log(content: str, query: str = None) -> dict: | |
| """Normalize and search logs.""" | |
| normalized = [normalize_log_line(line) for line in content.splitlines()] | |
| result = { | |
| "total_lines": len(normalized), | |
| "unique_entries": len(set(normalized)), | |
| "preview": normalized[:20] | |
| } | |
| if query: | |
| result["matches"] = keyword_search("\n".join(normalized), query)[:20] | |
| return result | |
| async def generate_script_doc(name: str, content: str) -> str: | |
| """Generate README-style documentation for a script.""" | |
| return generate_doc(name, "mcp", content) | |
| # ─── Health Check ───────────────────────────────────────────── | |
| async def health() -> dict: | |
| """Check MCP backend status.""" | |
| return {"ok": True, "message": "Omniscient MCP alive"} | |
| # ─── Run Server ─────────────────────────────────────────────── | |
| if __name__ == "__main__": | |
| asyncio.run(server.run_stdio()) | |