Building a Pub/Sub System in .NET: MassTransit, Reactive Extensions, and BlockingCollection


Modern applications often need to broadcast events across multiple services.
Think of an order being placed in an e-commerce system. The order service publishes an event, and multiple subscribers react independently:

  • the billing service charges the customer,
  • the email service sends a receipt,
  • the analytics service tracks the order.

This is the essence of the Publish/Subscribe (Pub/Sub) pattern.

In this post we’ll explore what it is, why it matters, and three implementations in .NET:

  • MassTransit with RabbitMQ (for distributed systems)
  • Reactive Extensions (Rx) (for in-memory event streams)
  • BlockingCollection (for producer/consumer pipelines)

What is the Pub/Sub Pattern

The Pub/Sub pattern decouples senders from receivers.

  • The Publisher creates a message (e.g., OrderSubmitted) and sends it to a broker or channel.
  • The Broker delivers that message to all interested Subscribers.
  • Each Subscriber receives a copy of the message and processes it independently.

Benefits

  • Decoupling — publishers do not know subscribers
  • Scalability — multiple subscribers can process in parallel
  • Flexibility — add or remove subscribers without changing the publisher

Why it matters

This pattern has been around for decades in middleware like IBM MQ, JMS, RabbitMQ, and Kafka.
It is fundamental in:

Event-driven microservices

  • Real-time systems (chat apps, trading platforms)
  • Notification and alerting systems
  • Data pipelines and analytics
  • IoT device communication
  • At its core, Pub/Sub enables loose coupling and asynchronous communication.

⚙️ Hosted Services in ASP.NET Core

A hosted service is a background task that runs with your app host.
In .NET you typically inherit from BackgroundService and implement ExecuteAsync.

In the MassTransit demo below, a PublisherService is implemented as a hosted service to periodically publish messages.

1️⃣ Pub/Sub with MassTransit and RabbitMQ

    appsettings.json
    {
      "RabbitMq": { "Host": "localhost", "Username": "guest", "Password": "guest" },
      "PublishIntervalSeconds": 5
    }
Enter fullscreen mode

Exit fullscreen mode

OrderSubmitted.cs

 namespace MassTransitDemo;

    public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);

OrderSubmittedConsumer.cs

    using MassTransit;
    using Microsoft.Extensions.Logging;

    namespace MassTransitDemo;

    public class OrderSubmittedConsumer(ILogger<OrderSubmittedConsumer> logger) : IConsumer<OrderSubmitted>
    {
        public Task Consume(ConsumeContext<OrderSubmitted> context)
        {
            var m = context.Message;
            logger.LogInformation("Consumed OrderSubmitted, OrderId {OrderId}, Email {Email}, Total {Total}",
                m.OrderId, m.CustomerEmail, m.Total);
            return Task.CompletedTask;
        }
    }

Enter fullscreen mode

Exit fullscreen mode

PublisherService.cs

using MassTransit;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using Microsoft.Extensions.Options;

    namespace MassTransitDemo;

    public class PublisherOptions
    {
        public int PublishIntervalSeconds { get; set; } = 5;
    }

    public class PublisherService(
        ILogger<PublisherService> logger,
        IPublishEndpoint publishEndpoint,
        IOptions<PublisherOptions> options) : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var delay = TimeSpan.FromSeconds(options.Value.PublishIntervalSeconds);
            logger.LogInformation("Publisher started, interval {Seconds}s", options.Value.PublishIntervalSeconds);

            while (!stoppingToken.IsCancellationRequested)
            {
                var msg = new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", Random.Shared.Next(50, 300));
                await publishEndpoint.Publish(msg, stoppingToken);
                logger.LogInformation("Published OrderSubmitted, OrderId {OrderId}", msg.OrderId);
                await Task.Delay(delay, stoppingToken);
            }

            logger.LogInformation("Publisher stopping");
        }
    }
Enter fullscreen mode

Exit fullscreen mode

Program.cs

using MassTransit;
        using MassTransitDemo;
        using Microsoft.Extensions.Configuration;
        using Microsoft.Extensions.Hosting;
        using Microsoft.Extensions.Logging;

        var builder = Host.CreateApplicationBuilder(args);

        builder.Configuration.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
        builder.Services.Configure<PublisherOptions>(builder.Configuration);

        builder.Services.AddMassTransit(mt =>
        {
            mt.AddConsumer<OrderSubmittedConsumer>();

            mt.UsingRabbitMq((ctx, cfg) =>
            {
                var section = builder.Configuration.GetSection("RabbitMq");
                cfg.Host(section["Host"], h =>
                {
                    h.Username(section["Username"]);
                    h.Password(section["Password"]);
                });

                cfg.ReceiveEndpoint("masstransitdemo-order-submitted", e =>
                {
                    e.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
                });

                cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
                cfg.UseInMemoryOutbox();
            });
        });

        builder.Services.AddHostedService<PublisherService>();

        builder.Logging.ClearProviders();
        builder.Logging.AddSimpleConsole(o =>
        {
            o.SingleLine = true;
            o.TimestampFormat = "HH:mm:ss ";
        });

        await builder.Build().RunAsync();
Enter fullscreen mode

Exit fullscreen mode

Run RabbitMQ

    docker run -it --rm -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    # UI at http://localhost:15672  user guest  pass guest
Enter fullscreen mode

Exit fullscreen mode

MassTransit is a distributed application framework for message brokers like RabbitMQ and Azure Service Bus. It takes care of serialization, consumer lifecycle, retries, and endpoint setup.

2️⃣ Pub/Sub with Reactive Extensions (Rx)

Reactive Extensions (Rx)
is a library for composing asynchronous and event based programs with observable sequences.

  • Uses IObservable (event source) and IObserver (subscriber)
  • Provides LINQ-style operators (Where, Select, Buffer) for event streams
  • Perfect for in-memory event pipelines and UI scenarios

⚠️ Limitation: Rx is in-memory only. Events vanish when the app stops.

Program.cs

using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;

    public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);

    class Program
    {
        static void Main()
        {
            var bus = new Subject<OrderSubmitted>();

            // Subscriber A
            var subA = bus.Subscribe(order =>
                Console.WriteLine($"[Email] Send receipt to {order.CustomerEmail}"));

            // Subscriber B with filter and transform
            var subB = bus
                .Where(o => o.Total >= 100)
                .Select(o => new { o.OrderId, Vat = o.Total * 0.24m })
                .Subscribe(x =>
                    Console.WriteLine($"[Analytics] Order {x.OrderId}, VAT {x.Vat:F2}"));

            // Publish a few messages
            bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 79.90m));
            bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 149.00m));
            bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 220.00m));

            bus.OnCompleted();
            subA.Dispose();
            subB.Dispose();
        }
    }
Enter fullscreen mode

Exit fullscreen mode

3️⃣ Pub/Sub with BlockingCollection

BlockingCollection is a thread safe producer and consumer collection in .NET.

  • Supports blocking (consumers wait until data is available)
  • Supports bounding (limit capacity to avoid flooding)
  • Useful for background workers, pipelines, or batch processors

Program.cs

 using System.Collections.Concurrent;

    public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);

    class Program
    {
        static async Task Main()
        {
            using var queue = new BlockingCollection<OrderSubmitted>(boundedCapacity: 100);

            // Subscriber A
            var emailTask = Task.Run(() =>
            {
                foreach (var order in queue.GetConsumingEnumerable())
                    Console.WriteLine($"[Email] Send receipt to {order.CustomerEmail}");
            });

            // Subscriber B
            var analyticsTask = Task.Run(() =>
            {
                foreach (var order in queue.GetConsumingEnumerable())
                    Console.WriteLine($"[Analytics] Track order {order.OrderId}, total {order.Total}");
            });

            // Publisher
            foreach (var price in new[] { 49.50m, 129.00m, 250.00m, 15.00m })
            {
                queue.Add(new OrderSubmitted(Guid.NewGuid(), $"user{price}@example.com", price));
            }

            queue.CompleteAdding();
            await Task.WhenAll(emailTask, analyticsTask);
        }
    }
Enter fullscreen mode

Exit fullscreen mode

⚠️ Limitation: Only works inside a single process. No distribution.

References

MassTransit Documentation

MassTransit GitHub

RabbitMQ Tutorials

Microsoft Docs: Worker Services

Microsoft Docs: Publisher-Subscriber Pattern

System.Reactive GitHub

BlockingCollection Overview

Source Code

✅ With these three approaches, you now have the full spectrum of Pub/Sub in .NET:

  • Lightweight in-memory streams with Rx and BlockingCollection
  • Robust distributed messaging with MassTransit and RabbitMQ
  • The Pub/Sub pattern has lasted decades because it makes systems more modular, more scalable, and more resilient to change.



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *