Modern applications often need to broadcast events across multiple services.
Think of an order being placed in an e-commerce system. The order service publishes an event, and multiple subscribers react independently:
- the billing service charges the customer,
- the email service sends a receipt,
- the analytics service tracks the order.
This is the essence of the Publish/Subscribe (Pub/Sub) pattern.
In this post we’ll explore what it is, why it matters, and three implementations in .NET:
- MassTransit with RabbitMQ (for distributed systems)
- Reactive Extensions (Rx) (for in-memory event streams)
- BlockingCollection (for producer/consumer pipelines)
What is the Pub/Sub Pattern
The Pub/Sub pattern decouples senders from receivers.
- The Publisher creates a message (e.g., OrderSubmitted) and sends it to a broker or channel.
- The Broker delivers that message to all interested Subscribers.
- Each Subscriber receives a copy of the message and processes it independently.
Benefits
- Decoupling — publishers do not know subscribers
- Scalability — multiple subscribers can process in parallel
- Flexibility — add or remove subscribers without changing the publisher
Why it matters
This pattern has been around for decades in middleware like IBM MQ, JMS, RabbitMQ, and Kafka.
It is fundamental in:
Event-driven microservices
- Real-time systems (chat apps, trading platforms)
- Notification and alerting systems
- Data pipelines and analytics
- IoT device communication
- At its core, Pub/Sub enables loose coupling and asynchronous communication.
⚙️ Hosted Services in ASP.NET Core
A hosted service is a background task that runs with your app host.
In .NET you typically inherit from BackgroundService and implement ExecuteAsync.
In the MassTransit demo below, a PublisherService is implemented as a hosted service to periodically publish messages.
1️⃣ Pub/Sub with MassTransit and RabbitMQ
appsettings.json
{
"RabbitMq": { "Host": "localhost", "Username": "guest", "Password": "guest" },
"PublishIntervalSeconds": 5
}
OrderSubmitted.cs
namespace MassTransitDemo;
public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);
OrderSubmittedConsumer.cs
using MassTransit;
using Microsoft.Extensions.Logging;
namespace MassTransitDemo;
public class OrderSubmittedConsumer(ILogger<OrderSubmittedConsumer> logger) : IConsumer<OrderSubmitted>
{
public Task Consume(ConsumeContext<OrderSubmitted> context)
{
var m = context.Message;
logger.LogInformation("Consumed OrderSubmitted, OrderId {OrderId}, Email {Email}, Total {Total}",
m.OrderId, m.CustomerEmail, m.Total);
return Task.CompletedTask;
}
}
PublisherService.cs
using MassTransit;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace MassTransitDemo;
public class PublisherOptions
{
public int PublishIntervalSeconds { get; set; } = 5;
}
public class PublisherService(
ILogger<PublisherService> logger,
IPublishEndpoint publishEndpoint,
IOptions<PublisherOptions> options) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var delay = TimeSpan.FromSeconds(options.Value.PublishIntervalSeconds);
logger.LogInformation("Publisher started, interval {Seconds}s", options.Value.PublishIntervalSeconds);
while (!stoppingToken.IsCancellationRequested)
{
var msg = new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", Random.Shared.Next(50, 300));
await publishEndpoint.Publish(msg, stoppingToken);
logger.LogInformation("Published OrderSubmitted, OrderId {OrderId}", msg.OrderId);
await Task.Delay(delay, stoppingToken);
}
logger.LogInformation("Publisher stopping");
}
}
Program.cs
using MassTransit;
using MassTransitDemo;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
var builder = Host.CreateApplicationBuilder(args);
builder.Configuration.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
builder.Services.Configure<PublisherOptions>(builder.Configuration);
builder.Services.AddMassTransit(mt =>
{
mt.AddConsumer<OrderSubmittedConsumer>();
mt.UsingRabbitMq((ctx, cfg) =>
{
var section = builder.Configuration.GetSection("RabbitMq");
cfg.Host(section["Host"], h =>
{
h.Username(section["Username"]);
h.Password(section["Password"]);
});
cfg.ReceiveEndpoint("masstransitdemo-order-submitted", e =>
{
e.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
});
cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
cfg.UseInMemoryOutbox();
});
});
builder.Services.AddHostedService<PublisherService>();
builder.Logging.ClearProviders();
builder.Logging.AddSimpleConsole(o =>
{
o.SingleLine = true;
o.TimestampFormat = "HH:mm:ss ";
});
await builder.Build().RunAsync();
Run RabbitMQ
docker run -it --rm -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# UI at http://localhost:15672 user guest pass guest
MassTransit is a distributed application framework for message brokers like RabbitMQ and Azure Service Bus. It takes care of serialization, consumer lifecycle, retries, and endpoint setup.
2️⃣ Pub/Sub with Reactive Extensions (Rx)
Reactive Extensions (Rx)
is a library for composing asynchronous and event based programs with observable sequences.
- Uses IObservable (event source) and IObserver (subscriber)
- Provides LINQ-style operators (Where, Select, Buffer) for event streams
- Perfect for in-memory event pipelines and UI scenarios
⚠️ Limitation: Rx is in-memory only. Events vanish when the app stops.
Program.cs
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);
class Program
{
static void Main()
{
var bus = new Subject<OrderSubmitted>();
// Subscriber A
var subA = bus.Subscribe(order =>
Console.WriteLine($"[Email] Send receipt to {order.CustomerEmail}"));
// Subscriber B with filter and transform
var subB = bus
.Where(o => o.Total >= 100)
.Select(o => new { o.OrderId, Vat = o.Total * 0.24m })
.Subscribe(x =>
Console.WriteLine($"[Analytics] Order {x.OrderId}, VAT {x.Vat:F2}"));
// Publish a few messages
bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 79.90m));
bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 149.00m));
bus.OnNext(new OrderSubmitted(Guid.NewGuid(), "*Emails are not allowed*", 220.00m));
bus.OnCompleted();
subA.Dispose();
subB.Dispose();
}
}
3️⃣ Pub/Sub with BlockingCollection
BlockingCollection is a thread safe producer and consumer collection in .NET.
- Supports blocking (consumers wait until data is available)
- Supports bounding (limit capacity to avoid flooding)
- Useful for background workers, pipelines, or batch processors
Program.cs
using System.Collections.Concurrent;
public record OrderSubmitted(Guid OrderId, string CustomerEmail, decimal Total);
class Program
{
static async Task Main()
{
using var queue = new BlockingCollection<OrderSubmitted>(boundedCapacity: 100);
// Subscriber A
var emailTask = Task.Run(() =>
{
foreach (var order in queue.GetConsumingEnumerable())
Console.WriteLine($"[Email] Send receipt to {order.CustomerEmail}");
});
// Subscriber B
var analyticsTask = Task.Run(() =>
{
foreach (var order in queue.GetConsumingEnumerable())
Console.WriteLine($"[Analytics] Track order {order.OrderId}, total {order.Total}");
});
// Publisher
foreach (var price in new[] { 49.50m, 129.00m, 250.00m, 15.00m })
{
queue.Add(new OrderSubmitted(Guid.NewGuid(), $"user{price}@example.com", price));
}
queue.CompleteAdding();
await Task.WhenAll(emailTask, analyticsTask);
}
}
⚠️ Limitation: Only works inside a single process. No distribution.
References
Microsoft Docs: Worker Services
Microsoft Docs: Publisher-Subscriber Pattern
✅ With these three approaches, you now have the full spectrum of Pub/Sub in .NET:
- Lightweight in-memory streams with Rx and BlockingCollection
- Robust distributed messaging with MassTransit and RabbitMQ
- The Pub/Sub pattern has lasted decades because it makes systems more modular, more scalable, and more resilient to change.