Spaces:
Runtime error
Runtime error
| import sys, os | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| UTILS_DIR = os.path.join(BASE_DIR, "utils") | |
| if UTILS_DIR not in sys.path: | |
| sys.path.insert(0, UTILS_DIR) | |
| import streamlit as st | |
| import sys, os, subprocess, hashlib | |
| # ─── Ensure utils/ is importable ────────────────────────────── | |
| UTILS_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) | |
| if UTILS_PATH not in sys.path: | |
| sys.path.append(UTILS_PATH) | |
| from utils.docgen import generate_doc | |
| from utils.summarizer import summarize_text | |
| st.title("🛠️ Local Script Orchestration & Execution") | |
| st.write("Scan, queue, and execute Omniscient scripts with results ingested back into the system.") | |
| # Init state | |
| if "local_scripts" not in st.session_state: | |
| st.session_state.local_scripts = [] | |
| if "script_runs" not in st.session_state: | |
| st.session_state.script_runs = [] | |
| SCAN_DIRS = [ | |
| "/opt/omniscient/bin", | |
| "/opt/omniscient/scripts", | |
| "/opt/omniscient/ai" | |
| ] | |
| def scan_scripts(): | |
| scripts = [] | |
| for d in SCAN_DIRS: | |
| if os.path.exists(d): | |
| for root, _, files in os.walk(d): | |
| for f in files: | |
| if f.endswith((".sh", ".py")): | |
| path = os.path.join(root, f) | |
| try: | |
| with open(path, "r", errors="ignore") as fh: | |
| content = fh.read() | |
| sha1 = hashlib.sha1(content.encode()).hexdigest() | |
| doc = generate_doc(f, path, content) | |
| summary = summarize_text(content) | |
| scripts.append({ | |
| "name": f, | |
| "path": path, | |
| "sha1": sha1, | |
| "doc": doc, | |
| "summary": summary | |
| }) | |
| except Exception as e: | |
| st.error(f"⚠️ Error reading {f}: {e}") | |
| return scripts | |
| def run_script(script_path: str): | |
| """Execute a script safely and capture output""" | |
| try: | |
| if script_path.endswith(".sh"): | |
| result = subprocess.run(["bash", script_path], capture_output=True, text=True, timeout=60) | |
| elif script_path.endswith(".py"): | |
| result = subprocess.run(["python3", script_path], capture_output=True, text=True, timeout=60) | |
| else: | |
| return {"error": "Unsupported script type"} | |
| return { | |
| "script": script_path, | |
| "stdout": result.stdout, | |
| "stderr": result.stderr, | |
| "returncode": result.returncode | |
| } | |
| except Exception as e: | |
| return {"script": script_path, "error": str(e)} | |
| # ─── Scan Scripts ───────────────────────────────────────────── | |
| if st.button("🔍 Scan Local Scripts"): | |
| scripts = scan_scripts() | |
| st.session_state.local_scripts = scripts | |
| st.success(f"✅ Found {len(scripts)} scripts.") | |
| # ─── Script Selection ───────────────────────────────────────── | |
| if st.session_state.local_scripts: | |
| script_names = [s["name"] for s in st.session_state.local_scripts] | |
| selected = st.multiselect("Select scripts to execute", script_names) | |
| if st.button("▶️ Run Selected Scripts"): | |
| for s in st.session_state.local_scripts: | |
| if s["name"] in selected: | |
| run_result = run_script(s["path"]) | |
| st.session_state.script_runs.append(run_result) | |
| if "error" in run_result: | |
| st.error(f"❌ {s['name']} failed: {run_result['error']}") | |
| elif run_result["returncode"] != 0: | |
| st.error(f"❌ {s['name']} exited with {run_result['returncode']}") | |
| st.code(run_result["stderr"]) | |
| else: | |
| st.success(f"✅ {s['name']} ran successfully") | |
| st.code(run_result["stdout"][:500]) # preview output | |
| # ─── Display Past Runs ──────────────────────────────────────── | |
| if st.session_state.script_runs: | |
| st.subheader("📜 Script Run History") | |
| for r in st.session_state.script_runs[-5:]: # show last 5 | |
| st.markdown(f"**Script:** {r.get('script')} \n**Return Code:** {r.get('returncode')}") | |
| if r.get("stdout"): | |
| st.code(r["stdout"][:300]) | |
| if r.get("stderr"): | |
| st.code(r["stderr"][:300]) | |