ধাপ ১.১: Virtual Environment তৈরি

# নতুন ফোল্ডার তৈরি করুন
mkdir fraud_detection_project
cd fraud_detection_project

# Virtual environment তৈরি করুন
python3 -m venv venv

# Linux/Mac:
source venv/bin/activate


ধাপ ১.২: requirements.txt তৈরি করুন

install Library

pip install Django djangorestframework django-cors-headers requests user-agents pillow

Django==4.2.7
djangorestframework==3.14.0
django-cors-headers==4.3.1
python-decouple==3.8
requests==2.31.0
user-agents==2.2.0
pillow==10.1.0

pip install -r requirements.txt

ধাপ ১.৩: Django Project তৈরি করুন

# Project তৈরি
django-admin startproject config .

# App তৈরি
python manage.py startapp frauddetect

ধাপ ১.৪: .env ফাইল তৈরি করুন

SECRET_KEY=django-insecure-your-secret-key-here-change-in-production
DEBUG=True

ধাপ ১.৫: config/settings.py সম্পূর্ণ কনফিগারেশন

from pathlib import Path
from decouple import config

BASE_DIR = Path(__file__).resolve().parent.parent

SECRET_KEY = config('SECRET_KEY', default='django-insecure-change-this-in-production')
DEBUG = config('DEBUG', default=True, cast=bool)

ALLOWED_HOSTS = ['*']

# ============================================
# INSTALLED APPS
# ============================================
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    
    # Third Party Apps
    'rest_framework',
    'corsheaders',
    
    # Local Apps
    'frauddetect',
]

# ============================================
# MIDDLEWARE
# ============================================
MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'corsheaders.middleware.CorsMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'frauddetect.middleware.DeviceFingerprintMiddleware',  # আমাদের custom middleware
]

ROOT_URLCONF = 'config.urls'

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    },
]

WSGI_APPLICATION = 'config.wsgi.application'

# ============================================
# DATABASE
# ============================================
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': BASE_DIR / 'db.sqlite3',
    }
}

# ============================================
# PASSWORD VALIDATION
# ============================================
AUTH_PASSWORD_VALIDATORS = [
    {'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'},
    {'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator'},
    {'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'},
    {'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator'},
]

# ============================================
# INTERNATIONALIZATION
# ============================================
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Dhaka'
USE_I18N = True
USE_TZ = True

STATIC_URL = 'static/'
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'

# ============================================
# REST FRAMEWORK CONFIGURATION
# ============================================
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.SessionAuthentication',
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticated',
    ],
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
    'PAGE_SIZE': 50,
}

# ============================================
# CORS SETTINGS
# ============================================
CORS_ALLOWED_ORIGINS = [
    "http://localhost:3000",
    "http://127.0.0.1:3000",
]
CORS_ALLOW_CREDENTIALS = True

# ============================================
# 🔥 FRAUD DETECTION SETTINGS
# ============================================
FRAUD_SETTINGS = {
    'MAX_LOGIN_ATTEMPTS': 5,              # সর্বোচ্চ লগইন চেষ্টা
    'LOGIN_ATTEMPT_WINDOW': 300,          # ৫ মিনিট
    'HIGH_AMOUNT_THRESHOLD': 100000,      # ১ লাখ টাকার বেশি = সন্দেহজনক
    'BUSINESS_HOURS_START': 8,            # সকাল ৮টা
    'BUSINESS_HOURS_END': 18,             # সন্ধ্যা ৬টা
    'MAX_DAILY_TRANSACTIONS': 50,         # দৈনিক সর্বোচ্চ ট্রানজেকশন
    'MAX_TRANSACTION_AMOUNT_DAILY': 500000,
    'VELOCITY_CHECK_WINDOW': 3600,        # ১ ঘণ্টা
    'MAX_TRANSACTIONS_PER_HOUR': 10,      # ঘণ্টায় সর্বোচ্চ ১০টি
}

# ============================================
# 🌍 COUNTRY RISK LEVELS
# ============================================
# উচ্চ ঝুঁকিপূর্ণ দেশ
HIGH_RISK_COUNTRIES = ['YE', 'SY', 'IQ', 'SD', 'SO', 'LY', 'AF', 'IR', 'NG', 'PK', 'BD']

# মাঝারি ঝুঁকিপূর্ণ দেশ
MEDIUM_RISK_COUNTRIES = ['EG', 'JO', 'MA', 'TN', 'TR', 'IN', 'CN', 'BR', 'MX']

# নিম্ন ঝুঁকিপূর্ণ দেশ (নিরাপদ)
LOW_RISK_COUNTRIES = ['SA', 'AE', 'KW', 'QA', 'BH', 'OM', 'US', 'CA', 'UK', 'GB', 'DE', 'FR']

এই পার্টে আমরা সব Database Tables/Models তৈরি করবো।

ধাপ ২.১: frauddetect/models.py

from django.db import models
from django.contrib.auth.models import User
from django.utils import timezone
from datetime import timedelta
import hashlib
import json


# ============================================
# 📱 MODEL 1: DEVICE (ডিভাইস ট্র্যাকিং)
# ============================================
class Device(models.Model):
    """
    ব্যবহারকারীর ডিভাইস ট্র্যাক করার জন্য
    একই ব্যক্তি বিভিন্ন ডিভাইস থেকে লগইন করলে ট্র্যাক করা যায়
    """
    STATUS_CHOICES = [
        ('normal', 'Normal'),
        ('suspicious', 'Suspicious'),
        ('blocked', 'Blocked'),
    ]
    
    # কোন ইউজারের ডিভাইস
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='devices')
    
    # ডিভাইসের Unique Fingerprint (Hash)
    fingerprint_hash = models.CharField(max_length=64, db_index=True)
    device_fingerprint = models.TextField(null=True, blank=True)
    
    # ডিভাইসের তথ্য
    device_type = models.CharField(max_length=50, null=True, blank=True)  # mobile/desktop/tablet
    device_name = models.CharField(max_length=100, null=True, blank=True)
    os_name = models.CharField(max_length=50, null=True, blank=True)      # Windows/Android/iOS
    os_version = models.CharField(max_length=50, null=True, blank=True)
    browser_name = models.CharField(max_length=50, null=True, blank=True)  # Chrome/Firefox
    browser_version = models.CharField(max_length=50, null=True, blank=True)
    
    # লোকেশন তথ্য
    last_ip = models.GenericIPAddressField(null=True, blank=True)
    last_country_code = models.CharField(max_length=2, null=True, blank=True)
    last_city = models.CharField(max_length=100, null=True, blank=True)
    
    # স্ট্যাটাস
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='normal')
    is_trusted = models.BooleanField(default=False)  # বিশ্বস্ত ডিভাইস কিনা
    is_blocked = models.BooleanField(default=False)  # ব্লক করা হয়েছে কিনা
    risk_score = models.IntegerField(default=0)      # ঝুঁকি স্কোর (0-100)
    
    # সময়
    first_seen_at = models.DateTimeField(auto_now_add=True)
    last_seen_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        db_table = 'devices'
        indexes = [
            models.Index(fields=['fingerprint_hash', 'user']),
            models.Index(fields=['last_ip']),
        ]
    
    def __str__(self):
        return f"{self.user.username} - {self.device_name or 'Unknown Device'}"


# ============================================
# 🔐 MODEL 2: LOGIN EVENT (লগইন ইভেন্ট)
# ============================================
class LoginEvent(models.Model):
    """
    প্রতিটি লগইন চেষ্টা রেকর্ড করার জন্য
    সফল/ব্যর্থ উভয় লগইন ট্র্যাক করা হয়
    """
    STATUS_CHOICES = [
        ('success', 'Success'),
        ('failed', 'Failed'),
        ('blocked', 'Blocked'),
    ]
    
    user = models.ForeignKey(User, on_delete=models.CASCADE, null=True, blank=True)
    username = models.CharField(max_length=150)  # ব্যর্থ লগইনের জন্য username রাখা হয়
    device = models.ForeignKey(Device, on_delete=models.SET_NULL, null=True, blank=True)
    
    # লগইন ডিটেইলস
    status = models.CharField(max_length=20, choices=STATUS_CHOICES)
    ip_address = models.GenericIPAddressField()
    country_code = models.CharField(max_length=2, null=True, blank=True)
    city = models.CharField(max_length=100, null=True, blank=True)
    
    # ঝুঁকি মূল্যায়ন
    is_suspicious = models.BooleanField(default=False)
    risk_score = models.IntegerField(default=0)
    risk_reasons = models.JSONField(default=list)  # কেন সন্দেহজনক তার কারণ
    
    # অতিরিক্ত তথ্য
    user_agent = models.TextField(null=True, blank=True)
    attempt_time = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'login_events'
        ordering = ['-attempt_time']
        indexes = [
            models.Index(fields=['username', 'attempt_time']),
            models.Index(fields=['ip_address']),
        ]
    
    def __str__(self):
        return f"{self.username} - {self.status} at {self.attempt_time}"


# ============================================
# 💰 MODEL 3: TRANSACTION (লেনদেন)
# ============================================
class Transaction(models.Model):
    """
    আর্থিক লেনদেন ট্র্যাক করার জন্য
    প্রতিটি লেনদেনের ঝুঁকি মূল্যায়ন করা হয়
    """
    STATUS_CHOICES = [
        ('pending', 'Pending'),      # অপেক্ষমাণ
        ('approved', 'Approved'),    # অনুমোদিত
        ('rejected', 'Rejected'),    # প্রত্যাখ্যাত
        ('flagged', 'Flagged'),      # সন্দেহজনক হিসেবে চিহ্নিত
    ]
    
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='transactions')
    device = models.ForeignKey(Device, on_delete=models.SET_NULL, null=True, blank=True)
    
    # লেনদেনের ডিটেইলস
    external_txn_id = models.CharField(max_length=100, unique=True)  # বাহ্যিক ট্রানজেকশন ID
    amount = models.DecimalField(max_digits=15, decimal_places=2)    # পরিমাণ
    currency = models.CharField(max_length=3, default='SAR')         # মুদ্রা
    description = models.TextField(null=True, blank=True)            # বিবরণ
    beneficiary = models.CharField(max_length=255, null=True, blank=True)  # প্রাপক
    
    # স্ট্যাটাস
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
    
    # ঝুঁকি মূল্যায়ন
    risk_score = models.IntegerField(default=0)           # ০-১০০
    risk_level = models.CharField(max_length=20, default='low')  # low/medium/high
    is_suspicious = models.BooleanField(default=False)
    
    # মেটাডেটা
    raw_payload = models.JSONField(null=True, blank=True)
    ip_address = models.GenericIPAddressField(null=True, blank=True)
    
    # সময়
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    approved_at = models.DateTimeField(null=True, blank=True)
    
    class Meta:
        db_table = 'transactions'
        ordering = ['-created_at']
        indexes = [
            models.Index(fields=['user', 'created_at']),
            models.Index(fields=['status']),
        ]
    
    def __str__(self):
        return f"TXN-{self.external_txn_id} - {self.amount} {self.currency}"


# ============================================
# 🚨 MODEL 4: FRAUD EVENT (জালিয়াতি ইভেন্ট)
# ============================================
class FraudEvent(models.Model):
    """
    সন্দেহজনক কার্যকলাপ সনাক্ত হলে এখানে রেকর্ড করা হয়
    """
    SEVERITY_CHOICES = [
        ('low', 'Low'),
        ('medium', 'Medium'),
        ('high', 'High'),
        ('critical', 'Critical'),
    ]
    
    transaction = models.ForeignKey(Transaction, on_delete=models.CASCADE, null=True, blank=True)
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    
    # জালিয়াতির ডিটেইলস
    rule_id = models.CharField(max_length=50, null=True, blank=True)  # কোন নিয়ম ভঙ্গ হয়েছে
    triggered_rules = models.JSONField(default=list)  # সব triggered নিয়মের তালিকা
    risk_score = models.IntegerField()
    severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES)
    
    # বিবরণ
    description = models.TextField()
    recommendations = models.TextField(null=True, blank=True)  # কী করা উচিত
    
    # সমাধান স্ট্যাটাস
    is_resolved = models.BooleanField(default=False)
    resolved_by = models.ForeignKey(
        User, on_delete=models.SET_NULL, 
        null=True, blank=True, 
        related_name='resolved_frauds'
    )
    resolved_at = models.DateTimeField(null=True, blank=True)
    resolution_notes = models.TextField(null=True, blank=True)
    
    # সময়
    detected_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'fraud_events'
        ordering = ['-detected_at']
        indexes = [
            models.Index(fields=['user', 'detected_at']),
            models.Index(fields=['severity']),
        ]
    
    def __str__(self):
        return f"Fraud Event - {self.severity} - {self.user.username}"


# ============================================
# 📊 MODEL 5: RISK PROFILE (ঝুঁকি প্রোফাইল)
# ============================================
class RiskProfile(models.Model):
    """
    প্রতিটি ব্যবহারকারীর সামগ্রিক ঝুঁকি প্রোফাইল
    """
    user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='risk_profile')
    
    # সামগ্রিক ঝুঁকি
    overall_risk_score = models.IntegerField(default=0)
    risk_level = models.CharField(max_length=20, default='low')
    
    # পরিসংখ্যান
    total_transactions = models.IntegerField(default=0)
    total_amount = models.DecimalField(max_digits=15, decimal_places=2, default=0)
    suspicious_events_count = models.IntegerField(default=0)
    failed_login_count = models.IntegerField(default=0)
    
    # আচরণগত প্যাটার্ন
    avg_transaction_amount = models.DecimalField(max_digits=15, decimal_places=2, default=0)
    usual_login_hours = models.JSONField(default=list)    # সাধারণত কোন সময় লগইন করে
    usual_countries = models.JSONField(default=list)       # সাধারণত কোন দেশ থেকে
    trusted_devices_count = models.IntegerField(default=0)
    
    # স্ট্যাটাস
    is_monitored = models.BooleanField(default=False)  # নজরদারিতে আছে কিনা
    is_blocked = models.BooleanField(default=False)    # ব্লক করা হয়েছে কিনা
    
    # সময়
    last_reviewed_at = models.DateTimeField(null=True, blank=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        db_table = 'risk_profiles'
    
    def __str__(self):
        return f"{self.user.username} - Risk: {self.risk_level}"


# ============================================
# 🚫 MODEL 6: IP BLOCKLIST (ব্লক করা IP)
# ============================================
class IPBlocklist(models.Model):
    """
    ব্লক করা IP ঠিকানার তালিকা
    """
    ip_address = models.GenericIPAddressField(unique=True)
    reason = models.TextField()
    blocked_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
    is_active = models.BooleanField(default=True)
    
    created_at = models.DateTimeField(auto_now_add=True)
    expires_at = models.DateTimeField(null=True, blank=True)  # কখন মেয়াদ শেষ হবে
    
    class Meta:
        db_table = 'ip_blocklist'
    
    def __str__(self):
        return f"Blocked IP: {self.ip_address}"


# ============================================
# 📝 MODEL 7: SYSTEM LOG (সিস্টেম লগ)
# ============================================
class SystemLog(models.Model):
    """
    সব ধরনের সিস্টেম কার্যকলাপ লগ করার জন্য
    """
    LOG_TYPE_CHOICES = [
        ('login', 'Login'),
        ('transaction', 'Transaction'),
        ('fraud_alert', 'Fraud Alert'),
        ('security', 'Security'),
        ('system', 'System'),
    ]
    
    LEVEL_CHOICES = [
        ('info', 'Info'),
        ('warning', 'Warning'),
        ('error', 'Error'),
        ('critical', 'Critical'),
    ]
    
    log_type = models.CharField(max_length=50, choices=LOG_TYPE_CHOICES)
    level = models.CharField(max_length=20, choices=LEVEL_CHOICES, default='info')
    message = models.TextField()
    user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
    ip_address = models.GenericIPAddressField(null=True, blank=True)
    metadata = models.JSONField(null=True, blank=True)
    
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'system_logs'
        ordering = ['-created_at']
        indexes = [
            models.Index(fields=['log_type', 'created_at']),
            models.Index(fields=['level']),
        ]
    
    def __str__(self):
        return f"{self.log_type} - {self.level} - {self.created_at}"

ধাপ ২.২: Migration চালান

python manage.py makemigrations frauddetect
python manage.py migrate

Part 2 সম্পন্ন!

এখন আপনার ডাটাবেসে ৭টি টেবিল তৈরি হয়েছে:

  • devices – ডিভাইস ট্র্যাকিং
  • login_events – লগইন ইভেন্ট
  • transactions – লেনদেন
  • fraud_events – জালিয়াতি ইভেন্ট
  • risk_profiles – ঝুঁকি প্রোফাইল
  • ip_blocklist – ব্লক করা IP
  • system_logs – সিস্টেম লগ

ধাপ ৩.১: frauddetect/utils.py তৈরি করুন

import hashlib
import requests
from django.conf import settings
from django.utils import timezone
from datetime import timedelta
from django.db import models


def calculate_device_fingerprint(request):
    """
    Request থেকে Device Fingerprint তৈরি করে
    একই ডিভাইস থেকে আসা request গুলো identify করতে সাহায্য করে
    """
    components = [
        request.META.get('HTTP_USER_AGENT', ''),
        request.META.get('HTTP_ACCEPT_LANGUAGE', ''),
        request.META.get('HTTP_ACCEPT_ENCODING', ''),
    ]
    fingerprint = '|'.join(components)
    return hashlib.sha256(fingerprint.encode()).hexdigest()


def get_client_ip(request):
    """
    Request থেকে সঠিক Client IP Address বের করে
    Proxy/Load Balancer এর পেছনে থাকলেও কাজ করে
    """
    x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
    if x_forwarded_for:
        # Multiple proxies থাকলে প্রথমটি হলো actual client IP
        ip = x_forwarded_for.split(',')[0].strip()
    else:
        ip = request.META.get('REMOTE_ADDR')
    return ip


def get_country_risk_level(country_code):
    """
    দেশের কোড থেকে ঝুঁকি স্তর নির্ধারণ করে
    
    Returns:
        dict: level, score, reason
    """
    if not country_code:
        return {'level': 'medium', 'score': 20, 'reason': 'Unknown Country'}
    
    country = country_code.upper()
    
    if country in settings.HIGH_RISK_COUNTRIES:
        return {
            'level': 'high', 
            'score': 30, 
            'reason': f'High-Risk Country ({country})'
        }
    elif country in settings.MEDIUM_RISK_COUNTRIES:
        return {
            'level': 'medium', 
            'score': 15, 
            'reason': f'Medium-Risk Country ({country})'
        }
    elif country in settings.LOW_RISK_COUNTRIES:
        return {
            'level': 'low', 
            'score': 5, 
            'reason': f'Low-Risk Country ({country})'
        }
    else:
        return {
            'level': 'medium', 
            'score': 20, 
            'reason': f'Unknown Country ({country})'
        }


def get_geo_location(ip_address):
    """
    IP Address থেকে Geographic Location বের করে
    Free API ব্যবহার করে (ipapi.co)
    """
    try:
        response = requests.get(
            f'https://ipapi.co/{ip_address}/json/', 
            timeout=3
        )
        if response.status_code == 200:
            data = response.json()
            return {
                'country_code': data.get('country_code', 'SA'),
                'city': data.get('city', 'Unknown'),
                'latitude': data.get('latitude'),
                'longitude': data.get('longitude'),
            }
    except Exception as e:
        print(f"Geo location error: {e}")
    
    # Default return যদি API কাজ না করে
    return {
        'country_code': 'SA', 
        'city': 'Unknown',
        'latitude': None,
        'longitude': None
    }


def check_velocity(user, check_type='login', window_minutes=60):
    """
    Velocity Check - নির্দিষ্ট সময়ে কতগুলো action হয়েছে তা চেক করে
    
    যেমন: ১ ঘণ্টায় ১০টির বেশি transaction হলে suspicious
    
    Args:
        user: User object
        check_type: 'login' বা 'transaction'
        window_minutes: কত মিনিটের মধ্যে চেক করতে হবে
    
    Returns:
        bool: True যদি limit exceed করে
    """
    from .models import LoginEvent, Transaction
    
    time_threshold = timezone.now() - timedelta(minutes=window_minutes)
    
    if check_type == 'login':
        count = LoginEvent.objects.filter(
            user=user,
            attempt_time__gte=time_threshold
        ).count()
    else:
        count = Transaction.objects.filter(
            user=user,
            created_at__gte=time_threshold
        ).count()
    
    max_allowed = settings.FRAUD_SETTINGS['MAX_TRANSACTIONS_PER_HOUR']
    
    return count >= max_allowed


def is_business_hours():
    """
    বর্তমান সময় business hours এর মধ্যে কিনা চেক করে
    Office hours এর বাইরে transaction সন্দেহজনক হতে পারে
    
    Returns:
        bool: True যদি business hours এর মধ্যে থাকে
    """
    current_hour = timezone.now().hour
    start = settings.FRAUD_SETTINGS['BUSINESS_HOURS_START']
    end = settings.FRAUD_SETTINGS['BUSINESS_HOURS_END']
    
    return start <= current_hour <= end


def check_ip_blocklist(ip_address):
    """
    IP Address ব্লকলিস্টে আছে কিনা চেক করে
    
    Returns:
        bool: True যদি ব্লক করা থাকে
    """
    from .models import IPBlocklist
    
    blocked = IPBlocklist.objects.filter(
        ip_address=ip_address,
        is_active=True
    ).filter(
        # মেয়াদ শেষ হয়নি বা মেয়াদ নেই
        models.Q(expires_at__isnull=True) | models.Q(expires_at__gt=timezone.now())
    ).exists()
    
    return blocked


def calculate_transaction_risk(transaction):
    """
    🔥 মূল Fraud Detection Logic
    
    Transaction এর ঝুঁকি স্কোর এবং level নির্ধারণ করে
    বিভিন্ন নিয়ম প্রয়োগ করে মোট স্কোর বের করে
    
    Args:
        transaction: Transaction object
    
    Returns:
        dict: risk_score, risk_level, triggered_rules
    """
    risk_score = 0
    triggered_rules = []
    
    # ============================================
    # Rule FR-01: High Amount Transaction
    # ১ লাখ টাকার বেশি হলে সন্দেহজনক
    # ============================================
    if transaction.amount > settings.FRAUD_SETTINGS['HIGH_AMOUNT_THRESHOLD']:
        risk_score += 40
        triggered_rules.append('FR-01: High Amount Transaction (>100,000)')
    
    # ============================================
    # Rule FR-02: Outside Business Hours
    # অফিস সময়ের বাইরে লেনদেন
    # ============================================
    if not is_business_hours():
        risk_score += 20
        triggered_rules.append('FR-02: Outside Business Hours')
    
    # ============================================
    # Rule FR-03: Velocity Check
    # ঘণ্টায় অনেক বেশি transaction
    # ============================================
    if check_velocity(transaction.user, 'transaction', 60):
        risk_score += 30
        triggered_rules.append('FR-03: Too Many Transactions in Short Time')
    
    # ============================================
    # Rule FR-04: Untrusted Device
    # অপরিচিত ডিভাইস থেকে লেনদেন
    # ============================================
    if transaction.device and not transaction.device.is_trusted:
        risk_score += 15
        triggered_rules.append('FR-04: Untrusted Device')
    
    # ============================================
    # Determine Risk Level
    # ============================================
    if risk_score >= 70:
        risk_level = 'high'
    elif risk_score >= 40:
        risk_level = 'medium'
    else:
        risk_level = 'low'
    
    return {
        'risk_score': risk_score,
        'risk_level': risk_level,
        'triggered_rules': triggered_rules
    }


def calculate_login_risk(request, user=None):
    """
    Login এর ঝুঁকি মূল্যায়ন করে
    """
    risk_score = 0
    risk_reasons = []
    
    ip = get_client_ip(request)
    geo = get_geo_location(ip)
    
    # Country risk
    country_risk = get_country_risk_level(geo['country_code'])
    risk_score += country_risk['score']
    if country_risk['level'] != 'low':
        risk_reasons.append(country_risk['reason'])
    
    # IP blocklist check
    if check_ip_blocklist(ip):
        risk_score += 50
        risk_reasons.append('Blocked IP Address')
    
    # Velocity check
    if user and check_velocity(user, 'login', 60):
        risk_score += 25
        risk_reasons.append('Too Many Login Attempts')
    
    return {
        'risk_score': risk_score,
        'risk_reasons': risk_reasons,
        'is_suspicious': risk_score >= 30,
        'geo': geo
    }

ধাপ ৪.১: frauddetect/middleware.py তৈরি করুন

from django.utils.deprecation import MiddlewareMixin
from django.utils import timezone
from .models import Device
from .utils import calculate_device_fingerprint, get_client_ip


class DeviceFingerprintMiddleware(MiddlewareMixin):
    """
    প্রতিটি Request এ Device Fingerprint ট্র্যাক করে
    
    কাজ:
    1. Request থেকে device fingerprint বের করে
    2. Device ডাটাবেসে থাকলে update করে
    3. নতুন হলে create করে
    4. Request object এ device attach করে
    """
    
    def process_request(self, request):
        # শুধুমাত্র authenticated user দের জন্য
        if request.user.is_authenticated:
            # Fingerprint বের করা
            fingerprint_hash = calculate_device_fingerprint(request)
            ip_address = get_client_ip(request)
            
            # Request এ attach করা (পরে ব্যবহারের জন্য)
            request.device_fingerprint = fingerprint_hash
            request.client_ip = ip_address
            
            # Device খোঁজা বা তৈরি করা
            device, created = Device.objects.get_or_create(
                user=request.user,
                fingerprint_hash=fingerprint_hash,
                defaults={
                    'last_ip': ip_address,
                    'device_fingerprint': fingerprint_hash,
                    'user_agent': request.META.get('HTTP_USER_AGENT', '')[:500]
                }
            )
            
            if not created:
                # যদি আগে থেকে থাকে, তাহলে update করা
                device.last_seen_at = timezone.now()
                device.last_ip = ip_address
                device.save(update_fields=['last_seen_at', 'last_ip'])
            
            # Request এ device attach করা
            request.device = device
        else:
            request.device = None
            request.device_fingerprint = None
            request.client_ip = get_client_ip(request)
        
        return None

ধাপ ৫.১: frauddetect/serializers.py তৈরি করুন

from rest_framework import serializers
from django.contrib.auth.models import User
from .models import (
    Device, 
    LoginEvent, 
    Transaction, 
    FraudEvent, 
    RiskProfile, 
    SystemLog,
    IPBlocklist
)


# ============================================
# User Serializer
# ============================================
class UserSerializer(serializers.ModelSerializer):
    """Basic User Information"""
    
    class Meta:
        model = User
        fields = ['id', 'username', 'email', 'first_name', 'last_name']


# ============================================
# Device Serializer
# ============================================
class DeviceSerializer(serializers.ModelSerializer):
    """Device Details with User Info"""
    user = UserSerializer(read_only=True)
    
    class Meta:
        model = Device
        fields = '__all__'


# ============================================
# Login Event Serializer
# ============================================
class LoginEventSerializer(serializers.ModelSerializer):
    """Login Event with Related Info"""
    user = UserSerializer(read_only=True)
    device = DeviceSerializer(read_only=True)
    
    class Meta:
        model = LoginEvent
        fields = '__all__'


# ============================================
# Transaction Serializer
# ============================================
class TransactionSerializer(serializers.ModelSerializer):
    """Transaction with Risk Assessment"""
    user = UserSerializer(read_only=True)
    device = DeviceSerializer(read_only=True)
    
    class Meta:
        model = Transaction
        fields = '__all__'
        # এই fields গুলো client পাঠাতে পারবে না, system generate করবে
        read_only_fields = [
            'risk_score', 
            'risk_level', 
            'is_suspicious',
            'status',
            'approved_at'
        ]


class TransactionCreateSerializer(serializers.ModelSerializer):
    """Transaction Creation (Limited Fields)"""
    
    class Meta:
        model = Transaction
        fields = [
            'external_txn_id',
            'amount',
            'currency',
            'description',
            'beneficiary',
            'raw_payload'
        ]


# ============================================
# Fraud Event Serializer
# ============================================
class FraudEventSerializer(serializers.ModelSerializer):
    """Fraud Event with Details"""
    user = UserSerializer(read_only=True)
    transaction = TransactionSerializer(read_only=True)
    resolved_by = UserSerializer(read_only=True)
    
    class Meta:
        model = FraudEvent
        fields = '__all__'


class FraudEventResolveSerializer(serializers.Serializer):
    """For Resolving Fraud Events"""
    notes = serializers.CharField(required=False, allow_blank=True)


# ============================================
# Risk Profile Serializer
# ============================================
class RiskProfileSerializer(serializers.ModelSerializer):
    """User Risk Profile"""
    user = UserSerializer(read_only=True)
    
    class Meta:
        model = RiskProfile
        fields = '__all__'


# ============================================
# System Log Serializer
# ============================================
class SystemLogSerializer(serializers.ModelSerializer):
    """System Activity Logs"""
    user = UserSerializer(read_only=True)
    
    class Meta:
        model = SystemLog
        fields = '__all__'


# ============================================
# IP Blocklist Serializer
# ============================================
class IPBlocklistSerializer(serializers.ModelSerializer):
    """Blocked IP Addresses"""
    blocked_by = UserSerializer(read_only=True)
    
    class Meta:
        model = IPBlocklist
        fields = '__all__'


class IPBlocklistCreateSerializer(serializers.ModelSerializer):
    """Create IP Block Entry"""
    
    class Meta:
        model = IPBlocklist
        fields = ['ip_address', 'reason', 'expires_at']


# ============================================
# Dashboard Statistics Serializer
# ============================================
class DashboardStatsSerializer(serializers.Serializer):
    """Dashboard Overview Statistics"""
    total_transactions = serializers.IntegerField()
    suspicious_transactions = serializers.IntegerField()
    total_fraud_events = serializers.IntegerField()
    unresolved_fraud_events = serializers.IntegerField()
    blocked_ips = serializers.IntegerField()
    high_risk_users = serializers.IntegerField()
    transactions_today = serializers.IntegerField()
    total_amount_today = serializers.DecimalField(max_digits=15, decimal_places=2)

ধাপ ৬.১: frauddetect/views.py তৈরি করুন

from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from rest_framework.views import APIView
from django.contrib.auth import authenticate, login
from django.utils import timezone
from django.db.models import Sum, Count, Q
from datetime import timedelta

from .models import (
    Device, 
    LoginEvent, 
    Transaction, 
    FraudEvent, 
    RiskProfile, 
    SystemLog,
    IPBlocklist
)
from .serializers import (
    DeviceSerializer, 
    LoginEventSerializer, 
    TransactionSerializer,
    TransactionCreateSerializer,
    FraudEventSerializer, 
    FraudEventResolveSerializer,
    RiskProfileSerializer, 
    SystemLogSerializer,
    IPBlocklistSerializer,
    IPBlocklistCreateSerializer,
    DashboardStatsSerializer
)
from .utils import (
    get_client_ip, 
    get_geo_location, 
    get_country_risk_level,
    calculate_transaction_risk, 
    check_velocity, 
    check_ip_blocklist
)


# ============================================
# 📱 DEVICE VIEW SET
# ============================================
class DeviceViewSet(viewsets.ReadOnlyModelViewSet):
    """
    ব্যবহারকারীর ডিভাইস দেখা এবং পরিচালনা করা
    
    Endpoints:
    - GET /api/devices/ - সব ডিভাইস দেখা
    - GET /api/devices/{id}/ - নির্দিষ্ট ডিভাইস
    - POST /api/devices/{id}/trust/ - ডিভাইস বিশ্বস্ত করা
    - POST /api/devices/{id}/block/ - ডিভাইস ব্লক করা
    """
    serializer_class = DeviceSerializer
    permission_classes = [IsAuthenticated]
    
    def get_queryset(self):
        # Admin সব দেখতে পারবে, সাধারণ user শুধু নিজের ডিভাইস
        if self.request.user.is_staff:
            return Device.objects.all().select_related('user')
        return Device.objects.filter(user=self.request.user)
    
    @action(detail=True, methods=['post'])
    def trust(self, request, pk=None):
        """ডিভাইস বিশ্বস্ত হিসেবে চিহ্নিত করা"""
        device = self.get_object()
        
        # শুধু নিজের ডিভাইস বা admin
        if device.user != request.user and not request.user.is_staff:
            return Response(
                {'error': 'Permission denied'}, 
                status=status.HTTP_403_FORBIDDEN
            )
        
        device.is_trusted = True
        device.status = 'normal'
        device.save()
        
        # Log
        SystemLog.objects.create(
            log_type='security',
            level='info',
            message=f"Device {device.id} marked as trusted",
            user=request.user,
            ip_address=get_client_ip(request)
        )
        
        return Response({
            'message': 'Device trusted successfully',
            'device': DeviceSerializer(device).data
        })
    
    @action(detail=True, methods=['post'])
    def block(self, request, pk=None):
        """ডিভাইস ব্লক করা (শুধু Admin)"""
        if not request.user.is_staff:
            return Response(
                {'error': 'Admin only'}, 
                status=status.HTTP_403_FORBIDDEN
            )
        
        device = self.get_object()
        device.is_blocked = True
        device.status = 'blocked'
        device.save()
        
        # Log
        SystemLog.objects.create(
            log_type='security',
            level='warning',
            message=f"Device {device.id} blocked by {request.user.username}",
            user=request.user,
            ip_address=get_client_ip(request)
        )
        
        return Response({
            'message': 'Device blocked successfully',
            'device': DeviceSerializer(device).data
        })


# ============================================
# 🔐 LOGIN EVENT VIEW SET
# ============================================
class LoginEventViewSet(viewsets.ReadOnlyModelViewSet):
    """
    লগইন ইতিহাস দেখা
    
    Endpoints:
    - GET /api/login-events/ - সব লগইন ইভেন্ট
    - GET /api/login-events/{id}/ - নির্দিষ্ট ইভেন্ট
    """
    serializer_class = LoginEventSerializer
    permission_classes = [IsAuthenticated]
    
    def get_queryset(self):
        queryset = LoginEvent.objects.select_related('user', 'device')
        
        if self.request.user.is_staff:
            return queryset
        return queryset.filter(user=self.request.user)
    
    @action(detail=False, methods=['get'])
    def suspicious(self, request):
        """শুধু সন্দেহজনক লগইন দেখা"""
        queryset = self.get_queryset().filter(is_suspicious=True)
        serializer = self.get_serializer(queryset[:50], many=True)
        return Response(serializer.data)


# ============================================
# 💰 TRANSACTION VIEW SET (🔥 মূল অংশ)
# ============================================
class TransactionViewSet(viewsets.ModelViewSet):
    """
    লেনদেন পরিচালনা - Fraud Detection এর মূল অংশ
    
    Endpoints:
    - GET /api/transactions/ - সব লেনদেন
    - POST /api/transactions/ - নতুন লেনদেন (Fraud Check সহ)
    - GET /api/transactions/{id}/ - নির্দিষ্ট লেনদেন
    - POST /api/transactions/{id}/approve/ - অনুমোদন করা
    - POST /api/transactions/{id}/reject/ - প্রত্যাখ্যান করা
    """
    serializer_class = TransactionSerializer
    permission_classes = [IsAuthenticated]
    
    def get_queryset(self):
        queryset = Transaction.objects.select_related('user', 'device')
        
        if self.request.user.is_staff:
            return queryset
        return queryset.filter(user=self.request.user)
    
    def get_serializer_class(self):
        if self.action == 'create':
            return TransactionCreateSerializer
        return TransactionSerializer
    
    def create(self, request, *args, **kwargs):
        """
        🔥 নতুন লেনদেন তৈরি - Fraud Detection সহ
        """
        # Step 1: IP Blocklist Check
        ip = get_client_ip(request)
        if check_ip_blocklist(ip):
            return Response(
                {'error': 'Your IP address is blocked'},
                status=status.HTTP_403_FORBIDDEN
            )
        
        # Step 2: Validate Data
        serializer = TransactionCreateSerializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        
        # Step 3: Create Transaction
        transaction = serializer.save(
            user=request.user,
            device=getattr(request, 'device', None),
            ip_address=ip
        )
        
        # Step 4: 🔥 Run Fraud Detection
        risk_result = calculate_transaction_risk(transaction)
        
        # Step 5: Update Transaction with Risk Assessment
        transaction.risk_score = risk_result['risk_score']
        transaction.risk_level = risk_result['risk_level']
        transaction.is_suspicious = risk_result['risk_score'] >= 40
        
        # High risk হলে flagged করা
        if transaction.is_suspicious:
            transaction.status = 'flagged'
        else:
            transaction.status = 'pending'  # Normal হলে pending
        
        transaction.save()
        
        # Step 6: Create Fraud Event if Suspicious
        if risk_result['risk_score'] >= 40:
            severity = 'high' if risk_result['risk_score'] >= 70 else 'medium'
            
            FraudEvent.objects.create(
                transaction=transaction,
                user=request.user,
                triggered_rules=risk_result['triggered_rules'],
                risk_score=risk_result['risk_score'],
                severity=severity,
                description=f"Suspicious transaction detected. Amount: {transaction.amount}. Rules triggered: {', '.join(risk_result['triggered_rules'])}"
            )
        
        # Step 7: Create System Log
        SystemLog.objects.create(
            log_type='transaction',
            level='warning' if transaction.is_suspicious else 'info',
            message=f"Transaction {transaction.external_txn_id} created. Risk: {risk_result['risk_level']}",
            user=request.user,
            ip_address=ip,
            metadata={
                'amount': str(transaction.amount),
                'risk_score': risk_result['risk_score'],
                'triggered_rules': risk_result['triggered_rules']
            }
        )
        
        # Step 8: Return Response
        return Response(
            {
                'transaction': TransactionSerializer(transaction).data,
                'risk_assessment': risk_result,
                'message': 'Transaction flagged for review' if transaction.is_suspicious else 'Transaction pending'
            },
            status=status.HTTP_201_CREATED
        )
    
    @action(detail=True, methods=['post'])
    def approve(self, request, pk=None):
        """লেনদেন অনুমোদন করা (Admin Only)"""
        if not request.user.is_staff:
            return Response(
                {'error': 'Admin only'}, 
                status=status.HTTP_403_FORBIDDEN
            )
        
        transaction = self.get_object()
        transaction.status = 'approved'
        transaction.approved_at = timezone.now()
        transaction.save()
        
        # Log
        SystemLog.objects.create(
            log_type='transaction',
            level='info',
            message=f"Transaction {transaction.external_txn_id} approved by {request.user.username}",
            user=request.user,
            ip_address=get_client_ip(request)
        )
        
        return Response({
            'message': 'Transaction approved',
            'transaction': TransactionSerializer(transaction).data
        })
    
    @action(detail=True, methods=['post'])
    def reject(self, request, pk=None):
        """লেনদেন প্রত্যাখ্যান করা (Admin Only)"""
        if not request.user.is_staff:
            return Response(
                {'error': 'Admin only'}, 
                status=status.HTTP_403_FORBIDDEN
            )
        
        transaction = self.get_object()
        transaction.status = 'rejected'
        transaction.save()
        
        # Log
        SystemLog.objects.create(
            log_type='transaction',
            level='warning',
            message=f"Transaction {transaction.external_txn_id} rejected by {request.user.username}",
            user=request.user,
            ip_address=get_client_ip(request)
        )
        
        return Response({
            'message': 'Transaction rejected',
            'transaction': TransactionSerializer(transaction).data
        })
    
    @action(detail=False, methods=['get'])
    def flagged(self, request):
        """শুধু Flagged transactions"""
        queryset = self.get_queryset().filter(status='flagged')
        serializer = self.get_serializer(queryset, many=True)
        return Response(serializer.data)


# ============================================
# 🚨 FRAUD EVENT VIEW SET
# ============================================
class FraudEventViewSet(viewsets.ReadOnlyModelViewSet):
    """
    জালিয়াতি ইভেন্ট দেখা ও সমাধান করা
    
    Endpoints:
    - GET /api/fraud-events/ - সব ইভেন্ট
    - GET /api/fraud-events/{id}/ - নির্দিষ্ট ইভেন্ট
    - POST /api/fraud-events/{id}/resolve/ - সমাধান করা
    """
    serializer_class = FraudEventSerializer
    permission_classes = [IsAuthenticated]
    
    def get_queryset(self):
        queryset = FraudEvent.objects.select_related(
            'user', 'transaction', 'resolved_by'
        )
        
        if self.request.user.is_staff:
            return queryset
        return queryset.filter(user=self.request.user)
    
    @action(detail=True, methods=['post'], permission_classes=[IsAdminUser])
    def resolve(self, request, pk=None):
        """জালিয়াতি ইভেন্ট সমাধান করা"""
        event = self.get_object()
        
        serializer = FraudEventResolveSerializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        
        event.is_resolved = True
        event.resolved_by = request.user
        event.resolved_at = timezone.now()
        event.resolution_notes = serializer.validated_data.get('notes', '')
        event.save()
        
        # Log
        SystemLog.objects.create(
            log_type='fraud_alert',
            level='info',
            message=f"Fraud event {event.id} resolved by {request.user.username}",
            user=request.user,
            ip_address=get_client_ip(request)
        )
        
        return Response({
            'message': 'Fraud event resolved',
            'event': FraudEventSerializer(event).data
        })
    
    @action(detail=False, methods=['get'])
    def unresolved(self, request):
        """শুধু অমীমাংসিত ইভেন্ট"""
        queryset = self.get_queryset().filter(is_resolved=False)
        serializer = self.get_serializer(queryset, many=True)
        return Response(serializer.data)


# ============================================
# 📊 RISK PROFILE VIEW SET
# ============================================
class RiskProfileViewSet(viewsets.ReadOnlyModelViewSet):
    """
    ব্যবহারকারীর ঝুঁকি প্রোফাইল
    """
    serializer_class = RiskProfileSerializer
    permission_classes = [IsAuthenticated]
    
    def get_queryset(self):
        queryset = RiskProfile.objects.select_related('user')
        
        if self.request.user.is_staff:
            return queryset
        return queryset.filter(user=self.request.user)
    
    @action(detail=False, methods=['get'])
    def high_risk(self, request):
        """উচ্চ ঝুঁকির ব্যবহারকারী"""
        if not request.user.is_staff:
            return Response(
                {'error': 'Admin only'}, 
                status=status.HTTP_403_FORBIDDEN
            )
        
        queryset = RiskProfile.objects.filter(risk_level='high')
        serializer = self.get_serializer(queryset, many=True)
        return Response(serializer.data)


# ============================================
# 📝 SYSTEM LOG VIEW SET
# ============================================
class SystemLogViewSet(viewsets.ReadOnlyModelViewSet):
    """
    সিস্টেম লগ (শুধু Admin)
    """
    serializer_class = SystemLogSerializer
    permission_classes = [IsAdminUser]
    queryset = SystemLog.objects.select_related('user')
    
    def get_queryset(self):
        queryset = super().get_queryset()
        
        # Filter by log_type
        log_type = self.request.query_params.get('type')
        if log_type:
            queryset = queryset.filter(log_type=log_type)
        
        # Filter by level
        level = self.request.query_params.get('level')
        if level:
            queryset = queryset.filter(level=level)
        
        return queryset


# ============================================
# 🚫 IP BLOCKLIST VIEW SET
# ============================================
class IPBlocklistViewSet(viewsets.ModelViewSet):
    """
    IP Blocklist পরিচালনা (Admin Only)
    """
    serializer_class = IPBlocklistSerializer
    permission_classes = [IsAdminUser]
    queryset = IPBlocklist.objects.all()
    
    def get_serializer_class(self):
        if self.action == 'create':
            return IPBlocklistCreateSerializer
        return IPBlocklistSerializer
    
    def perform_create(self, serializer):
        serializer.save(blocked_by=self.request.user)
        
        # Log
        SystemLog.objects.create(
            log_type='security',
            level='warning',
            message=f"IP {serializer.validated_data['ip_address']} blocked",
            user=self.request.user,
            ip_address=get_client_ip(self.request)
        )


# ============================================
# 📈 DASHBOARD VIEW
# ============================================
class DashboardView(APIView):
    """
    Dashboard Statistics
    """
    permission_classes = [IsAdminUser]
    
    def get(self, request):
        today = timezone.now().date()
        
        stats = {
            'total_transactions': Transaction.objects.count(),
            'suspicious_transactions': Transaction.objects.filter(is_suspicious=True).count(),
            'total_fraud_events': FraudEvent.objects.count(),
            'unresolved_fraud_events': FraudEvent.objects.filter(is_resolved=False).count(),
            'blocked_ips': IPBlocklist.objects.filter(is_active=True).count(),
            'high_risk_users': RiskProfile.objects.filter(risk_level='high').count(),
            'transactions_today': Transaction.objects.filter(
                created_at__date=today
            ).count(),
            'total_amount_today': Transaction.objects.filter(
                created_at__date=today
            ).aggregate(total=Sum('amount'))['total'] or 0,
        }
        
        serializer = DashboardStatsSerializer(stats)
        return Response(serializer.data)

ধাপ ৭.১: frauddetect/urls.py তৈরি করুন

from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views

# Router তৈরি
router = DefaultRouter()
router.register(r'devices', views.DeviceViewSet, basename='device')
router.register(r'login-events', views.LoginEventViewSet, basename='loginevent')
router.register(r'transactions', views.TransactionViewSet, basename='transaction')
router.register(r'fraud-events', views.FraudEventViewSet, basename='fraudevent')
router.register(r'risk-profiles', views.RiskProfileViewSet, basename='riskprofile')
router.register(r'system-logs', views.SystemLogViewSet, basename='systemlog')
router.register(r'ip-blocklist', views.IPBlocklistViewSet, basename='ipblocklist')

urlpatterns = [
    path('', include(router.urls)),
    path('dashboard/', views.DashboardView.as_view(), name='dashboard'),
]

ধাপ ৭.২: config/urls.py আপডেট করুন

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include('frauddetect.urls')),
    path('api-auth/', include('rest_framework.urls')),
]

API Endpoints:

  • /api/devices/
  • /api/login-events/
  • /api/transactions/
  • /api/fraud-events/
  • /api/risk-profiles/
  • /api/system-logs/
  • /api/ip-blocklist/
  • /api/dashboard/

ধাপ ৮.১: frauddetect/admin

from django.contrib import admin
from django.utils.html import format_html
from .models import (
    Device, 
    LoginEvent, 
    Transaction, 
    FraudEvent, 
    RiskProfile, 
    IPBlocklist, 
    SystemLog
)


@admin.register(Device)
class DeviceAdmin(admin.ModelAdmin):
    list_display = [
        'id', 'user', 'device_name', 'status_badge', 
        'is_trusted', 'last_country_code', 'last_seen_at'
    ]
    list_filter = ['status', 'is_trusted', 'is_blocked', 'last_country_code']
    search_fields = ['user__username', 'fingerprint_hash', 'last_ip']
    readonly_fields = ['fingerprint_hash', 'first_seen_at', 'last_seen_at']
    
    def status_badge(self, obj):
        colors = {
            'normal': 'green',
            'suspicious': 'orange',
            'blocked': 'red'
        }
        color = colors.get(obj.status, 'gray')
        return format_html(
            '<span style="color: {}; font-weight: bold;">{}</span>',
            color, obj.status.upper()
        )
    status_badge.short_description = 'Status'


@admin.register(LoginEvent)
class LoginEventAdmin(admin.ModelAdmin):
    list_display = [
        'id', 'username', 'status_badge', 'is_suspicious', 
        'ip_address', 'country_code', 'attempt_time'
    ]
    list_filter = ['status', 'is_suspicious', 'country_code', 'attempt_time']
    search_fields = ['username', 'ip_address']
    readonly_fields = ['attempt_time']
    
    def status_badge(self, obj):
        colors = {
            'success': 'green',
            'failed': 'red',
            'blocked': 'darkred'
        }
        color = colors.get(obj.status, 'gray')
        return format_html(
            '<span style="color: {}; font-weight: bold;">{}</span>',
            color, obj.status.upper()
        )
    status_badge.short_description = 'Status'


@admin.register(Transaction)
class TransactionAdmin(admin.ModelAdmin):
    list_display = [
        'id', 'external_txn_id', 'user', 'amount', 
        'status_badge', 'risk_badge', 'is_suspicious', 'created_at'
    ]
    list_filter = ['status', 'risk_level', 'is_suspicious', 'created_at']
    search_fields = ['external_txn_id', 'user__username', 'beneficiary']
    readonly_fields = ['created_at', 'updated_at', 'risk_score', 'risk_level']
    
    def status_badge(self, obj):
        colors = {
            'pending': 'blue',
            'approved': 'green',
            'rejected': 'red',
            'flagged': 'orange'
        }
        color = colors.get(obj.status, 'gray')
        return format_html(
            '<span style="color: {}; font-weight: bold;">{}</span>',
            color, obj.status.upper()
        )
    status_badge.short_description = 'Status'
    
    def risk_badge(self, obj):
        colors = {
            'low': 'green',
            'medium': 'orange',
            'high': 'red'
        }
        color = colors.get(obj.risk_level, 'gray')
        return format_html(
            '<span style="color: {}; font-weight: bold;">{} ({})</span>',
            color, obj.risk_level.upper(), obj.risk_score
        )
    risk_badge.short_description = 'Risk'


@admin.register(FraudEvent)
class FraudEventAdmin(admin.ModelAdmin):
    list_display = [
        'id', 'user', 'severity_badge', 'risk_score', 
        'is_resolved', 'detected_at'
    ]
    list_filter = ['severity', 'is_resolved', 'detected_at']
    search_fields = ['user__username', 'description']
    readonly_fields = ['detected_at']
    
    def severity_badge(self, obj):
        colors = {
            'low': 'blue',
            'medium': 'orange',
            'high': 'red',
            'critical': 'darkred'
        }
        color = colors.get(obj.severity, 'gray')
        return format_html(
            '<span style="color: {}; font-weight: bold;">{}</span>',
            color, obj.severity.upper()
        )
    severity_badge.short_description = 'Severity'


@admin.register(RiskProfile)
class RiskProfileAdmin(admin.ModelAdmin):
    list_display = [
        'id', 'user', 'risk_badge', 'overall_risk_score', 
        'total_transactions', 'suspicious_events_count', 
        'is_monitored', 'is_blocked'
    ]
    list_filter = ['risk_level', 'is_monitored', 'is_blocked']
    search_fields = ['user__username']
    
    def risk_badge(self, obj):
        colors = {
            'low': 'green',
            'medium': 'orange',
            'high': 'red'
        }
        color = colors.get(obj.risk_level, 'gray')
        return format_html(
            '<span style="color: {}; font-weight: bold;">{}</span>',
            color, obj.risk_level.upper()
        )
    risk_badge.short_description = 'Risk Level'


@admin.register(IPBlocklist)
class IPBlocklistAdmin(admin.ModelAdmin):
    list_display = ['id', 'ip_address', 'is_active', 'blocked_by', 'created_at', 'expires_at']
    list_filter = ['is_active', 'created_at']
    search_fields = ['ip_address', 'reason']


@admin.register(SystemLog)
class SystemLogAdmin(admin.ModelAdmin):
    list_display = ['id', 'log_type', 'level_badge', 'short_message', 'user', 'created_at']
    list_filter = ['log_type', 'level', 'created_at']
    search_fields = ['message', 'user__username']
    readonly_fields = ['created_at']
    
    def level_badge(self, obj):
        colors = {
            'info': 'blue',
            'warning': 'orange',
            'error': 'red',
            'critical': 'darkred'
        }
        color = colors.get(obj.level, 'gray')
        return format_html(
            '<span style="color: {}; font-weight: bold;">{}</span>',
            color, obj.level.upper()
        )
    level_badge.short_description = 'Level'
    
    def short_message(self, obj):
        return obj.message[:50] + '...' if len(obj.message) > 50 else obj.message
    short_message.short_description = 'Message'

Leave a Comment