Part 1: Project Setup & Configuration
️ Part 2: Database Models তৈরি
️ Part 3: Utility Functions
Part 4: Middleware তৈরি
Part 5: Serializers
Part 6: Views & API Endpoints
Part 7: URL Configuration
️ Part 8: Admin Panel Setup
ধাপ ১.১: Virtual Environment তৈরি
# নতুন ফোল্ডার তৈরি করুন
mkdir fraud_detection_project
cd fraud_detection_project
# Virtual environment তৈরি করুন
python3 -m venv venv
# Linux/Mac:
source venv/bin/activate
ধাপ ১.২: requirements.txt তৈরি করুন
install Library
pip install Django djangorestframework django-cors-headers requests user-agents pillow
Django==4.2.7
djangorestframework==3.14.0
django-cors-headers==4.3.1
python-decouple==3.8
requests==2.31.0
user-agents==2.2.0
pillow==10.1.0pip install -r requirements.txt
ধাপ ১.৩: Django Project তৈরি করুন
# Project তৈরি
django-admin startproject config .
# App তৈরি
python manage.py startapp frauddetectধাপ ১.৪: .env ফাইল তৈরি করুন
SECRET_KEY=django-insecure-your-secret-key-here-change-in-production
DEBUG=Trueধাপ ১.৫: config/settings.py সম্পূর্ণ কনফিগারেশন
from pathlib import Path
from decouple import config
BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = config('SECRET_KEY', default='django-insecure-change-this-in-production')
DEBUG = config('DEBUG', default=True, cast=bool)
ALLOWED_HOSTS = ['*']
# ============================================
# INSTALLED APPS
# ============================================
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
# Third Party Apps
'rest_framework',
'corsheaders',
# Local Apps
'frauddetect',
]
# ============================================
# MIDDLEWARE
# ============================================
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'frauddetect.middleware.DeviceFingerprintMiddleware', # আমাদের custom middleware
]
ROOT_URLCONF = 'config.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'config.wsgi.application'
# ============================================
# DATABASE
# ============================================
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# ============================================
# PASSWORD VALIDATION
# ============================================
AUTH_PASSWORD_VALIDATORS = [
{'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'},
{'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator'},
{'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'},
{'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator'},
]
# ============================================
# INTERNATIONALIZATION
# ============================================
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Dhaka'
USE_I18N = True
USE_TZ = True
STATIC_URL = 'static/'
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# ============================================
# REST FRAMEWORK CONFIGURATION
# ============================================
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 50,
}
# ============================================
# CORS SETTINGS
# ============================================
CORS_ALLOWED_ORIGINS = [
"http://localhost:3000",
"http://127.0.0.1:3000",
]
CORS_ALLOW_CREDENTIALS = True
# ============================================
# 🔥 FRAUD DETECTION SETTINGS
# ============================================
FRAUD_SETTINGS = {
'MAX_LOGIN_ATTEMPTS': 5, # সর্বোচ্চ লগইন চেষ্টা
'LOGIN_ATTEMPT_WINDOW': 300, # ৫ মিনিট
'HIGH_AMOUNT_THRESHOLD': 100000, # ১ লাখ টাকার বেশি = সন্দেহজনক
'BUSINESS_HOURS_START': 8, # সকাল ৮টা
'BUSINESS_HOURS_END': 18, # সন্ধ্যা ৬টা
'MAX_DAILY_TRANSACTIONS': 50, # দৈনিক সর্বোচ্চ ট্রানজেকশন
'MAX_TRANSACTION_AMOUNT_DAILY': 500000,
'VELOCITY_CHECK_WINDOW': 3600, # ১ ঘণ্টা
'MAX_TRANSACTIONS_PER_HOUR': 10, # ঘণ্টায় সর্বোচ্চ ১০টি
}
# ============================================
# 🌍 COUNTRY RISK LEVELS
# ============================================
# উচ্চ ঝুঁকিপূর্ণ দেশ
HIGH_RISK_COUNTRIES = ['YE', 'SY', 'IQ', 'SD', 'SO', 'LY', 'AF', 'IR', 'NG', 'PK', 'BD']
# মাঝারি ঝুঁকিপূর্ণ দেশ
MEDIUM_RISK_COUNTRIES = ['EG', 'JO', 'MA', 'TN', 'TR', 'IN', 'CN', 'BR', 'MX']
# নিম্ন ঝুঁকিপূর্ণ দেশ (নিরাপদ)
LOW_RISK_COUNTRIES = ['SA', 'AE', 'KW', 'QA', 'BH', 'OM', 'US', 'CA', 'UK', 'GB', 'DE', 'FR']এই পার্টে আমরা সব Database Tables/Models তৈরি করবো।
ধাপ ২.১: frauddetect/models.py
from django.db import models
from django.contrib.auth.models import User
from django.utils import timezone
from datetime import timedelta
import hashlib
import json
# ============================================
# 📱 MODEL 1: DEVICE (ডিভাইস ট্র্যাকিং)
# ============================================
class Device(models.Model):
"""
ব্যবহারকারীর ডিভাইস ট্র্যাক করার জন্য
একই ব্যক্তি বিভিন্ন ডিভাইস থেকে লগইন করলে ট্র্যাক করা যায়
"""
STATUS_CHOICES = [
('normal', 'Normal'),
('suspicious', 'Suspicious'),
('blocked', 'Blocked'),
]
# কোন ইউজারের ডিভাইস
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='devices')
# ডিভাইসের Unique Fingerprint (Hash)
fingerprint_hash = models.CharField(max_length=64, db_index=True)
device_fingerprint = models.TextField(null=True, blank=True)
# ডিভাইসের তথ্য
device_type = models.CharField(max_length=50, null=True, blank=True) # mobile/desktop/tablet
device_name = models.CharField(max_length=100, null=True, blank=True)
os_name = models.CharField(max_length=50, null=True, blank=True) # Windows/Android/iOS
os_version = models.CharField(max_length=50, null=True, blank=True)
browser_name = models.CharField(max_length=50, null=True, blank=True) # Chrome/Firefox
browser_version = models.CharField(max_length=50, null=True, blank=True)
# লোকেশন তথ্য
last_ip = models.GenericIPAddressField(null=True, blank=True)
last_country_code = models.CharField(max_length=2, null=True, blank=True)
last_city = models.CharField(max_length=100, null=True, blank=True)
# স্ট্যাটাস
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='normal')
is_trusted = models.BooleanField(default=False) # বিশ্বস্ত ডিভাইস কিনা
is_blocked = models.BooleanField(default=False) # ব্লক করা হয়েছে কিনা
risk_score = models.IntegerField(default=0) # ঝুঁকি স্কোর (0-100)
# সময়
first_seen_at = models.DateTimeField(auto_now_add=True)
last_seen_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'devices'
indexes = [
models.Index(fields=['fingerprint_hash', 'user']),
models.Index(fields=['last_ip']),
]
def __str__(self):
return f"{self.user.username} - {self.device_name or 'Unknown Device'}"
# ============================================
# 🔐 MODEL 2: LOGIN EVENT (লগইন ইভেন্ট)
# ============================================
class LoginEvent(models.Model):
"""
প্রতিটি লগইন চেষ্টা রেকর্ড করার জন্য
সফল/ব্যর্থ উভয় লগইন ট্র্যাক করা হয়
"""
STATUS_CHOICES = [
('success', 'Success'),
('failed', 'Failed'),
('blocked', 'Blocked'),
]
user = models.ForeignKey(User, on_delete=models.CASCADE, null=True, blank=True)
username = models.CharField(max_length=150) # ব্যর্থ লগইনের জন্য username রাখা হয়
device = models.ForeignKey(Device, on_delete=models.SET_NULL, null=True, blank=True)
# লগইন ডিটেইলস
status = models.CharField(max_length=20, choices=STATUS_CHOICES)
ip_address = models.GenericIPAddressField()
country_code = models.CharField(max_length=2, null=True, blank=True)
city = models.CharField(max_length=100, null=True, blank=True)
# ঝুঁকি মূল্যায়ন
is_suspicious = models.BooleanField(default=False)
risk_score = models.IntegerField(default=0)
risk_reasons = models.JSONField(default=list) # কেন সন্দেহজনক তার কারণ
# অতিরিক্ত তথ্য
user_agent = models.TextField(null=True, blank=True)
attempt_time = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'login_events'
ordering = ['-attempt_time']
indexes = [
models.Index(fields=['username', 'attempt_time']),
models.Index(fields=['ip_address']),
]
def __str__(self):
return f"{self.username} - {self.status} at {self.attempt_time}"
# ============================================
# 💰 MODEL 3: TRANSACTION (লেনদেন)
# ============================================
class Transaction(models.Model):
"""
আর্থিক লেনদেন ট্র্যাক করার জন্য
প্রতিটি লেনদেনের ঝুঁকি মূল্যায়ন করা হয়
"""
STATUS_CHOICES = [
('pending', 'Pending'), # অপেক্ষমাণ
('approved', 'Approved'), # অনুমোদিত
('rejected', 'Rejected'), # প্রত্যাখ্যাত
('flagged', 'Flagged'), # সন্দেহজনক হিসেবে চিহ্নিত
]
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='transactions')
device = models.ForeignKey(Device, on_delete=models.SET_NULL, null=True, blank=True)
# লেনদেনের ডিটেইলস
external_txn_id = models.CharField(max_length=100, unique=True) # বাহ্যিক ট্রানজেকশন ID
amount = models.DecimalField(max_digits=15, decimal_places=2) # পরিমাণ
currency = models.CharField(max_length=3, default='SAR') # মুদ্রা
description = models.TextField(null=True, blank=True) # বিবরণ
beneficiary = models.CharField(max_length=255, null=True, blank=True) # প্রাপক
# স্ট্যাটাস
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
# ঝুঁকি মূল্যায়ন
risk_score = models.IntegerField(default=0) # ০-১০০
risk_level = models.CharField(max_length=20, default='low') # low/medium/high
is_suspicious = models.BooleanField(default=False)
# মেটাডেটা
raw_payload = models.JSONField(null=True, blank=True)
ip_address = models.GenericIPAddressField(null=True, blank=True)
# সময়
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
approved_at = models.DateTimeField(null=True, blank=True)
class Meta:
db_table = 'transactions'
ordering = ['-created_at']
indexes = [
models.Index(fields=['user', 'created_at']),
models.Index(fields=['status']),
]
def __str__(self):
return f"TXN-{self.external_txn_id} - {self.amount} {self.currency}"
# ============================================
# 🚨 MODEL 4: FRAUD EVENT (জালিয়াতি ইভেন্ট)
# ============================================
class FraudEvent(models.Model):
"""
সন্দেহজনক কার্যকলাপ সনাক্ত হলে এখানে রেকর্ড করা হয়
"""
SEVERITY_CHOICES = [
('low', 'Low'),
('medium', 'Medium'),
('high', 'High'),
('critical', 'Critical'),
]
transaction = models.ForeignKey(Transaction, on_delete=models.CASCADE, null=True, blank=True)
user = models.ForeignKey(User, on_delete=models.CASCADE)
# জালিয়াতির ডিটেইলস
rule_id = models.CharField(max_length=50, null=True, blank=True) # কোন নিয়ম ভঙ্গ হয়েছে
triggered_rules = models.JSONField(default=list) # সব triggered নিয়মের তালিকা
risk_score = models.IntegerField()
severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES)
# বিবরণ
description = models.TextField()
recommendations = models.TextField(null=True, blank=True) # কী করা উচিত
# সমাধান স্ট্যাটাস
is_resolved = models.BooleanField(default=False)
resolved_by = models.ForeignKey(
User, on_delete=models.SET_NULL,
null=True, blank=True,
related_name='resolved_frauds'
)
resolved_at = models.DateTimeField(null=True, blank=True)
resolution_notes = models.TextField(null=True, blank=True)
# সময়
detected_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'fraud_events'
ordering = ['-detected_at']
indexes = [
models.Index(fields=['user', 'detected_at']),
models.Index(fields=['severity']),
]
def __str__(self):
return f"Fraud Event - {self.severity} - {self.user.username}"
# ============================================
# 📊 MODEL 5: RISK PROFILE (ঝুঁকি প্রোফাইল)
# ============================================
class RiskProfile(models.Model):
"""
প্রতিটি ব্যবহারকারীর সামগ্রিক ঝুঁকি প্রোফাইল
"""
user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='risk_profile')
# সামগ্রিক ঝুঁকি
overall_risk_score = models.IntegerField(default=0)
risk_level = models.CharField(max_length=20, default='low')
# পরিসংখ্যান
total_transactions = models.IntegerField(default=0)
total_amount = models.DecimalField(max_digits=15, decimal_places=2, default=0)
suspicious_events_count = models.IntegerField(default=0)
failed_login_count = models.IntegerField(default=0)
# আচরণগত প্যাটার্ন
avg_transaction_amount = models.DecimalField(max_digits=15, decimal_places=2, default=0)
usual_login_hours = models.JSONField(default=list) # সাধারণত কোন সময় লগইন করে
usual_countries = models.JSONField(default=list) # সাধারণত কোন দেশ থেকে
trusted_devices_count = models.IntegerField(default=0)
# স্ট্যাটাস
is_monitored = models.BooleanField(default=False) # নজরদারিতে আছে কিনা
is_blocked = models.BooleanField(default=False) # ব্লক করা হয়েছে কিনা
# সময়
last_reviewed_at = models.DateTimeField(null=True, blank=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'risk_profiles'
def __str__(self):
return f"{self.user.username} - Risk: {self.risk_level}"
# ============================================
# 🚫 MODEL 6: IP BLOCKLIST (ব্লক করা IP)
# ============================================
class IPBlocklist(models.Model):
"""
ব্লক করা IP ঠিকানার তালিকা
"""
ip_address = models.GenericIPAddressField(unique=True)
reason = models.TextField()
blocked_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField(null=True, blank=True) # কখন মেয়াদ শেষ হবে
class Meta:
db_table = 'ip_blocklist'
def __str__(self):
return f"Blocked IP: {self.ip_address}"
# ============================================
# 📝 MODEL 7: SYSTEM LOG (সিস্টেম লগ)
# ============================================
class SystemLog(models.Model):
"""
সব ধরনের সিস্টেম কার্যকলাপ লগ করার জন্য
"""
LOG_TYPE_CHOICES = [
('login', 'Login'),
('transaction', 'Transaction'),
('fraud_alert', 'Fraud Alert'),
('security', 'Security'),
('system', 'System'),
]
LEVEL_CHOICES = [
('info', 'Info'),
('warning', 'Warning'),
('error', 'Error'),
('critical', 'Critical'),
]
log_type = models.CharField(max_length=50, choices=LOG_TYPE_CHOICES)
level = models.CharField(max_length=20, choices=LEVEL_CHOICES, default='info')
message = models.TextField()
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
ip_address = models.GenericIPAddressField(null=True, blank=True)
metadata = models.JSONField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'system_logs'
ordering = ['-created_at']
indexes = [
models.Index(fields=['log_type', 'created_at']),
models.Index(fields=['level']),
]
def __str__(self):
return f"{self.log_type} - {self.level} - {self.created_at}"ধাপ ২.২: Migration চালান
python manage.py makemigrations frauddetect
python manage.py migratePart 2 সম্পন্ন!
এখন আপনার ডাটাবেসে ৭টি টেবিল তৈরি হয়েছে:
devices– ডিভাইস ট্র্যাকিংlogin_events– লগইন ইভেন্টtransactions– লেনদেনfraud_events– জালিয়াতি ইভেন্টrisk_profiles– ঝুঁকি প্রোফাইলip_blocklist– ব্লক করা IPsystem_logs– সিস্টেম লগ
ধাপ ৩.১: frauddetect/utils.py তৈরি করুন
import hashlib
import requests
from django.conf import settings
from django.utils import timezone
from datetime import timedelta
from django.db import models
def calculate_device_fingerprint(request):
"""
Request থেকে Device Fingerprint তৈরি করে
একই ডিভাইস থেকে আসা request গুলো identify করতে সাহায্য করে
"""
components = [
request.META.get('HTTP_USER_AGENT', ''),
request.META.get('HTTP_ACCEPT_LANGUAGE', ''),
request.META.get('HTTP_ACCEPT_ENCODING', ''),
]
fingerprint = '|'.join(components)
return hashlib.sha256(fingerprint.encode()).hexdigest()
def get_client_ip(request):
"""
Request থেকে সঠিক Client IP Address বের করে
Proxy/Load Balancer এর পেছনে থাকলেও কাজ করে
"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
# Multiple proxies থাকলে প্রথমটি হলো actual client IP
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def get_country_risk_level(country_code):
"""
দেশের কোড থেকে ঝুঁকি স্তর নির্ধারণ করে
Returns:
dict: level, score, reason
"""
if not country_code:
return {'level': 'medium', 'score': 20, 'reason': 'Unknown Country'}
country = country_code.upper()
if country in settings.HIGH_RISK_COUNTRIES:
return {
'level': 'high',
'score': 30,
'reason': f'High-Risk Country ({country})'
}
elif country in settings.MEDIUM_RISK_COUNTRIES:
return {
'level': 'medium',
'score': 15,
'reason': f'Medium-Risk Country ({country})'
}
elif country in settings.LOW_RISK_COUNTRIES:
return {
'level': 'low',
'score': 5,
'reason': f'Low-Risk Country ({country})'
}
else:
return {
'level': 'medium',
'score': 20,
'reason': f'Unknown Country ({country})'
}
def get_geo_location(ip_address):
"""
IP Address থেকে Geographic Location বের করে
Free API ব্যবহার করে (ipapi.co)
"""
try:
response = requests.get(
f'https://ipapi.co/{ip_address}/json/',
timeout=3
)
if response.status_code == 200:
data = response.json()
return {
'country_code': data.get('country_code', 'SA'),
'city': data.get('city', 'Unknown'),
'latitude': data.get('latitude'),
'longitude': data.get('longitude'),
}
except Exception as e:
print(f"Geo location error: {e}")
# Default return যদি API কাজ না করে
return {
'country_code': 'SA',
'city': 'Unknown',
'latitude': None,
'longitude': None
}
def check_velocity(user, check_type='login', window_minutes=60):
"""
Velocity Check - নির্দিষ্ট সময়ে কতগুলো action হয়েছে তা চেক করে
যেমন: ১ ঘণ্টায় ১০টির বেশি transaction হলে suspicious
Args:
user: User object
check_type: 'login' বা 'transaction'
window_minutes: কত মিনিটের মধ্যে চেক করতে হবে
Returns:
bool: True যদি limit exceed করে
"""
from .models import LoginEvent, Transaction
time_threshold = timezone.now() - timedelta(minutes=window_minutes)
if check_type == 'login':
count = LoginEvent.objects.filter(
user=user,
attempt_time__gte=time_threshold
).count()
else:
count = Transaction.objects.filter(
user=user,
created_at__gte=time_threshold
).count()
max_allowed = settings.FRAUD_SETTINGS['MAX_TRANSACTIONS_PER_HOUR']
return count >= max_allowed
def is_business_hours():
"""
বর্তমান সময় business hours এর মধ্যে কিনা চেক করে
Office hours এর বাইরে transaction সন্দেহজনক হতে পারে
Returns:
bool: True যদি business hours এর মধ্যে থাকে
"""
current_hour = timezone.now().hour
start = settings.FRAUD_SETTINGS['BUSINESS_HOURS_START']
end = settings.FRAUD_SETTINGS['BUSINESS_HOURS_END']
return start <= current_hour <= end
def check_ip_blocklist(ip_address):
"""
IP Address ব্লকলিস্টে আছে কিনা চেক করে
Returns:
bool: True যদি ব্লক করা থাকে
"""
from .models import IPBlocklist
blocked = IPBlocklist.objects.filter(
ip_address=ip_address,
is_active=True
).filter(
# মেয়াদ শেষ হয়নি বা মেয়াদ নেই
models.Q(expires_at__isnull=True) | models.Q(expires_at__gt=timezone.now())
).exists()
return blocked
def calculate_transaction_risk(transaction):
"""
🔥 মূল Fraud Detection Logic
Transaction এর ঝুঁকি স্কোর এবং level নির্ধারণ করে
বিভিন্ন নিয়ম প্রয়োগ করে মোট স্কোর বের করে
Args:
transaction: Transaction object
Returns:
dict: risk_score, risk_level, triggered_rules
"""
risk_score = 0
triggered_rules = []
# ============================================
# Rule FR-01: High Amount Transaction
# ১ লাখ টাকার বেশি হলে সন্দেহজনক
# ============================================
if transaction.amount > settings.FRAUD_SETTINGS['HIGH_AMOUNT_THRESHOLD']:
risk_score += 40
triggered_rules.append('FR-01: High Amount Transaction (>100,000)')
# ============================================
# Rule FR-02: Outside Business Hours
# অফিস সময়ের বাইরে লেনদেন
# ============================================
if not is_business_hours():
risk_score += 20
triggered_rules.append('FR-02: Outside Business Hours')
# ============================================
# Rule FR-03: Velocity Check
# ঘণ্টায় অনেক বেশি transaction
# ============================================
if check_velocity(transaction.user, 'transaction', 60):
risk_score += 30
triggered_rules.append('FR-03: Too Many Transactions in Short Time')
# ============================================
# Rule FR-04: Untrusted Device
# অপরিচিত ডিভাইস থেকে লেনদেন
# ============================================
if transaction.device and not transaction.device.is_trusted:
risk_score += 15
triggered_rules.append('FR-04: Untrusted Device')
# ============================================
# Determine Risk Level
# ============================================
if risk_score >= 70:
risk_level = 'high'
elif risk_score >= 40:
risk_level = 'medium'
else:
risk_level = 'low'
return {
'risk_score': risk_score,
'risk_level': risk_level,
'triggered_rules': triggered_rules
}
def calculate_login_risk(request, user=None):
"""
Login এর ঝুঁকি মূল্যায়ন করে
"""
risk_score = 0
risk_reasons = []
ip = get_client_ip(request)
geo = get_geo_location(ip)
# Country risk
country_risk = get_country_risk_level(geo['country_code'])
risk_score += country_risk['score']
if country_risk['level'] != 'low':
risk_reasons.append(country_risk['reason'])
# IP blocklist check
if check_ip_blocklist(ip):
risk_score += 50
risk_reasons.append('Blocked IP Address')
# Velocity check
if user and check_velocity(user, 'login', 60):
risk_score += 25
risk_reasons.append('Too Many Login Attempts')
return {
'risk_score': risk_score,
'risk_reasons': risk_reasons,
'is_suspicious': risk_score >= 30,
'geo': geo
}ধাপ ৪.১: frauddetect/middleware.py তৈরি করুন
from django.utils.deprecation import MiddlewareMixin
from django.utils import timezone
from .models import Device
from .utils import calculate_device_fingerprint, get_client_ip
class DeviceFingerprintMiddleware(MiddlewareMixin):
"""
প্রতিটি Request এ Device Fingerprint ট্র্যাক করে
কাজ:
1. Request থেকে device fingerprint বের করে
2. Device ডাটাবেসে থাকলে update করে
3. নতুন হলে create করে
4. Request object এ device attach করে
"""
def process_request(self, request):
# শুধুমাত্র authenticated user দের জন্য
if request.user.is_authenticated:
# Fingerprint বের করা
fingerprint_hash = calculate_device_fingerprint(request)
ip_address = get_client_ip(request)
# Request এ attach করা (পরে ব্যবহারের জন্য)
request.device_fingerprint = fingerprint_hash
request.client_ip = ip_address
# Device খোঁজা বা তৈরি করা
device, created = Device.objects.get_or_create(
user=request.user,
fingerprint_hash=fingerprint_hash,
defaults={
'last_ip': ip_address,
'device_fingerprint': fingerprint_hash,
'user_agent': request.META.get('HTTP_USER_AGENT', '')[:500]
}
)
if not created:
# যদি আগে থেকে থাকে, তাহলে update করা
device.last_seen_at = timezone.now()
device.last_ip = ip_address
device.save(update_fields=['last_seen_at', 'last_ip'])
# Request এ device attach করা
request.device = device
else:
request.device = None
request.device_fingerprint = None
request.client_ip = get_client_ip(request)
return Noneধাপ ৫.১: frauddetect/serializers.py তৈরি করুন
from rest_framework import serializers
from django.contrib.auth.models import User
from .models import (
Device,
LoginEvent,
Transaction,
FraudEvent,
RiskProfile,
SystemLog,
IPBlocklist
)
# ============================================
# User Serializer
# ============================================
class UserSerializer(serializers.ModelSerializer):
"""Basic User Information"""
class Meta:
model = User
fields = ['id', 'username', 'email', 'first_name', 'last_name']
# ============================================
# Device Serializer
# ============================================
class DeviceSerializer(serializers.ModelSerializer):
"""Device Details with User Info"""
user = UserSerializer(read_only=True)
class Meta:
model = Device
fields = '__all__'
# ============================================
# Login Event Serializer
# ============================================
class LoginEventSerializer(serializers.ModelSerializer):
"""Login Event with Related Info"""
user = UserSerializer(read_only=True)
device = DeviceSerializer(read_only=True)
class Meta:
model = LoginEvent
fields = '__all__'
# ============================================
# Transaction Serializer
# ============================================
class TransactionSerializer(serializers.ModelSerializer):
"""Transaction with Risk Assessment"""
user = UserSerializer(read_only=True)
device = DeviceSerializer(read_only=True)
class Meta:
model = Transaction
fields = '__all__'
# এই fields গুলো client পাঠাতে পারবে না, system generate করবে
read_only_fields = [
'risk_score',
'risk_level',
'is_suspicious',
'status',
'approved_at'
]
class TransactionCreateSerializer(serializers.ModelSerializer):
"""Transaction Creation (Limited Fields)"""
class Meta:
model = Transaction
fields = [
'external_txn_id',
'amount',
'currency',
'description',
'beneficiary',
'raw_payload'
]
# ============================================
# Fraud Event Serializer
# ============================================
class FraudEventSerializer(serializers.ModelSerializer):
"""Fraud Event with Details"""
user = UserSerializer(read_only=True)
transaction = TransactionSerializer(read_only=True)
resolved_by = UserSerializer(read_only=True)
class Meta:
model = FraudEvent
fields = '__all__'
class FraudEventResolveSerializer(serializers.Serializer):
"""For Resolving Fraud Events"""
notes = serializers.CharField(required=False, allow_blank=True)
# ============================================
# Risk Profile Serializer
# ============================================
class RiskProfileSerializer(serializers.ModelSerializer):
"""User Risk Profile"""
user = UserSerializer(read_only=True)
class Meta:
model = RiskProfile
fields = '__all__'
# ============================================
# System Log Serializer
# ============================================
class SystemLogSerializer(serializers.ModelSerializer):
"""System Activity Logs"""
user = UserSerializer(read_only=True)
class Meta:
model = SystemLog
fields = '__all__'
# ============================================
# IP Blocklist Serializer
# ============================================
class IPBlocklistSerializer(serializers.ModelSerializer):
"""Blocked IP Addresses"""
blocked_by = UserSerializer(read_only=True)
class Meta:
model = IPBlocklist
fields = '__all__'
class IPBlocklistCreateSerializer(serializers.ModelSerializer):
"""Create IP Block Entry"""
class Meta:
model = IPBlocklist
fields = ['ip_address', 'reason', 'expires_at']
# ============================================
# Dashboard Statistics Serializer
# ============================================
class DashboardStatsSerializer(serializers.Serializer):
"""Dashboard Overview Statistics"""
total_transactions = serializers.IntegerField()
suspicious_transactions = serializers.IntegerField()
total_fraud_events = serializers.IntegerField()
unresolved_fraud_events = serializers.IntegerField()
blocked_ips = serializers.IntegerField()
high_risk_users = serializers.IntegerField()
transactions_today = serializers.IntegerField()
total_amount_today = serializers.DecimalField(max_digits=15, decimal_places=2)ধাপ ৬.১: frauddetect/views.py তৈরি করুন
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from rest_framework.views import APIView
from django.contrib.auth import authenticate, login
from django.utils import timezone
from django.db.models import Sum, Count, Q
from datetime import timedelta
from .models import (
Device,
LoginEvent,
Transaction,
FraudEvent,
RiskProfile,
SystemLog,
IPBlocklist
)
from .serializers import (
DeviceSerializer,
LoginEventSerializer,
TransactionSerializer,
TransactionCreateSerializer,
FraudEventSerializer,
FraudEventResolveSerializer,
RiskProfileSerializer,
SystemLogSerializer,
IPBlocklistSerializer,
IPBlocklistCreateSerializer,
DashboardStatsSerializer
)
from .utils import (
get_client_ip,
get_geo_location,
get_country_risk_level,
calculate_transaction_risk,
check_velocity,
check_ip_blocklist
)
# ============================================
# 📱 DEVICE VIEW SET
# ============================================
class DeviceViewSet(viewsets.ReadOnlyModelViewSet):
"""
ব্যবহারকারীর ডিভাইস দেখা এবং পরিচালনা করা
Endpoints:
- GET /api/devices/ - সব ডিভাইস দেখা
- GET /api/devices/{id}/ - নির্দিষ্ট ডিভাইস
- POST /api/devices/{id}/trust/ - ডিভাইস বিশ্বস্ত করা
- POST /api/devices/{id}/block/ - ডিভাইস ব্লক করা
"""
serializer_class = DeviceSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
# Admin সব দেখতে পারবে, সাধারণ user শুধু নিজের ডিভাইস
if self.request.user.is_staff:
return Device.objects.all().select_related('user')
return Device.objects.filter(user=self.request.user)
@action(detail=True, methods=['post'])
def trust(self, request, pk=None):
"""ডিভাইস বিশ্বস্ত হিসেবে চিহ্নিত করা"""
device = self.get_object()
# শুধু নিজের ডিভাইস বা admin
if device.user != request.user and not request.user.is_staff:
return Response(
{'error': 'Permission denied'},
status=status.HTTP_403_FORBIDDEN
)
device.is_trusted = True
device.status = 'normal'
device.save()
# Log
SystemLog.objects.create(
log_type='security',
level='info',
message=f"Device {device.id} marked as trusted",
user=request.user,
ip_address=get_client_ip(request)
)
return Response({
'message': 'Device trusted successfully',
'device': DeviceSerializer(device).data
})
@action(detail=True, methods=['post'])
def block(self, request, pk=None):
"""ডিভাইস ব্লক করা (শুধু Admin)"""
if not request.user.is_staff:
return Response(
{'error': 'Admin only'},
status=status.HTTP_403_FORBIDDEN
)
device = self.get_object()
device.is_blocked = True
device.status = 'blocked'
device.save()
# Log
SystemLog.objects.create(
log_type='security',
level='warning',
message=f"Device {device.id} blocked by {request.user.username}",
user=request.user,
ip_address=get_client_ip(request)
)
return Response({
'message': 'Device blocked successfully',
'device': DeviceSerializer(device).data
})
# ============================================
# 🔐 LOGIN EVENT VIEW SET
# ============================================
class LoginEventViewSet(viewsets.ReadOnlyModelViewSet):
"""
লগইন ইতিহাস দেখা
Endpoints:
- GET /api/login-events/ - সব লগইন ইভেন্ট
- GET /api/login-events/{id}/ - নির্দিষ্ট ইভেন্ট
"""
serializer_class = LoginEventSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
queryset = LoginEvent.objects.select_related('user', 'device')
if self.request.user.is_staff:
return queryset
return queryset.filter(user=self.request.user)
@action(detail=False, methods=['get'])
def suspicious(self, request):
"""শুধু সন্দেহজনক লগইন দেখা"""
queryset = self.get_queryset().filter(is_suspicious=True)
serializer = self.get_serializer(queryset[:50], many=True)
return Response(serializer.data)
# ============================================
# 💰 TRANSACTION VIEW SET (🔥 মূল অংশ)
# ============================================
class TransactionViewSet(viewsets.ModelViewSet):
"""
লেনদেন পরিচালনা - Fraud Detection এর মূল অংশ
Endpoints:
- GET /api/transactions/ - সব লেনদেন
- POST /api/transactions/ - নতুন লেনদেন (Fraud Check সহ)
- GET /api/transactions/{id}/ - নির্দিষ্ট লেনদেন
- POST /api/transactions/{id}/approve/ - অনুমোদন করা
- POST /api/transactions/{id}/reject/ - প্রত্যাখ্যান করা
"""
serializer_class = TransactionSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
queryset = Transaction.objects.select_related('user', 'device')
if self.request.user.is_staff:
return queryset
return queryset.filter(user=self.request.user)
def get_serializer_class(self):
if self.action == 'create':
return TransactionCreateSerializer
return TransactionSerializer
def create(self, request, *args, **kwargs):
"""
🔥 নতুন লেনদেন তৈরি - Fraud Detection সহ
"""
# Step 1: IP Blocklist Check
ip = get_client_ip(request)
if check_ip_blocklist(ip):
return Response(
{'error': 'Your IP address is blocked'},
status=status.HTTP_403_FORBIDDEN
)
# Step 2: Validate Data
serializer = TransactionCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Step 3: Create Transaction
transaction = serializer.save(
user=request.user,
device=getattr(request, 'device', None),
ip_address=ip
)
# Step 4: 🔥 Run Fraud Detection
risk_result = calculate_transaction_risk(transaction)
# Step 5: Update Transaction with Risk Assessment
transaction.risk_score = risk_result['risk_score']
transaction.risk_level = risk_result['risk_level']
transaction.is_suspicious = risk_result['risk_score'] >= 40
# High risk হলে flagged করা
if transaction.is_suspicious:
transaction.status = 'flagged'
else:
transaction.status = 'pending' # Normal হলে pending
transaction.save()
# Step 6: Create Fraud Event if Suspicious
if risk_result['risk_score'] >= 40:
severity = 'high' if risk_result['risk_score'] >= 70 else 'medium'
FraudEvent.objects.create(
transaction=transaction,
user=request.user,
triggered_rules=risk_result['triggered_rules'],
risk_score=risk_result['risk_score'],
severity=severity,
description=f"Suspicious transaction detected. Amount: {transaction.amount}. Rules triggered: {', '.join(risk_result['triggered_rules'])}"
)
# Step 7: Create System Log
SystemLog.objects.create(
log_type='transaction',
level='warning' if transaction.is_suspicious else 'info',
message=f"Transaction {transaction.external_txn_id} created. Risk: {risk_result['risk_level']}",
user=request.user,
ip_address=ip,
metadata={
'amount': str(transaction.amount),
'risk_score': risk_result['risk_score'],
'triggered_rules': risk_result['triggered_rules']
}
)
# Step 8: Return Response
return Response(
{
'transaction': TransactionSerializer(transaction).data,
'risk_assessment': risk_result,
'message': 'Transaction flagged for review' if transaction.is_suspicious else 'Transaction pending'
},
status=status.HTTP_201_CREATED
)
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
"""লেনদেন অনুমোদন করা (Admin Only)"""
if not request.user.is_staff:
return Response(
{'error': 'Admin only'},
status=status.HTTP_403_FORBIDDEN
)
transaction = self.get_object()
transaction.status = 'approved'
transaction.approved_at = timezone.now()
transaction.save()
# Log
SystemLog.objects.create(
log_type='transaction',
level='info',
message=f"Transaction {transaction.external_txn_id} approved by {request.user.username}",
user=request.user,
ip_address=get_client_ip(request)
)
return Response({
'message': 'Transaction approved',
'transaction': TransactionSerializer(transaction).data
})
@action(detail=True, methods=['post'])
def reject(self, request, pk=None):
"""লেনদেন প্রত্যাখ্যান করা (Admin Only)"""
if not request.user.is_staff:
return Response(
{'error': 'Admin only'},
status=status.HTTP_403_FORBIDDEN
)
transaction = self.get_object()
transaction.status = 'rejected'
transaction.save()
# Log
SystemLog.objects.create(
log_type='transaction',
level='warning',
message=f"Transaction {transaction.external_txn_id} rejected by {request.user.username}",
user=request.user,
ip_address=get_client_ip(request)
)
return Response({
'message': 'Transaction rejected',
'transaction': TransactionSerializer(transaction).data
})
@action(detail=False, methods=['get'])
def flagged(self, request):
"""শুধু Flagged transactions"""
queryset = self.get_queryset().filter(status='flagged')
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# ============================================
# 🚨 FRAUD EVENT VIEW SET
# ============================================
class FraudEventViewSet(viewsets.ReadOnlyModelViewSet):
"""
জালিয়াতি ইভেন্ট দেখা ও সমাধান করা
Endpoints:
- GET /api/fraud-events/ - সব ইভেন্ট
- GET /api/fraud-events/{id}/ - নির্দিষ্ট ইভেন্ট
- POST /api/fraud-events/{id}/resolve/ - সমাধান করা
"""
serializer_class = FraudEventSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
queryset = FraudEvent.objects.select_related(
'user', 'transaction', 'resolved_by'
)
if self.request.user.is_staff:
return queryset
return queryset.filter(user=self.request.user)
@action(detail=True, methods=['post'], permission_classes=[IsAdminUser])
def resolve(self, request, pk=None):
"""জালিয়াতি ইভেন্ট সমাধান করা"""
event = self.get_object()
serializer = FraudEventResolveSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
event.is_resolved = True
event.resolved_by = request.user
event.resolved_at = timezone.now()
event.resolution_notes = serializer.validated_data.get('notes', '')
event.save()
# Log
SystemLog.objects.create(
log_type='fraud_alert',
level='info',
message=f"Fraud event {event.id} resolved by {request.user.username}",
user=request.user,
ip_address=get_client_ip(request)
)
return Response({
'message': 'Fraud event resolved',
'event': FraudEventSerializer(event).data
})
@action(detail=False, methods=['get'])
def unresolved(self, request):
"""শুধু অমীমাংসিত ইভেন্ট"""
queryset = self.get_queryset().filter(is_resolved=False)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# ============================================
# 📊 RISK PROFILE VIEW SET
# ============================================
class RiskProfileViewSet(viewsets.ReadOnlyModelViewSet):
"""
ব্যবহারকারীর ঝুঁকি প্রোফাইল
"""
serializer_class = RiskProfileSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
queryset = RiskProfile.objects.select_related('user')
if self.request.user.is_staff:
return queryset
return queryset.filter(user=self.request.user)
@action(detail=False, methods=['get'])
def high_risk(self, request):
"""উচ্চ ঝুঁকির ব্যবহারকারী"""
if not request.user.is_staff:
return Response(
{'error': 'Admin only'},
status=status.HTTP_403_FORBIDDEN
)
queryset = RiskProfile.objects.filter(risk_level='high')
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# ============================================
# 📝 SYSTEM LOG VIEW SET
# ============================================
class SystemLogViewSet(viewsets.ReadOnlyModelViewSet):
"""
সিস্টেম লগ (শুধু Admin)
"""
serializer_class = SystemLogSerializer
permission_classes = [IsAdminUser]
queryset = SystemLog.objects.select_related('user')
def get_queryset(self):
queryset = super().get_queryset()
# Filter by log_type
log_type = self.request.query_params.get('type')
if log_type:
queryset = queryset.filter(log_type=log_type)
# Filter by level
level = self.request.query_params.get('level')
if level:
queryset = queryset.filter(level=level)
return queryset
# ============================================
# 🚫 IP BLOCKLIST VIEW SET
# ============================================
class IPBlocklistViewSet(viewsets.ModelViewSet):
"""
IP Blocklist পরিচালনা (Admin Only)
"""
serializer_class = IPBlocklistSerializer
permission_classes = [IsAdminUser]
queryset = IPBlocklist.objects.all()
def get_serializer_class(self):
if self.action == 'create':
return IPBlocklistCreateSerializer
return IPBlocklistSerializer
def perform_create(self, serializer):
serializer.save(blocked_by=self.request.user)
# Log
SystemLog.objects.create(
log_type='security',
level='warning',
message=f"IP {serializer.validated_data['ip_address']} blocked",
user=self.request.user,
ip_address=get_client_ip(self.request)
)
# ============================================
# 📈 DASHBOARD VIEW
# ============================================
class DashboardView(APIView):
"""
Dashboard Statistics
"""
permission_classes = [IsAdminUser]
def get(self, request):
today = timezone.now().date()
stats = {
'total_transactions': Transaction.objects.count(),
'suspicious_transactions': Transaction.objects.filter(is_suspicious=True).count(),
'total_fraud_events': FraudEvent.objects.count(),
'unresolved_fraud_events': FraudEvent.objects.filter(is_resolved=False).count(),
'blocked_ips': IPBlocklist.objects.filter(is_active=True).count(),
'high_risk_users': RiskProfile.objects.filter(risk_level='high').count(),
'transactions_today': Transaction.objects.filter(
created_at__date=today
).count(),
'total_amount_today': Transaction.objects.filter(
created_at__date=today
).aggregate(total=Sum('amount'))['total'] or 0,
}
serializer = DashboardStatsSerializer(stats)
return Response(serializer.data)ধাপ ৭.১: frauddetect/urls.py তৈরি করুন
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views
# Router তৈরি
router = DefaultRouter()
router.register(r'devices', views.DeviceViewSet, basename='device')
router.register(r'login-events', views.LoginEventViewSet, basename='loginevent')
router.register(r'transactions', views.TransactionViewSet, basename='transaction')
router.register(r'fraud-events', views.FraudEventViewSet, basename='fraudevent')
router.register(r'risk-profiles', views.RiskProfileViewSet, basename='riskprofile')
router.register(r'system-logs', views.SystemLogViewSet, basename='systemlog')
router.register(r'ip-blocklist', views.IPBlocklistViewSet, basename='ipblocklist')
urlpatterns = [
path('', include(router.urls)),
path('dashboard/', views.DashboardView.as_view(), name='dashboard'),
]ধাপ ৭.২: config/urls.py আপডেট করুন
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('frauddetect.urls')),
path('api-auth/', include('rest_framework.urls')),
]API Endpoints:
/api/devices//api/login-events//api/transactions//api/fraud-events//api/risk-profiles//api/system-logs//api/ip-blocklist//api/dashboard/
ধাপ ৮.১: frauddetect/admin
from django.contrib import admin
from django.utils.html import format_html
from .models import (
Device,
LoginEvent,
Transaction,
FraudEvent,
RiskProfile,
IPBlocklist,
SystemLog
)
@admin.register(Device)
class DeviceAdmin(admin.ModelAdmin):
list_display = [
'id', 'user', 'device_name', 'status_badge',
'is_trusted', 'last_country_code', 'last_seen_at'
]
list_filter = ['status', 'is_trusted', 'is_blocked', 'last_country_code']
search_fields = ['user__username', 'fingerprint_hash', 'last_ip']
readonly_fields = ['fingerprint_hash', 'first_seen_at', 'last_seen_at']
def status_badge(self, obj):
colors = {
'normal': 'green',
'suspicious': 'orange',
'blocked': 'red'
}
color = colors.get(obj.status, 'gray')
return format_html(
'<span style="color: {}; font-weight: bold;">{}</span>',
color, obj.status.upper()
)
status_badge.short_description = 'Status'
@admin.register(LoginEvent)
class LoginEventAdmin(admin.ModelAdmin):
list_display = [
'id', 'username', 'status_badge', 'is_suspicious',
'ip_address', 'country_code', 'attempt_time'
]
list_filter = ['status', 'is_suspicious', 'country_code', 'attempt_time']
search_fields = ['username', 'ip_address']
readonly_fields = ['attempt_time']
def status_badge(self, obj):
colors = {
'success': 'green',
'failed': 'red',
'blocked': 'darkred'
}
color = colors.get(obj.status, 'gray')
return format_html(
'<span style="color: {}; font-weight: bold;">{}</span>',
color, obj.status.upper()
)
status_badge.short_description = 'Status'
@admin.register(Transaction)
class TransactionAdmin(admin.ModelAdmin):
list_display = [
'id', 'external_txn_id', 'user', 'amount',
'status_badge', 'risk_badge', 'is_suspicious', 'created_at'
]
list_filter = ['status', 'risk_level', 'is_suspicious', 'created_at']
search_fields = ['external_txn_id', 'user__username', 'beneficiary']
readonly_fields = ['created_at', 'updated_at', 'risk_score', 'risk_level']
def status_badge(self, obj):
colors = {
'pending': 'blue',
'approved': 'green',
'rejected': 'red',
'flagged': 'orange'
}
color = colors.get(obj.status, 'gray')
return format_html(
'<span style="color: {}; font-weight: bold;">{}</span>',
color, obj.status.upper()
)
status_badge.short_description = 'Status'
def risk_badge(self, obj):
colors = {
'low': 'green',
'medium': 'orange',
'high': 'red'
}
color = colors.get(obj.risk_level, 'gray')
return format_html(
'<span style="color: {}; font-weight: bold;">{} ({})</span>',
color, obj.risk_level.upper(), obj.risk_score
)
risk_badge.short_description = 'Risk'
@admin.register(FraudEvent)
class FraudEventAdmin(admin.ModelAdmin):
list_display = [
'id', 'user', 'severity_badge', 'risk_score',
'is_resolved', 'detected_at'
]
list_filter = ['severity', 'is_resolved', 'detected_at']
search_fields = ['user__username', 'description']
readonly_fields = ['detected_at']
def severity_badge(self, obj):
colors = {
'low': 'blue',
'medium': 'orange',
'high': 'red',
'critical': 'darkred'
}
color = colors.get(obj.severity, 'gray')
return format_html(
'<span style="color: {}; font-weight: bold;">{}</span>',
color, obj.severity.upper()
)
severity_badge.short_description = 'Severity'
@admin.register(RiskProfile)
class RiskProfileAdmin(admin.ModelAdmin):
list_display = [
'id', 'user', 'risk_badge', 'overall_risk_score',
'total_transactions', 'suspicious_events_count',
'is_monitored', 'is_blocked'
]
list_filter = ['risk_level', 'is_monitored', 'is_blocked']
search_fields = ['user__username']
def risk_badge(self, obj):
colors = {
'low': 'green',
'medium': 'orange',
'high': 'red'
}
color = colors.get(obj.risk_level, 'gray')
return format_html(
'<span style="color: {}; font-weight: bold;">{}</span>',
color, obj.risk_level.upper()
)
risk_badge.short_description = 'Risk Level'
@admin.register(IPBlocklist)
class IPBlocklistAdmin(admin.ModelAdmin):
list_display = ['id', 'ip_address', 'is_active', 'blocked_by', 'created_at', 'expires_at']
list_filter = ['is_active', 'created_at']
search_fields = ['ip_address', 'reason']
@admin.register(SystemLog)
class SystemLogAdmin(admin.ModelAdmin):
list_display = ['id', 'log_type', 'level_badge', 'short_message', 'user', 'created_at']
list_filter = ['log_type', 'level', 'created_at']
search_fields = ['message', 'user__username']
readonly_fields = ['created_at']
def level_badge(self, obj):
colors = {
'info': 'blue',
'warning': 'orange',
'error': 'red',
'critical': 'darkred'
}
color = colors.get(obj.level, 'gray')
return format_html(
'<span style="color: {}; font-weight: bold;">{}</span>',
color, obj.level.upper()
)
level_badge.short_description = 'Level'
def short_message(self, obj):
return obj.message[:50] + '...' if len(obj.message) > 50 else obj.message
short_message.short_description = 'Message'