Files
Iliyan Angelov c67067a2a4 Mail
2025-09-14 23:24:25 +03:00

138 lines
5.3 KiB
Python

from celery import shared_task
import csv
import io
import logging
from django.utils import timezone
from django.core.files.base import ContentFile
from .models import Contact, ContactGroup, ContactImport
logger = logging.getLogger(__name__)
@shared_task
def process_contact_import(import_id):
"""Process contact import from CSV file."""
try:
import_obj = ContactImport.objects.get(id=import_id)
import_obj.status = 'processing'
import_obj.save()
# Read CSV file
csv_file = import_obj.file.read().decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(csv_file))
total_contacts = 0
imported_contacts = 0
failed_contacts = 0
error_log = []
for row in csv_reader:
total_contacts += 1
try:
# Extract contact data
first_name = row.get('First Name', '').strip()
last_name = row.get('Last Name', '').strip()
email = row.get('Email', '').strip()
if not email:
error_log.append(f"Row {total_contacts}: Email is required")
failed_contacts += 1
continue
# Check if contact already exists
if Contact.objects.filter(user=import_obj.user, email=email).exists():
error_log.append(f"Row {total_contacts}: Contact with email {email} already exists")
failed_contacts += 1
continue
# Get or create group
group = None
group_name = row.get('Group', '').strip()
if group_name:
group, created = ContactGroup.objects.get_or_create(
user=import_obj.user,
name=group_name,
defaults={'description': f'Imported from {import_obj.filename}'}
)
# Create contact
contact = Contact.objects.create(
user=import_obj.user,
group=group,
first_name=first_name,
last_name=last_name,
email=email,
phone=row.get('Phone', '').strip(),
company=row.get('Company', '').strip(),
job_title=row.get('Job Title', '').strip(),
address_line1=row.get('Address Line 1', '').strip(),
address_line2=row.get('Address Line 2', '').strip(),
city=row.get('City', '').strip(),
state=row.get('State', '').strip(),
postal_code=row.get('Postal Code', '').strip(),
country=row.get('Country', '').strip(),
website=row.get('Website', '').strip(),
linkedin=row.get('LinkedIn', '').strip(),
twitter=row.get('Twitter', '').strip(),
facebook=row.get('Facebook', '').strip(),
notes=row.get('Notes', '').strip(),
is_favorite=row.get('Is Favorite', '').lower() == 'true',
is_blocked=row.get('Is Blocked', '').lower() == 'true',
)
imported_contacts += 1
logger.info(f"Imported contact {contact.id} for user {import_obj.user.id}")
except Exception as e:
error_log.append(f"Row {total_contacts}: {str(e)}")
failed_contacts += 1
logger.error(f"Error importing contact row {total_contacts}: {str(e)}")
# Update import status
import_obj.status = 'completed'
import_obj.total_contacts = total_contacts
import_obj.imported_contacts = imported_contacts
import_obj.failed_contacts = failed_contacts
import_obj.error_log = '\n'.join(error_log)
import_obj.completed_at = timezone.now()
import_obj.save()
logger.info(f"Contact import {import_id} completed: {imported_contacts} imported, {failed_contacts} failed")
except Exception as e:
logger.error(f"Failed to process contact import {import_id}: {str(e)}")
try:
import_obj = ContactImport.objects.get(id=import_id)
import_obj.status = 'failed'
import_obj.error_log = str(e)
import_obj.save()
except ContactImport.DoesNotExist:
pass
@shared_task
def cleanup_contact_imports():
"""Clean up old contact import files."""
try:
from django.utils import timezone
from datetime import timedelta
# Delete import files older than 7 days
cutoff_date = timezone.now() - timedelta(days=7)
old_imports = ContactImport.objects.filter(
created_at__lt=cutoff_date,
status__in=['completed', 'failed']
)
count = old_imports.count()
for import_obj in old_imports:
if import_obj.file:
import_obj.file.delete()
import_obj.delete()
logger.info(f"Cleaned up {count} old contact import files")
except Exception as e:
logger.error(f"Failed to cleanup contact imports: {str(e)}")