Back to Deep Dives

Event-Driven Architecture: Building Real-Time Systems with .NET and Azure

AzureEvent-Driven.NETService BusArchitecture

Learn how to design and implement event-driven systems using Azure Service Bus, Event Grid, and .NET to build scalable, loosely-coupled real-time applications.

Event-Driven Architecture: Building Real-Time Systems

Event-driven architecture (EDA) has become essential for building modern, scalable applications. In my experience developing distributed systems, I've found that EDA enables loose coupling, better scalability, and improved resilience compared to traditional request-response patterns.

Why Event-Driven Architecture?

Traditional monolithic architectures struggle with:

  • Tight coupling between components
  • Synchronous dependencies that create bottlenecks
  • Scaling challenges when parts of the system have different load patterns
  • Failure cascades when one service goes down

Event-driven systems solve these problems by allowing services to communicate through events rather than direct calls.

Core Concepts

Events vs Messages

Events are facts about something that happened:

public class OrderPlacedEvent
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime PlacedAt { get; set; }
}

Commands are requests to do something:

public class ProcessPaymentCommand
{
    public Guid OrderId { get; set; }
    public decimal Amount { get; set; }
    public string PaymentMethod { get; set; }
}

Pub/Sub Pattern with Azure Service Bus

Azure Service Bus Topics provide a powerful pub/sub mechanism:

// Publisher
public class OrderService
{
    private readonly ServiceBusSender _sender;
    
    public async Task PlaceOrderAsync(Order order)
    {
        // Business logic to save order
        await _orderRepository.SaveAsync(order);
        
        // Publish event
        var orderPlaced = new OrderPlacedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.Total,
            PlacedAt = DateTime.UtcNow
        };
        
        var message = new ServiceBusMessage(
            JsonSerializer.SerializeToUtf8Bytes(orderPlaced))
        {
            MessageId = Guid.NewGuid().ToString(),
            ContentType = "application/json",
            Subject = "order.placed"
        };
        
        await _sender.SendMessageAsync(message);
    }
}

// Subscriber - Inventory Service
public class OrderPlacedHandler : IHostedService
{
    private readonly ServiceBusProcessor _processor;
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += MessageHandler;
        _processor.ProcessErrorAsync += ErrorHandler;
        
        await _processor.StartProcessingAsync(stoppingToken);
    }
    
    private async Task MessageHandler(ProcessMessageEventArgs args)
    {
        var orderPlaced = JsonSerializer.Deserialize<OrderPlacedEvent>(
            args.Message.Body);
        
        // Reserve inventory
        await _inventoryService.ReserveStockAsync(orderPlaced.OrderId);
        
        // Complete the message
        await args.CompleteMessageAsync(args.Message);
    }
}

Real-Time Processing Patterns

Pattern 1: Event Sourcing

Instead of storing current state, store events that led to that state:

public class Order
{
    private readonly List<IEvent> _events = new();
    
    public void PlaceOrder(Guid customerId, List<OrderItem> items)
    {
        var orderPlaced = new OrderPlacedEvent
        {
            OrderId = Guid.NewGuid(),
            CustomerId = customerId,
            Items = items,
            PlacedAt = DateTime.UtcNow
        };
        
        Apply(orderPlaced);
        _events.Add(orderPlaced);
    }
    
    public void ApprovePayment(string transactionId)
    {
        var paymentApproved = new PaymentApprovedEvent
        {
            OrderId = this.Id,
            TransactionId = transactionId,
            ApprovedAt = DateTime.UtcNow
        };
        
        Apply(paymentApproved);
        _events.Add(paymentApproved);
    }
    
    private void Apply(OrderPlacedEvent e)
    {
        Id = e.OrderId;
        CustomerId = e.CustomerId;
        Status = OrderStatus.Pending;
    }
    
    private void Apply(PaymentApprovedEvent e)
    {
        Status = OrderStatus.Confirmed;
        TransactionId = e.TransactionId;
    }
}

Pattern 2: CQRS (Command Query Responsibility Segregation)

Separate read and write models for better scalability:

// Write Model - Commands
public class CreateOrderCommandHandler
{
    private readonly IOrderRepository _repository;
    private readonly IEventPublisher _publisher;
    
    public async Task<Guid> Handle(CreateOrderCommand command)
    {
        var order = new Order(command.CustomerId, command.Items);
        await _repository.SaveAsync(order);
        
        // Publish events to update read model
        await _publisher.PublishAsync(order.GetUncommittedEvents());
        
        return order.Id;
    }
}

// Read Model - Queries
public class OrderReadModel
{
    public Guid Id { get; set; }
    public string CustomerName { get; set; }
    public decimal Total { get; set; }
    public string Status { get; set; }
    public DateTime PlacedAt { get; set; }
}

public class GetOrderQueryHandler
{
    private readonly IReadOnlyRepository<OrderReadModel> _readRepository;
    
    public async Task<OrderReadModel> Handle(GetOrderQuery query)
    {
        // Fast read from optimized read model
        return await _readRepository.GetByIdAsync(query.OrderId);
    }
}

Implementing Saga Pattern for Distributed Transactions

When you need to coordinate multiple services:

public class OrderSaga
{
    private readonly IServiceBusClient _serviceBus;
    
    public async Task ProcessOrderAsync(Order order)
    {
        var sagaId = Guid.NewGuid();
        
        try
        {
            // Step 1: Reserve Inventory
            await _serviceBus.SendAsync(new ReserveInventoryCommand
            {
                SagaId = sagaId,
                OrderId = order.Id,
                Items = order.Items
            });
            
            // Step 2: Process Payment
            await _serviceBus.SendAsync(new ProcessPaymentCommand
            {
                SagaId = sagaId,
                OrderId = order.Id,
                Amount = order.Total
            });
            
            // Step 3: Ship Order
            await _serviceBus.SendAsync(new ShipOrderCommand
            {
                SagaId = sagaId,
                OrderId = order.Id
            });
        }
        catch (Exception ex)
        {
            // Compensating transactions
            await CompensateAsync(sagaId, order);
        }
    }
    
    private async Task CompensateAsync(Guid sagaId, Order order)
    {
        // Rollback in reverse order
        await _serviceBus.SendAsync(new CancelShipmentCommand { SagaId = sagaId });
        await _serviceBus.SendAsync(new RefundPaymentCommand { SagaId = sagaId });
        await _serviceBus.SendAsync(new ReleaseInventoryCommand { SagaId = sagaId });
    }
}

Azure Event Grid for Serverless Events

For lightweight, serverless event processing:

// Publishing to Event Grid
public class BlobStorageEventPublisher
{
    private readonly EventGridPublisherClient _client;
    
    public async Task PublishFileUploadedAsync(string fileName, string blobUrl)
    {
        var cloudEvent = new CloudEvent(
            source: "/storage/uploads",
            type: "Storage.BlobUploaded",
            jsonSerializableData: new
            {
                FileName = fileName,
                BlobUrl = blobUrl,
                UploadedAt = DateTime.UtcNow
            });
        
        await _client.SendEventAsync(cloudEvent);
    }
}

// Azure Function subscriber
public class FileUploadedHandler
{
    [FunctionName("ProcessUploadedFile")]
    public async Task Run(
        [EventGridTrigger] EventGridEvent eventGridEvent,
        ILogger log)
    {
        var fileData = JsonSerializer.Deserialize<FileUploadedData>(
            eventGridEvent.Data.ToString());
        
        log.LogInformation($"Processing file: {fileData.FileName}");
        
        // Process the file (scan, resize, analyze, etc.)
        await _fileProcessor.ProcessAsync(fileData.BlobUrl);
    }
}

Monitoring and Observability

Critical for event-driven systems:

public class ObservableMessageHandler
{
    private readonly ILogger _logger;
    private readonly IMetrics _metrics;
    
    public async Task HandleAsync(ServiceBusReceivedMessage message)
    {
        var stopwatch = Stopwatch.StartNew();
        
        try
        {
            await ProcessMessageAsync(message);
            
            _metrics.IncrementCounter("messages.processed", new Dictionary<string, object>
            {
                ["subject"] = message.Subject,
                ["status"] = "success"
            });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process message {MessageId}", message.MessageId);
            
            _metrics.IncrementCounter("messages.processed", new Dictionary<string, object>
            {
                ["subject"] = message.Subject,
                ["status"] = "error"
            });
            
            throw;
        }
        finally
        {
            stopwatch.Stop();
            _metrics.RecordValue("message.processing.duration", 
                stopwatch.ElapsedMilliseconds,
                new Dictionary<string, object> { ["subject"] = message.Subject });
        }
    }
}

Best Practices from Production

1. Idempotency is Critical

Always design handlers to be idempotent:

public class PaymentHandler
{
    public async Task HandleAsync(ProcessPaymentCommand command)
    {
        // Check if already processed
        var existing = await _paymentRepository.GetByOrderIdAsync(command.OrderId);
        if (existing != null)
        {
            _logger.LogInformation("Payment already processed for order {OrderId}", 
                command.OrderId);
            return; // Idempotent - safe to call multiple times
        }
        
        // Process payment
        var payment = await _paymentGateway.ChargeAsync(command);
        await _paymentRepository.SaveAsync(payment);
    }
}

2. Dead Letter Queue Handling

public async Task ProcessDeadLetterQueueAsync()
{
    var receiver = _serviceBusClient.CreateReceiver(
        "orders-topic", 
        "inventory-subscription/$deadletterqueue");
    
    var messages = await receiver.ReceiveMessagesAsync(maxMessages: 10);
    
    foreach (var message in messages)
    {
        _logger.LogWarning(
            "Dead letter message: {MessageId}, Reason: {Reason}, Description: {Description}",
            message.MessageId,
            message.DeadLetterReason,
            message.DeadLetterErrorDescription);
        
        // Analyze and potentially retry or alert
        await AnalyzeAndHandleAsync(message);
    }
}

3. Circuit Breaker for External Dependencies

var policy = Policy
    .Handle<HttpRequestException>()
    .CircuitBreakerAsync(
        handledEventsAllowedBeforeBreaking: 3,
        durationOfBreak: TimeSpan.FromMinutes(1),
        onBreak: (exception, duration) =>
        {
            _logger.LogWarning("Circuit breaker opened for {Duration}", duration);
        },
        onReset: () =>
        {
            _logger.LogInformation("Circuit breaker reset");
        });

Performance Considerations

Batch Processing

public class BatchMessageHandler
{
    private const int BatchSize = 100;
    
    public async Task ProcessBatchAsync()
    {
        var messages = await _receiver.ReceiveMessagesAsync(BatchSize);
        
        // Process in parallel
        var tasks = messages.Select(async message =>
        {
            try
            {
                await ProcessMessageAsync(message);
                await _receiver.CompleteMessageAsync(message);
            }
            catch (Exception ex)
            {
                await _receiver.AbandonMessageAsync(message);
            }
        });
        
        await Task.WhenAll(tasks);
    }
}

Message Compression

public async Task SendCompressedMessageAsync<T>(T data)
{
    var json = JsonSerializer.SerializeToUtf8Bytes(data);
    
    using var memoryStream = new MemoryStream();
    using (var gzipStream = new GZipStream(memoryStream, CompressionMode.Compress))
    {
        await gzipStream.WriteAsync(json);
    }
    
    var message = new ServiceBusMessage(memoryStream.ToArray())
    {
        ContentType = "application/json+gzip"
    };
    
    await _sender.SendMessageAsync(message);
}

Key Takeaways

Event-driven architecture enables building resilient, scalable systems, but it comes with complexity:

  • Design for idempotency - messages may be delivered multiple times
  • Monitor everything - distributed systems are hard to debug
  • Handle failures gracefully - use dead letter queues and compensating transactions
  • Start simple - don't over-engineer, add complexity as needed
  • Document your events - maintain a schema registry

The investment in EDA pays off when your system needs to scale horizontally and handle millions of events per day.


Questions about event-driven architecture? Let's discuss!