469 lines
14 KiB
Python
469 lines
14 KiB
Python
"""
|
|
OSINT (Open Source Intelligence) integration models.
|
|
"""
|
|
from django.db import models
|
|
from django.contrib.auth import get_user_model
|
|
from reports.models import ScamReport
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
class OSINTTask(models.Model):
|
|
"""
|
|
Background tasks for OSINT data collection.
|
|
"""
|
|
TASK_TYPE_CHOICES = [
|
|
('domain_analysis', 'Domain Analysis'),
|
|
('url_analysis', 'URL Analysis'),
|
|
('email_analysis', 'Email Analysis'),
|
|
('phone_analysis', 'Phone Analysis'),
|
|
('whois_lookup', 'WHOIS Lookup'),
|
|
('dns_lookup', 'DNS Lookup'),
|
|
('ssl_check', 'SSL Certificate Check'),
|
|
('archive_check', 'Archive Check'),
|
|
('business_registry', 'Business Registry Check'),
|
|
('social_media', 'Social Media Check'),
|
|
]
|
|
|
|
STATUS_CHOICES = [
|
|
('pending', 'Pending'),
|
|
('running', 'Running'),
|
|
('completed', 'Completed'),
|
|
('failed', 'Failed'),
|
|
('cancelled', 'Cancelled'),
|
|
]
|
|
|
|
report = models.ForeignKey(
|
|
ScamReport,
|
|
on_delete=models.CASCADE,
|
|
related_name='osint_tasks'
|
|
)
|
|
task_type = models.CharField(
|
|
max_length=50,
|
|
choices=TASK_TYPE_CHOICES
|
|
)
|
|
status = models.CharField(
|
|
max_length=20,
|
|
choices=STATUS_CHOICES,
|
|
default='pending'
|
|
)
|
|
parameters = models.JSONField(
|
|
default=dict,
|
|
help_text='Task parameters (e.g., URL, email, phone)'
|
|
)
|
|
result = models.JSONField(
|
|
default=dict,
|
|
blank=True,
|
|
help_text='Task result data'
|
|
)
|
|
error_message = models.TextField(blank=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
started_at = models.DateTimeField(null=True, blank=True)
|
|
completed_at = models.DateTimeField(null=True, blank=True)
|
|
retry_count = models.IntegerField(default=0)
|
|
|
|
class Meta:
|
|
db_table = 'osint_osinttask'
|
|
verbose_name = 'OSINT Task'
|
|
verbose_name_plural = 'OSINT Tasks'
|
|
ordering = ['-created_at']
|
|
indexes = [
|
|
models.Index(fields=['status', 'created_at']),
|
|
models.Index(fields=['report', 'task_type']),
|
|
]
|
|
|
|
def __str__(self):
|
|
return f"{self.get_task_type_display()} for Report #{self.report.id} - {self.get_status_display()}"
|
|
|
|
|
|
class OSINTResult(models.Model):
|
|
"""
|
|
OSINT investigation results.
|
|
"""
|
|
DATA_TYPE_CHOICES = [
|
|
('whois', 'WHOIS Data'),
|
|
('dns', 'DNS Records'),
|
|
('ssl', 'SSL Certificate'),
|
|
('archive', 'Archive Data'),
|
|
('email', 'Email Data'),
|
|
('phone', 'Phone Data'),
|
|
('business', 'Business Registry Data'),
|
|
('social', 'Social Media Data'),
|
|
('reputation', 'Reputation Data'),
|
|
]
|
|
|
|
report = models.ForeignKey(
|
|
ScamReport,
|
|
on_delete=models.CASCADE,
|
|
related_name='osint_results'
|
|
)
|
|
source = models.CharField(
|
|
max_length=100,
|
|
help_text='OSINT source/service name'
|
|
)
|
|
data_type = models.CharField(
|
|
max_length=50,
|
|
choices=DATA_TYPE_CHOICES
|
|
)
|
|
raw_data = models.JSONField(
|
|
default=dict,
|
|
help_text='Raw data from OSINT source'
|
|
)
|
|
processed_data = models.JSONField(
|
|
default=dict,
|
|
blank=True,
|
|
help_text='Processed/cleaned data'
|
|
)
|
|
confidence_level = models.IntegerField(
|
|
default=0,
|
|
help_text='Confidence level (0-100)'
|
|
)
|
|
is_verified = models.BooleanField(
|
|
default=False,
|
|
help_text='Manually verified by moderator'
|
|
)
|
|
collected_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
class Meta:
|
|
db_table = 'osint_osintresult'
|
|
verbose_name = 'OSINT Result'
|
|
verbose_name_plural = 'OSINT Results'
|
|
ordering = ['-collected_at']
|
|
indexes = [
|
|
models.Index(fields=['report', 'data_type']),
|
|
models.Index(fields=['confidence_level', 'is_verified']),
|
|
]
|
|
|
|
def __str__(self):
|
|
return f"{self.get_data_type_display()} from {self.source} for Report #{self.report.id}"
|
|
|
|
|
|
class OSINTConfiguration(models.Model):
|
|
"""
|
|
Configuration for OSINT services and APIs.
|
|
"""
|
|
service_name = models.CharField(max_length=100, unique=True)
|
|
api_key = models.CharField(
|
|
max_length=255,
|
|
blank=True,
|
|
help_text='Encrypted API key'
|
|
)
|
|
api_url = models.URLField(blank=True)
|
|
is_active = models.BooleanField(default=True)
|
|
rate_limit = models.IntegerField(
|
|
default=100,
|
|
help_text='Requests per hour'
|
|
)
|
|
configuration = models.JSONField(
|
|
default=dict,
|
|
blank=True,
|
|
help_text='Additional configuration'
|
|
)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
class Meta:
|
|
db_table = 'osint_osintconfiguration'
|
|
verbose_name = 'OSINT Configuration'
|
|
verbose_name_plural = 'OSINT Configurations'
|
|
|
|
def __str__(self):
|
|
return f"{self.service_name} ({'Active' if self.is_active else 'Inactive'})"
|
|
|
|
|
|
class SeedWebsite(models.Model):
|
|
"""
|
|
Seed websites for OSINT crawling.
|
|
"""
|
|
PRIORITY_CHOICES = [
|
|
('high', 'High'),
|
|
('medium', 'Medium'),
|
|
('low', 'Low'),
|
|
]
|
|
|
|
url = models.URLField(
|
|
max_length=500,
|
|
help_text='Base URL to crawl'
|
|
)
|
|
name = models.CharField(
|
|
max_length=200,
|
|
help_text='Friendly name for this seed website'
|
|
)
|
|
description = models.TextField(
|
|
blank=True,
|
|
help_text='Description of the website'
|
|
)
|
|
is_active = models.BooleanField(
|
|
default=True,
|
|
help_text='Enable/disable crawling for this website'
|
|
)
|
|
priority = models.CharField(
|
|
max_length=10,
|
|
choices=PRIORITY_CHOICES,
|
|
default='medium',
|
|
help_text='Crawling priority'
|
|
)
|
|
crawl_depth = models.IntegerField(
|
|
default=2,
|
|
help_text='Maximum depth to crawl (0 = only this page, 1 = this page + direct links, etc.)'
|
|
)
|
|
crawl_interval_hours = models.IntegerField(
|
|
default=24,
|
|
help_text='Hours between crawls'
|
|
)
|
|
allowed_domains = models.JSONField(
|
|
default=list,
|
|
blank=True,
|
|
help_text='List of allowed domains to crawl (empty = same domain only)'
|
|
)
|
|
user_agent = models.CharField(
|
|
max_length=255,
|
|
blank=True,
|
|
default='Mozilla/5.0 (compatible; OSINTBot/1.0)',
|
|
help_text='User agent string for requests'
|
|
)
|
|
last_crawled_at = models.DateTimeField(null=True, blank=True)
|
|
pages_crawled = models.IntegerField(default=0)
|
|
matches_found = models.IntegerField(default=0)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
created_by = models.ForeignKey(
|
|
User,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
related_name='created_seed_websites'
|
|
)
|
|
|
|
class Meta:
|
|
db_table = 'osint_seedwebsite'
|
|
verbose_name = 'Seed Website'
|
|
verbose_name_plural = 'Seed Websites'
|
|
ordering = ['-priority', '-last_crawled_at']
|
|
indexes = [
|
|
models.Index(fields=['is_active', 'priority']),
|
|
models.Index(fields=['last_crawled_at']),
|
|
]
|
|
|
|
def __str__(self):
|
|
return f"{self.name} ({self.url})"
|
|
|
|
|
|
class OSINTKeyword(models.Model):
|
|
"""
|
|
Keywords and patterns to search for during OSINT crawling.
|
|
"""
|
|
TYPE_CHOICES = [
|
|
('exact', 'Exact Match'),
|
|
('regex', 'Regular Expression'),
|
|
('phrase', 'Phrase Match'),
|
|
('domain', 'Domain Pattern'),
|
|
('email', 'Email Pattern'),
|
|
('phone', 'Phone Pattern'),
|
|
]
|
|
|
|
keyword = models.CharField(
|
|
max_length=500,
|
|
help_text='Keyword, phrase, or regex pattern to search for'
|
|
)
|
|
name = models.CharField(
|
|
max_length=200,
|
|
help_text='Friendly name for this keyword'
|
|
)
|
|
description = models.TextField(
|
|
blank=True,
|
|
help_text='Description of what this keyword detects'
|
|
)
|
|
keyword_type = models.CharField(
|
|
max_length=20,
|
|
choices=TYPE_CHOICES,
|
|
default='phrase',
|
|
help_text='Type of matching to perform'
|
|
)
|
|
is_active = models.BooleanField(
|
|
default=True,
|
|
help_text='Enable/disable this keyword'
|
|
)
|
|
case_sensitive = models.BooleanField(
|
|
default=False,
|
|
help_text='Case sensitive matching'
|
|
)
|
|
confidence_score = models.IntegerField(
|
|
default=50,
|
|
help_text='Default confidence score (0-100) when this keyword matches'
|
|
)
|
|
auto_approve = models.BooleanField(
|
|
default=False,
|
|
help_text='Auto-approve reports matching this keyword (requires high confidence)'
|
|
)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
created_by = models.ForeignKey(
|
|
User,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
related_name='created_keywords'
|
|
)
|
|
|
|
class Meta:
|
|
db_table = 'osint_keyword'
|
|
verbose_name = 'OSINT Keyword'
|
|
verbose_name_plural = 'OSINT Keywords'
|
|
ordering = ['-is_active', 'name']
|
|
indexes = [
|
|
models.Index(fields=['is_active', 'keyword_type']),
|
|
]
|
|
|
|
def __str__(self):
|
|
return f"{self.name} ({self.keyword_type})"
|
|
|
|
|
|
class CrawledContent(models.Model):
|
|
"""
|
|
Content crawled from seed websites.
|
|
"""
|
|
seed_website = models.ForeignKey(
|
|
SeedWebsite,
|
|
on_delete=models.CASCADE,
|
|
related_name='crawled_contents'
|
|
)
|
|
url = models.URLField(
|
|
max_length=1000,
|
|
help_text='URL of the crawled page'
|
|
)
|
|
title = models.CharField(
|
|
max_length=500,
|
|
blank=True,
|
|
help_text='Page title'
|
|
)
|
|
content = models.TextField(
|
|
help_text='Crawled page content'
|
|
)
|
|
html_content = models.TextField(
|
|
blank=True,
|
|
help_text='Raw HTML content'
|
|
)
|
|
matched_keywords = models.ManyToManyField(
|
|
OSINTKeyword,
|
|
blank=True,
|
|
related_name='matched_contents',
|
|
help_text='Keywords that matched this content'
|
|
)
|
|
match_count = models.IntegerField(
|
|
default=0,
|
|
help_text='Number of keyword matches found'
|
|
)
|
|
confidence_score = models.IntegerField(
|
|
default=0,
|
|
help_text='Calculated confidence score based on matches'
|
|
)
|
|
has_potential_scam = models.BooleanField(
|
|
default=False,
|
|
help_text='Flagged as potential scam based on keyword matches'
|
|
)
|
|
crawled_at = models.DateTimeField(auto_now_add=True)
|
|
http_status = models.IntegerField(
|
|
null=True,
|
|
blank=True,
|
|
help_text='HTTP status code'
|
|
)
|
|
content_hash = models.CharField(
|
|
max_length=64,
|
|
blank=True,
|
|
help_text='SHA256 hash of content for deduplication'
|
|
)
|
|
|
|
class Meta:
|
|
db_table = 'osint_crawledcontent'
|
|
verbose_name = 'Crawled Content'
|
|
verbose_name_plural = 'Crawled Contents'
|
|
ordering = ['-crawled_at', '-confidence_score']
|
|
indexes = [
|
|
models.Index(fields=['seed_website', 'crawled_at']),
|
|
models.Index(fields=['has_potential_scam', 'confidence_score']),
|
|
models.Index(fields=['content_hash']),
|
|
]
|
|
unique_together = [['url', 'content_hash']]
|
|
|
|
def __str__(self):
|
|
return f"{self.title or self.url} - {self.match_count} matches"
|
|
|
|
|
|
class AutoGeneratedReport(models.Model):
|
|
"""
|
|
Automatically generated scam reports from OSINT crawling.
|
|
"""
|
|
STATUS_CHOICES = [
|
|
('pending', 'Pending Review'),
|
|
('approved', 'Approved'),
|
|
('rejected', 'Rejected'),
|
|
('published', 'Published'),
|
|
]
|
|
|
|
crawled_content = models.OneToOneField(
|
|
CrawledContent,
|
|
on_delete=models.CASCADE,
|
|
related_name='auto_report'
|
|
)
|
|
report = models.ForeignKey(
|
|
ScamReport,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
related_name='auto_generated_reports',
|
|
help_text='Linked scam report (created when approved)'
|
|
)
|
|
title = models.CharField(
|
|
max_length=500,
|
|
help_text='Auto-generated report title'
|
|
)
|
|
description = models.TextField(
|
|
help_text='Auto-generated report description'
|
|
)
|
|
source_url = models.URLField(
|
|
max_length=1000,
|
|
help_text='Source URL where scam was found'
|
|
)
|
|
matched_keywords = models.ManyToManyField(
|
|
OSINTKeyword,
|
|
related_name='generated_reports'
|
|
)
|
|
confidence_score = models.IntegerField(
|
|
default=0,
|
|
help_text='Confidence score (0-100)'
|
|
)
|
|
status = models.CharField(
|
|
max_length=20,
|
|
choices=STATUS_CHOICES,
|
|
default='pending',
|
|
help_text='Review status'
|
|
)
|
|
review_notes = models.TextField(
|
|
blank=True,
|
|
help_text='Notes from moderator/admin review'
|
|
)
|
|
reviewed_by = models.ForeignKey(
|
|
User,
|
|
on_delete=models.SET_NULL,
|
|
null=True,
|
|
blank=True,
|
|
related_name='reviewed_auto_reports'
|
|
)
|
|
reviewed_at = models.DateTimeField(null=True, blank=True)
|
|
published_at = models.DateTimeField(null=True, blank=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
class Meta:
|
|
db_table = 'osint_autogeneratedreport'
|
|
verbose_name = 'Auto-Generated Report'
|
|
verbose_name_plural = 'Auto-Generated Reports'
|
|
ordering = ['-created_at', '-confidence_score']
|
|
indexes = [
|
|
models.Index(fields=['status', 'confidence_score']),
|
|
models.Index(fields=['created_at']),
|
|
]
|
|
|
|
def __str__(self):
|
|
return f"{self.title} - {self.get_status_display()}"
|