#!/usr/bin/env python3 """ Ultimate Memory MCP Server - Ollama-Powered Self-hosted embeddings with Ollama for complete privacy and control Requires: fastmcp>=2.8.1, kuzu>=0.4.0, numpy>=1.26.0 Python 3.11+ required for modern type hints and performance improvements """ import asyncio import json import logging import uuid import requests from datetime import datetime from dataclasses import dataclass from enum import Enum import numpy as np from pathlib import Path import os import kuzu from fastmcp import FastMCP from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MemoryType(Enum): EPISODIC = "episodic" SEMANTIC = "semantic" PROCEDURAL = "procedural" @dataclass class MemoryNode: id: str content: str summary: str | None = None memory_type: MemoryType = MemoryType.EPISODIC confidence_score: float = 1.0 tags: list[str] | None = None retrieval_cues: list[str] | None = None embedding: list[float] | None = None created_at: datetime | None = None access_count: int = 0 def __post_init__(self): if self.tags is None: self.tags = [] if self.retrieval_cues is None: self.retrieval_cues = [] if self.created_at is None: self.created_at = datetime.now() @dataclass class SearchResult: memory_id: str content: str similarity_score: float memory_type: str confidence_score: float related_memories: list[dict] | None = None class OllamaProvider: """Ollama embedding and summary provider""" def __init__(self, base_url: str = "http://localhost:11434", embedding_model: str = "nomic-embed-text"): self.base_url = base_url.rstrip('/') self.embedding_model = embedding_model logger.info(f"Ollama provider initialized: {base_url} using {embedding_model}") async def generate_embedding(self, text: str) -> list[float]: """Generate embedding using Ollama""" try: # Ollama embedding API response = await asyncio.to_thread( requests.post, f"{self.base_url}/api/embeddings", json={ "model": self.embedding_model, "prompt": text }, timeout=30 ) if response.status_code != 200: raise Exception(f"Ollama API error: {response.status_code} - {response.text}") result = response.json() return result["embedding"] except Exception as e: logger.error(f"Ollama embedding failed: {e}") raise async def generate_summary(self, content: str) -> str: """Generate summary using Ollama (optional - requires a chat model)""" try: # Try to use a small model for summaries response = await asyncio.to_thread( requests.post, f"{self.base_url}/api/generate", json={ "model": "llama3.2:1b", # Small, fast model "prompt": f"Summarize this text in 1-2 sentences:\n\n{content}", "stream": False }, timeout=30 ) if response.status_code == 200: result = response.json() return result.get("response", "").strip() else: # Fallback to truncation return content[:200] + "..." if len(content) > 200 else content except Exception as e: logger.warning(f"Ollama summary failed, using truncation: {e}") return content[:200] + "..." if len(content) > 200 else content def check_connection(self) -> tuple[bool, str]: """Check if Ollama server is accessible and model is available""" try: # Test server connection response = requests.get(f"{self.base_url}/api/tags", timeout=10) if response.status_code != 200: return False, f"Server error: {response.status_code}" # Check if embedding model is available data = response.json() models = [m['name'] for m in data.get('models', [])] if self.embedding_model not in models: return False, f"Model '{self.embedding_model}' not found. Available: {models}" return True, "Connected successfully" except requests.exceptions.ConnectionError: return False, f"Cannot connect to Ollama server at {self.base_url}" except Exception as e: return False, f"Connection check failed: {str(e)}" class MemoryMCPServer: def __init__(self, kuzu_db_path: str, ollama_provider: OllamaProvider): self.db_path = Path(kuzu_db_path) self.ollama = ollama_provider self.db: kuzu.Database | None = None self.conn: kuzu.Connection | None = None async def initialize_db(self): """Initialize Kuzu database and create schema""" try: # Ensure directory exists self.db_path.mkdir(parents=True, exist_ok=True) self.db = kuzu.Database(str(self.db_path)) self.conn = kuzu.Connection(self.db) # Create schema if it doesn't exist await self._create_schema() logger.info(f"Kuzu database initialized at {self.db_path}") except Exception as e: logger.error(f"Failed to initialize database: {e}") raise async def _create_schema(self): """Create the graph schema in Kuzu""" schema_queries = [ # Node tables """CREATE NODE TABLE IF NOT EXISTS Memory ( id STRING, content STRING, summary STRING, memory_type STRING, confidence_score DOUBLE, created_at TIMESTAMP, updated_at TIMESTAMP, last_accessed_at TIMESTAMP, access_count INT64, source_type STRING, source_id STRING, tags STRING[], retrieval_cues STRING[], embedding DOUBLE[], PRIMARY KEY (id) )""", """CREATE NODE TABLE IF NOT EXISTS Conversation ( id STRING, title STRING, started_at TIMESTAMP, last_message_at TIMESTAMP, participant_count INT64, metadata STRING, PRIMARY KEY (id) )""", """CREATE NODE TABLE IF NOT EXISTS Cluster ( id STRING, name STRING, description STRING, cluster_embedding DOUBLE[], created_at TIMESTAMP, updated_at TIMESTAMP, PRIMARY KEY (id) )""", """CREATE NODE TABLE IF NOT EXISTS Topic ( id STRING, name STRING, description STRING, confidence DOUBLE, PRIMARY KEY (id) )""", # Relationship tables """CREATE REL TABLE IF NOT EXISTS RELATES_TO ( FROM Memory TO Memory, relationship_type STRING, strength DOUBLE, context STRING, bidirectional BOOLEAN, created_at TIMESTAMP, created_by STRING, confidence DOUBLE )""", """CREATE REL TABLE IF NOT EXISTS BELONGS_TO_CONVERSATION ( FROM Memory TO Conversation, sequence_number INT64, created_at TIMESTAMP )""", """CREATE REL TABLE IF NOT EXISTS IN_CLUSTER ( FROM Memory TO Cluster, membership_strength DOUBLE, added_at TIMESTAMP )""", """CREATE REL TABLE IF NOT EXISTS ABOUT_TOPIC ( FROM Memory TO Topic, relevance_score DOUBLE, extracted_at TIMESTAMP )""", """CREATE REL TABLE IF NOT EXISTS CAUSES ( FROM Memory TO Memory, causal_strength DOUBLE, mechanism STRING, conditions STRING )""", """CREATE REL TABLE IF NOT EXISTS CONTAINS ( FROM Memory TO Memory, containment_type STRING, specificity_level INT64 )""" ] for query in schema_queries: try: self.conn.execute(query) except Exception as e: # Ignore "already exists" errors if "already exists" not in str(e).lower(): logger.warning(f"Schema creation warning: {e}") async def generate_embedding(self, text: str) -> list[float]: """Generate embedding using Ollama""" return await self.ollama.generate_embedding(text) async def generate_summary(self, content: str) -> str: """Generate summary using Ollama""" return await self.ollama.generate_summary(content) def cosine_similarity(self, a: list[float], b: list[float]) -> float: """Calculate cosine similarity between two vectors""" a_np = np.array(a, dtype=np.float32) b_np = np.array(b, dtype=np.float32) return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np))) async def store_memory( self, content: str, memory_type: MemoryType = MemoryType.EPISODIC, source_type: str = "conversation", source_id: str | None = None, tags: list[str] | None = None, retrieval_cues: list[str] | None = None, conversation_id: str | None = None ) -> str: """Store a new memory in the graph database""" try: memory_id = str(uuid.uuid4()) # Generate embedding embedding = await self.generate_embedding(content) # Generate summary for longer content summary = None if len(content) > 200: summary = await self.generate_summary(content) now = datetime.now() # Create memory node create_query = """ CREATE (m:Memory { id: $id, content: $content, summary: $summary, memory_type: $memory_type, confidence_score: $confidence_score, created_at: $created_at, updated_at: $created_at, last_accessed_at: $created_at, access_count: 0, source_type: $source_type, source_id: $source_id, tags: $tags, retrieval_cues: $retrieval_cues, embedding: $embedding }) """ self.conn.execute(create_query, { 'id': memory_id, 'content': content, 'summary': summary, 'memory_type': memory_type.value, 'confidence_score': 1.0, 'created_at': now, 'source_type': source_type, 'source_id': source_id, 'tags': tags or [], 'retrieval_cues': retrieval_cues or [], 'embedding': embedding }) # Link to conversation if provided if conversation_id: # Create conversation node if it doesn't exist conv_query = """ MERGE (c:Conversation {id: $conv_id}) ON CREATE SET c.started_at = $now, c.last_message_at = $now, c.participant_count = 1 ON MATCH SET c.last_message_at = $now """ self.conn.execute(conv_query, {'conv_id': conversation_id, 'now': now}) # Create relationship rel_query = """ MATCH (m:Memory {id: $memory_id}), (c:Conversation {id: $conv_id}) CREATE (m)-[:BELONGS_TO_CONVERSATION { sequence_number: 0, created_at: $now }]->(c) """ self.conn.execute(rel_query, { 'memory_id': memory_id, 'conv_id': conversation_id, 'now': now }) # Auto-generate relationships await self._generate_auto_relationships(memory_id, embedding) logger.info(f"Stored memory {memory_id}") return memory_id except Exception as e: logger.error(f"Failed to store memory: {e}") raise async def _generate_auto_relationships(self, memory_id: str, embedding: list[float]): """Generate automatic relationships based on similarity""" try: # Get all existing memories with embeddings query = """ MATCH (m:Memory) WHERE m.id <> $memory_id AND m.embedding IS NOT NULL RETURN m.id, m.embedding, m.content """ result = self.conn.execute(query, {'memory_id': memory_id}) similarities = [] for record in result: other_id = record['m.id'] other_embedding = record['m.embedding'] other_content = record['m.content'] similarity = self.cosine_similarity(embedding, other_embedding) if similarity > 0.8: # High similarity threshold similarities.append((other_id, similarity, other_content)) # Create relationships for highly similar memories for other_id, similarity, other_content in similarities: rel_query = """ MATCH (m1:Memory {id: $memory_id}), (m2:Memory {id: $other_id}) CREATE (m1)-[:RELATES_TO { relationship_type: 'similar_to', strength: $similarity, context: 'auto-generated', bidirectional: true, created_at: $now, created_by: 'system', confidence: $similarity }]->(m2) """ self.conn.execute(rel_query, { 'memory_id': memory_id, 'other_id': other_id, 'similarity': similarity, 'now': datetime.now() }) except Exception as e: logger.error(f"Auto-relationship generation failed: {e}") async def search_memories_semantic( self, query: str, max_results: int = 20, similarity_threshold: float = 0.7, include_relationships: bool = True, memory_types: list[MemoryType] | None = None ) -> list[SearchResult]: """Search memories using semantic similarity""" try: query_embedding = await self.generate_embedding(query) # Get all memories with embeddings cypher_query = """ MATCH (m:Memory) WHERE m.embedding IS NOT NULL """ # Add memory type filter if specified if memory_types: type_list = [mt.value for mt in memory_types] cypher_query += f" AND m.memory_type IN {type_list}" cypher_query += " RETURN m" result = self.conn.execute(cypher_query) # Calculate similarities in Python candidates = [] for record in result: memory = record['m'] if memory['embedding']: similarity = self.cosine_similarity(query_embedding, memory['embedding']) if similarity >= similarity_threshold: candidates.append((memory, similarity)) # Sort by similarity and limit results candidates.sort(key=lambda x: x[1], reverse=True) candidates = candidates[:max_results] # Build search results search_results = [] for memory, similarity in candidates: related_memories = [] if include_relationships: # Get related memories rel_query = """ MATCH (m:Memory {id: $memory_id})-[r:RELATES_TO]->(related:Memory) RETURN related.id, related.content, r.relationship_type, r.strength ORDER BY r.strength DESC LIMIT 5 """ rel_result = self.conn.execute(rel_query, {'memory_id': memory['id']}) for rel_record in rel_result: related_memories.append({ 'memory_id': rel_record['related.id'], 'content': rel_record['related.content'], 'relationship_type': rel_record['r.relationship_type'], 'strength': rel_record['r.strength'] }) search_results.append(SearchResult( memory_id=memory['id'], content=memory['content'], similarity_score=similarity, memory_type=memory['memory_type'], confidence_score=memory['confidence_score'], related_memories=related_memories )) return search_results except Exception as e: logger.error(f"Semantic search failed: {e}") raise async def search_memories_by_keywords( self, query: str, max_results: int = 20 ) -> list[SearchResult]: """Search memories using keyword matching""" try: # Simple keyword search using CONTAINS cypher_query = """ MATCH (m:Memory) WHERE toLower(m.content) CONTAINS toLower($query) OR ANY(tag IN m.tags WHERE toLower(tag) CONTAINS toLower($query)) OR ANY(cue IN m.retrieval_cues WHERE toLower(cue) CONTAINS toLower($query)) RETURN m ORDER BY m.confidence_score DESC, m.created_at DESC LIMIT $limit """ result = self.conn.execute(cypher_query, {'query': query, 'limit': max_results}) search_results = [] for record in result: memory = record['m'] search_results.append(SearchResult( memory_id=memory['id'], content=memory['content'], similarity_score=1.0, # Keyword match score memory_type=memory['memory_type'], confidence_score=memory['confidence_score'], related_memories=[] )) return search_results except Exception as e: logger.error(f"Keyword search failed: {e}") raise async def find_connected_memories( self, memory_id: str, max_depth: int = 3, min_strength: float = 0.3 ) -> list[dict]: """Find memories connected through relationships using graph traversal""" try: cypher_query = f""" MATCH path = (start:Memory {{id: $memory_id}})-[:RELATES_TO*1..{max_depth}]->(connected:Memory) WHERE ALL(rel in relationships(path) WHERE rel.strength >= $min_strength) WITH connected, path, length(path) as depth RETURN DISTINCT connected.id as memory_id, connected.content as content, depth, reduce(strength = 1.0, rel in relationships(path) | strength * rel.strength) as path_strength, [rel in relationships(path) | rel.relationship_type] as relationship_path ORDER BY depth, path_strength DESC """ result = self.conn.execute(cypher_query, { 'memory_id': memory_id, 'min_strength': min_strength }) connected_memories = [] for record in result: connected_memories.append({ 'memory_id': record['memory_id'], 'content': record['content'], 'depth': record['depth'], 'path_strength': record['path_strength'], 'relationship_path': record['relationship_path'] }) return connected_memories except Exception as e: logger.error(f"Graph traversal failed: {e}") raise async def create_relationship( self, source_memory_id: str, target_memory_id: str, relationship_type: str, strength: float = 1.0, context: str | None = None, bidirectional: bool = False ) -> str: """Create a relationship between two memories""" try: cypher_query = """ MATCH (m1:Memory {id: $source_id}), (m2:Memory {id: $target_id}) CREATE (m1)-[r:RELATES_TO { relationship_type: $rel_type, strength: $strength, context: $context, bidirectional: $bidirectional, created_at: $now, created_by: 'user', confidence: 1.0 }]->(m2) RETURN id(r) as relationship_id """ result = self.conn.execute(cypher_query, { 'source_id': source_memory_id, 'target_id': target_memory_id, 'rel_type': relationship_type, 'strength': strength, 'context': context, 'bidirectional': bidirectional, 'now': datetime.now() }) relationship_id = str(result[0]['relationship_id']) # Create reverse relationship if bidirectional if bidirectional: reverse_query = """ MATCH (m1:Memory {id: $target_id}), (m2:Memory {id: $source_id}) CREATE (m1)-[:RELATES_TO { relationship_type: $rel_type, strength: $strength, context: $context, bidirectional: true, created_at: $now, created_by: 'user', confidence: 1.0 }]->(m2) """ self.conn.execute(reverse_query, { 'source_id': source_memory_id, 'target_id': target_memory_id, 'rel_type': relationship_type, 'strength': strength, 'context': context, 'now': datetime.now() }) logger.info(f"Created relationship {relationship_id}") return relationship_id except Exception as e: logger.error(f"Failed to create relationship: {e}") raise async def get_memory_by_id(self, memory_id: str) -> MemoryNode | None: """Retrieve a specific memory by ID""" try: # Update access tracking update_query = """ MATCH (m:Memory {id: $memory_id}) SET m.last_accessed_at = $now, m.access_count = m.access_count + 1 RETURN m """ result = self.conn.execute(update_query, { 'memory_id': memory_id, 'now': datetime.now() }) if result: memory = result[0]['m'] return MemoryNode( id=memory['id'], content=memory['content'], summary=memory['summary'], memory_type=MemoryType(memory['memory_type']), confidence_score=memory['confidence_score'], tags=memory['tags'], retrieval_cues=memory['retrieval_cues'], created_at=memory['created_at'], access_count=memory['access_count'] ) return None except Exception as e: logger.error(f"Failed to get memory {memory_id}: {e}") raise async def delete_memory(self, memory_id: str) -> bool: """Delete a memory (hard delete in graph DB)""" try: cypher_query = """ MATCH (m:Memory {id: $memory_id}) DETACH DELETE m RETURN count(m) as deleted_count """ result = self.conn.execute(cypher_query, {'memory_id': memory_id}) deleted = result[0]['deleted_count'] > 0 if deleted: logger.info(f"Deleted memory {memory_id}") return deleted except Exception as e: logger.error(f"Failed to delete memory {memory_id}: {e}") raise async def get_conversation_memories(self, conversation_id: str) -> list[MemoryNode]: """Get all memories for a conversation""" try: cypher_query = """ MATCH (m:Memory)-[b:BELONGS_TO_CONVERSATION]->(c:Conversation {id: $conv_id}) RETURN m ORDER BY b.sequence_number, b.created_at """ result = self.conn.execute(cypher_query, {'conv_id': conversation_id}) memories = [] for record in result: memory = record['m'] memories.append(MemoryNode( id=memory['id'], content=memory['content'], summary=memory['summary'], memory_type=MemoryType(memory['memory_type']), confidence_score=memory['confidence_score'], tags=memory['tags'], retrieval_cues=memory['retrieval_cues'], created_at=memory['created_at'], access_count=memory['access_count'] )) return memories except Exception as e: logger.error(f"Failed to get conversation memories: {e}") raise def close_db(self): """Close database connection""" if self.conn: self.conn.close() if self.db: self.db.close() # FastMCP Server Setup app = FastMCP("Memory Server") # Global memory server instance memory_server: MemoryMCPServer | None = None @app.tool() async def store_memory( content: str, memory_type: str = "episodic", tags: list[str] | None = None, conversation_id: str | None = None ) -> str: """Store a new memory with automatic relationship detection Args: content: The memory content to store memory_type: Type of memory (episodic, semantic, procedural) tags: Optional tags for categorization conversation_id: Optional conversation ID this memory belongs to """ if not memory_server: return "Error: Memory server not initialized" try: memory_id = await memory_server.store_memory( content=content, memory_type=MemoryType(memory_type), tags=tags, conversation_id=conversation_id ) return f"Memory stored successfully with ID: {memory_id}" except Exception as e: logger.error(f"Tool store_memory failed: {e}") return f"Error storing memory: {str(e)}" @app.tool() async def search_memories( query: str, max_results: int = 10, search_type: str = "semantic", include_relationships: bool = True ) -> str: """Search memories using semantic similarity or keywords Args: query: Search query max_results: Maximum number of results to return search_type: Type of search ('semantic' or 'keyword') include_relationships: Whether to include related memories """ if not memory_server: return "Error: Memory server not initialized" try: if search_type == "semantic": results = await memory_server.search_memories_semantic( query=query, max_results=max_results, include_relationships=include_relationships ) elif search_type == "keyword": results = await memory_server.search_memories_by_keywords( query=query, max_results=max_results ) else: return "Error: search_type must be 'semantic' or 'keyword'" if not results: return "No memories found matching your query." response = "## Search Results\n\n" for i, result in enumerate(results, 1): response += f"**{i}. Memory {result.memory_id}** (Score: {result.similarity_score:.3f})\n" response += f"{result.content}\n" if result.related_memories: response += f"*Related: {len(result.related_memories)} connections*\n" response += "\n" return response except Exception as e: logger.error(f"Tool search_memories failed: {e}") return f"Error searching memories: {str(e)}" @app.tool() async def get_memory(memory_id: str) -> str: """Retrieve a specific memory by ID Args: memory_id: The unique identifier of the memory to retrieve """ if not memory_server: return "Error: Memory server not initialized" try: memory = await memory_server.get_memory_by_id(memory_id) if memory: response = f"## Memory {memory.id}\n\n" response += f"**Content:** {memory.content}\n" response += f"**Type:** {memory.memory_type.value}\n" response += f"**Confidence:** {memory.confidence_score}\n" if memory.summary: response += f"**Summary:** {memory.summary}\n" if memory.tags: response += f"**Tags:** {', '.join(memory.tags)}\n" response += f"**Created:** {memory.created_at}\n" response += f"**Access Count:** {memory.access_count}\n" return response else: return "Memory not found" except Exception as e: logger.error(f"Tool get_memory failed: {e}") return f"Error retrieving memory: {str(e)}" @app.tool() async def find_connected_memories( memory_id: str, max_depth: int = 3, min_strength: float = 0.3 ) -> str: """Find memories connected through relationships Args: memory_id: Starting memory ID for traversal max_depth: Maximum relationship depth to traverse min_strength: Minimum relationship strength threshold """ if not memory_server: return "Error: Memory server not initialized" try: connections = await memory_server.find_connected_memories( memory_id=memory_id, max_depth=max_depth, min_strength=min_strength ) if not connections: return "No connected memories found." response = "## Connected Memories\n\n" for conn in connections: response += f"**Memory {conn['memory_id']}** (Depth: {conn['depth']}, Strength: {conn['path_strength']:.3f})\n" response += f"{conn['content']}\n" response += f"*Path: {' → '.join(conn['relationship_path'])}*\n\n" return response except Exception as e: logger.error(f"Tool find_connected_memories failed: {e}") return f"Error finding connected memories: {str(e)}" @app.tool() async def create_relationship( source_memory_id: str, target_memory_id: str, relationship_type: str, strength: float = 1.0, context: str | None = None, bidirectional: bool = False ) -> str: """Create a relationship between two memories Args: source_memory_id: ID of the source memory target_memory_id: ID of the target memory relationship_type: Type of relationship (e.g., 'causes', 'enables', 'contradicts') strength: Relationship strength (0.0 to 1.0) context: Optional context where this relationship applies bidirectional: Whether the relationship works both ways """ if not memory_server: return "Error: Memory server not initialized" try: relationship_id = await memory_server.create_relationship( source_memory_id=source_memory_id, target_memory_id=target_memory_id, relationship_type=relationship_type, strength=strength, context=context, bidirectional=bidirectional ) return f"Relationship created successfully with ID: {relationship_id}" except Exception as e: logger.error(f"Tool create_relationship failed: {e}") return f"Error creating relationship: {str(e)}" @app.tool() async def get_conversation_memories(conversation_id: str) -> str: """Get all memories for a specific conversation Args: conversation_id: The conversation ID to retrieve memories for """ if not memory_server: return "Error: Memory server not initialized" try: memories = await memory_server.get_conversation_memories(conversation_id) if not memories: return "No memories found for this conversation." response = f"## Conversation Memories ({len(memories)} total)\n\n" for i, memory in enumerate(memories, 1): response += f"**{i}. {memory.memory_type.value.title()} Memory**\n" response += f"{memory.content}\n" if memory.tags: response += f"*Tags: {', '.join(memory.tags)}*\n" response += f"*Created: {memory.created_at}*\n\n" return response except Exception as e: logger.error(f"Tool get_conversation_memories failed: {e}") return f"Error getting conversation memories: {str(e)}" @app.tool() async def delete_memory(memory_id: str) -> str: """Delete a memory Args: memory_id: The ID of the memory to delete """ if not memory_server: return "Error: Memory server not initialized" try: deleted = await memory_server.delete_memory(memory_id) if deleted: return "Memory deleted successfully" else: return "Memory not found" except Exception as e: logger.error(f"Tool delete_memory failed: {e}") return f"Error deleting memory: {str(e)}" @app.tool() async def analyze_memory_patterns() -> str: """Analyze patterns in the memory graph""" if not memory_server: return "Error: Memory server not initialized" try: # Get memory statistics stats_query = """ MATCH (m:Memory) RETURN count(m) as total_memories, avg(m.confidence_score) as avg_confidence, collect(DISTINCT m.memory_type) as memory_types """ stats_result = memory_server.conn.execute(stats_query) stats = stats_result[0] # Get relationship statistics rel_stats_query = """ MATCH ()-[r:RELATES_TO]->() RETURN count(r) as total_relationships, avg(r.strength) as avg_strength, collect(DISTINCT r.relationship_type) as relationship_types """ rel_stats_result = memory_server.conn.execute(rel_stats_query) rel_stats = rel_stats_result[0] # Find most connected memories connected_query = """ MATCH (m:Memory)-[r:RELATES_TO]-() RETURN m.id, m.content, count(r) as connection_count ORDER BY connection_count DESC LIMIT 5 """ connected_result = memory_server.conn.execute(connected_query) response = "## Memory Graph Analysis\n\n" response += f"**Embedding Provider:** Ollama ({memory_server.ollama.embedding_model})\n" response += f"**Total Memories:** {stats['total_memories']}\n" response += f"**Average Confidence:** {stats['avg_confidence']:.3f}\n" response += f"**Memory Types:** {', '.join(stats['memory_types'])}\n\n" response += f"**Total Relationships:** {rel_stats['total_relationships']}\n" response += f"**Average Relationship Strength:** {rel_stats['avg_strength']:.3f}\n" response += f"**Relationship Types:** {', '.join(rel_stats['relationship_types'])}\n\n" response += "**Most Connected Memories:**\n" for record in connected_result: response += f"- {record['m.content'][:100]}... ({record['connection_count']} connections)\n" return response except Exception as e: logger.error(f"Tool analyze_memory_patterns failed: {e}") return f"Error analyzing memory patterns: {str(e)}" @app.tool() async def check_ollama_status() -> str: """Check Ollama server status and configuration""" if not memory_server: return "Error: Memory server not initialized" try: connected, message = memory_server.ollama.check_connection() response = f"## Ollama Status\n\n" response += f"**Server URL:** {memory_server.ollama.base_url}\n" response += f"**Embedding Model:** {memory_server.ollama.embedding_model}\n" response += f"**Connection Status:** {'✅ Connected' if connected else '❌ Failed'}\n" response += f"**Details:** {message}\n" if connected: # Test embedding generation try: test_embedding = await memory_server.generate_embedding("test") response += f"**Embedding Test:** ✅ Success ({len(test_embedding)} dimensions)\n" except Exception as e: response += f"**Embedding Test:** ❌ Failed: {str(e)}\n" # Test summary generation try: test_summary = await memory_server.generate_summary("This is a test text for summary generation.") response += f"**Summary Test:** ✅ Success\n" response += f"**Sample Summary:** {test_summary}\n" except Exception as e: response += f"**Summary Test:** ⚠️ Failed (using truncation): {str(e)}\n" return response except Exception as e: logger.error(f"Tool check_ollama_status failed: {e}") return f"Error checking Ollama status: {str(e)}" async def main(): """Main entry point""" # Configuration from environment kuzu_db_path = os.getenv('KUZU_DB_PATH', './memory_graph_db') ollama_base_url = os.getenv('OLLAMA_BASE_URL', 'http://localhost:11434') ollama_model = os.getenv('OLLAMA_EMBEDDING_MODEL', 'nomic-embed-text') try: # Create Ollama provider ollama_provider = OllamaProvider(ollama_base_url, ollama_model) # Check connection connected, message = ollama_provider.check_connection() if not connected: logger.error(f"Ollama connection failed: {message}") print(f"❌ Ollama connection failed: {message}") print(f"\n💡 Quick fixes:") print(f"1. Start Ollama server: ollama serve") print(f"2. Pull the model: ollama pull {ollama_model}") print(f"3. Check URL: {ollama_base_url}") return logger.info(f"Ollama connected successfully using {ollama_model}") # Initialize memory server global memory_server memory_server = MemoryMCPServer(kuzu_db_path, ollama_provider) await memory_server.initialize_db() # Run the FastMCP server logger.info("Starting Ultimate Memory MCP Server with Ollama") await app.run() except Exception as e: logger.error(f"Failed to start server: {e}") raise finally: if memory_server: memory_server.close_db() if __name__ == "__main__": asyncio.run(main())