""" OSINT (Open Source Intelligence) integration models. """ from django.db import models from django.contrib.auth import get_user_model from reports.models import ScamReport User = get_user_model() class OSINTTask(models.Model): """ Background tasks for OSINT data collection. """ TASK_TYPE_CHOICES = [ ('domain_analysis', 'Domain Analysis'), ('url_analysis', 'URL Analysis'), ('email_analysis', 'Email Analysis'), ('phone_analysis', 'Phone Analysis'), ('whois_lookup', 'WHOIS Lookup'), ('dns_lookup', 'DNS Lookup'), ('ssl_check', 'SSL Certificate Check'), ('archive_check', 'Archive Check'), ('business_registry', 'Business Registry Check'), ('social_media', 'Social Media Check'), ] STATUS_CHOICES = [ ('pending', 'Pending'), ('running', 'Running'), ('completed', 'Completed'), ('failed', 'Failed'), ('cancelled', 'Cancelled'), ] report = models.ForeignKey( ScamReport, on_delete=models.CASCADE, related_name='osint_tasks' ) task_type = models.CharField( max_length=50, choices=TASK_TYPE_CHOICES ) status = models.CharField( max_length=20, choices=STATUS_CHOICES, default='pending' ) parameters = models.JSONField( default=dict, help_text='Task parameters (e.g., URL, email, phone)' ) result = models.JSONField( default=dict, blank=True, help_text='Task result data' ) error_message = models.TextField(blank=True) created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(null=True, blank=True) completed_at = models.DateTimeField(null=True, blank=True) retry_count = models.IntegerField(default=0) class Meta: db_table = 'osint_osinttask' verbose_name = 'OSINT Task' verbose_name_plural = 'OSINT Tasks' ordering = ['-created_at'] indexes = [ models.Index(fields=['status', 'created_at']), models.Index(fields=['report', 'task_type']), ] def __str__(self): return f"{self.get_task_type_display()} for Report #{self.report.id} - {self.get_status_display()}" class OSINTResult(models.Model): """ OSINT investigation results. """ DATA_TYPE_CHOICES = [ ('whois', 'WHOIS Data'), ('dns', 'DNS Records'), ('ssl', 'SSL Certificate'), ('archive', 'Archive Data'), ('email', 'Email Data'), ('phone', 'Phone Data'), ('business', 'Business Registry Data'), ('social', 'Social Media Data'), ('reputation', 'Reputation Data'), ] report = models.ForeignKey( ScamReport, on_delete=models.CASCADE, related_name='osint_results' ) source = models.CharField( max_length=100, help_text='OSINT source/service name' ) data_type = models.CharField( max_length=50, choices=DATA_TYPE_CHOICES ) raw_data = models.JSONField( default=dict, help_text='Raw data from OSINT source' ) processed_data = models.JSONField( default=dict, blank=True, help_text='Processed/cleaned data' ) confidence_level = models.IntegerField( default=0, help_text='Confidence level (0-100)' ) is_verified = models.BooleanField( default=False, help_text='Manually verified by moderator' ) collected_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: db_table = 'osint_osintresult' verbose_name = 'OSINT Result' verbose_name_plural = 'OSINT Results' ordering = ['-collected_at'] indexes = [ models.Index(fields=['report', 'data_type']), models.Index(fields=['confidence_level', 'is_verified']), ] def __str__(self): return f"{self.get_data_type_display()} from {self.source} for Report #{self.report.id}" class OSINTConfiguration(models.Model): """ Configuration for OSINT services and APIs. """ service_name = models.CharField(max_length=100, unique=True) api_key = models.CharField( max_length=255, blank=True, help_text='Encrypted API key' ) api_url = models.URLField(blank=True) is_active = models.BooleanField(default=True) rate_limit = models.IntegerField( default=100, help_text='Requests per hour' ) configuration = models.JSONField( default=dict, blank=True, help_text='Additional configuration' ) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: db_table = 'osint_osintconfiguration' verbose_name = 'OSINT Configuration' verbose_name_plural = 'OSINT Configurations' def __str__(self): return f"{self.service_name} ({'Active' if self.is_active else 'Inactive'})" class SeedWebsite(models.Model): """ Seed websites for OSINT crawling. """ PRIORITY_CHOICES = [ ('high', 'High'), ('medium', 'Medium'), ('low', 'Low'), ] url = models.URLField( max_length=500, help_text='Base URL to crawl' ) name = models.CharField( max_length=200, help_text='Friendly name for this seed website' ) description = models.TextField( blank=True, help_text='Description of the website' ) is_active = models.BooleanField( default=True, help_text='Enable/disable crawling for this website' ) priority = models.CharField( max_length=10, choices=PRIORITY_CHOICES, default='medium', help_text='Crawling priority' ) crawl_depth = models.IntegerField( default=2, help_text='Maximum depth to crawl (0 = only this page, 1 = this page + direct links, etc.)' ) crawl_interval_hours = models.IntegerField( default=24, help_text='Hours between crawls' ) allowed_domains = models.JSONField( default=list, blank=True, help_text='List of allowed domains to crawl (empty = same domain only)' ) user_agent = models.CharField( max_length=255, blank=True, default='Mozilla/5.0 (compatible; OSINTBot/1.0)', help_text='User agent string for requests' ) last_crawled_at = models.DateTimeField(null=True, blank=True) pages_crawled = models.IntegerField(default=0) matches_found = models.IntegerField(default=0) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) created_by = models.ForeignKey( User, on_delete=models.SET_NULL, null=True, related_name='created_seed_websites' ) class Meta: db_table = 'osint_seedwebsite' verbose_name = 'Seed Website' verbose_name_plural = 'Seed Websites' ordering = ['-priority', '-last_crawled_at'] indexes = [ models.Index(fields=['is_active', 'priority']), models.Index(fields=['last_crawled_at']), ] def __str__(self): return f"{self.name} ({self.url})" class OSINTKeyword(models.Model): """ Keywords and patterns to search for during OSINT crawling. """ TYPE_CHOICES = [ ('exact', 'Exact Match'), ('regex', 'Regular Expression'), ('phrase', 'Phrase Match'), ('domain', 'Domain Pattern'), ('email', 'Email Pattern'), ('phone', 'Phone Pattern'), ] keyword = models.CharField( max_length=500, help_text='Keyword, phrase, or regex pattern to search for' ) name = models.CharField( max_length=200, help_text='Friendly name for this keyword' ) description = models.TextField( blank=True, help_text='Description of what this keyword detects' ) keyword_type = models.CharField( max_length=20, choices=TYPE_CHOICES, default='phrase', help_text='Type of matching to perform' ) is_active = models.BooleanField( default=True, help_text='Enable/disable this keyword' ) case_sensitive = models.BooleanField( default=False, help_text='Case sensitive matching' ) confidence_score = models.IntegerField( default=50, help_text='Default confidence score (0-100) when this keyword matches' ) auto_approve = models.BooleanField( default=False, help_text='Auto-approve reports matching this keyword (requires high confidence)' ) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) created_by = models.ForeignKey( User, on_delete=models.SET_NULL, null=True, related_name='created_keywords' ) class Meta: db_table = 'osint_keyword' verbose_name = 'OSINT Keyword' verbose_name_plural = 'OSINT Keywords' ordering = ['-is_active', 'name'] indexes = [ models.Index(fields=['is_active', 'keyword_type']), ] def __str__(self): return f"{self.name} ({self.keyword_type})" class CrawledContent(models.Model): """ Content crawled from seed websites. """ seed_website = models.ForeignKey( SeedWebsite, on_delete=models.CASCADE, related_name='crawled_contents' ) url = models.URLField( max_length=1000, help_text='URL of the crawled page' ) title = models.CharField( max_length=500, blank=True, help_text='Page title' ) content = models.TextField( help_text='Crawled page content' ) html_content = models.TextField( blank=True, help_text='Raw HTML content' ) matched_keywords = models.ManyToManyField( OSINTKeyword, blank=True, related_name='matched_contents', help_text='Keywords that matched this content' ) match_count = models.IntegerField( default=0, help_text='Number of keyword matches found' ) confidence_score = models.IntegerField( default=0, help_text='Calculated confidence score based on matches' ) has_potential_scam = models.BooleanField( default=False, help_text='Flagged as potential scam based on keyword matches' ) crawled_at = models.DateTimeField(auto_now_add=True) http_status = models.IntegerField( null=True, blank=True, help_text='HTTP status code' ) content_hash = models.CharField( max_length=64, blank=True, help_text='SHA256 hash of content for deduplication' ) class Meta: db_table = 'osint_crawledcontent' verbose_name = 'Crawled Content' verbose_name_plural = 'Crawled Contents' ordering = ['-crawled_at', '-confidence_score'] indexes = [ models.Index(fields=['seed_website', 'crawled_at']), models.Index(fields=['has_potential_scam', 'confidence_score']), models.Index(fields=['content_hash']), ] unique_together = [['url', 'content_hash']] def __str__(self): return f"{self.title or self.url} - {self.match_count} matches" class AutoGeneratedReport(models.Model): """ Automatically generated scam reports from OSINT crawling. """ STATUS_CHOICES = [ ('pending', 'Pending Review'), ('approved', 'Approved'), ('rejected', 'Rejected'), ('published', 'Published'), ] crawled_content = models.OneToOneField( CrawledContent, on_delete=models.CASCADE, related_name='auto_report' ) report = models.ForeignKey( ScamReport, on_delete=models.SET_NULL, null=True, blank=True, related_name='auto_generated_reports', help_text='Linked scam report (created when approved)' ) title = models.CharField( max_length=500, help_text='Auto-generated report title' ) description = models.TextField( help_text='Auto-generated report description' ) source_url = models.URLField( max_length=1000, help_text='Source URL where scam was found' ) matched_keywords = models.ManyToManyField( OSINTKeyword, related_name='generated_reports' ) confidence_score = models.IntegerField( default=0, help_text='Confidence score (0-100)' ) status = models.CharField( max_length=20, choices=STATUS_CHOICES, default='pending', help_text='Review status' ) review_notes = models.TextField( blank=True, help_text='Notes from moderator/admin review' ) reviewed_by = models.ForeignKey( User, on_delete=models.SET_NULL, null=True, blank=True, related_name='reviewed_auto_reports' ) reviewed_at = models.DateTimeField(null=True, blank=True) published_at = models.DateTimeField(null=True, blank=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: db_table = 'osint_autogeneratedreport' verbose_name = 'Auto-Generated Report' verbose_name_plural = 'Auto-Generated Reports' ordering = ['-created_at', '-confidence_score'] indexes = [ models.Index(fields=['status', 'confidence_score']), models.Index(fields=['created_at']), ] def __str__(self): return f"{self.title} - {self.get_status_display()}"