mcp-ultimate-memory/memory_mcp_server.py
Ryan Malloy d1bb9cbf56 🧠 Initial commit: Ultimate Memory MCP Server with Multi-Provider Support
🚀 Features:
- FastMCP 2.8.1+ integration with modern Python 3.11+ features
- Kuzu graph database for intelligent memory relationships
- Multi-provider embedding support (OpenAI, Ollama, Sentence Transformers)
- Automatic relationship detection via semantic similarity
- Graph traversal for connected memory discovery
- 8 MCP tools for comprehensive memory operations

🦙 Self-Hosted Focus:
- Ollama provider for complete privacy and control
- Zero external dependencies for sacred trust applications
- Production-ready with comprehensive testing
- Interactive setup script with provider selection

📦 Complete Package:
- memory_mcp_server.py (1,010 lines) - Main FastMCP server
- Comprehensive test suite and examples
- Detailed documentation including Ollama setup guide
- MCP client configuration examples
- Interactive setup script

🎯 Perfect for LLM memory systems requiring:
- Privacy-first architecture
- Intelligent relationship modeling
- Graph-based memory exploration
- Self-hosted deployment capabilities
2025-06-23 22:34:12 -06:00

1126 lines
40 KiB
Python

#!/usr/bin/env python3
"""
Ultimate Memory MCP Server - Ollama-Powered
Self-hosted embeddings with Ollama for complete privacy and control
Requires: fastmcp>=2.8.1, kuzu>=0.4.0, numpy>=1.26.0
Python 3.11+ required for modern type hints and performance improvements
"""
import asyncio
import json
import logging
import uuid
import requests
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
import numpy as np
from pathlib import Path
import os
import kuzu
from fastmcp import FastMCP
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MemoryType(Enum):
EPISODIC = "episodic"
SEMANTIC = "semantic"
PROCEDURAL = "procedural"
@dataclass
class MemoryNode:
id: str
content: str
summary: str | None = None
memory_type: MemoryType = MemoryType.EPISODIC
confidence_score: float = 1.0
tags: list[str] | None = None
retrieval_cues: list[str] | None = None
embedding: list[float] | None = None
created_at: datetime | None = None
access_count: int = 0
def __post_init__(self):
if self.tags is None:
self.tags = []
if self.retrieval_cues is None:
self.retrieval_cues = []
if self.created_at is None:
self.created_at = datetime.now()
@dataclass
class SearchResult:
memory_id: str
content: str
similarity_score: float
memory_type: str
confidence_score: float
related_memories: list[dict] | None = None
class OllamaProvider:
"""Ollama embedding and summary provider"""
def __init__(self, base_url: str = "http://localhost:11434", embedding_model: str = "nomic-embed-text"):
self.base_url = base_url.rstrip('/')
self.embedding_model = embedding_model
logger.info(f"Ollama provider initialized: {base_url} using {embedding_model}")
async def generate_embedding(self, text: str) -> list[float]:
"""Generate embedding using Ollama"""
try:
# Ollama embedding API
response = await asyncio.to_thread(
requests.post,
f"{self.base_url}/api/embeddings",
json={
"model": self.embedding_model,
"prompt": text
},
timeout=30
)
if response.status_code != 200:
raise Exception(f"Ollama API error: {response.status_code} - {response.text}")
result = response.json()
return result["embedding"]
except Exception as e:
logger.error(f"Ollama embedding failed: {e}")
raise
async def generate_summary(self, content: str) -> str:
"""Generate summary using Ollama (optional - requires a chat model)"""
try:
# Try to use a small model for summaries
response = await asyncio.to_thread(
requests.post,
f"{self.base_url}/api/generate",
json={
"model": "llama3.2:1b", # Small, fast model
"prompt": f"Summarize this text in 1-2 sentences:\n\n{content}",
"stream": False
},
timeout=30
)
if response.status_code == 200:
result = response.json()
return result.get("response", "").strip()
else:
# Fallback to truncation
return content[:200] + "..." if len(content) > 200 else content
except Exception as e:
logger.warning(f"Ollama summary failed, using truncation: {e}")
return content[:200] + "..." if len(content) > 200 else content
def check_connection(self) -> tuple[bool, str]:
"""Check if Ollama server is accessible and model is available"""
try:
# Test server connection
response = requests.get(f"{self.base_url}/api/tags", timeout=10)
if response.status_code != 200:
return False, f"Server error: {response.status_code}"
# Check if embedding model is available
data = response.json()
models = [m['name'] for m in data.get('models', [])]
if self.embedding_model not in models:
return False, f"Model '{self.embedding_model}' not found. Available: {models}"
return True, "Connected successfully"
except requests.exceptions.ConnectionError:
return False, f"Cannot connect to Ollama server at {self.base_url}"
except Exception as e:
return False, f"Connection check failed: {str(e)}"
class MemoryMCPServer:
def __init__(self, kuzu_db_path: str, ollama_provider: OllamaProvider):
self.db_path = Path(kuzu_db_path)
self.ollama = ollama_provider
self.db: kuzu.Database | None = None
self.conn: kuzu.Connection | None = None
async def initialize_db(self):
"""Initialize Kuzu database and create schema"""
try:
# Ensure directory exists
self.db_path.mkdir(parents=True, exist_ok=True)
self.db = kuzu.Database(str(self.db_path))
self.conn = kuzu.Connection(self.db)
# Create schema if it doesn't exist
await self._create_schema()
logger.info(f"Kuzu database initialized at {self.db_path}")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise
async def _create_schema(self):
"""Create the graph schema in Kuzu"""
schema_queries = [
# Node tables
"""CREATE NODE TABLE IF NOT EXISTS Memory (
id STRING,
content STRING,
summary STRING,
memory_type STRING,
confidence_score DOUBLE,
created_at TIMESTAMP,
updated_at TIMESTAMP,
last_accessed_at TIMESTAMP,
access_count INT64,
source_type STRING,
source_id STRING,
tags STRING[],
retrieval_cues STRING[],
embedding DOUBLE[],
PRIMARY KEY (id)
)""",
"""CREATE NODE TABLE IF NOT EXISTS Conversation (
id STRING,
title STRING,
started_at TIMESTAMP,
last_message_at TIMESTAMP,
participant_count INT64,
metadata STRING,
PRIMARY KEY (id)
)""",
"""CREATE NODE TABLE IF NOT EXISTS Cluster (
id STRING,
name STRING,
description STRING,
cluster_embedding DOUBLE[],
created_at TIMESTAMP,
updated_at TIMESTAMP,
PRIMARY KEY (id)
)""",
"""CREATE NODE TABLE IF NOT EXISTS Topic (
id STRING,
name STRING,
description STRING,
confidence DOUBLE,
PRIMARY KEY (id)
)""",
# Relationship tables
"""CREATE REL TABLE IF NOT EXISTS RELATES_TO (
FROM Memory TO Memory,
relationship_type STRING,
strength DOUBLE,
context STRING,
bidirectional BOOLEAN,
created_at TIMESTAMP,
created_by STRING,
confidence DOUBLE
)""",
"""CREATE REL TABLE IF NOT EXISTS BELONGS_TO_CONVERSATION (
FROM Memory TO Conversation,
sequence_number INT64,
created_at TIMESTAMP
)""",
"""CREATE REL TABLE IF NOT EXISTS IN_CLUSTER (
FROM Memory TO Cluster,
membership_strength DOUBLE,
added_at TIMESTAMP
)""",
"""CREATE REL TABLE IF NOT EXISTS ABOUT_TOPIC (
FROM Memory TO Topic,
relevance_score DOUBLE,
extracted_at TIMESTAMP
)""",
"""CREATE REL TABLE IF NOT EXISTS CAUSES (
FROM Memory TO Memory,
causal_strength DOUBLE,
mechanism STRING,
conditions STRING
)""",
"""CREATE REL TABLE IF NOT EXISTS CONTAINS (
FROM Memory TO Memory,
containment_type STRING,
specificity_level INT64
)"""
]
for query in schema_queries:
try:
self.conn.execute(query)
except Exception as e:
# Ignore "already exists" errors
if "already exists" not in str(e).lower():
logger.warning(f"Schema creation warning: {e}")
async def generate_embedding(self, text: str) -> list[float]:
"""Generate embedding using Ollama"""
return await self.ollama.generate_embedding(text)
async def generate_summary(self, content: str) -> str:
"""Generate summary using Ollama"""
return await self.ollama.generate_summary(content)
def cosine_similarity(self, a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors"""
a_np = np.array(a, dtype=np.float32)
b_np = np.array(b, dtype=np.float32)
return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np)))
async def store_memory(
self,
content: str,
memory_type: MemoryType = MemoryType.EPISODIC,
source_type: str = "conversation",
source_id: str | None = None,
tags: list[str] | None = None,
retrieval_cues: list[str] | None = None,
conversation_id: str | None = None
) -> str:
"""Store a new memory in the graph database"""
try:
memory_id = str(uuid.uuid4())
# Generate embedding
embedding = await self.generate_embedding(content)
# Generate summary for longer content
summary = None
if len(content) > 200:
summary = await self.generate_summary(content)
now = datetime.now()
# Create memory node
create_query = """
CREATE (m:Memory {
id: $id,
content: $content,
summary: $summary,
memory_type: $memory_type,
confidence_score: $confidence_score,
created_at: $created_at,
updated_at: $created_at,
last_accessed_at: $created_at,
access_count: 0,
source_type: $source_type,
source_id: $source_id,
tags: $tags,
retrieval_cues: $retrieval_cues,
embedding: $embedding
})
"""
self.conn.execute(create_query, {
'id': memory_id,
'content': content,
'summary': summary,
'memory_type': memory_type.value,
'confidence_score': 1.0,
'created_at': now,
'source_type': source_type,
'source_id': source_id,
'tags': tags or [],
'retrieval_cues': retrieval_cues or [],
'embedding': embedding
})
# Link to conversation if provided
if conversation_id:
# Create conversation node if it doesn't exist
conv_query = """
MERGE (c:Conversation {id: $conv_id})
ON CREATE SET c.started_at = $now, c.last_message_at = $now, c.participant_count = 1
ON MATCH SET c.last_message_at = $now
"""
self.conn.execute(conv_query, {'conv_id': conversation_id, 'now': now})
# Create relationship
rel_query = """
MATCH (m:Memory {id: $memory_id}), (c:Conversation {id: $conv_id})
CREATE (m)-[:BELONGS_TO_CONVERSATION {
sequence_number: 0,
created_at: $now
}]->(c)
"""
self.conn.execute(rel_query, {
'memory_id': memory_id,
'conv_id': conversation_id,
'now': now
})
# Auto-generate relationships
await self._generate_auto_relationships(memory_id, embedding)
logger.info(f"Stored memory {memory_id}")
return memory_id
except Exception as e:
logger.error(f"Failed to store memory: {e}")
raise
async def _generate_auto_relationships(self, memory_id: str, embedding: list[float]):
"""Generate automatic relationships based on similarity"""
try:
# Get all existing memories with embeddings
query = """
MATCH (m:Memory)
WHERE m.id <> $memory_id AND m.embedding IS NOT NULL
RETURN m.id, m.embedding, m.content
"""
result = self.conn.execute(query, {'memory_id': memory_id})
similarities = []
for record in result:
other_id = record['m.id']
other_embedding = record['m.embedding']
other_content = record['m.content']
similarity = self.cosine_similarity(embedding, other_embedding)
if similarity > 0.8: # High similarity threshold
similarities.append((other_id, similarity, other_content))
# Create relationships for highly similar memories
for other_id, similarity, other_content in similarities:
rel_query = """
MATCH (m1:Memory {id: $memory_id}), (m2:Memory {id: $other_id})
CREATE (m1)-[:RELATES_TO {
relationship_type: 'similar_to',
strength: $similarity,
context: 'auto-generated',
bidirectional: true,
created_at: $now,
created_by: 'system',
confidence: $similarity
}]->(m2)
"""
self.conn.execute(rel_query, {
'memory_id': memory_id,
'other_id': other_id,
'similarity': similarity,
'now': datetime.now()
})
except Exception as e:
logger.error(f"Auto-relationship generation failed: {e}")
async def search_memories_semantic(
self,
query: str,
max_results: int = 20,
similarity_threshold: float = 0.7,
include_relationships: bool = True,
memory_types: list[MemoryType] | None = None
) -> list[SearchResult]:
"""Search memories using semantic similarity"""
try:
query_embedding = await self.generate_embedding(query)
# Get all memories with embeddings
cypher_query = """
MATCH (m:Memory)
WHERE m.embedding IS NOT NULL
"""
# Add memory type filter if specified
if memory_types:
type_list = [mt.value for mt in memory_types]
cypher_query += f" AND m.memory_type IN {type_list}"
cypher_query += " RETURN m"
result = self.conn.execute(cypher_query)
# Calculate similarities in Python
candidates = []
for record in result:
memory = record['m']
if memory['embedding']:
similarity = self.cosine_similarity(query_embedding, memory['embedding'])
if similarity >= similarity_threshold:
candidates.append((memory, similarity))
# Sort by similarity and limit results
candidates.sort(key=lambda x: x[1], reverse=True)
candidates = candidates[:max_results]
# Build search results
search_results = []
for memory, similarity in candidates:
related_memories = []
if include_relationships:
# Get related memories
rel_query = """
MATCH (m:Memory {id: $memory_id})-[r:RELATES_TO]->(related:Memory)
RETURN related.id, related.content, r.relationship_type, r.strength
ORDER BY r.strength DESC
LIMIT 5
"""
rel_result = self.conn.execute(rel_query, {'memory_id': memory['id']})
for rel_record in rel_result:
related_memories.append({
'memory_id': rel_record['related.id'],
'content': rel_record['related.content'],
'relationship_type': rel_record['r.relationship_type'],
'strength': rel_record['r.strength']
})
search_results.append(SearchResult(
memory_id=memory['id'],
content=memory['content'],
similarity_score=similarity,
memory_type=memory['memory_type'],
confidence_score=memory['confidence_score'],
related_memories=related_memories
))
return search_results
except Exception as e:
logger.error(f"Semantic search failed: {e}")
raise
async def search_memories_by_keywords(
self,
query: str,
max_results: int = 20
) -> list[SearchResult]:
"""Search memories using keyword matching"""
try:
# Simple keyword search using CONTAINS
cypher_query = """
MATCH (m:Memory)
WHERE toLower(m.content) CONTAINS toLower($query)
OR ANY(tag IN m.tags WHERE toLower(tag) CONTAINS toLower($query))
OR ANY(cue IN m.retrieval_cues WHERE toLower(cue) CONTAINS toLower($query))
RETURN m
ORDER BY m.confidence_score DESC, m.created_at DESC
LIMIT $limit
"""
result = self.conn.execute(cypher_query, {'query': query, 'limit': max_results})
search_results = []
for record in result:
memory = record['m']
search_results.append(SearchResult(
memory_id=memory['id'],
content=memory['content'],
similarity_score=1.0, # Keyword match score
memory_type=memory['memory_type'],
confidence_score=memory['confidence_score'],
related_memories=[]
))
return search_results
except Exception as e:
logger.error(f"Keyword search failed: {e}")
raise
async def find_connected_memories(
self,
memory_id: str,
max_depth: int = 3,
min_strength: float = 0.3
) -> list[dict]:
"""Find memories connected through relationships using graph traversal"""
try:
cypher_query = f"""
MATCH path = (start:Memory {{id: $memory_id}})-[:RELATES_TO*1..{max_depth}]->(connected:Memory)
WHERE ALL(rel in relationships(path) WHERE rel.strength >= $min_strength)
WITH connected, path, length(path) as depth
RETURN DISTINCT connected.id as memory_id,
connected.content as content,
depth,
reduce(strength = 1.0, rel in relationships(path) | strength * rel.strength) as path_strength,
[rel in relationships(path) | rel.relationship_type] as relationship_path
ORDER BY depth, path_strength DESC
"""
result = self.conn.execute(cypher_query, {
'memory_id': memory_id,
'min_strength': min_strength
})
connected_memories = []
for record in result:
connected_memories.append({
'memory_id': record['memory_id'],
'content': record['content'],
'depth': record['depth'],
'path_strength': record['path_strength'],
'relationship_path': record['relationship_path']
})
return connected_memories
except Exception as e:
logger.error(f"Graph traversal failed: {e}")
raise
async def create_relationship(
self,
source_memory_id: str,
target_memory_id: str,
relationship_type: str,
strength: float = 1.0,
context: str | None = None,
bidirectional: bool = False
) -> str:
"""Create a relationship between two memories"""
try:
cypher_query = """
MATCH (m1:Memory {id: $source_id}), (m2:Memory {id: $target_id})
CREATE (m1)-[r:RELATES_TO {
relationship_type: $rel_type,
strength: $strength,
context: $context,
bidirectional: $bidirectional,
created_at: $now,
created_by: 'user',
confidence: 1.0
}]->(m2)
RETURN id(r) as relationship_id
"""
result = self.conn.execute(cypher_query, {
'source_id': source_memory_id,
'target_id': target_memory_id,
'rel_type': relationship_type,
'strength': strength,
'context': context,
'bidirectional': bidirectional,
'now': datetime.now()
})
relationship_id = str(result[0]['relationship_id'])
# Create reverse relationship if bidirectional
if bidirectional:
reverse_query = """
MATCH (m1:Memory {id: $target_id}), (m2:Memory {id: $source_id})
CREATE (m1)-[:RELATES_TO {
relationship_type: $rel_type,
strength: $strength,
context: $context,
bidirectional: true,
created_at: $now,
created_by: 'user',
confidence: 1.0
}]->(m2)
"""
self.conn.execute(reverse_query, {
'source_id': source_memory_id,
'target_id': target_memory_id,
'rel_type': relationship_type,
'strength': strength,
'context': context,
'now': datetime.now()
})
logger.info(f"Created relationship {relationship_id}")
return relationship_id
except Exception as e:
logger.error(f"Failed to create relationship: {e}")
raise
async def get_memory_by_id(self, memory_id: str) -> MemoryNode | None:
"""Retrieve a specific memory by ID"""
try:
# Update access tracking
update_query = """
MATCH (m:Memory {id: $memory_id})
SET m.last_accessed_at = $now, m.access_count = m.access_count + 1
RETURN m
"""
result = self.conn.execute(update_query, {
'memory_id': memory_id,
'now': datetime.now()
})
if result:
memory = result[0]['m']
return MemoryNode(
id=memory['id'],
content=memory['content'],
summary=memory['summary'],
memory_type=MemoryType(memory['memory_type']),
confidence_score=memory['confidence_score'],
tags=memory['tags'],
retrieval_cues=memory['retrieval_cues'],
created_at=memory['created_at'],
access_count=memory['access_count']
)
return None
except Exception as e:
logger.error(f"Failed to get memory {memory_id}: {e}")
raise
async def delete_memory(self, memory_id: str) -> bool:
"""Delete a memory (hard delete in graph DB)"""
try:
cypher_query = """
MATCH (m:Memory {id: $memory_id})
DETACH DELETE m
RETURN count(m) as deleted_count
"""
result = self.conn.execute(cypher_query, {'memory_id': memory_id})
deleted = result[0]['deleted_count'] > 0
if deleted:
logger.info(f"Deleted memory {memory_id}")
return deleted
except Exception as e:
logger.error(f"Failed to delete memory {memory_id}: {e}")
raise
async def get_conversation_memories(self, conversation_id: str) -> list[MemoryNode]:
"""Get all memories for a conversation"""
try:
cypher_query = """
MATCH (m:Memory)-[b:BELONGS_TO_CONVERSATION]->(c:Conversation {id: $conv_id})
RETURN m
ORDER BY b.sequence_number, b.created_at
"""
result = self.conn.execute(cypher_query, {'conv_id': conversation_id})
memories = []
for record in result:
memory = record['m']
memories.append(MemoryNode(
id=memory['id'],
content=memory['content'],
summary=memory['summary'],
memory_type=MemoryType(memory['memory_type']),
confidence_score=memory['confidence_score'],
tags=memory['tags'],
retrieval_cues=memory['retrieval_cues'],
created_at=memory['created_at'],
access_count=memory['access_count']
))
return memories
except Exception as e:
logger.error(f"Failed to get conversation memories: {e}")
raise
def close_db(self):
"""Close database connection"""
if self.conn:
self.conn.close()
if self.db:
self.db.close()
# FastMCP Server Setup
app = FastMCP("Memory Server")
# Global memory server instance
memory_server: MemoryMCPServer | None = None
@app.tool()
async def store_memory(
content: str,
memory_type: str = "episodic",
tags: list[str] | None = None,
conversation_id: str | None = None
) -> str:
"""Store a new memory with automatic relationship detection
Args:
content: The memory content to store
memory_type: Type of memory (episodic, semantic, procedural)
tags: Optional tags for categorization
conversation_id: Optional conversation ID this memory belongs to
"""
if not memory_server:
return "Error: Memory server not initialized"
try:
memory_id = await memory_server.store_memory(
content=content,
memory_type=MemoryType(memory_type),
tags=tags,
conversation_id=conversation_id
)
return f"Memory stored successfully with ID: {memory_id}"
except Exception as e:
logger.error(f"Tool store_memory failed: {e}")
return f"Error storing memory: {str(e)}"
@app.tool()
async def search_memories(
query: str,
max_results: int = 10,
search_type: str = "semantic",
include_relationships: bool = True
) -> str:
"""Search memories using semantic similarity or keywords
Args:
query: Search query
max_results: Maximum number of results to return
search_type: Type of search ('semantic' or 'keyword')
include_relationships: Whether to include related memories
"""
if not memory_server:
return "Error: Memory server not initialized"
try:
if search_type == "semantic":
results = await memory_server.search_memories_semantic(
query=query,
max_results=max_results,
include_relationships=include_relationships
)
elif search_type == "keyword":
results = await memory_server.search_memories_by_keywords(
query=query,
max_results=max_results
)
else:
return "Error: search_type must be 'semantic' or 'keyword'"
if not results:
return "No memories found matching your query."
response = "## Search Results\n\n"
for i, result in enumerate(results, 1):
response += f"**{i}. Memory {result.memory_id}** (Score: {result.similarity_score:.3f})\n"
response += f"{result.content}\n"
if result.related_memories:
response += f"*Related: {len(result.related_memories)} connections*\n"
response += "\n"
return response
except Exception as e:
logger.error(f"Tool search_memories failed: {e}")
return f"Error searching memories: {str(e)}"
@app.tool()
async def get_memory(memory_id: str) -> str:
"""Retrieve a specific memory by ID
Args:
memory_id: The unique identifier of the memory to retrieve
"""
if not memory_server:
return "Error: Memory server not initialized"
try:
memory = await memory_server.get_memory_by_id(memory_id)
if memory:
response = f"## Memory {memory.id}\n\n"
response += f"**Content:** {memory.content}\n"
response += f"**Type:** {memory.memory_type.value}\n"
response += f"**Confidence:** {memory.confidence_score}\n"
if memory.summary:
response += f"**Summary:** {memory.summary}\n"
if memory.tags:
response += f"**Tags:** {', '.join(memory.tags)}\n"
response += f"**Created:** {memory.created_at}\n"
response += f"**Access Count:** {memory.access_count}\n"
return response
else:
return "Memory not found"
except Exception as e:
logger.error(f"Tool get_memory failed: {e}")
return f"Error retrieving memory: {str(e)}"
@app.tool()
async def find_connected_memories(
memory_id: str,
max_depth: int = 3,
min_strength: float = 0.3
) -> str:
"""Find memories connected through relationships
Args:
memory_id: Starting memory ID for traversal
max_depth: Maximum relationship depth to traverse
min_strength: Minimum relationship strength threshold
"""
if not memory_server:
return "Error: Memory server not initialized"
try:
connections = await memory_server.find_connected_memories(
memory_id=memory_id,
max_depth=max_depth,
min_strength=min_strength
)
if not connections:
return "No connected memories found."
response = "## Connected Memories\n\n"
for conn in connections:
response += f"**Memory {conn['memory_id']}** (Depth: {conn['depth']}, Strength: {conn['path_strength']:.3f})\n"
response += f"{conn['content']}\n"
response += f"*Path: {''.join(conn['relationship_path'])}*\n\n"
return response
except Exception as e:
logger.error(f"Tool find_connected_memories failed: {e}")
return f"Error finding connected memories: {str(e)}"
@app.tool()
async def create_relationship(
source_memory_id: str,
target_memory_id: str,
relationship_type: str,
strength: float = 1.0,
context: str | None = None,
bidirectional: bool = False
) -> str:
"""Create a relationship between two memories
Args:
source_memory_id: ID of the source memory
target_memory_id: ID of the target memory
relationship_type: Type of relationship (e.g., 'causes', 'enables', 'contradicts')
strength: Relationship strength (0.0 to 1.0)
context: Optional context where this relationship applies
bidirectional: Whether the relationship works both ways
"""
if not memory_server:
return "Error: Memory server not initialized"
try:
relationship_id = await memory_server.create_relationship(
source_memory_id=source_memory_id,
target_memory_id=target_memory_id,
relationship_type=relationship_type,
strength=strength,
context=context,
bidirectional=bidirectional
)
return f"Relationship created successfully with ID: {relationship_id}"
except Exception as e:
logger.error(f"Tool create_relationship failed: {e}")
return f"Error creating relationship: {str(e)}"
@app.tool()
async def get_conversation_memories(conversation_id: str) -> str:
"""Get all memories for a specific conversation
Args:
conversation_id: The conversation ID to retrieve memories for
"""
if not memory_server:
return "Error: Memory server not initialized"
try:
memories = await memory_server.get_conversation_memories(conversation_id)
if not memories:
return "No memories found for this conversation."
response = f"## Conversation Memories ({len(memories)} total)\n\n"
for i, memory in enumerate(memories, 1):
response += f"**{i}. {memory.memory_type.value.title()} Memory**\n"
response += f"{memory.content}\n"
if memory.tags:
response += f"*Tags: {', '.join(memory.tags)}*\n"
response += f"*Created: {memory.created_at}*\n\n"
return response
except Exception as e:
logger.error(f"Tool get_conversation_memories failed: {e}")
return f"Error getting conversation memories: {str(e)}"
@app.tool()
async def delete_memory(memory_id: str) -> str:
"""Delete a memory
Args:
memory_id: The ID of the memory to delete
"""
if not memory_server:
return "Error: Memory server not initialized"
try:
deleted = await memory_server.delete_memory(memory_id)
if deleted:
return "Memory deleted successfully"
else:
return "Memory not found"
except Exception as e:
logger.error(f"Tool delete_memory failed: {e}")
return f"Error deleting memory: {str(e)}"
@app.tool()
async def analyze_memory_patterns() -> str:
"""Analyze patterns in the memory graph"""
if not memory_server:
return "Error: Memory server not initialized"
try:
# Get memory statistics
stats_query = """
MATCH (m:Memory)
RETURN
count(m) as total_memories,
avg(m.confidence_score) as avg_confidence,
collect(DISTINCT m.memory_type) as memory_types
"""
stats_result = memory_server.conn.execute(stats_query)
stats = stats_result[0]
# Get relationship statistics
rel_stats_query = """
MATCH ()-[r:RELATES_TO]->()
RETURN
count(r) as total_relationships,
avg(r.strength) as avg_strength,
collect(DISTINCT r.relationship_type) as relationship_types
"""
rel_stats_result = memory_server.conn.execute(rel_stats_query)
rel_stats = rel_stats_result[0]
# Find most connected memories
connected_query = """
MATCH (m:Memory)-[r:RELATES_TO]-()
RETURN m.id, m.content, count(r) as connection_count
ORDER BY connection_count DESC
LIMIT 5
"""
connected_result = memory_server.conn.execute(connected_query)
response = "## Memory Graph Analysis\n\n"
response += f"**Embedding Provider:** Ollama ({memory_server.ollama.embedding_model})\n"
response += f"**Total Memories:** {stats['total_memories']}\n"
response += f"**Average Confidence:** {stats['avg_confidence']:.3f}\n"
response += f"**Memory Types:** {', '.join(stats['memory_types'])}\n\n"
response += f"**Total Relationships:** {rel_stats['total_relationships']}\n"
response += f"**Average Relationship Strength:** {rel_stats['avg_strength']:.3f}\n"
response += f"**Relationship Types:** {', '.join(rel_stats['relationship_types'])}\n\n"
response += "**Most Connected Memories:**\n"
for record in connected_result:
response += f"- {record['m.content'][:100]}... ({record['connection_count']} connections)\n"
return response
except Exception as e:
logger.error(f"Tool analyze_memory_patterns failed: {e}")
return f"Error analyzing memory patterns: {str(e)}"
@app.tool()
async def check_ollama_status() -> str:
"""Check Ollama server status and configuration"""
if not memory_server:
return "Error: Memory server not initialized"
try:
connected, message = memory_server.ollama.check_connection()
response = f"## Ollama Status\n\n"
response += f"**Server URL:** {memory_server.ollama.base_url}\n"
response += f"**Embedding Model:** {memory_server.ollama.embedding_model}\n"
response += f"**Connection Status:** {'✅ Connected' if connected else '❌ Failed'}\n"
response += f"**Details:** {message}\n"
if connected:
# Test embedding generation
try:
test_embedding = await memory_server.generate_embedding("test")
response += f"**Embedding Test:** ✅ Success ({len(test_embedding)} dimensions)\n"
except Exception as e:
response += f"**Embedding Test:** ❌ Failed: {str(e)}\n"
# Test summary generation
try:
test_summary = await memory_server.generate_summary("This is a test text for summary generation.")
response += f"**Summary Test:** ✅ Success\n"
response += f"**Sample Summary:** {test_summary}\n"
except Exception as e:
response += f"**Summary Test:** ⚠️ Failed (using truncation): {str(e)}\n"
return response
except Exception as e:
logger.error(f"Tool check_ollama_status failed: {e}")
return f"Error checking Ollama status: {str(e)}"
async def main():
"""Main entry point"""
# Configuration from environment
kuzu_db_path = os.getenv('KUZU_DB_PATH', './memory_graph_db')
ollama_base_url = os.getenv('OLLAMA_BASE_URL', 'http://localhost:11434')
ollama_model = os.getenv('OLLAMA_EMBEDDING_MODEL', 'nomic-embed-text')
try:
# Create Ollama provider
ollama_provider = OllamaProvider(ollama_base_url, ollama_model)
# Check connection
connected, message = ollama_provider.check_connection()
if not connected:
logger.error(f"Ollama connection failed: {message}")
print(f"❌ Ollama connection failed: {message}")
print(f"\n💡 Quick fixes:")
print(f"1. Start Ollama server: ollama serve")
print(f"2. Pull the model: ollama pull {ollama_model}")
print(f"3. Check URL: {ollama_base_url}")
return
logger.info(f"Ollama connected successfully using {ollama_model}")
# Initialize memory server
global memory_server
memory_server = MemoryMCPServer(kuzu_db_path, ollama_provider)
await memory_server.initialize_db()
# Run the FastMCP server
logger.info("Starting Ultimate Memory MCP Server with Ollama")
await app.run()
except Exception as e:
logger.error(f"Failed to start server: {e}")
raise
finally:
if memory_server:
memory_server.close_db()
if __name__ == "__main__":
asyncio.run(main())