Files
Iliyan Angelov c67067a2a4 Mail
2025-09-14 23:24:25 +03:00

300 lines
11 KiB
Python

from rest_framework import generics, status, permissions, filters
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from rest_framework.views import APIView
from django_filters.rest_framework import DjangoFilterBackend
from django.db import models
from django.db.models import Q, Count
from django.shortcuts import get_object_or_404
from django_ratelimit.decorators import ratelimit
from django.http import HttpResponse
import csv
import io
from datetime import datetime
from .models import Contact, ContactGroup, ContactInteraction, ContactImport
from .serializers import (
ContactSerializer, ContactCreateSerializer, ContactGroupSerializer,
ContactInteractionSerializer, ContactImportSerializer, ContactBulkActionSerializer,
ContactSearchSerializer
)
from .tasks import process_contact_import
class ContactGroupListCreateView(generics.ListCreateAPIView):
"""List and create contact groups."""
serializer_class = ContactGroupSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return ContactGroup.objects.filter(user=self.request.user)
class ContactGroupDetailView(generics.RetrieveUpdateDestroyAPIView):
"""Retrieve, update, or delete contact group."""
serializer_class = ContactGroupSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return ContactGroup.objects.filter(user=self.request.user)
class ContactListCreateView(generics.ListCreateAPIView):
"""List and create contacts."""
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['group', 'is_favorite', 'is_blocked', 'company', 'city', 'country']
search_fields = ['first_name', 'last_name', 'email', 'company', 'phone']
ordering_fields = ['first_name', 'last_name', 'email', 'created_at']
ordering = ['first_name', 'last_name']
def get_serializer_class(self):
if self.request.method == 'POST':
return ContactCreateSerializer
return ContactSerializer
def get_queryset(self):
return Contact.objects.filter(user=self.request.user).select_related('group')
def perform_create(self, serializer):
serializer.save(user=self.request.user)
class ContactDetailView(generics.RetrieveUpdateDestroyAPIView):
"""Retrieve, update, or delete contact."""
serializer_class = ContactSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return Contact.objects.filter(user=self.request.user).select_related('group')
class ContactSearchView(generics.ListAPIView):
"""Search contacts with advanced filters."""
serializer_class = ContactSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Contact.objects.filter(user=self.request.user)
# Get search parameters
query = self.request.query_params.get('q', '')
group = self.request.query_params.get('group')
is_favorite = self.request.query_params.get('is_favorite')
is_blocked = self.request.query_params.get('is_blocked')
company = self.request.query_params.get('company')
city = self.request.query_params.get('city')
country = self.request.query_params.get('country')
# Apply filters
if query:
queryset = queryset.filter(
Q(first_name__icontains=query) |
Q(last_name__icontains=query) |
Q(email__icontains=query) |
Q(company__icontains=query) |
Q(phone__icontains=query)
)
if group:
queryset = queryset.filter(group_id=group)
if is_favorite is not None:
queryset = queryset.filter(is_favorite=is_favorite.lower() == 'true')
if is_blocked is not None:
queryset = queryset.filter(is_blocked=is_blocked.lower() == 'true')
if company:
queryset = queryset.filter(company__icontains=company)
if city:
queryset = queryset.filter(city__icontains=city)
if country:
queryset = queryset.filter(country__icontains=country)
return queryset.select_related('group')
class ContactBulkActionView(APIView):
"""Perform bulk actions on contacts."""
permission_classes = [permissions.IsAuthenticated]
def post(self, request):
serializer = ContactBulkActionSerializer(data=request.data, context={'request': request})
if serializer.is_valid():
contact_ids = serializer.validated_data['contact_ids']
action = serializer.validated_data['action']
group_id = serializer.validated_data.get('group_id')
contacts = Contact.objects.filter(id__in=contact_ids, user=request.user)
if action == 'add_to_group':
group = get_object_or_404(ContactGroup, id=group_id, user=request.user)
contacts.update(group=group)
elif action == 'remove_from_group':
contacts.update(group=None)
elif action == 'mark_favorite':
contacts.update(is_favorite=True)
elif action == 'mark_unfavorite':
contacts.update(is_favorite=False)
elif action == 'block':
contacts.update(is_blocked=True)
elif action == 'unblock':
contacts.update(is_blocked=False)
elif action == 'delete':
contacts.delete()
return Response({'message': f'Bulk action {action} completed successfully'})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class ContactInteractionListCreateView(generics.ListCreateAPIView):
"""List and create contact interactions."""
serializer_class = ContactInteractionSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
contact_id = self.kwargs.get('contact_id')
return ContactInteraction.objects.filter(
contact_id=contact_id,
contact__user=self.request.user
).order_by('-date')
def perform_create(self, serializer):
contact_id = self.kwargs.get('contact_id')
contact = get_object_or_404(Contact, id=contact_id, user=self.request.user)
serializer.save(contact=contact, created_by=self.request.user)
class ContactInteractionDetailView(generics.RetrieveUpdateDestroyAPIView):
"""Retrieve, update, or delete contact interaction."""
serializer_class = ContactInteractionSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return ContactInteraction.objects.filter(contact__user=self.request.user)
class ContactImportView(generics.CreateAPIView):
"""Import contacts from CSV file."""
serializer_class = ContactImportSerializer
permission_classes = [permissions.IsAuthenticated]
def perform_create(self, serializer):
import_obj = serializer.save(user=self.request.user)
# Process import asynchronously
process_contact_import.delay(import_obj.id)
class ContactImportDetailView(generics.RetrieveAPIView):
"""Retrieve contact import status."""
serializer_class = ContactImportSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return ContactImport.objects.filter(user=self.request.user)
@api_view(['GET'])
@permission_classes([permissions.IsAuthenticated])
def contact_stats(request):
"""Get contact statistics."""
user = request.user
stats = {
'total_contacts': Contact.objects.filter(user=user).count(),
'favorite_contacts': Contact.objects.filter(user=user, is_favorite=True).count(),
'blocked_contacts': Contact.objects.filter(user=user, is_blocked=True).count(),
'total_groups': ContactGroup.objects.filter(user=user).count(),
'recent_interactions': ContactInteraction.objects.filter(
contact__user=user
).count(),
'group_stats': ContactGroup.objects.filter(user=user).annotate(
contact_count=models.Count('contacts')
).values('name', 'contact_count'),
}
return Response(stats)
@api_view(['POST'])
@permission_classes([permissions.IsAuthenticated])
def export_contacts(request):
"""Export contacts to CSV."""
user = request.user
format_type = request.data.get('format', 'csv')
if format_type == 'csv':
# Create CSV response
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="contacts.csv"'
writer = csv.writer(response)
writer.writerow([
'First Name', 'Last Name', 'Email', 'Phone', 'Company', 'Job Title',
'Address Line 1', 'Address Line 2', 'City', 'State', 'Postal Code',
'Country', 'Website', 'LinkedIn', 'Twitter', 'Facebook', 'Notes',
'Birthday', 'Group', 'Is Favorite', 'Is Blocked'
])
contacts = Contact.objects.filter(user=user).select_related('group')
for contact in contacts:
writer.writerow([
contact.first_name,
contact.last_name,
contact.email,
contact.phone,
contact.company,
contact.job_title,
contact.address_line1,
contact.address_line2,
contact.city,
contact.state,
contact.postal_code,
contact.country,
contact.website,
contact.linkedin,
contact.twitter,
contact.facebook,
contact.notes,
contact.birthday,
contact.group.name if contact.group else '',
contact.is_favorite,
contact.is_blocked,
])
return response
return Response({'error': 'Unsupported format'}, status=status.HTTP_400_BAD_REQUEST)
@api_view(['GET'])
@permission_classes([permissions.IsAuthenticated])
def contact_suggestions(request):
"""Get contact suggestions based on email addresses."""
email = request.query_params.get('email', '')
if not email:
return Response({'suggestions': []})
# Find contacts with similar email addresses
suggestions = Contact.objects.filter(
user=request.user,
email__icontains=email
).values('id', 'first_name', 'last_name', 'email', 'company')[:10]
return Response({'suggestions': list(suggestions)})