Django

⌘K
  1. Home
  2. Django
  3. Django তে কিভাবে কাজ করতে...
  4. Celery
  5. ৬. প্রাকটিক্যাল করি

৬. প্রাকটিক্যাল করি

একটি Django প্রজেক্ট ও একটি এপ্লিকেশন বানাই

Create a Django Project:

django-admin startproject project_name

Navigate to the Project Directory:

cd project_name

Create a Django App:

python manage.py startapp app_name

setings.py

INSTALLED_APPS = [
  'app_name',

]

python manage.py migrate 

app_name/views.py

# app_name/views.py

from django.http import HttpResponse

def index(request):
    return HttpResponse("Hello from my view!")

app_name/urls.py

# app_name/urls.py

from django.urls import path

from . import views

urlpatterns = [

path('', views.index, name='index'),

]

project urls.py

# project_name/urls.py

from django.contrib import admin

from django.urls import path, include # Import the include function

urlpatterns = [

path('admin/', admin.site.urls),

path('', include('app_name.urls')), # Include the app's URLs

]

Start the Development Server To Check

python manage.py runserver

celery Configure

রেডিস ইনস্টল করি

এই লিংক থেকে রেডিস ডাউনলোড করে ইনস্টল করি link

প্রয়োজনীয় লাইব্রেরি ইনস্টল করি

pip install  celery redis django-celery-results django-celery-beat

Settings.py

INSTALLED_APPS = [
   'django_celery_results',
   'django_celery_beat'

]

python manage.py migrate 

প্রজেক্ট ডিরেক্টরির যেখানে settings.py ফাইল আছে সেখানে celery.py নামে একটি ফাইল তৈরী করি।

from __future__ import absolute_import, unicode_literals
import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings')

app = Celery('project_name')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

প্রজেক্ট এর settings.py ডিরেক্টরি তে init.py তে celery যোগ করি

from .celery import app as celery_app

__all__ = ('celery_app',) 

setings.py



# Celery settings
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True

app ফোল্ডারে tasks.py নামে একটি ফাইল তৈরী করি

# app_name/tasks.py


from celery import shared_task
import time

@shared_task
def print_numbers():
    for i in range(1, 11):
        print(f"Number: {i}")
        time.sleep(1)  # Sleep for 1 second between each print


    

app ফোল্ডারে views.py নামে ফাইলটি নিচের মত করি

from django.shortcuts import render
from django.http import HttpResponse  # Import HttpResponse

from .tasks import print_numbers


def print_numbers_view(request):
    # Use Celery task to print numbers asynchronously
    print_numbers.delay()

    return HttpResponse  ('email_sent.html')

urls.py


from django.urls import path

from . import views

urlpatterns = [

path('', views.print_numbers_view, name='print_numbers_view'),

]

Celery Beat Schedule

CELERY_BEAT_SCHEDULE = {
    'print_numbers': {
        'task': 'app_name.tasks.print_numbers',
        'schedule': 300.0,  # Run every 5 minutes (300 seconds)
    },
}

প্রজেক্ট ফোল্ডারে cmd ওপেন করে নিচের কমান্ড রান করি

celery -A project_name worker -l info 
celery -A project_name beat -l info

How can we help?