import sys, os BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) UTILS_DIR = os.path.join(BASE_DIR, "utils") if UTILS_DIR not in sys.path: sys.path.insert(0, UTILS_DIR) import streamlit as st import sys, os, subprocess, hashlib # ─── Ensure utils/ is importable ────────────────────────────── UTILS_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if UTILS_PATH not in sys.path: sys.path.append(UTILS_PATH) from utils.docgen import generate_doc from utils.summarizer import summarize_text st.title("🛠️ Local Script Orchestration & Execution") st.write("Scan, queue, and execute Omniscient scripts with results ingested back into the system.") # Init state if "local_scripts" not in st.session_state: st.session_state.local_scripts = [] if "script_runs" not in st.session_state: st.session_state.script_runs = [] SCAN_DIRS = [ "/opt/omniscient/bin", "/opt/omniscient/scripts", "/opt/omniscient/ai" ] def scan_scripts(): scripts = [] for d in SCAN_DIRS: if os.path.exists(d): for root, _, files in os.walk(d): for f in files: if f.endswith((".sh", ".py")): path = os.path.join(root, f) try: with open(path, "r", errors="ignore") as fh: content = fh.read() sha1 = hashlib.sha1(content.encode()).hexdigest() doc = generate_doc(f, path, content) summary = summarize_text(content) scripts.append({ "name": f, "path": path, "sha1": sha1, "doc": doc, "summary": summary }) except Exception as e: st.error(f"⚠️ Error reading {f}: {e}") return scripts def run_script(script_path: str): """Execute a script safely and capture output""" try: if script_path.endswith(".sh"): result = subprocess.run(["bash", script_path], capture_output=True, text=True, timeout=60) elif script_path.endswith(".py"): result = subprocess.run(["python3", script_path], capture_output=True, text=True, timeout=60) else: return {"error": "Unsupported script type"} return { "script": script_path, "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode } except Exception as e: return {"script": script_path, "error": str(e)} # ─── Scan Scripts ───────────────────────────────────────────── if st.button("🔍 Scan Local Scripts"): scripts = scan_scripts() st.session_state.local_scripts = scripts st.success(f"✅ Found {len(scripts)} scripts.") # ─── Script Selection ───────────────────────────────────────── if st.session_state.local_scripts: script_names = [s["name"] for s in st.session_state.local_scripts] selected = st.multiselect("Select scripts to execute", script_names) if st.button("▶️ Run Selected Scripts"): for s in st.session_state.local_scripts: if s["name"] in selected: run_result = run_script(s["path"]) st.session_state.script_runs.append(run_result) if "error" in run_result: st.error(f"❌ {s['name']} failed: {run_result['error']}") elif run_result["returncode"] != 0: st.error(f"❌ {s['name']} exited with {run_result['returncode']}") st.code(run_result["stderr"]) else: st.success(f"✅ {s['name']} ran successfully") st.code(run_result["stdout"][:500]) # preview output # ─── Display Past Runs ──────────────────────────────────────── if st.session_state.script_runs: st.subheader("📜 Script Run History") for r in st.session_state.script_runs[-5:]: # show last 5 st.markdown(f"**Script:** {r.get('script')} \n**Return Code:** {r.get('returncode')}") if r.get("stdout"): st.code(r["stdout"][:300]) if r.get("stderr"): st.code(r["stderr"][:300])