একটি Django প্রজেক্ট ও একটি এপ্লিকেশন বানাই
Create a Django Project:
django-admin startproject project_nameNavigate to the Project Directory:
cd project_nameCreate a Django App:
python manage.py startapp app_name
setings.py
INSTALLED_APPS = [
'app_name',
]python manage.py migrate
app_name/views.py
# app_name/views.py
from django.http import HttpResponse
def index(request):
return HttpResponse("Hello from my view!")
app_name/urls.py
# app_name/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'),
]project urls.py
# project_name/urls.py
from django.contrib import admin
from django.urls import path, include # Import the include function
urlpatterns = [
path('admin/', admin.site.urls),
path('', include('app_name.urls')), # Include the app's URLs
]Start the Development Server To Check
python manage.py runserver
celery Configure
রেডিস ইনস্টল করি
এই লিংক থেকে রেডিস ডাউনলোড করে ইনস্টল করি link
প্রয়োজনীয় লাইব্রেরি ইনস্টল করি
pip install celery redis django-celery-results django-celery-beatSettings.py
INSTALLED_APPS = [
'django_celery_results',
'django_celery_beat'
]python manage.py migrate
প্রজেক্ট ডিরেক্টরির যেখানে settings.py ফাইল আছে সেখানে celery.py নামে একটি ফাইল তৈরী করি।
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings')
app = Celery('project_name')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
প্রজেক্ট এর settings.py ডিরেক্টরি তে init.py তে celery যোগ করি
from .celery import app as celery_app
__all__ = ('celery_app',) setings.py
# Celery settings
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
app ফোল্ডারে tasks.py নামে একটি ফাইল তৈরী করি
# app_name/tasks.py
from celery import shared_task
import time
@shared_task
def print_numbers():
for i in range(1, 11):
print(f"Number: {i}")
time.sleep(1) # Sleep for 1 second between each print
app ফোল্ডারে views.py নামে ফাইলটি নিচের মত করি
from django.shortcuts import render
from django.http import HttpResponse # Import HttpResponse
from .tasks import print_numbers
def print_numbers_view(request):
# Use Celery task to print numbers asynchronously
print_numbers.delay()
return HttpResponse ('email_sent.html')urls.py
from django.urls import path
from . import views
urlpatterns = [
path('', views.print_numbers_view, name='print_numbers_view'),
]Celery Beat Schedule
CELERY_BEAT_SCHEDULE = {
'print_numbers': {
'task': 'app_name.tasks.print_numbers',
'schedule': 300.0, # Run every 5 minutes (300 seconds)
},
}প্রজেক্ট ফোল্ডারে cmd ওপেন করে নিচের কমান্ড রান করি
celery -A project_name worker -l info
celery -A project_name beat -l info