423 lines
17 KiB
Python
423 lines
17 KiB
Python
import logging
|
|
from typing import Dict, List, Any, Optional
|
|
from django.db.models import Q, Count
|
|
from django.utils import timezone
|
|
# from sklearn.feature_extraction.text import TfidfVectorizer
|
|
# from sklearn.metrics.pairwise import cosine_similarity
|
|
import re
|
|
|
|
from ..models import KnowledgeBaseArticle, KnowledgeBaseUsage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class KnowledgeBaseSearchService:
|
|
"""Service for searching and discovering knowledge base articles"""
|
|
|
|
def __init__(self):
|
|
self.model_version = "v1.0"
|
|
self.min_similarity_threshold = 0.1
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
article_types: Optional[List[str]] = None,
|
|
categories: Optional[List[str]] = None,
|
|
difficulty_levels: Optional[List[str]] = None,
|
|
limit: int = 20,
|
|
offset: int = 0
|
|
) -> Dict[str, Any]:
|
|
"""Search knowledge base articles with various filters"""
|
|
|
|
try:
|
|
# Build base queryset
|
|
queryset = KnowledgeBaseArticle.objects.filter(status='PUBLISHED')
|
|
|
|
# Apply filters
|
|
if article_types:
|
|
queryset = queryset.filter(article_type__in=article_types)
|
|
|
|
if categories:
|
|
queryset = queryset.filter(category__in=categories)
|
|
|
|
if difficulty_levels:
|
|
queryset = queryset.filter(difficulty_level__in=difficulty_levels)
|
|
|
|
# Get all matching articles for similarity calculation
|
|
all_articles = list(queryset)
|
|
|
|
if not all_articles:
|
|
return {
|
|
'results': [],
|
|
'total_count': 0,
|
|
'query': query,
|
|
'filters': {
|
|
'article_types': article_types,
|
|
'categories': categories,
|
|
'difficulty_levels': difficulty_levels
|
|
}
|
|
}
|
|
|
|
# Calculate similarity scores
|
|
articles_with_scores = self._calculate_similarity_scores(query, all_articles)
|
|
|
|
# Sort by relevance (combination of similarity and popularity)
|
|
articles_with_scores.sort(
|
|
key=lambda x: (x['similarity_score'] * 0.7) + (x['popularity_score'] * 0.3),
|
|
reverse=True
|
|
)
|
|
|
|
# Apply pagination
|
|
paginated_articles = articles_with_scores[offset:offset + limit]
|
|
|
|
# Format results
|
|
results = []
|
|
for article_data in paginated_articles:
|
|
article = article_data['article']
|
|
results.append({
|
|
'id': str(article.id),
|
|
'title': article.title,
|
|
'slug': article.slug,
|
|
'summary': article.summary,
|
|
'article_type': article.article_type,
|
|
'category': article.category,
|
|
'subcategory': article.subcategory,
|
|
'tags': article.tags,
|
|
'difficulty_level': article.difficulty_level,
|
|
'view_count': article.view_count,
|
|
'created_at': article.created_at.isoformat(),
|
|
'updated_at': article.updated_at.isoformat(),
|
|
'author': article.author.username if article.author else None,
|
|
'similarity_score': article_data['similarity_score'],
|
|
'relevance_score': article_data['relevance_score'],
|
|
'popularity_score': article_data['popularity_score'],
|
|
'matching_keywords': article_data['matching_keywords']
|
|
})
|
|
|
|
return {
|
|
'results': results,
|
|
'total_count': len(articles_with_scores),
|
|
'query': query,
|
|
'filters': {
|
|
'article_types': article_types,
|
|
'categories': categories,
|
|
'difficulty_levels': difficulty_levels
|
|
},
|
|
'pagination': {
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'has_more': (offset + limit) < len(articles_with_scores)
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to search knowledge base: {str(e)}")
|
|
raise
|
|
|
|
def find_related_articles(
|
|
self,
|
|
article_id: str,
|
|
limit: int = 5
|
|
) -> List[Dict[str, Any]]:
|
|
"""Find articles related to a specific article"""
|
|
|
|
try:
|
|
article = KnowledgeBaseArticle.objects.get(id=article_id)
|
|
|
|
# Find articles with similar categories, tags, or content
|
|
related_articles = KnowledgeBaseArticle.objects.filter(
|
|
status='PUBLISHED'
|
|
).exclude(id=article_id).filter(
|
|
Q(category=article.category) |
|
|
Q(subcategory=article.subcategory) |
|
|
Q(tags__overlap=article.tags) |
|
|
Q(article_type=article.article_type)
|
|
).distinct()
|
|
|
|
if not related_articles.exists():
|
|
return []
|
|
|
|
# Calculate similarity scores
|
|
article_text = f"{article.title} {article.summary} {' '.join(article.tags)}"
|
|
articles_with_scores = []
|
|
|
|
for related_article in related_articles:
|
|
related_text = f"{related_article.title} {related_article.summary} {' '.join(related_article.tags)}"
|
|
similarity = self._calculate_text_similarity(article_text, related_text)
|
|
|
|
if similarity >= self.min_similarity_threshold:
|
|
articles_with_scores.append({
|
|
'article': related_article,
|
|
'similarity_score': similarity
|
|
})
|
|
|
|
# Sort by similarity and return top matches
|
|
articles_with_scores.sort(key=lambda x: x['similarity_score'], reverse=True)
|
|
|
|
results = []
|
|
for article_data in articles_with_scores[:limit]:
|
|
article = article_data['article']
|
|
results.append({
|
|
'id': str(article.id),
|
|
'title': article.title,
|
|
'slug': article.slug,
|
|
'summary': article.summary,
|
|
'article_type': article.article_type,
|
|
'category': article.category,
|
|
'similarity_score': article_data['similarity_score']
|
|
})
|
|
|
|
return results
|
|
|
|
except KnowledgeBaseArticle.DoesNotExist:
|
|
raise ValueError(f"Article with ID {article_id} not found")
|
|
except Exception as e:
|
|
logger.error(f"Failed to find related articles: {str(e)}")
|
|
raise
|
|
|
|
def suggest_articles_for_incident(
|
|
self,
|
|
incident_title: str,
|
|
incident_description: str,
|
|
incident_category: str,
|
|
limit: int = 5
|
|
) -> List[Dict[str, Any]]:
|
|
"""Suggest knowledge base articles for an incident"""
|
|
|
|
try:
|
|
# Build search query from incident data
|
|
search_query = f"{incident_title} {incident_description} {incident_category}"
|
|
|
|
# Search for relevant articles
|
|
search_results = self.search(
|
|
query=search_query,
|
|
categories=[incident_category] if incident_category else None,
|
|
limit=limit * 2 # Get more results to filter
|
|
)
|
|
|
|
# Filter and rank results
|
|
relevant_articles = []
|
|
for result in search_results['results']:
|
|
# Boost score for category matches
|
|
category_boost = 0.2 if result['category'] == incident_category else 0.0
|
|
|
|
# Boost score for runbooks and troubleshooting guides
|
|
type_boost = 0.1 if result['article_type'] in ['RUNBOOK', 'TROUBLESHOOTING'] else 0.0
|
|
|
|
final_score = result['similarity_score'] + category_boost + type_boost
|
|
|
|
if final_score >= self.min_similarity_threshold:
|
|
relevant_articles.append({
|
|
**result,
|
|
'final_score': final_score
|
|
})
|
|
|
|
# Sort by final score and return top matches
|
|
relevant_articles.sort(key=lambda x: x['final_score'], reverse=True)
|
|
|
|
return relevant_articles[:limit]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to suggest articles for incident: {str(e)}")
|
|
raise
|
|
|
|
def get_popular_articles(
|
|
self,
|
|
category: Optional[str] = None,
|
|
article_type: Optional[str] = None,
|
|
limit: int = 10
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get popular articles based on view count and recent activity"""
|
|
|
|
try:
|
|
queryset = KnowledgeBaseArticle.objects.filter(status='PUBLISHED')
|
|
|
|
if category:
|
|
queryset = queryset.filter(category=category)
|
|
|
|
if article_type:
|
|
queryset = queryset.filter(article_type=article_type)
|
|
|
|
# Get articles ordered by popularity (view count + recent activity)
|
|
popular_articles = queryset.order_by('-view_count', '-updated_at')[:limit]
|
|
|
|
results = []
|
|
for article in popular_articles:
|
|
results.append({
|
|
'id': str(article.id),
|
|
'title': article.title,
|
|
'slug': article.slug,
|
|
'summary': article.summary,
|
|
'article_type': article.article_type,
|
|
'category': article.category,
|
|
'view_count': article.view_count,
|
|
'updated_at': article.updated_at.isoformat(),
|
|
'is_featured': article.is_featured
|
|
})
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get popular articles: {str(e)}")
|
|
raise
|
|
|
|
def get_articles_due_for_review(self) -> List[Dict[str, Any]]:
|
|
"""Get articles that are due for review"""
|
|
|
|
try:
|
|
due_articles = KnowledgeBaseArticle.objects.filter(
|
|
next_review_due__lt=timezone.now(),
|
|
status='PUBLISHED'
|
|
).order_by('next_review_due')
|
|
|
|
results = []
|
|
for article in due_articles:
|
|
results.append({
|
|
'id': str(article.id),
|
|
'title': article.title,
|
|
'slug': article.slug,
|
|
'category': article.category,
|
|
'last_reviewed': article.last_reviewed.isoformat() if article.last_reviewed else None,
|
|
'next_review_due': article.next_review_due.isoformat(),
|
|
'maintainer': article.maintainer.username if article.maintainer else None,
|
|
'days_overdue': (timezone.now() - article.next_review_due).days
|
|
})
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get articles due for review: {str(e)}")
|
|
raise
|
|
|
|
def _calculate_similarity_scores(
|
|
self,
|
|
query: str,
|
|
articles: List[KnowledgeBaseArticle]
|
|
) -> List[Dict[str, Any]]:
|
|
"""Calculate similarity scores for articles against a query"""
|
|
|
|
if not articles:
|
|
return []
|
|
|
|
# Prepare texts for similarity calculation
|
|
query_text = self._preprocess_text(query)
|
|
article_texts = []
|
|
|
|
for article in articles:
|
|
article_text = f"{article.title} {article.summary} {' '.join(article.tags)} {' '.join(article.search_keywords)}"
|
|
article_texts.append(self._preprocess_text(article_text))
|
|
|
|
# Calculate similarity using simple keyword matching (fallback)
|
|
try:
|
|
similarities = [self._calculate_keyword_similarity(query, article_text) for article_text in article_texts]
|
|
except Exception as e:
|
|
logger.warning(f"Failed to calculate similarity: {str(e)}")
|
|
similarities = [0.0] * len(article_texts)
|
|
|
|
# Prepare results with additional scoring
|
|
results = []
|
|
for i, article in enumerate(articles):
|
|
similarity_score = float(similarities[i])
|
|
popularity_score = self._calculate_popularity_score(article)
|
|
relevance_score = (similarity_score * 0.7) + (popularity_score * 0.3)
|
|
|
|
matching_keywords = self._find_matching_keywords(query, article)
|
|
|
|
results.append({
|
|
'article': article,
|
|
'similarity_score': similarity_score,
|
|
'popularity_score': popularity_score,
|
|
'relevance_score': relevance_score,
|
|
'matching_keywords': matching_keywords
|
|
})
|
|
|
|
return results
|
|
|
|
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
|
|
"""Calculate text similarity using simple keyword matching (fallback)"""
|
|
try:
|
|
if not text1.strip() or not text2.strip():
|
|
return 0.0
|
|
|
|
# Simple keyword-based similarity as fallback
|
|
words1 = set(text1.lower().split())
|
|
words2 = set(text2.lower().split())
|
|
|
|
if not words1 or not words2:
|
|
return 0.0
|
|
|
|
intersection = words1.intersection(words2)
|
|
union = words1.union(words2)
|
|
|
|
return len(intersection) / len(union) if union else 0.0
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to calculate text similarity: {str(e)}")
|
|
return 0.0
|
|
|
|
def _calculate_keyword_similarity(self, query: str, article_text: str) -> float:
|
|
"""Fallback similarity calculation using keyword matching"""
|
|
query_words = set(self._extract_keywords(query.lower()))
|
|
article_words = set(self._extract_keywords(article_text.lower()))
|
|
|
|
if not query_words or not article_words:
|
|
return 0.0
|
|
|
|
intersection = query_words.intersection(article_words)
|
|
union = query_words.union(article_words)
|
|
|
|
return len(intersection) / len(union) if union else 0.0
|
|
|
|
def _calculate_popularity_score(self, article: KnowledgeBaseArticle) -> float:
|
|
"""Calculate popularity score based on views and recency"""
|
|
# Normalize view count (assuming max views is around 1000)
|
|
view_score = min(article.view_count / 1000.0, 1.0)
|
|
|
|
# Calculate recency score (more recent = higher score)
|
|
days_since_update = (timezone.now() - article.updated_at).days
|
|
recency_score = max(0, 1 - (days_since_update / 365.0)) # Decay over a year
|
|
|
|
# Featured articles get a boost
|
|
featured_boost = 0.1 if article.is_featured else 0.0
|
|
|
|
return (view_score * 0.6) + (recency_score * 0.3) + featured_boost
|
|
|
|
def _preprocess_text(self, text: str) -> str:
|
|
"""Preprocess text for similarity calculation"""
|
|
# Convert to lowercase
|
|
text = text.lower()
|
|
|
|
# Remove special characters but keep spaces
|
|
text = re.sub(r'[^\w\s]', ' ', text)
|
|
|
|
# Remove extra whitespace
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
return text
|
|
|
|
def _extract_keywords(self, text: str) -> List[str]:
|
|
"""Extract keywords from text"""
|
|
# Simple keyword extraction - in production, you might use more sophisticated methods
|
|
words = text.split()
|
|
|
|
# Filter out common stop words
|
|
stop_words = {
|
|
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
|
|
'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have',
|
|
'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should'
|
|
}
|
|
|
|
keywords = [word for word in words if len(word) > 2 and word not in stop_words]
|
|
return keywords
|
|
|
|
def _find_matching_keywords(self, query: str, article: KnowledgeBaseArticle) -> List[str]:
|
|
"""Find keywords that match between query and article"""
|
|
query_keywords = set(self._extract_keywords(query.lower()))
|
|
|
|
# Check article title, summary, tags, and search keywords
|
|
article_text = f"{article.title} {article.summary} {' '.join(article.tags)} {' '.join(article.search_keywords)}"
|
|
article_keywords = set(self._extract_keywords(article_text.lower()))
|
|
|
|
matching_keywords = list(query_keywords.intersection(article_keywords))
|
|
return matching_keywords[:5] # Return top 5 matches
|