omniscientframework / pages /ScriptOrchestrator.py
NexusInstruments's picture
Update pages/ScriptOrchestrator.py
9c37488 verified
import sys, os
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
UTILS_DIR = os.path.join(BASE_DIR, "utils")
if UTILS_DIR not in sys.path:
sys.path.insert(0, UTILS_DIR)
import streamlit as st
import sys, os, subprocess, hashlib
# ─── Ensure utils/ is importable ──────────────────────────────
UTILS_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if UTILS_PATH not in sys.path:
sys.path.append(UTILS_PATH)
from utils.docgen import generate_doc
from utils.summarizer import summarize_text
st.title("🛠️ Local Script Orchestration & Execution")
st.write("Scan, queue, and execute Omniscient scripts with results ingested back into the system.")
# Init state
if "local_scripts" not in st.session_state:
st.session_state.local_scripts = []
if "script_runs" not in st.session_state:
st.session_state.script_runs = []
SCAN_DIRS = [
"/opt/omniscient/bin",
"/opt/omniscient/scripts",
"/opt/omniscient/ai"
]
def scan_scripts():
scripts = []
for d in SCAN_DIRS:
if os.path.exists(d):
for root, _, files in os.walk(d):
for f in files:
if f.endswith((".sh", ".py")):
path = os.path.join(root, f)
try:
with open(path, "r", errors="ignore") as fh:
content = fh.read()
sha1 = hashlib.sha1(content.encode()).hexdigest()
doc = generate_doc(f, path, content)
summary = summarize_text(content)
scripts.append({
"name": f,
"path": path,
"sha1": sha1,
"doc": doc,
"summary": summary
})
except Exception as e:
st.error(f"⚠️ Error reading {f}: {e}")
return scripts
def run_script(script_path: str):
"""Execute a script safely and capture output"""
try:
if script_path.endswith(".sh"):
result = subprocess.run(["bash", script_path], capture_output=True, text=True, timeout=60)
elif script_path.endswith(".py"):
result = subprocess.run(["python3", script_path], capture_output=True, text=True, timeout=60)
else:
return {"error": "Unsupported script type"}
return {
"script": script_path,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except Exception as e:
return {"script": script_path, "error": str(e)}
# ─── Scan Scripts ─────────────────────────────────────────────
if st.button("🔍 Scan Local Scripts"):
scripts = scan_scripts()
st.session_state.local_scripts = scripts
st.success(f"✅ Found {len(scripts)} scripts.")
# ─── Script Selection ─────────────────────────────────────────
if st.session_state.local_scripts:
script_names = [s["name"] for s in st.session_state.local_scripts]
selected = st.multiselect("Select scripts to execute", script_names)
if st.button("▶️ Run Selected Scripts"):
for s in st.session_state.local_scripts:
if s["name"] in selected:
run_result = run_script(s["path"])
st.session_state.script_runs.append(run_result)
if "error" in run_result:
st.error(f"❌ {s['name']} failed: {run_result['error']}")
elif run_result["returncode"] != 0:
st.error(f"❌ {s['name']} exited with {run_result['returncode']}")
st.code(run_result["stderr"])
else:
st.success(f"✅ {s['name']} ran successfully")
st.code(run_result["stdout"][:500]) # preview output
# ─── Display Past Runs ────────────────────────────────────────
if st.session_state.script_runs:
st.subheader("📜 Script Run History")
for r in st.session_state.script_runs[-5:]: # show last 5
st.markdown(f"**Script:** {r.get('script')} \n**Return Code:** {r.get('returncode')}")
if r.get("stdout"):
st.code(r["stdout"][:300])
if r.get("stderr"):
st.code(r["stderr"][:300])