Visualization Framework for RAG Performance Evaluation
Solve the challenge of unmeasurable improvement effects through metrics like Recall@k, MRR, and faithfulness. Learn practical evaluation methods using Ragas, TruLens, and DeepEval.
Table of Contents
Visualization Framework for RAG Performance Evaluation
Solving the Challenge of Unmeasurable Improvement Effects
Many companies that have implemented RAG systems struggle to answer questions like "Has accuracy really improved?" and "Which parts should be improved?" Without quantitative evaluation metrics, both system improvement and ROI justification become difficult. This article explains a practical framework for multi-faceted evaluation of RAG system performance and enabling continuous improvement.
Why an Evaluation Framework is Necessary
Traditional Challenges
Typical problems in RAG system evaluation:
1. Subjective Evaluation: Intuitive judgments like "it seems better"
2. Partial Optimization: Search accuracy improved but answer quality declined
3. Unclear Improvement Points: Cannot identify what to improve
4. Difficult ROI Proof: Cannot demonstrate investment effects numerically
The Revolution an Evaluation Framework Brings
With proper evaluation framework implementation:
- •Visualization of Improvement Effects: Quantify each component's performance
- •Bottleneck Identification: Instantly discover problem areas
- •Continuous Improvement: Data-driven optimization cycle
- •Investment Decision Basis: Clear numerical-based decision making
System of Key Evaluation Metrics
1. Search Accuracy Metrics
Evaluating information retrieval, the first stage of RAG:
1import numpy as np
2from typing import List, Dict, Tuple
3from sklearn.metrics import ndcg_score
4
5class RetrievalMetrics:
6 """Metrics for evaluating search accuracy"""
7
8 def calculate_recall_at_k(self,
9 retrieved_docs: List[str],
10 relevant_docs: List[str],
11 k: int) -> float:
12 """Recall@k: Recall rate of relevant documents in k results"""
13 retrieved_k = set(retrieved_docs[:k])
14 relevant_set = set(relevant_docs)
15
16 if not relevant_set:
17 return 0.0
18
19 intersection = retrieved_k.intersection(relevant_set)
20 recall = len(intersection) / len(relevant_set)
21
22 return recall
23
24 def calculate_precision_at_k(self,
25 retrieved_docs: List[str],
26 relevant_docs: List[str],
27 k: int) -> float:
28 """Precision@k: Precision of relevant documents in k results"""
29 retrieved_k = retrieved_docs[:k]
30 relevant_set = set(relevant_docs)
31
32 if not retrieved_k:
33 return 0.0
34
35 hits = sum(1 for doc in retrieved_k if doc in relevant_set)
36 precision = hits / len(retrieved_k)
37
38 return precision
39
40 def calculate_mrr(self,
41 retrieved_docs: List[str],
42 relevant_docs: List[str]) -> float:
43 """MRR (Mean Reciprocal Rank): Reciprocal of first relevant document rank"""
44 relevant_set = set(relevant_docs)
45
46 for rank, doc in enumerate(retrieved_docs, 1):
47 if doc in relevant_set:
48 return 1.0 / rank
49
50 return 0.0
51
52 def calculate_map(self,
53 queries: List[str],
54 retrieved_results: List[List[str]],
55 relevant_results: List[List[str]]) -> float:
56 """MAP (Mean Average Precision): Average precision"""
57 ap_scores = []
58
59 for retrieved, relevant in zip(retrieved_results, relevant_results):
60 if not relevant:
61 continue
62
63 relevant_set = set(relevant)
64 ap = 0.0
65 relevant_count = 0
66
67 for rank, doc in enumerate(retrieved, 1):
68 if doc in relevant_set:
69 relevant_count += 1
70 precision = relevant_count / rank
71 ap += precision
72
73 if relevant_count > 0:
74 ap /= len(relevant_set)
75
76 ap_scores.append(ap)
77
78 return np.mean(ap_scores) if ap_scores else 0.0
79
80 def calculate_ndcg(self,
81 retrieved_docs: List[str],
82 relevance_scores: Dict[str, float],
83 k: int) -> float:
84 """NDCG@k: Normalized Discounted Cumulative Gain"""
85 retrieved_k = retrieved_docs[:k]
86
87 # Actual gain scores
88 actual_scores = [relevance_scores.get(doc, 0) for doc in retrieved_k]
89
90 # Ideal gain scores in perfect order
91 ideal_scores = sorted(relevance_scores.values(), reverse=True)[:k]
92
93 if not ideal_scores or sum(ideal_scores) == 0:
94 return 0.0
95
96 # Calculate NDCG
97 actual_dcg = self._calculate_dcg(actual_scores)
98 ideal_dcg = self._calculate_dcg(ideal_scores)
99
100 return actual_dcg / ideal_dcg if ideal_dcg > 0 else 0.0
101
102 def _calculate_dcg(self, scores: List[float]) -> float:
103 """Calculate DCG (Discounted Cumulative Gain)"""
104 dcg = 0.0
105 for i, score in enumerate(scores, 1):
106 dcg += score / np.log2(i + 1)
107 return dcg
2. Generation Quality Metrics
Evaluating answer generation quality by LLM:
1from rouge_score import rouge_scorer
2from bert_score import score as bert_score
3import torch
4from transformers import AutoTokenizer, AutoModelForSequenceClassification
5
6class GenerationMetrics:
7 """Metrics for evaluating generation quality"""
8
9 def __init__(self):
10 self.rouge_scorer = rouge_scorer.RougeScorer(
11 ['rouge1', 'rouge2', 'rougeL'], use_stemmer=True
12 )
13 # BERT model for English
14 self.tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
15 self.model = AutoModelForSequenceClassification.from_pretrained(
16 'bert-base-uncased'
17 )
18
19 def calculate_rouge_scores(self,
20 generated: str,
21 reference: str) -> Dict[str, float]:
22 """ROUGE Score: Evaluation based on n-gram overlap"""
23 scores = self.rouge_scorer.score(reference, generated)
24
25 return {
26 'rouge1_f1': scores['rouge1'].fmeasure,
27 'rouge2_f1': scores['rouge2'].fmeasure,
28 'rougeL_f1': scores['rougeL'].fmeasure,
29 'rouge1_precision': scores['rouge1'].precision,
30 'rouge1_recall': scores['rouge1'].recall
31 }
32
33 def calculate_bert_score(self,
34 generated: List[str],
35 references: List[str]) -> Dict[str, float]:
36 """BERTScore: Context-aware semantic similarity"""
37 P, R, F1 = bert_score(
38 generated,
39 references,
40 lang='en',
41 model_type='bert-base-uncased'
42 )
43
44 return {
45 'bert_precision': P.mean().item(),
46 'bert_recall': R.mean().item(),
47 'bert_f1': F1.mean().item()
48 }
49
50 def calculate_fluency_score(self, text: str) -> float:
51 """Fluency Score: Evaluate text naturalness"""
52 # Perplexity-based evaluation
53 inputs = self.tokenizer(text, return_tensors='pt', truncation=True)
54
55 with torch.no_grad():
56 outputs = self.model(**inputs)
57 # Calculate fluency score from logits
58 fluency_score = torch.softmax(outputs.logits, dim=-1).max().item()
59
60 return fluency_score
61
62 def calculate_coherence_score(self, text: str) -> float:
63 """Coherence Score: Evaluate logical consistency"""
64 sentences = text.split('. ')
65 if len(sentences) < 2:
66 return 1.0
67
68 coherence_scores = []
69
70 for i in range(len(sentences) - 1):
71 if not sentences[i] or not sentences[i+1]:
72 continue
73
74 # Calculate semantic similarity between adjacent sentences
75 inputs1 = self.tokenizer(sentences[i], return_tensors='pt', truncation=True)
76 inputs2 = self.tokenizer(sentences[i+1], return_tensors='pt', truncation=True)
77
78 with torch.no_grad():
79 # Note: This is simplified - actual implementation would use proper embeddings
80 emb1 = self.model.bert(**inputs1).last_hidden_state.mean(dim=1)
81 emb2 = self.model.bert(**inputs2).last_hidden_state.mean(dim=1)
82
83 # Cosine similarity
84 similarity = torch.cosine_similarity(emb1, emb2).item()
85 coherence_scores.append(similarity)
86
87 return np.mean(coherence_scores) if coherence_scores else 0.0
3. RAG-Specific Evaluation Metrics
Performance evaluation specific to RAG systems:
1class RAGSpecificMetrics:
2 """RAG-specific evaluation metrics"""
3
4 def __init__(self, llm_client=None):
5 self.llm_client = llm_client
6
7 def calculate_faithfulness(self,
8 answer: str,
9 context: str,
10 use_llm: bool = True) -> float:
11 """Faithfulness: Whether answer is based on context"""
12 if use_llm and self.llm_client:
13 prompt = f"""
14 Evaluate whether the following answer is faithfully based on the context information.
15
16 Context: {context}
17 Answer: {answer}
18
19 Evaluation criteria:
20 1.0: Completely faithful (all information based on context)
21 0.8: Mostly faithful (slight inference but reasonable)
22 0.5: Partially faithful (some information inaccurate)
23 0.0: Unfaithful (contradicts context or unrelated)
24
25 Please respond with only the numerical score.
26 """
27
28 response = self.llm_client.generate(prompt)
29 try:
30 return float(response.strip())
31 except:
32 return 0.0
33 else:
34 # Simple rule-based evaluation
35 context_words = set(context.split())
36 answer_words = set(answer.split())
37
38 if not answer_words:
39 return 0.0
40
41 overlap = len(context_words.intersection(answer_words))
42 return min(overlap / len(answer_words), 1.0)
43
44 def calculate_answer_relevance(self,
45 question: str,
46 answer: str,
47 use_llm: bool = True) -> float:
48 """Answer Relevance: Appropriateness of answer to question"""
49 if use_llm and self.llm_client:
50 prompt = f"""
51 Evaluate the relevance of the answer to the question.
52
53 Question: {question}
54 Answer: {answer}
55
56 Evaluation criteria:
57 1.0: Completely relevant (directly and fully answers the question)
58 0.7: Highly relevant (answers main parts of question)
59 0.4: Partially relevant (answers only part)
60 0.0: Irrelevant (does not answer the question)
61
62 Please respond with only the numerical score.
63 """
64
65 response = self.llm_client.generate(prompt)
66 try:
67 return float(response.strip())
68 except:
69 return 0.0
70 else:
71 # Simple keyword matching
72 question_words = set(question.lower().split())
73 answer_words = set(answer.lower().split())
74
75 overlap = len(question_words.intersection(answer_words))
76 return min(overlap / max(len(question_words), 1), 1.0)
77
78 def calculate_context_precision(self,
79 retrieved_contexts: List[str],
80 relevant_contexts: List[str]) -> float:
81 """Context Precision: Accuracy of retrieved contexts"""
82 if not retrieved_contexts:
83 return 0.0
84
85 relevant_set = set(relevant_contexts)
86 precision_scores = []
87
88 for i, context in enumerate(retrieved_contexts):
89 if context in relevant_set:
90 # Rank-aware precision
91 precision_scores.append(1.0 / (i + 1))
92
93 return sum(precision_scores) / len(retrieved_contexts)
94
95 def calculate_hallucination_score(self,
96 answer: str,
97 context: str) -> float:
98 """Hallucination Score: Degree of generating non-existent information"""
99 import re
100
101 # Extract numbers
102 answer_numbers = set(re.findall(r'd+.?d*', answer))
103 context_numbers = set(re.findall(r'd+.?d*', context))
104
105 # Simple entity extraction (words starting with capital)
106 answer_entities = set(re.findall(r'[A-Z][a-z]+', answer))
107 context_entities = set(re.findall(r'[A-Z][a-z]+', context))
108
109 hallucination_count = 0
110 total_count = 0
111
112 # Check number hallucinations
113 for num in answer_numbers:
114 total_count += 1
115 if num not in context_numbers:
116 hallucination_count += 1
117
118 # Check entity hallucinations
119 for entity in answer_entities:
120 total_count += 1
121 if entity not in context_entities:
122 hallucination_count += 1
123
124 if total_count == 0:
125 return 0.0
126
127 # Hallucination score (lower is better)
128 return hallucination_count / total_count
Implementation of Evaluation Tools
Automated Evaluation with Ragas
1from ragas import evaluate
2from ragas.metrics import (
3 faithfulness,
4 answer_relevancy,
5 context_precision,
6 context_recall,
7 answer_correctness,
8 answer_similarity
9)
10from datasets import Dataset
11import pandas as pd
12
13class RagasEvaluator:
14 """Comprehensive evaluation using Ragas"""
15
16 def __init__(self):
17 self.metrics = [
18 faithfulness,
19 answer_relevancy,
20 context_precision,
21 context_recall,
22 answer_correctness,
23 answer_similarity
24 ]
25
26 def prepare_dataset(self,
27 questions: List[str],
28 answers: List[str],
29 contexts: List[List[str]],
30 ground_truths: List[str] = None) -> Dataset:
31 """Prepare evaluation dataset"""
32 data = {
33 'question': questions,
34 'answer': answers,
35 'contexts': contexts
36 }
37
38 if ground_truths:
39 data['ground_truth'] = ground_truths
40
41 df = pd.DataFrame(data)
42 return Dataset.from_pandas(df)
43
44 def evaluate_rag_system(self, dataset: Dataset) -> Dict[str, float]:
45 """Comprehensive RAG system evaluation"""
46 results = evaluate(
47 dataset,
48 metrics=self.metrics
49 )
50
51 # Format results
52 evaluation_results = {
53 'faithfulness': results['faithfulness'],
54 'answer_relevancy': results['answer_relevancy'],
55 'context_precision': results['context_precision'],
56 'context_recall': results['context_recall'],
57 'answer_correctness': results.get('answer_correctness', None),
58 'answer_similarity': results.get('answer_similarity', None),
59 'overall_score': self._calculate_overall_score(results)
60 }
61
62 return evaluation_results
63
64 def _calculate_overall_score(self, results: Dict) -> float:
65 """Calculate overall score"""
66 weights = {
67 'faithfulness': 0.25,
68 'answer_relevancy': 0.25,
69 'context_precision': 0.20,
70 'context_recall': 0.20,
71 'answer_correctness': 0.10
72 }
73
74 score = 0.0
75 total_weight = 0.0
76
77 for metric, weight in weights.items():
78 if metric in results and results[metric] is not None:
79 score += results[metric] * weight
80 total_weight += weight
81
82 return score / total_weight if total_weight > 0 else 0.0
83
84 def generate_report(self, results: Dict[str, float]) -> str:
85 """Generate evaluation report"""
86 report = "=" * 50 + "
87"
88 report += "RAG System Evaluation Report
89"
90 report += "=" * 50 + "
91
92"
93
94 for metric, value in results.items():
95 if value is not None:
96 status = self._get_status(metric, value)
97 report += f"{metric:20s}: {value:.3f} [{status}]
98"
99
100 report += "
101" + "-" * 50 + "
102"
103 report += "Recommended Improvement Actions:
104"
105 report += self._generate_recommendations(results)
106
107 return report
108
109 def _get_status(self, metric: str, value: float) -> str:
110 """Determine metric status"""
111 thresholds = {
112 'excellent': 0.9,
113 'good': 0.7,
114 'fair': 0.5,
115 'poor': 0.0
116 }
117
118 if value >= thresholds['excellent']:
119 return "Excellent"
120 elif value >= thresholds['good']:
121 return "Good"
122 elif value >= thresholds['fair']:
123 return "Needs Improvement"
124 else:
125 return "Critical"
126
127 def _generate_recommendations(self, results: Dict) -> str:
128 """Generate improvement recommendations"""
129 recommendations = []
130
131 if results.get('faithfulness', 1.0) < 0.7:
132 recommendations.append(
133 "- Low faithfulness: Consider strengthening grounding, improving prompts"
134 )
135
136 if results.get('answer_relevancy', 1.0) < 0.7:
137 recommendations.append(
138 "- Low answer relevance: Consider improving query understanding, adding intent classification"
139 )
140
141 if results.get('context_precision', 1.0) < 0.7:
142 recommendations.append(
143 "- Low context precision: Consider improving search algorithms, adding reranking"
144 )
145
146 if results.get('context_recall', 1.0) < 0.7:
147 recommendations.append(
148 "- Low context recall: Consider reviewing index strategy, adjusting chunk size"
149 )
150
151 return "
152".join(recommendations) if recommendations else "Currently performing well."
Summary and Future Prospects
The RAG evaluation framework is not just a measurement tool but an important foundation that enables continuous improvement. Through proper metric selection, automation tool utilization, and continuous monitoring, RAG system performance can be objectively understood and data-driven improvements become possible.
At INDX, we design and implement evaluation frameworks tailored to our clients' business requirements, supporting RAG system value maximization from "visualization" to "continuous improvement."