Files
OSINT/osint/management/commands/crawl_osint.py
Iliyan Angelov ed94dd22dd update
2025-11-26 22:32:20 +02:00

361 lines
14 KiB
Python

"""
Management command for OSINT crawling from seed websites.
"""
import re
import hashlib
import time
from urllib.parse import urljoin, urlparse
from django.core.management.base import BaseCommand
from django.utils import timezone
from django.db import transaction, models
from django.conf import settings
import requests
from bs4 import BeautifulSoup
from osint.models import SeedWebsite, OSINTKeyword, CrawledContent, AutoGeneratedReport
from reports.models import ScamReport
class Command(BaseCommand):
help = 'Crawl seed websites and search for scam-related keywords'
def add_arguments(self, parser):
parser.add_argument(
'--seed-id',
type=int,
help='Crawl specific seed website by ID',
)
parser.add_argument(
'--all',
action='store_true',
help='Crawl all active seed websites',
)
parser.add_argument(
'--force',
action='store_true',
help='Force crawl even if recently crawled',
)
parser.add_argument(
'--max-pages',
type=int,
default=50,
help='Maximum pages to crawl per seed website (default: 50)',
)
parser.add_argument(
'--delay',
type=float,
default=1.0,
help='Delay between requests in seconds (default: 1.0)',
)
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('Starting OSINT crawling...'))
# Get seed websites to crawl
if options['seed_id']:
seeds = SeedWebsite.objects.filter(id=options['seed_id'], is_active=True)
elif options['all']:
seeds = SeedWebsite.objects.filter(is_active=True)
else:
# Default: crawl websites that are due
now = timezone.now()
seeds = SeedWebsite.objects.filter(
is_active=True
).filter(
models.Q(last_crawled_at__isnull=True) |
models.Q(last_crawled_at__lt=now - timezone.timedelta(hours=models.F('crawl_interval_hours')))
)
if not seeds.exists():
self.stdout.write(self.style.WARNING('No seed websites to crawl.'))
return
# Get active keywords
keywords = OSINTKeyword.objects.filter(is_active=True)
if not keywords.exists():
self.stdout.write(self.style.WARNING('No active keywords configured.'))
return
self.stdout.write(f'Found {seeds.count()} seed website(s) to crawl')
self.stdout.write(f'Found {keywords.count()} active keyword(s)')
total_pages = 0
total_matches = 0
for seed in seeds:
self.stdout.write(f'\nCrawling: {seed.name} ({seed.url})')
pages, matches = self.crawl_seed(seed, keywords, options)
total_pages += pages
total_matches += matches
# Update seed website stats
seed.last_crawled_at = timezone.now()
seed.pages_crawled += pages
seed.matches_found += matches
seed.save()
self.stdout.write(self.style.SUCCESS(
f'\nCrawling completed! Total pages: {total_pages}, Total matches: {total_matches}'
))
def crawl_seed(self, seed, keywords, options):
"""Crawl a single seed website."""
max_pages = options['max_pages']
delay = options['delay']
pages_crawled = 0
matches_found = 0
# Parse base URL
parsed_base = urlparse(seed.url)
base_domain = f"{parsed_base.scheme}://{parsed_base.netloc}"
# Determine allowed domains
allowed_domains = seed.allowed_domains if seed.allowed_domains else [parsed_base.netloc]
# URLs to visit
visited_urls = set()
urls_to_visit = [(seed.url, 0)] # (url, depth)
session = requests.Session()
session.headers.update({
'User-Agent': seed.user_agent or 'Mozilla/5.0 (compatible; OSINTBot/1.0)'
})
while urls_to_visit and pages_crawled < max_pages:
url, depth = urls_to_visit.pop(0)
# Skip if already visited or too deep
if url in visited_urls or depth > seed.crawl_depth:
continue
# Check domain
parsed = urlparse(url)
if parsed.netloc not in allowed_domains:
continue
visited_urls.add(url)
try:
# Fetch page
self.stdout.write(f' Fetching: {url} (depth: {depth})')
response = session.get(url, timeout=10, allow_redirects=True)
response.raise_for_status()
# Parse content
soup = BeautifulSoup(response.text, 'lxml')
# Extract text content
# Remove script and style elements
for script in soup(["script", "style", "meta", "link"]):
script.decompose()
text_content = soup.get_text(separator=' ', strip=True)
title = soup.title.string if soup.title else ''
html_content = str(soup)
# Calculate content hash
content_hash = hashlib.sha256(text_content.encode('utf-8')).hexdigest()
# Check for duplicates
if CrawledContent.objects.filter(url=url, content_hash=content_hash).exists():
self.stdout.write(f' Skipping duplicate content')
continue
# Match keywords
matched_keywords = []
match_count = 0
for keyword_obj in keywords:
matches = self.match_keyword(keyword_obj, text_content, url, title)
if matches:
matched_keywords.append(keyword_obj)
match_count += len(matches)
# Calculate confidence score
confidence_score = self.calculate_confidence(matched_keywords, match_count)
has_potential_scam = confidence_score >= 30 # Threshold
# Save crawled content
with transaction.atomic():
crawled_content = CrawledContent.objects.create(
seed_website=seed,
url=url,
title=title[:500],
content=text_content[:10000], # Limit content size
html_content=html_content[:50000], # Limit HTML size
match_count=match_count,
confidence_score=confidence_score,
has_potential_scam=has_potential_scam,
http_status=response.status_code,
content_hash=content_hash
)
crawled_content.matched_keywords.set(matched_keywords)
pages_crawled += 1
if has_potential_scam:
matches_found += 1
self.stdout.write(self.style.WARNING(
f' ⚠ Potential scam detected! Confidence: {confidence_score}%'
))
# Create auto-generated report
self.create_auto_report(crawled_content, matched_keywords, confidence_score)
# Extract links for further crawling
if depth < seed.crawl_depth:
for link in soup.find_all('a', href=True):
href = link['href']
absolute_url = urljoin(url, href)
parsed_link = urlparse(absolute_url)
# Only follow same-domain links
if parsed_link.netloc in allowed_domains:
if absolute_url not in visited_urls:
urls_to_visit.append((absolute_url, depth + 1))
# Rate limiting
time.sleep(delay)
except requests.RequestException as e:
self.stdout.write(self.style.ERROR(f' Error fetching {url}: {e}'))
continue
except Exception as e:
self.stdout.write(self.style.ERROR(f' Error processing {url}: {e}'))
continue
return pages_crawled, matches_found
def match_keyword(self, keyword_obj, text, url, title):
"""Match a keyword against text content."""
keyword = keyword_obj.keyword
flags = 0 if keyword_obj.case_sensitive else re.IGNORECASE
matches = []
if keyword_obj.keyword_type == 'exact':
if keyword_obj.case_sensitive:
if keyword in text or keyword in url or keyword in title:
matches.append(keyword)
else:
if keyword.lower() in text.lower() or keyword.lower() in url.lower() or keyword.lower() in title.lower():
matches.append(keyword)
elif keyword_obj.keyword_type == 'regex':
try:
pattern = re.compile(keyword, flags)
matches = pattern.findall(text + ' ' + url + ' ' + title)
except re.error:
self.stdout.write(self.style.ERROR(f' Invalid regex: {keyword}'))
elif keyword_obj.keyword_type == 'phrase':
# Phrase matching (word boundaries)
pattern = re.compile(r'\b' + re.escape(keyword) + r'\b', flags)
matches = pattern.findall(text + ' ' + url + ' ' + title)
elif keyword_obj.keyword_type == 'domain':
# Domain pattern matching
pattern = re.compile(keyword, flags)
matches = pattern.findall(url)
elif keyword_obj.keyword_type == 'email':
# Email pattern
email_pattern = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', flags)
found_emails = email_pattern.findall(text + ' ' + url)
# Check if any email matches the keyword pattern
pattern = re.compile(keyword, flags)
matches = [email for email in found_emails if pattern.search(email)]
elif keyword_obj.keyword_type == 'phone':
# Phone pattern
phone_pattern = re.compile(r'[\+]?[(]?[0-9]{1,4}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,9}', flags)
found_phones = phone_pattern.findall(text)
# Check if any phone matches the keyword pattern
pattern = re.compile(keyword, flags)
matches = [phone for phone in found_phones if pattern.search(phone)]
return matches
def calculate_confidence(self, matched_keywords, match_count):
"""Calculate confidence score based on matched keywords."""
if not matched_keywords:
return 0
# Base score from keyword confidence scores
base_score = sum(kw.confidence_score for kw in matched_keywords) / len(matched_keywords)
# Boost for multiple matches
match_boost = min(match_count * 2, 30) # Max 30 point boost
# Boost for multiple different keywords
keyword_boost = min(len(matched_keywords) * 5, 20) # Max 20 point boost
total_score = base_score + match_boost + keyword_boost
return min(int(total_score), 100) # Cap at 100
def create_auto_report(self, crawled_content, matched_keywords, confidence_score):
"""Create an auto-generated report from crawled content."""
# Check if report already exists
if AutoGeneratedReport.objects.filter(crawled_content=crawled_content).exists():
return
# Generate title
title = f"Potential Scam Detected: {crawled_content.title or crawled_content.url}"
if len(title) > 500:
title = title[:497] + '...'
# Generate description
description = f"Automatically detected potential scam from OSINT crawling.\n\n"
description += f"Source URL: {crawled_content.url}\n"
description += f"Matched Keywords: {', '.join(kw.name for kw in matched_keywords)}\n"
description += f"Confidence Score: {confidence_score}%\n\n"
# Extract relevant snippet
content_preview = crawled_content.content[:500] + '...' if len(crawled_content.content) > 500 else crawled_content.content
description += f"Content Preview:\n{content_preview}"
# Determine if should auto-approve
status = 'pending'
if confidence_score >= 80 and any(kw.auto_approve for kw in matched_keywords):
status = 'approved'
# Create auto-generated report
auto_report = AutoGeneratedReport.objects.create(
crawled_content=crawled_content,
title=title,
description=description,
source_url=crawled_content.url,
confidence_score=confidence_score,
status=status
)
auto_report.matched_keywords.set(matched_keywords)
# If auto-approved, create the actual report
if status == 'approved':
self.create_scam_report(auto_report)
def create_scam_report(self, auto_report):
"""Create actual scam report from auto-generated report."""
from reports.models import ScamReport
report = ScamReport.objects.create(
title=auto_report.title,
description=auto_report.description,
reported_url=auto_report.source_url,
scam_type='other', # Default type, can be updated by moderator
status='verified', # Auto-verified since reviewed
verification_score=auto_report.confidence_score,
is_public=True,
is_anonymous=True, # System-generated
is_auto_discovered=True, # Mark as auto-discovered
)
auto_report.report = report
auto_report.status = 'published'
auto_report.published_at = timezone.now()
auto_report.save()
self.stdout.write(self.style.SUCCESS(
f' ✓ Auto-approved and published report: {report.title}'
))