Event-Driven Architecture: Building Real-Time Systems with .NET and Azure
Learn how to design and implement event-driven systems using Azure Service Bus, Event Grid, and .NET to build scalable, loosely-coupled real-time applications.
Event-Driven Architecture: Building Real-Time Systems
Event-driven architecture (EDA) has become essential for building modern, scalable applications. In my experience developing distributed systems, I've found that EDA enables loose coupling, better scalability, and improved resilience compared to traditional request-response patterns.
Why Event-Driven Architecture?
Traditional monolithic architectures struggle with:
- Tight coupling between components
- Synchronous dependencies that create bottlenecks
- Scaling challenges when parts of the system have different load patterns
- Failure cascades when one service goes down
Event-driven systems solve these problems by allowing services to communicate through events rather than direct calls.
Core Concepts
Events vs Messages
Events are facts about something that happened:
public class OrderPlacedEvent
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public DateTime PlacedAt { get; set; }
}
Commands are requests to do something:
public class ProcessPaymentCommand
{
public Guid OrderId { get; set; }
public decimal Amount { get; set; }
public string PaymentMethod { get; set; }
}
Pub/Sub Pattern with Azure Service Bus
Azure Service Bus Topics provide a powerful pub/sub mechanism:
// Publisher
public class OrderService
{
private readonly ServiceBusSender _sender;
public async Task PlaceOrderAsync(Order order)
{
// Business logic to save order
await _orderRepository.SaveAsync(order);
// Publish event
var orderPlaced = new OrderPlacedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.Total,
PlacedAt = DateTime.UtcNow
};
var message = new ServiceBusMessage(
JsonSerializer.SerializeToUtf8Bytes(orderPlaced))
{
MessageId = Guid.NewGuid().ToString(),
ContentType = "application/json",
Subject = "order.placed"
};
await _sender.SendMessageAsync(message);
}
}
// Subscriber - Inventory Service
public class OrderPlacedHandler : IHostedService
{
private readonly ServiceBusProcessor _processor;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += MessageHandler;
_processor.ProcessErrorAsync += ErrorHandler;
await _processor.StartProcessingAsync(stoppingToken);
}
private async Task MessageHandler(ProcessMessageEventArgs args)
{
var orderPlaced = JsonSerializer.Deserialize<OrderPlacedEvent>(
args.Message.Body);
// Reserve inventory
await _inventoryService.ReserveStockAsync(orderPlaced.OrderId);
// Complete the message
await args.CompleteMessageAsync(args.Message);
}
}
Real-Time Processing Patterns
Pattern 1: Event Sourcing
Instead of storing current state, store events that led to that state:
public class Order
{
private readonly List<IEvent> _events = new();
public void PlaceOrder(Guid customerId, List<OrderItem> items)
{
var orderPlaced = new OrderPlacedEvent
{
OrderId = Guid.NewGuid(),
CustomerId = customerId,
Items = items,
PlacedAt = DateTime.UtcNow
};
Apply(orderPlaced);
_events.Add(orderPlaced);
}
public void ApprovePayment(string transactionId)
{
var paymentApproved = new PaymentApprovedEvent
{
OrderId = this.Id,
TransactionId = transactionId,
ApprovedAt = DateTime.UtcNow
};
Apply(paymentApproved);
_events.Add(paymentApproved);
}
private void Apply(OrderPlacedEvent e)
{
Id = e.OrderId;
CustomerId = e.CustomerId;
Status = OrderStatus.Pending;
}
private void Apply(PaymentApprovedEvent e)
{
Status = OrderStatus.Confirmed;
TransactionId = e.TransactionId;
}
}
Pattern 2: CQRS (Command Query Responsibility Segregation)
Separate read and write models for better scalability:
// Write Model - Commands
public class CreateOrderCommandHandler
{
private readonly IOrderRepository _repository;
private readonly IEventPublisher _publisher;
public async Task<Guid> Handle(CreateOrderCommand command)
{
var order = new Order(command.CustomerId, command.Items);
await _repository.SaveAsync(order);
// Publish events to update read model
await _publisher.PublishAsync(order.GetUncommittedEvents());
return order.Id;
}
}
// Read Model - Queries
public class OrderReadModel
{
public Guid Id { get; set; }
public string CustomerName { get; set; }
public decimal Total { get; set; }
public string Status { get; set; }
public DateTime PlacedAt { get; set; }
}
public class GetOrderQueryHandler
{
private readonly IReadOnlyRepository<OrderReadModel> _readRepository;
public async Task<OrderReadModel> Handle(GetOrderQuery query)
{
// Fast read from optimized read model
return await _readRepository.GetByIdAsync(query.OrderId);
}
}
Implementing Saga Pattern for Distributed Transactions
When you need to coordinate multiple services:
public class OrderSaga
{
private readonly IServiceBusClient _serviceBus;
public async Task ProcessOrderAsync(Order order)
{
var sagaId = Guid.NewGuid();
try
{
// Step 1: Reserve Inventory
await _serviceBus.SendAsync(new ReserveInventoryCommand
{
SagaId = sagaId,
OrderId = order.Id,
Items = order.Items
});
// Step 2: Process Payment
await _serviceBus.SendAsync(new ProcessPaymentCommand
{
SagaId = sagaId,
OrderId = order.Id,
Amount = order.Total
});
// Step 3: Ship Order
await _serviceBus.SendAsync(new ShipOrderCommand
{
SagaId = sagaId,
OrderId = order.Id
});
}
catch (Exception ex)
{
// Compensating transactions
await CompensateAsync(sagaId, order);
}
}
private async Task CompensateAsync(Guid sagaId, Order order)
{
// Rollback in reverse order
await _serviceBus.SendAsync(new CancelShipmentCommand { SagaId = sagaId });
await _serviceBus.SendAsync(new RefundPaymentCommand { SagaId = sagaId });
await _serviceBus.SendAsync(new ReleaseInventoryCommand { SagaId = sagaId });
}
}
Azure Event Grid for Serverless Events
For lightweight, serverless event processing:
// Publishing to Event Grid
public class BlobStorageEventPublisher
{
private readonly EventGridPublisherClient _client;
public async Task PublishFileUploadedAsync(string fileName, string blobUrl)
{
var cloudEvent = new CloudEvent(
source: "/storage/uploads",
type: "Storage.BlobUploaded",
jsonSerializableData: new
{
FileName = fileName,
BlobUrl = blobUrl,
UploadedAt = DateTime.UtcNow
});
await _client.SendEventAsync(cloudEvent);
}
}
// Azure Function subscriber
public class FileUploadedHandler
{
[FunctionName("ProcessUploadedFile")]
public async Task Run(
[EventGridTrigger] EventGridEvent eventGridEvent,
ILogger log)
{
var fileData = JsonSerializer.Deserialize<FileUploadedData>(
eventGridEvent.Data.ToString());
log.LogInformation($"Processing file: {fileData.FileName}");
// Process the file (scan, resize, analyze, etc.)
await _fileProcessor.ProcessAsync(fileData.BlobUrl);
}
}
Monitoring and Observability
Critical for event-driven systems:
public class ObservableMessageHandler
{
private readonly ILogger _logger;
private readonly IMetrics _metrics;
public async Task HandleAsync(ServiceBusReceivedMessage message)
{
var stopwatch = Stopwatch.StartNew();
try
{
await ProcessMessageAsync(message);
_metrics.IncrementCounter("messages.processed", new Dictionary<string, object>
{
["subject"] = message.Subject,
["status"] = "success"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process message {MessageId}", message.MessageId);
_metrics.IncrementCounter("messages.processed", new Dictionary<string, object>
{
["subject"] = message.Subject,
["status"] = "error"
});
throw;
}
finally
{
stopwatch.Stop();
_metrics.RecordValue("message.processing.duration",
stopwatch.ElapsedMilliseconds,
new Dictionary<string, object> { ["subject"] = message.Subject });
}
}
}
Best Practices from Production
1. Idempotency is Critical
Always design handlers to be idempotent:
public class PaymentHandler
{
public async Task HandleAsync(ProcessPaymentCommand command)
{
// Check if already processed
var existing = await _paymentRepository.GetByOrderIdAsync(command.OrderId);
if (existing != null)
{
_logger.LogInformation("Payment already processed for order {OrderId}",
command.OrderId);
return; // Idempotent - safe to call multiple times
}
// Process payment
var payment = await _paymentGateway.ChargeAsync(command);
await _paymentRepository.SaveAsync(payment);
}
}
2. Dead Letter Queue Handling
public async Task ProcessDeadLetterQueueAsync()
{
var receiver = _serviceBusClient.CreateReceiver(
"orders-topic",
"inventory-subscription/$deadletterqueue");
var messages = await receiver.ReceiveMessagesAsync(maxMessages: 10);
foreach (var message in messages)
{
_logger.LogWarning(
"Dead letter message: {MessageId}, Reason: {Reason}, Description: {Description}",
message.MessageId,
message.DeadLetterReason,
message.DeadLetterErrorDescription);
// Analyze and potentially retry or alert
await AnalyzeAndHandleAsync(message);
}
}
3. Circuit Breaker for External Dependencies
var policy = Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 3,
durationOfBreak: TimeSpan.FromMinutes(1),
onBreak: (exception, duration) =>
{
_logger.LogWarning("Circuit breaker opened for {Duration}", duration);
},
onReset: () =>
{
_logger.LogInformation("Circuit breaker reset");
});
Performance Considerations
Batch Processing
public class BatchMessageHandler
{
private const int BatchSize = 100;
public async Task ProcessBatchAsync()
{
var messages = await _receiver.ReceiveMessagesAsync(BatchSize);
// Process in parallel
var tasks = messages.Select(async message =>
{
try
{
await ProcessMessageAsync(message);
await _receiver.CompleteMessageAsync(message);
}
catch (Exception ex)
{
await _receiver.AbandonMessageAsync(message);
}
});
await Task.WhenAll(tasks);
}
}
Message Compression
public async Task SendCompressedMessageAsync<T>(T data)
{
var json = JsonSerializer.SerializeToUtf8Bytes(data);
using var memoryStream = new MemoryStream();
using (var gzipStream = new GZipStream(memoryStream, CompressionMode.Compress))
{
await gzipStream.WriteAsync(json);
}
var message = new ServiceBusMessage(memoryStream.ToArray())
{
ContentType = "application/json+gzip"
};
await _sender.SendMessageAsync(message);
}
Key Takeaways
Event-driven architecture enables building resilient, scalable systems, but it comes with complexity:
- Design for idempotency - messages may be delivered multiple times
- Monitor everything - distributed systems are hard to debug
- Handle failures gracefully - use dead letter queues and compensating transactions
- Start simple - don't over-engineer, add complexity as needed
- Document your events - maintain a schema registry
The investment in EDA pays off when your system needs to scale horizontally and handle millions of events per day.
Questions about event-driven architecture? Let's discuss!