omniscientframework / mcp_server.py
NexusInstruments's picture
Create mcp_server.py
7105ca4 verified
import asyncio
from mcp.server import Server
from utils.file_utils import normalize_log_line, keyword_search
from utils.summarizer import summarize_text
from utils.docgen import generate_doc
server = Server("omniscient")
# ─── Expose Tools ─────────────────────────────────────────────
@server.tool()
async def summarize_file(content: str) -> str:
"""Summarize a text or log file."""
return summarize_text(content)
@server.tool()
async def analyze_log(content: str, query: str = None) -> dict:
"""Normalize and search logs."""
normalized = [normalize_log_line(line) for line in content.splitlines()]
result = {
"total_lines": len(normalized),
"unique_entries": len(set(normalized)),
"preview": normalized[:20]
}
if query:
result["matches"] = keyword_search("\n".join(normalized), query)[:20]
return result
@server.tool()
async def generate_script_doc(name: str, content: str) -> str:
"""Generate README-style documentation for a script."""
return generate_doc(name, "mcp", content)
# ─── Health Check ─────────────────────────────────────────────
@server.tool()
async def health() -> dict:
"""Check MCP backend status."""
return {"ok": True, "message": "Omniscient MCP alive"}
# ─── Run Server ───────────────────────────────────────────────
if __name__ == "__main__":
asyncio.run(server.run_stdio())