import requests
import json
import math
from typing import Dict, Any, Optional
def update_raptor_rag_node(flow_name: str, node_id: str, config: Dict[str, Any], api_token: str) -> Dict[str, Any]:
url = f"https://{flow_name}.flows.graphorlm.com/raptor-rag/{node_id}"
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
payload = {"config": config}
response = requests.patch(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
class RaptorRagHierarchicalOptimizer:
"""Advanced RAPTOR RAG configuration optimizer with tree analysis capabilities"""
def __init__(self, flow_name: str, api_token: str):
self.flow_name = flow_name
self.api_token = api_token
self.base_url = f"https://{flow_name}.flows.graphorlm.com"
def calculate_optimal_configuration(
self,
document_count: int,
document_complexity: str = "medium",
priority: str = "balanced"
) -> Dict[str, Any]:
"""Calculate optimal RAPTOR tree configuration based on document characteristics"""
# Base configurations for different priorities
priority_configs = {
"precision": {"base_topk": 15, "base_max_level": 3},
"balanced": {"base_topk": 25, "base_max_level": 4},
"coverage": {"base_topk": 40, "base_max_level": 5},
"exploration": {"base_topk": None, "base_max_level": 6}
}
base_config = priority_configs.get(priority, priority_configs["balanced"])
# Complexity multipliers
complexity_multipliers = {
"low": {"topk_mult": 0.8, "level_adj": -1},
"medium": {"topk_mult": 1.0, "level_adj": 0},
"high": {"topk_mult": 1.3, "level_adj": 1},
"very_high": {"topk_mult": 1.6, "level_adj": 2}
}
complexity = complexity_multipliers.get(document_complexity, complexity_multipliers["medium"])
# Calculate adjusted values
if base_config["base_topk"] is not None:
adjusted_topk = int(base_config["base_topk"] * complexity["topk_mult"])
# Document count adjustments
if document_count > 2000:
adjusted_topk = min(adjusted_topk * 1.4, 80)
elif document_count > 1000:
adjusted_topk = min(adjusted_topk * 1.2, 60)
elif document_count < 200:
adjusted_topk = max(adjusted_topk * 0.8, 10)
adjusted_topk = max(5, min(100, adjusted_topk))
else:
adjusted_topk = None
adjusted_max_level = base_config["base_max_level"] + complexity["level_adj"]
# Document count level adjustments
if document_count > 1500:
adjusted_max_level = min(adjusted_max_level + 1, 8)
elif document_count < 100:
adjusted_max_level = max(adjusted_max_level - 1, 2)
adjusted_max_level = max(2, min(8, adjusted_max_level))
return {
"topK": adjusted_topk,
"max_level": adjusted_max_level,
"optimization_context": {
"document_count": document_count,
"document_complexity": document_complexity,
"priority": priority,
"estimated_tree_nodes": self._estimate_tree_nodes(adjusted_max_level),
"estimated_clusters": self._estimate_clusters(document_count, adjusted_max_level)
}
}
def _estimate_tree_nodes(self, max_level: int) -> int:
"""Estimate total nodes in RAPTOR tree"""
return sum(math.pow(2, level) for level in range(max_level))
def _estimate_clusters(self, document_count: int, max_level: int) -> int:
"""Estimate number of clusters across all levels"""
base_chunks = document_count * 15 # Estimated chunks per document
total_clusters = 0
current_level_nodes = base_chunks
for level in range(max_level - 1):
clusters_at_level = max(current_level_nodes // 3, 1)
total_clusters += clusters_at_level
current_level_nodes = clusters_at_level
return total_clusters
def analyze_tree_performance(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze expected performance for given configuration"""
topk = config.get("topK")
max_level = config.get("max_level", 3)
analysis = {
"tree_complexity": self._analyze_tree_complexity(max_level),
"retrieval_scope": self._analyze_retrieval_scope(topk),
"memory_requirements": self._estimate_memory_requirements(max_level),
"processing_characteristics": self._analyze_processing_characteristics(max_level, topk),
"hierarchy_quality": self._assess_hierarchy_quality(max_level),
"performance_warnings": [],
"optimization_suggestions": []
}
# Generate warnings and suggestions
if max_level > 6:
analysis["performance_warnings"].append("Very deep tree may cause performance issues")
analysis["optimization_suggestions"].append("Consider reducing max_level to 5-6 for better performance")
if topk and topk > 70:
analysis["performance_warnings"].append("High Top K may impact traversal efficiency")
analysis["optimization_suggestions"].append("Consider reducing Top K for faster hierarchical retrieval")
if max_level < 3:
analysis["optimization_suggestions"].append("Increasing max_level may improve hierarchical benefits")
if topk and topk < 10:
analysis["optimization_suggestions"].append("Low Top K may miss relevant hierarchical content")
return analysis
def _analyze_tree_complexity(self, max_level: int) -> Dict[str, Any]:
"""Analyze tree structural complexity"""
estimated_nodes = self._estimate_tree_nodes(max_level)
if estimated_nodes < 15:
complexity_level = "Low"
description = "Simple hierarchical structure"
elif estimated_nodes < 63:
complexity_level = "Medium"
description = "Balanced multi-level hierarchy"
elif estimated_nodes < 255:
complexity_level = "High"
description = "Complex hierarchical abstraction"
else:
complexity_level = "Very High"
description = "Highly complex multi-level tree"
return {
"level": complexity_level,
"description": description,
"estimated_nodes": estimated_nodes,
"max_levels": max_level
}
def _analyze_retrieval_scope(self, topk: Optional[int]) -> Dict[str, Any]:
"""Analyze retrieval coverage scope"""
if topk is None:
return {
"scope": "Unlimited",
"description": "Complete hierarchical coverage",
"coverage_type": "Exhaustive"
}
elif topk <= 15:
return {
"scope": "Focused",
"description": "High-precision hierarchical retrieval",
"coverage_type": "Selective"
}
elif topk <= 35:
return {
"scope": "Balanced",
"description": "Good balance of precision and coverage",
"coverage_type": "Moderate"
}
else:
return {
"scope": "Comprehensive",
"description": "Broad hierarchical content coverage",
"coverage_type": "Extensive"
}
def _estimate_memory_requirements(self, max_level: int) -> Dict[str, Any]:
"""Estimate memory requirements for tree processing"""
base_memory_mb = 150
level_multiplier = math.pow(1.6, max_level - 2)
estimated_mb = int(base_memory_mb * level_multiplier)
if estimated_mb < 300:
requirement_level = "Low"
elif estimated_mb < 800:
requirement_level = "Medium"
elif estimated_mb < 2000:
requirement_level = "High"
else:
requirement_level = "Very High"
return {
"estimated_mb": estimated_mb,
"requirement_level": requirement_level,
"scaling_factor": f"{level_multiplier:.2f}x base"
}
def _analyze_processing_characteristics(self, max_level: int, topk: Optional[int]) -> Dict[str, Any]:
"""Analyze processing time and resource characteristics"""
base_construction_time = 45 # seconds
level_factor = math.pow(1.4, max_level - 2)
construction_time = int(base_construction_time * level_factor)
base_retrieval_time = 2 # seconds
topk_factor = math.log10(topk) if topk else 3
retrieval_time = int(base_retrieval_time * topk_factor * (max_level / 3))
return {
"tree_construction_time_estimate": f"~{construction_time}s",
"retrieval_time_estimate": f"~{retrieval_time}s",
"clustering_intensity": "High" if max_level > 4 else "Medium" if max_level > 2 else "Low",
"summarization_intensity": "High" if max_level > 5 else "Medium" if max_level > 3 else "Low"
}
def _assess_hierarchy_quality(self, max_level: int) -> Dict[str, Any]:
"""Assess expected hierarchy quality"""
if max_level >= 5:
quality = "Excellent"
description = "Rich multi-level abstractions with deep hierarchical insights"
elif max_level >= 4:
quality = "Good"
description = "Well-structured hierarchy with good abstraction levels"
elif max_level >= 3:
quality = "Standard"
description = "Basic hierarchical structure with moderate abstractions"
else:
quality = "Limited"
description = "Minimal hierarchical benefits with shallow abstractions"
return {
"quality_level": quality,
"description": description,
"abstraction_depth": max_level,
"hierarchy_richness": f"{max_level} levels of abstraction"
}
def update_with_optimization(
self,
node_id: str,
document_count: int,
document_complexity: str = "medium",
priority: str = "balanced"
) -> Dict[str, Any]:
"""Update node configuration with optimization"""
print(f"🌳 Optimizing RAPTOR RAG configuration...")
print(f" Document Count: {document_count}")
print(f" Document Complexity: {document_complexity}")
print(f" Priority: {priority}")
# Calculate optimal configuration
optimal_config = self.calculate_optimal_configuration(
document_count, document_complexity, priority
)
config_to_update = {
"topK": optimal_config["topK"],
"max_level": optimal_config["max_level"]
}
print(f"\n📊 Optimal Configuration:")
print(f" Top K: {config_to_update['topK'] or 'unlimited'}")
print(f" Max Level: {config_to_update['max_level']}")
print(f" Estimated Tree Nodes: {optimal_config['optimization_context']['estimated_tree_nodes']}")
print(f" Estimated Clusters: {optimal_config['optimization_context']['estimated_clusters']}")
# Analyze performance
performance_analysis = self.analyze_tree_performance(config_to_update)
print(f"\n⚡ Performance Analysis:")
print(f" Tree Complexity: {performance_analysis['tree_complexity']['level']}")
print(f" Retrieval Scope: {performance_analysis['retrieval_scope']['scope']}")
print(f" Memory Requirements: {performance_analysis['memory_requirements']['requirement_level']} ({performance_analysis['memory_requirements']['estimated_mb']}MB)")
if performance_analysis["performance_warnings"]:
print(f"\n⚠️ Performance Warnings:")
for warning in performance_analysis["performance_warnings"]:
print(f" - {warning}")
if performance_analysis["optimization_suggestions"]:
print(f"\n💡 Optimization Suggestions:")
for suggestion in performance_analysis["optimization_suggestions"]:
print(f" - {suggestion}")
# Update configuration
try:
result = update_raptor_rag_node(
self.flow_name, node_id, config_to_update, self.api_token
)
print(f"\n✅ Configuration updated successfully!")
print(f" Node ID: {result['node_id']}")
return {
"update_result": result,
"applied_config": config_to_update,
"optimization_context": optimal_config["optimization_context"],
"performance_analysis": performance_analysis
}
except Exception as e:
print(f"\n❌ Configuration update failed: {str(e)}")
raise
# Advanced usage examples
def demonstrate_raptor_optimization():
optimizer = RaptorRagHierarchicalOptimizer("my-rag-pipeline", "YOUR_API_TOKEN")
# Example 1: Large document collection with high complexity
print("=== Large Document Collection Optimization ===")
result1 = optimizer.update_with_optimization(
node_id="raptor-rag-1748287628685",
document_count=2500,
document_complexity="high",
priority="coverage"
)
# Example 2: Small specialized collection
print("\n=== Specialized Collection Optimization ===")
result2 = optimizer.update_with_optimization(
node_id="raptor-rag-1748287628686",
document_count=150,
document_complexity="medium",
priority="precision"
)
# Example 3: Research exploration setup
print("\n=== Research Exploration Optimization ===")
result3 = optimizer.update_with_optimization(
node_id="raptor-rag-1748287628687",
document_count=800,
document_complexity="very_high",
priority="exploration"
)
# Basic usage
try:
# Simple configuration update
result = update_raptor_rag_node(
"my-rag-pipeline",
"raptor-rag-1748287628685",
{"topK": 30, "max_level": 4},
"YOUR_API_TOKEN"
)
print(f"✅ Configuration updated: {result}")
# Advanced optimization
demonstrate_raptor_optimization()
except requests.exceptions.HTTPError as e:
print(f"❌ Error: {e}")
if e.response.status_code == 400:
error_detail = e.response.json().get("detail", "Invalid configuration")
print(f"Configuration error: {error_detail}")
elif e.response.status_code == 404:
print("RAPTOR RAG node not found")