361 lines
14 KiB
Python
361 lines
14 KiB
Python
"""
|
|
Management command for OSINT crawling from seed websites.
|
|
"""
|
|
import re
|
|
import hashlib
|
|
import time
|
|
from urllib.parse import urljoin, urlparse
|
|
from django.core.management.base import BaseCommand
|
|
from django.utils import timezone
|
|
from django.db import transaction, models
|
|
from django.conf import settings
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from osint.models import SeedWebsite, OSINTKeyword, CrawledContent, AutoGeneratedReport
|
|
from reports.models import ScamReport
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Crawl seed websites and search for scam-related keywords'
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
'--seed-id',
|
|
type=int,
|
|
help='Crawl specific seed website by ID',
|
|
)
|
|
parser.add_argument(
|
|
'--all',
|
|
action='store_true',
|
|
help='Crawl all active seed websites',
|
|
)
|
|
parser.add_argument(
|
|
'--force',
|
|
action='store_true',
|
|
help='Force crawl even if recently crawled',
|
|
)
|
|
parser.add_argument(
|
|
'--max-pages',
|
|
type=int,
|
|
default=50,
|
|
help='Maximum pages to crawl per seed website (default: 50)',
|
|
)
|
|
parser.add_argument(
|
|
'--delay',
|
|
type=float,
|
|
default=1.0,
|
|
help='Delay between requests in seconds (default: 1.0)',
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
self.stdout.write(self.style.SUCCESS('Starting OSINT crawling...'))
|
|
|
|
# Get seed websites to crawl
|
|
if options['seed_id']:
|
|
seeds = SeedWebsite.objects.filter(id=options['seed_id'], is_active=True)
|
|
elif options['all']:
|
|
seeds = SeedWebsite.objects.filter(is_active=True)
|
|
else:
|
|
# Default: crawl websites that are due
|
|
now = timezone.now()
|
|
seeds = SeedWebsite.objects.filter(
|
|
is_active=True
|
|
).filter(
|
|
models.Q(last_crawled_at__isnull=True) |
|
|
models.Q(last_crawled_at__lt=now - timezone.timedelta(hours=models.F('crawl_interval_hours')))
|
|
)
|
|
|
|
if not seeds.exists():
|
|
self.stdout.write(self.style.WARNING('No seed websites to crawl.'))
|
|
return
|
|
|
|
# Get active keywords
|
|
keywords = OSINTKeyword.objects.filter(is_active=True)
|
|
if not keywords.exists():
|
|
self.stdout.write(self.style.WARNING('No active keywords configured.'))
|
|
return
|
|
|
|
self.stdout.write(f'Found {seeds.count()} seed website(s) to crawl')
|
|
self.stdout.write(f'Found {keywords.count()} active keyword(s)')
|
|
|
|
total_pages = 0
|
|
total_matches = 0
|
|
|
|
for seed in seeds:
|
|
self.stdout.write(f'\nCrawling: {seed.name} ({seed.url})')
|
|
pages, matches = self.crawl_seed(seed, keywords, options)
|
|
total_pages += pages
|
|
total_matches += matches
|
|
|
|
# Update seed website stats
|
|
seed.last_crawled_at = timezone.now()
|
|
seed.pages_crawled += pages
|
|
seed.matches_found += matches
|
|
seed.save()
|
|
|
|
self.stdout.write(self.style.SUCCESS(
|
|
f'\nCrawling completed! Total pages: {total_pages}, Total matches: {total_matches}'
|
|
))
|
|
|
|
def crawl_seed(self, seed, keywords, options):
|
|
"""Crawl a single seed website."""
|
|
max_pages = options['max_pages']
|
|
delay = options['delay']
|
|
pages_crawled = 0
|
|
matches_found = 0
|
|
|
|
# Parse base URL
|
|
parsed_base = urlparse(seed.url)
|
|
base_domain = f"{parsed_base.scheme}://{parsed_base.netloc}"
|
|
|
|
# Determine allowed domains
|
|
allowed_domains = seed.allowed_domains if seed.allowed_domains else [parsed_base.netloc]
|
|
|
|
# URLs to visit
|
|
visited_urls = set()
|
|
urls_to_visit = [(seed.url, 0)] # (url, depth)
|
|
|
|
session = requests.Session()
|
|
session.headers.update({
|
|
'User-Agent': seed.user_agent or 'Mozilla/5.0 (compatible; OSINTBot/1.0)'
|
|
})
|
|
|
|
while urls_to_visit and pages_crawled < max_pages:
|
|
url, depth = urls_to_visit.pop(0)
|
|
|
|
# Skip if already visited or too deep
|
|
if url in visited_urls or depth > seed.crawl_depth:
|
|
continue
|
|
|
|
# Check domain
|
|
parsed = urlparse(url)
|
|
if parsed.netloc not in allowed_domains:
|
|
continue
|
|
|
|
visited_urls.add(url)
|
|
|
|
try:
|
|
# Fetch page
|
|
self.stdout.write(f' Fetching: {url} (depth: {depth})')
|
|
response = session.get(url, timeout=10, allow_redirects=True)
|
|
response.raise_for_status()
|
|
|
|
# Parse content
|
|
soup = BeautifulSoup(response.text, 'lxml')
|
|
|
|
# Extract text content
|
|
# Remove script and style elements
|
|
for script in soup(["script", "style", "meta", "link"]):
|
|
script.decompose()
|
|
|
|
text_content = soup.get_text(separator=' ', strip=True)
|
|
title = soup.title.string if soup.title else ''
|
|
html_content = str(soup)
|
|
|
|
# Calculate content hash
|
|
content_hash = hashlib.sha256(text_content.encode('utf-8')).hexdigest()
|
|
|
|
# Check for duplicates
|
|
if CrawledContent.objects.filter(url=url, content_hash=content_hash).exists():
|
|
self.stdout.write(f' Skipping duplicate content')
|
|
continue
|
|
|
|
# Match keywords
|
|
matched_keywords = []
|
|
match_count = 0
|
|
|
|
for keyword_obj in keywords:
|
|
matches = self.match_keyword(keyword_obj, text_content, url, title)
|
|
if matches:
|
|
matched_keywords.append(keyword_obj)
|
|
match_count += len(matches)
|
|
|
|
# Calculate confidence score
|
|
confidence_score = self.calculate_confidence(matched_keywords, match_count)
|
|
has_potential_scam = confidence_score >= 30 # Threshold
|
|
|
|
# Save crawled content
|
|
with transaction.atomic():
|
|
crawled_content = CrawledContent.objects.create(
|
|
seed_website=seed,
|
|
url=url,
|
|
title=title[:500],
|
|
content=text_content[:10000], # Limit content size
|
|
html_content=html_content[:50000], # Limit HTML size
|
|
match_count=match_count,
|
|
confidence_score=confidence_score,
|
|
has_potential_scam=has_potential_scam,
|
|
http_status=response.status_code,
|
|
content_hash=content_hash
|
|
)
|
|
crawled_content.matched_keywords.set(matched_keywords)
|
|
|
|
pages_crawled += 1
|
|
|
|
if has_potential_scam:
|
|
matches_found += 1
|
|
self.stdout.write(self.style.WARNING(
|
|
f' ⚠ Potential scam detected! Confidence: {confidence_score}%'
|
|
))
|
|
|
|
# Create auto-generated report
|
|
self.create_auto_report(crawled_content, matched_keywords, confidence_score)
|
|
|
|
# Extract links for further crawling
|
|
if depth < seed.crawl_depth:
|
|
for link in soup.find_all('a', href=True):
|
|
href = link['href']
|
|
absolute_url = urljoin(url, href)
|
|
parsed_link = urlparse(absolute_url)
|
|
|
|
# Only follow same-domain links
|
|
if parsed_link.netloc in allowed_domains:
|
|
if absolute_url not in visited_urls:
|
|
urls_to_visit.append((absolute_url, depth + 1))
|
|
|
|
# Rate limiting
|
|
time.sleep(delay)
|
|
|
|
except requests.RequestException as e:
|
|
self.stdout.write(self.style.ERROR(f' Error fetching {url}: {e}'))
|
|
continue
|
|
except Exception as e:
|
|
self.stdout.write(self.style.ERROR(f' Error processing {url}: {e}'))
|
|
continue
|
|
|
|
return pages_crawled, matches_found
|
|
|
|
def match_keyword(self, keyword_obj, text, url, title):
|
|
"""Match a keyword against text content."""
|
|
keyword = keyword_obj.keyword
|
|
flags = 0 if keyword_obj.case_sensitive else re.IGNORECASE
|
|
|
|
matches = []
|
|
|
|
if keyword_obj.keyword_type == 'exact':
|
|
if keyword_obj.case_sensitive:
|
|
if keyword in text or keyword in url or keyword in title:
|
|
matches.append(keyword)
|
|
else:
|
|
if keyword.lower() in text.lower() or keyword.lower() in url.lower() or keyword.lower() in title.lower():
|
|
matches.append(keyword)
|
|
|
|
elif keyword_obj.keyword_type == 'regex':
|
|
try:
|
|
pattern = re.compile(keyword, flags)
|
|
matches = pattern.findall(text + ' ' + url + ' ' + title)
|
|
except re.error:
|
|
self.stdout.write(self.style.ERROR(f' Invalid regex: {keyword}'))
|
|
|
|
elif keyword_obj.keyword_type == 'phrase':
|
|
# Phrase matching (word boundaries)
|
|
pattern = re.compile(r'\b' + re.escape(keyword) + r'\b', flags)
|
|
matches = pattern.findall(text + ' ' + url + ' ' + title)
|
|
|
|
elif keyword_obj.keyword_type == 'domain':
|
|
# Domain pattern matching
|
|
pattern = re.compile(keyword, flags)
|
|
matches = pattern.findall(url)
|
|
|
|
elif keyword_obj.keyword_type == 'email':
|
|
# Email pattern
|
|
email_pattern = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', flags)
|
|
found_emails = email_pattern.findall(text + ' ' + url)
|
|
# Check if any email matches the keyword pattern
|
|
pattern = re.compile(keyword, flags)
|
|
matches = [email for email in found_emails if pattern.search(email)]
|
|
|
|
elif keyword_obj.keyword_type == 'phone':
|
|
# Phone pattern
|
|
phone_pattern = re.compile(r'[\+]?[(]?[0-9]{1,4}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,9}', flags)
|
|
found_phones = phone_pattern.findall(text)
|
|
# Check if any phone matches the keyword pattern
|
|
pattern = re.compile(keyword, flags)
|
|
matches = [phone for phone in found_phones if pattern.search(phone)]
|
|
|
|
return matches
|
|
|
|
def calculate_confidence(self, matched_keywords, match_count):
|
|
"""Calculate confidence score based on matched keywords."""
|
|
if not matched_keywords:
|
|
return 0
|
|
|
|
# Base score from keyword confidence scores
|
|
base_score = sum(kw.confidence_score for kw in matched_keywords) / len(matched_keywords)
|
|
|
|
# Boost for multiple matches
|
|
match_boost = min(match_count * 2, 30) # Max 30 point boost
|
|
|
|
# Boost for multiple different keywords
|
|
keyword_boost = min(len(matched_keywords) * 5, 20) # Max 20 point boost
|
|
|
|
total_score = base_score + match_boost + keyword_boost
|
|
return min(int(total_score), 100) # Cap at 100
|
|
|
|
def create_auto_report(self, crawled_content, matched_keywords, confidence_score):
|
|
"""Create an auto-generated report from crawled content."""
|
|
# Check if report already exists
|
|
if AutoGeneratedReport.objects.filter(crawled_content=crawled_content).exists():
|
|
return
|
|
|
|
# Generate title
|
|
title = f"Potential Scam Detected: {crawled_content.title or crawled_content.url}"
|
|
if len(title) > 500:
|
|
title = title[:497] + '...'
|
|
|
|
# Generate description
|
|
description = f"Automatically detected potential scam from OSINT crawling.\n\n"
|
|
description += f"Source URL: {crawled_content.url}\n"
|
|
description += f"Matched Keywords: {', '.join(kw.name for kw in matched_keywords)}\n"
|
|
description += f"Confidence Score: {confidence_score}%\n\n"
|
|
|
|
# Extract relevant snippet
|
|
content_preview = crawled_content.content[:500] + '...' if len(crawled_content.content) > 500 else crawled_content.content
|
|
description += f"Content Preview:\n{content_preview}"
|
|
|
|
# Determine if should auto-approve
|
|
status = 'pending'
|
|
if confidence_score >= 80 and any(kw.auto_approve for kw in matched_keywords):
|
|
status = 'approved'
|
|
|
|
# Create auto-generated report
|
|
auto_report = AutoGeneratedReport.objects.create(
|
|
crawled_content=crawled_content,
|
|
title=title,
|
|
description=description,
|
|
source_url=crawled_content.url,
|
|
confidence_score=confidence_score,
|
|
status=status
|
|
)
|
|
auto_report.matched_keywords.set(matched_keywords)
|
|
|
|
# If auto-approved, create the actual report
|
|
if status == 'approved':
|
|
self.create_scam_report(auto_report)
|
|
|
|
def create_scam_report(self, auto_report):
|
|
"""Create actual scam report from auto-generated report."""
|
|
from reports.models import ScamReport
|
|
|
|
report = ScamReport.objects.create(
|
|
title=auto_report.title,
|
|
description=auto_report.description,
|
|
reported_url=auto_report.source_url,
|
|
scam_type='other', # Default type, can be updated by moderator
|
|
status='verified', # Auto-verified since reviewed
|
|
verification_score=auto_report.confidence_score,
|
|
is_public=True,
|
|
is_anonymous=True, # System-generated
|
|
is_auto_discovered=True, # Mark as auto-discovered
|
|
)
|
|
|
|
auto_report.report = report
|
|
auto_report.status = 'published'
|
|
auto_report.published_at = timezone.now()
|
|
auto_report.save()
|
|
|
|
self.stdout.write(self.style.SUCCESS(
|
|
f' ✓ Auto-approved and published report: {report.title}'
|
|
))
|
|
|