DETERMINATOR / src /services /neo4j_service.py
annabossler's picture
Add Neo4j knowledge graph integration
4b34235
raw
history blame
4.44 kB
"""Neo4j Knowledge Graph Service for Drug Repurposing"""
from neo4j import GraphDatabase
from typing import List, Dict, Optional, Any
import os
from dotenv import load_dotenv
import logging
load_dotenv()
logger = logging.getLogger(__name__)
class Neo4jService:
def __init__(self):
self.uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
self.user = os.getenv("NEO4J_USER", "neo4j")
self.password = os.getenv("NEO4J_PASSWORD")
self.database = os.getenv("NEO4J_DATABASE", "neo4j")
if not self.password:
logger.warning("⚠️ NEO4J_PASSWORD not set")
self.driver = None
return
try:
self.driver = GraphDatabase.driver(self.uri, auth=(self.user, self.password))
self.driver.verify_connectivity()
logger.info(f"✅ Neo4j connected: {self.uri} (db: {self.database})")
except Exception as e:
logger.error(f"❌ Neo4j connection failed: {e}")
self.driver = None
def is_connected(self) -> bool:
return self.driver is not None
def close(self):
if self.driver:
self.driver.close()
def ingest_search_results(self, disease_name: str, papers: List[Dict[str, Any]],
drugs_mentioned: List[str] = None) -> Dict[str, int]:
if not self.driver:
return {"error": "Neo4j not connected"}
stats = {"papers": 0, "drugs": 0, "relationships": 0, "errors": 0}
try:
with self.driver.session(database=self.database) as session:
session.run("MERGE (d:Disease {name: $name})", name=disease_name)
for paper in papers:
try:
paper_id = paper.get('id') or paper.get('url', '')
if not paper_id:
continue
session.run("""
MERGE (p:Paper {paper_id: $id})
SET p.title = $title,
p.abstract = $abstract,
p.url = $url,
p.source = $source,
p.updated_at = datetime()
""",
id=paper_id,
title=str(paper.get('title', ''))[:500],
abstract=str(paper.get('abstract', ''))[:2000],
url=str(paper.get('url', ''))[:500],
source=str(paper.get('source', ''))[:100])
session.run("""
MATCH (p:Paper {paper_id: $id})
MATCH (d:Disease {name: $disease})
MERGE (p)-[r:ABOUT]->(d)
""", id=paper_id, disease=disease_name)
stats['papers'] += 1
stats['relationships'] += 1
except Exception as e:
stats['errors'] += 1
if drugs_mentioned:
for drug in drugs_mentioned:
try:
session.run("MERGE (d:Drug {name: $name})", name=drug)
session.run("""
MATCH (drug:Drug {name: $drug})
MATCH (disease:Disease {name: $disease})
MERGE (drug)-[r:POTENTIAL_TREATMENT]->(disease)
""", drug=drug, disease=disease_name)
stats['drugs'] += 1
stats['relationships'] += 1
except Exception as e:
stats['errors'] += 1
logger.info(f"�� Neo4j ingestion: {stats['papers']} papers, {stats['drugs']} drugs")
except Exception as e:
logger.error(f"Neo4j ingestion error: {e}")
stats['errors'] += 1
return stats
_neo4j_service = None
def get_neo4j_service() -> Optional[Neo4jService]:
global _neo4j_service
if _neo4j_service is None:
_neo4j_service = Neo4jService()
return _neo4j_service if _neo4j_service and _neo4j_service.is_connected() else None