Engineering

Building a Batch Notification System with MongoDB, Django, Celery, and SendGrid for Developers

Anjali Arya
July 25, 2024
Discover how to design and implement a robust batched notification system using MongoDB, Django, Celery, and SendGrid. Learn key strategies for handling race conditions, ensuring single execution, managing retries, and implementing advanced features like item counting and user-specific batch windows.
TABLE OF CONTENTS

What are Batch Notifications?

Batched notifications group multiple notifications into a single message delivered within a specific time window. Unlike traditional systems where each event triggers a separate notification, batched notifications reduce noise and increase engagement.

For example, in a document collaboration app, instead of sending an email for each comment, a batched system sends one email with all comments within a time frame. You can actually build a batched notification mechanism in your collaborative app directly via here How to Batch Notifications for your Social Media/ Collaborative Application?

Key Benefits:

  • Reduced Notification Noise: Fewer, more informative notifications.
  • Increased Engagement: Higher likelihood of user interaction.
  • Improved Retention: Happier users due to fewer interruptions.

Designing a Batched Notification System

There are two main approaches to designing a batched notification system:

  1. Batch on Write: Accumulate notifications into batches as events occur.
  2. Batch on Read: Periodically batch unsent notifications.

The choice between these approaches affects system performance and scalability.

Batch on Write

In this method, notifications are accumulated as events occur, optimizing lookups.

Batch on Read

In this method, batches are created periodically by querying unsent notifications. This can be less efficient as data volume grows.

Our Approach: Batch on Write

Given the scalability benefits, we'll use the batch on write approach, even though it requires more initial effort.

Database Design with MongoDB

We'll use MongoDB, a flexible NoSQL database, to model our system. The collections will be:

  • notifications: Tracks individual notifications.
  • notification_batches: Tracks batched notifications.
  • notification_batch_notifications: Links individual notifications to batches.

MongoDB Schema

 
    # notifications collection
    {
      "_id": ObjectId(),
      "type": String,
      "actor_id": ObjectId(),
      "recipient_id": ObjectId(),
      "object_id": ObjectId(),
      "object_type": String,
      "inserted_at": DateTime
    }

    # notification_batches collection
    {
      "_id": ObjectId(),
      "type": String,
      "recipient_id": ObjectId(),
      "batch_key": String,
      "object_id": ObjectId(),
      "object_type": String,
      "closes_at": DateTime,
      "processed_at": DateTime,
      "inserted_at": DateTime
    }

    # notification_batch_notifications collection
    {
      "notification_batch_id": ObjectId(),
      "notification_id": ObjectId(),
      "inserted_at": DateTime
    }

    

Batching Notifications with Django

We'll use Django for our web framework. Here's how to batch notifications:

  1. Generate a batch_key.
  2. Create a notification entry.
  3. Find or create an open batch.
  4. Add the notification to the batch.

Django Code Example

 
    # models.py

    from django.db import models
    from bson import ObjectId

    class Notification(models.Model):
        _id = models.CharField(max_length=24, primary_key=True, default=lambda: str(ObjectId()))
        type = models.CharField(max_length=255)
        actor_id = models.CharField(max_length=24)
        recipient_id = models.CharField(max_length=24)
        object_id = models.CharField(max_length=24)
        object_type = models.CharField(max_length=255)
        inserted_at = models.DateTimeField(auto_now_add=True)

    class NotificationBatch(models.Model):
        _id = models.CharField(max_length=24, primary_key=True, default=lambda: str(ObjectId()))
        type = models.CharField(max_length=255)
        recipient_id = models.CharField(max_length=24)
        batch_key = models.CharField(max_length=255)
        object_id = models.CharField(max_length=24)
        object_type = models.CharField(max_length=255)
        closes_at = models.DateTimeField()
        processed_at = models.DateTimeField(null=True, blank=True)
        inserted_at = models.DateTimeField(auto_now_add=True)

    class NotificationBatchNotification(models.Model):
        notification_batch_id = models.CharField(max_length=24)
        notification_id = models.CharField(max_length=24)
        inserted_at = models.DateTimeField(auto_now_add=True)

    

Adding a Notification to a Batch

 
    # views.py

    from datetime import datetime, timedelta
    from .models import Notification, NotificationBatch, NotificationBatchNotification

    def add_notification_to_batch(comment, user, document):
        recipients = get_recipients(document, user)

        batch_key = f"document:{document.id}:comments"
        batch_window = timedelta(minutes=5)

        for recipient in recipients:
            notification = Notification.objects.create(
                type="new-comment",
                actor_id=str(user.id),
                recipient_id=str(recipient.id),
                object_id=str(comment.id),
                object_type="comment"
            )

            batch = NotificationBatch.objects.filter(
                recipient_id=str(recipient.id),
                batch_key=batch_key,
                closes_at__gte=datetime.now()
            ).first()

            if not batch:
                batch = NotificationBatch.objects.create(
                    type="new-comment",
                    recipient_id=str(recipient.id),
                    batch_key=batch_key,
                    object_id=str(document.id),
                    object_type="document",
                    closes_at=datetime.now() + batch_window
                )

            NotificationBatchNotification.objects.create(
                notification_batch_id=str(batch.id),
                notification_id=str(notification.id)
            )

    def get_recipients(document, user):
        # Example function to get document collaborators excluding the comment creator
        return document.collaborators.exclude(id=user.id)

    

Flushing Closed Batches with Celery

We'll use Celery to handle the periodic task of flushing closed batches and sending notifications. Incase you want to read more about how we handle concurrent DB writes using Celery and Redis, you can check How Redis Solved Our Challenges with Dynamic Task Scheduling and Concurrent Execution? [Developer's Guide]

Celery Task Example

 
    # tasks.py

    from celery import shared_task
    from datetime import datetime
    from .models import NotificationBatch
    from .notifications import send_batch_notification

    @shared_task
    def flush_closed_batches():
        batches = NotificationBatch.objects.filter(
            closes_at__lte=datetime.now(),
            processed_at__isnull=True
        )

        for batch in batches:
            send_batch_notification(batch)
            batch.processed_at = datetime.now()
            batch.save()

    # notifications.py

    from django.template.loader import render_to_string
    from sendgrid import SendGridAPIClient
    from sendgrid.helpers.mail import Mail

    def send_batch_notification(batch):
        notifications = NotificationBatchNotification.objects.filter(notification_batch_id=batch.id)
        context = {"notifications": notifications, "batch": batch}

        subject = f"New comments on {batch.object_id}"
        message = render_to_string("email_template.html", context)
        recipient = batch.recipient_id

        send_email(subject, message, recipient)

    def send_email(subject, message, recipient):
        sg = SendGridAPIClient('SENDGRID_API_KEY')
        email = Mail(
            from_email='no-reply@example.com',
            to_emails=recipient,
            subject=subject,
            html_content=message
        )
        sg.send(email)

    

Generating Notification Messages

We use a template to generate the notification message from a batch.

Email Template Example

    
    <!-- email_template.html -->
    
    <h1>Comments for {{ "{{ batch.object_id }}" }}</h1>
    
    <% for notification in notifications %>
      <p>
        <strong>{{ "{{ notification.actor_id }}" }}</strong> said at {{ "{{ notification.inserted_at }}" }}:
      </p>
      <blockquote>
        <p>{{ "{{ notification.object_id }}" }}</p>
      </blockquote>
    <% endfor %>

Preparing Your Batched Notification Engine for Production

Building a robust and scalable batched notification engine involves several key considerations and advanced features to ensure smooth operation and optimal user experience. Here’s how to prepare your system for production:

Key Considerations for Production-Ready Batch Notifications

  • Race Conditions:
    • Ensure Atomic Operations: Utilize database transactions and locking mechanisms to avoid creating duplicate batches. In MongoDB, you can use the findAndModify operation to perform atomic updates.
    • Example in Django:
 
    from django.db import transaction

    @transaction.atomic
    def create_batch_if_not_exists(recipient_id, batch_key, batch_window):
        batch = NotificationBatch.objects.select_for_update().filter(
            recipient_id=recipient_id,
            batch_key=batch_key,
            closes_at__gt=timezone.now()
        ).first()
        if not batch:
            batch = NotificationBatch.objects.create(
                recipient_id=recipient_id,
                batch_key=batch_key,
                closes_at=timezone.now() + timedelta(seconds=batch_window)
            )
        return batch

    

  • Single Execution:
    • Mark Batches as Processed: Ensure each batch is processed only once by marking it as processed after sending notifications. This can be done by updating the processed_at field.
    • Example in Celery:
 
    @celery.task
    def flush_batch(batch_id):
        batch = NotificationBatch.objects.get(id=batch_id)
        if batch.processed_at:
            return
        send_notification(batch)
        batch.processed_at = timezone.now()
        batch.save()


    

  • Retries:
    • Handle Failures Gracefully: Implement retry logic in Celery to handle temporary failures. Use exponential backoff and logging to manage retries.
    • Example:
 
    @celery.task(bind=True, max_retries=5)
    def send_notification(self, batch):
        try:
            # Code to send notification
        except Exception as e:
            self.retry(exc=e, countdown=2 ** self.request.retries)


    

Advanced Features for Enhanced Batch Notifications

  • Item Count:
    • Track Total Items: Keep a count of the total number of items in each batch to provide more informative notifications, such as “You have 5 new comments.”
    • Example in Django:
 
    class NotificationBatch(models.Model):
        recipient = models.ForeignKey(User, on_delete=models.CASCADE)
        batch_key = models.CharField(max_length=255)
        closes_at = models.DateTimeField()
        processed_at = models.DateTimeField(null=True, blank=True)
        item_count = models.IntegerField(default=0)

    def add_notification_to_batch(batch, notification):
        batch.item_count += 1
        batch.save()
        # Add notification to batch


    

  • Early Flushing:
    • Flush Based on Activity Thresholds: Implement logic to flush batches early when a certain number of notifications is reached, ensuring timely updates for high-activity users.
    • Example:
 
    def check_and_flush_batch(batch):
        if batch.item_count >= MAX_ITEMS_PER_BATCH:
            flush_batch(batch.id)


    

  • User-Specific Windows:
    • Customizable Batch Windows: Allow users to set their preferred batch window durations, offering flexibility in how frequently they receive notifications.
    • Example in Django:
 
    class UserProfile(models.Model):
        user = models.OneToOneField(User, on_delete=models.CASCADE)
        batch_window = models.IntegerField(default=300)  # Default to 5 minutes

    def get_user_batch_window(user):
        return user.profile.batch_window


    

  • Partitioned Cron Jobs:
    • Distribute Tasks for High Volume: For applications with a large number of users, distribute cron jobs to handle the load effectively. This can be achieved by partitioning the data and processing in parallel.
    • Example in Celery:
 
    @celery.task
    def flush_batches():
        batch_ids = NotificationBatch.objects.filter(
            closes_at__lte=timezone.now(), processed_at__isnull=True
        ).values_list('id', flat=True)
        for batch_id in batch_ids:
            flush_batch.delay(batch_id)


    

By implementing these considerations and advanced features, you can build a production-ready batched notification system that is both scalable and efficient, leading to better user engagement and satisfaction.

Building Batching May Not Seem Complex, Though an Experienced Developer Will Certainly Understand the Overheads in Maintaining it.

If you find the tasks described above daunting for your team, you're not alone. That's why we created SuprSend.

Here's how we differentiate your notifications building process for batched notifications.

Feature/Step Manual Batch Notification System SuprSend Batch Notification Integration
Notification Batching Manual configuration required to group notifications. Developers need to write custom code to batch events and send consolidated notifications. Automatically batches multiple triggers into a single output, sending one consolidated notification.
Batch Window Management Developers must define and manage the batch window manually, often requiring custom logic to handle fixed and dynamic windows. Offers both fixed and dynamic batch windows. Dynamic windows can be set based on event properties, simplifying per-user customization.
Batch Key Management Custom implementation needed to create unique batches for different scenarios. Allows defining unique batches using the batch_key, making it easy to create multiple batches per user.
Retain Batch Events Developers must implement logic to limit and manage event data within a batch. Provides built-in functionality to retain a specific number of events, configurable between 2 and 100.
Template Integration Manual creation of templates and handling of batched event variables. Simplifies template creation with $batched_events and $batched_event_count variables, providing easy integration with notification content.
Database Management Requires developers to design and manage database schemas for notifications and batches, usually involving multiple collections or tables. Abstracts the complexity of database management, providing a streamlined approach to handle batch notifications.
Cron Job and Task Scheduling Developers need to configure cron jobs and manage job queues manually, often using tools like Celery. Eliminates the need for manual cron job configuration, handling task scheduling automatically.
Error Handling and Retries Custom logic needed for handling race conditions, ensuring atomic operations, and implementing retry mechanisms. Built-in error handling, ensuring atomic operations with tools like findAndModify in MongoDB, and automatic retry logic for failed tasks.
Scalability and Performance Requires careful planning and optimization to scale, including partitioned cron jobs for high-volume applications. Designed for scalability, automatically managing high volumes and distributing tasks efficiently.
User-Specific Customization Developers must implement custom logic to support user-specific batch windows and preferences. Supports per-user batch windows with dynamic configuration based on event properties, making customization straightforward.
Integration and Maintenance High initial effort and ongoing maintenance to ensure system reliability and performance. Lowers integration effort and reduces maintenance overhead, providing a robust solution with minimal setup.

Example Use Case: Batching Comments in a Collaboration App

Manual Approach:

  • Write custom code to batch comments within a specified time window.
  • Design database schemas and manage entries for notifications and batches.
  • Configure cron jobs to periodically check and send batched notifications.
  • Implement error handling, retries, and ensure atomic operations.

SuprSend Approach:

  • Define a batch node within the SuprSend workflow.
  • Set a fixed or dynamic batch window based on event properties.
  • Use built-in batch variables in notification templates.
  • SuprSend handles the batching, database management, scheduling, and error handling automatically.

You can rely on SuprSend to orchestrate notifications across various channels, manage user preferences, and centralize your notification templates for team-wide visibility.

Practical Demonstration on a Demo Application (Deployed on Github)

We built a practical demo application to demonstrate how you can integrate the batching mechanism in your notification without any development. You can read more here: How to Batch Notifications for your Social Media/ Collaborative Application? (suprsend.com)

It contains the Github link as well.

What are batch notifications on facebook?

Batch notifications on Facebook refer to a feature where multiple notifications are grouped or batched together into a single notification event. This grouping helps reduce the number of individual notifications a user receives, especially when there are multiple related activities or updates within a short period.

For example, instead of receiving separate notifications for each comment, like, or mention on a post, Facebook may batch these notifications into a single notification to improve user experience and reduce notification overload.

Written by:
Anjali Arya
Product & Analytics, SuprSend
Get a powerful notification engine with SuprSend
Build smart notifications across channels in minutes with a single API and frontend components
Implement a powerful stack for your notifications
By clicking “Accept All Cookies”, you agree to the storing of cookies on your device to enhance site navigation, analyze site usage, and assist in our marketing efforts. View our Privacy Policy for more information.