Microservices Communication Patterns in .NET
•
.NETMicroservicesArchitecturegRPCMessage Queues
Comprehensive guide to implementing effective communication patterns between microservices using .NET, covering synchronous HTTP, asynchronous messaging, and gRPC.
Microservices Communication Patterns in .NET
Building distributed systems requires careful consideration of how services communicate. Here's what I've learned implementing microservices at scale.
Communication Styles Overview
Synchronous (Request-Response)
- REST APIs (HTTP/HTTPS)
- gRPC
- GraphQL
When to use: Real-time queries, user-facing operations
Asynchronous (Event-Driven)
- Message queues (RabbitMQ, Azure Service Bus)
- Event streams (Kafka, Event Hubs)
When to use: Background processing, eventual consistency, high-throughput scenarios
Pattern 1: REST APIs with HTTP Client
Basic Implementation
public class ProductService
{
private readonly HttpClient _httpClient;
public ProductService(HttpClient httpClient)
{
_httpClient = httpClient;
}
public async Task<Product> GetProductAsync(int id)
{
var response = await _httpClient.GetAsync($"/api/products/{id}");
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<Product>();
}
}
// Registration with Polly for resilience
builder.Services.AddHttpClient<ProductService>(client =>
{
client.BaseAddress = new Uri("https://product-service");
client.Timeout = TimeSpan.FromSeconds(30);
})
.AddPolicyHandler(GetRetryPolicy())
.AddPolicyHandler(GetCircuitBreakerPolicy());
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (outcome, timespan, retryCount, context) =>
{
Log.Warning($"Retry {retryCount} after {timespan.TotalSeconds}s");
});
}
Service Discovery with Consul
public class ServiceDiscoveryHttpClientHandler : DelegatingHandler
{
private readonly IConsulClient _consulClient;
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
var serviceName = request.RequestUri.Host;
var serviceEntry = await DiscoverServiceAsync(serviceName);
var uriBuilder = new UriBuilder(request.RequestUri)
{
Host = serviceEntry.Address,
Port = serviceEntry.Port
};
request.RequestUri = uriBuilder.Uri;
return await base.SendAsync(request, cancellationToken);
}
private async Task<ServiceEntry> DiscoverServiceAsync(string serviceName)
{
var services = await _consulClient.Health.Service(
serviceName,
tag: "",
passingOnly: true);
var service = services.Response
.OrderBy(_ => Guid.NewGuid())
.FirstOrDefault()?.Service;
return service ?? throw new ServiceNotFoundException(serviceName);
}
}
Pattern 2: gRPC for Internal Service Communication
Why gRPC?
- Performance: Binary protocol, smaller payloads
- Type safety: Contract-first with .proto files
- Streaming: Bidirectional streaming support
- Language agnostic: Works across different tech stacks
Proto Definition
syntax = "proto3";
option csharp_namespace = "OrderService.Grpc";
service OrderService {
rpc GetOrder (GetOrderRequest) returns (OrderResponse);
rpc CreateOrder (CreateOrderRequest) returns (OrderResponse);
rpc StreamOrders (StreamOrdersRequest) returns (stream OrderResponse);
}
message GetOrderRequest {
string order_id = 1;
}
message CreateOrderRequest {
string customer_id = 1;
repeated OrderItem items = 2;
}
message OrderResponse {
string order_id = 1;
string customer_id = 2;
double total_amount = 3;
string status = 4;
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}
gRPC Server Implementation
public class OrderGrpcService : OrderService.OrderServiceBase
{
private readonly IOrderRepository _repository;
public override async Task<OrderResponse> GetOrder(
GetOrderRequest request,
ServerCallContext context)
{
var order = await _repository.GetByIdAsync(request.OrderId);
if (order == null)
{
throw new RpcException(new Status(
StatusCode.NotFound,
$"Order {request.OrderId} not found"));
}
return new OrderResponse
{
OrderId = order.Id.ToString(),
CustomerId = order.CustomerId.ToString(),
TotalAmount = (double)order.Total,
Status = order.Status.ToString()
};
}
public override async Task StreamOrders(
StreamOrdersRequest request,
IServerStreamWriter<OrderResponse> responseStream,
ServerCallContext context)
{
await foreach (var order in _repository.GetOrdersStreamAsync(
request.CustomerId,
context.CancellationToken))
{
await responseStream.WriteAsync(new OrderResponse
{
OrderId = order.Id.ToString(),
TotalAmount = (double)order.Total
});
}
}
}
// Startup registration
builder.Services.AddGrpc(options =>
{
options.MaxReceiveMessageSize = 16 * 1024 * 1024; // 16 MB
options.EnableDetailedErrors = builder.Environment.IsDevelopment();
});
app.MapGrpcService<OrderGrpcService>();
gRPC Client
public class OrderClient
{
private readonly OrderService.OrderServiceClient _client;
public OrderClient(OrderService.OrderServiceClient client)
{
_client = client;
}
public async Task<OrderResponse> GetOrderAsync(string orderId)
{
try
{
return await _client.GetOrderAsync(
new GetOrderRequest { OrderId = orderId });
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound)
{
return null;
}
}
public async Task ProcessOrderStreamAsync(string customerId)
{
using var call = _client.StreamOrders(
new StreamOrdersRequest { CustomerId = customerId });
await foreach (var order in call.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"Received order: {order.OrderId}");
}
}
}
// Client registration
builder.Services.AddGrpcClient<OrderService.OrderServiceClient>(options =>
{
options.Address = new Uri("https://order-service:5001");
})
.ConfigurePrimaryHttpMessageHandler(() => new SocketsHttpHandler
{
PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
KeepAlivePingDelay = TimeSpan.FromSeconds(60),
KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
EnableMultipleHttp2Connections = true
});
Pattern 3: Asynchronous Messaging
Azure Service Bus Implementation
// Publisher
public class OrderPlacedPublisher
{
private readonly ServiceBusSender _sender;
public async Task PublishOrderPlacedAsync(Order order)
{
var orderPlaced = new OrderPlacedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Items = order.Items.Select(i => new OrderItemDto
{
ProductId = i.ProductId,
Quantity = i.Quantity,
Price = i.Price
}).ToList(),
Total = order.Total,
PlacedAt = DateTime.UtcNow
};
var message = new ServiceBusMessage
{
MessageId = Guid.NewGuid().ToString(),
Subject = "order.placed",
ContentType = "application/json",
Body = BinaryData.FromObjectAsJson(orderPlaced),
ApplicationProperties =
{
["CustomerId"] = order.CustomerId.ToString(),
["OrderTotal"] = order.Total
}
};
// Set message properties for routing
await _sender.SendMessageAsync(message);
}
}
// Subscriber
public class InventoryService : BackgroundService
{
private readonly ServiceBusProcessor _processor;
private readonly IInventoryManager _inventoryManager;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += MessageHandler;
_processor.ProcessErrorAsync += ErrorHandler;
await _processor.StartProcessingAsync(stoppingToken);
}
private async Task MessageHandler(ProcessMessageEventArgs args)
{
var orderPlaced = args.Message.Body.ToObjectFromJson<OrderPlacedEvent>();
try
{
// Reserve inventory for order
await _inventoryManager.ReserveAsync(orderPlaced);
// Complete message
await args.CompleteMessageAsync(args.Message);
}
catch (InsufficientInventoryException ex)
{
// Dead letter message
await args.DeadLetterMessageAsync(
args.Message,
"InsufficientInventory",
ex.Message);
}
catch (Exception ex)
{
// Abandon message for retry
await args.AbandonMessageAsync(args.Message);
}
}
private Task ErrorHandler(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception,
"Error processing message in {EntityPath}",
args.EntityPath);
return Task.CompletedTask;
}
}
Pattern 4: API Gateway
Using Ocelot
// ocelot.json
{
"Routes": [
{
"DownstreamPathTemplate": "/api/products/{id}",
"DownstreamScheme": "https",
"DownstreamHostAndPorts": [
{
"Host": "product-service",
"Port": 443
}
],
"UpstreamPathTemplate": "/products/{id}",
"UpstreamHttpMethod": [ "GET" ],
"AuthenticationOptions": {
"AuthenticationProviderKey": "Bearer",
"AllowedScopes": []
},
"RateLimitOptions": {
"EnableRateLimiting": true,
"Period": "1s",
"PeriodTimespan": 1,
"Limit": 10
}
}
],
"GlobalConfiguration": {
"RequestIdKey": "OcRequestId",
"AdministrationPath": "/administration"
}
}
// Program.cs
builder.Configuration.AddJsonFile("ocelot.json");
builder.Services.AddOcelot()
.AddCacheManager(x => x.WithDictionaryHandle());
app.UseOcelot().Wait();
Pattern 5: Saga Pattern for Distributed Transactions
public class OrderSagaOrchestrator
{
private readonly IServiceBus _serviceBus;
private readonly ISagaRepository _sagaRepository;
public async Task CreateOrderAsync(CreateOrderCommand command)
{
var saga = new OrderSaga
{
SagaId = Guid.NewGuid(),
OrderId = command.OrderId,
Status = SagaStatus.Started
};
await _sagaRepository.SaveAsync(saga);
try
{
// Step 1: Reserve Inventory
await _serviceBus.SendAsync(new ReserveInventoryCommand
{
SagaId = saga.SagaId,
OrderId = command.OrderId,
Items = command.Items
});
saga.Status = SagaStatus.InventoryReserved;
await _sagaRepository.UpdateAsync(saga);
// Step 2: Process Payment
await _serviceBus.SendAsync(new ProcessPaymentCommand
{
SagaId = saga.SagaId,
OrderId = command.OrderId,
Amount = command.TotalAmount
});
saga.Status = SagaStatus.PaymentProcessed;
await _sagaRepository.UpdateAsync(saga);
// Step 3: Confirm Order
await _serviceBus.SendAsync(new ConfirmOrderCommand
{
SagaId = saga.SagaId,
OrderId = command.OrderId
});
saga.Status = SagaStatus.Completed;
await _sagaRepository.UpdateAsync(saga);
}
catch (Exception ex)
{
saga.Status = SagaStatus.Failed;
await _sagaRepository.UpdateAsync(saga);
// Trigger compensating transactions
await CompensateAsync(saga);
throw;
}
}
private async Task CompensateAsync(OrderSaga saga)
{
// Rollback in reverse order
if (saga.Status >= SagaStatus.PaymentProcessed)
{
await _serviceBus.SendAsync(new RefundPaymentCommand
{
SagaId = saga.SagaId
});
}
if (saga.Status >= SagaStatus.InventoryReserved)
{
await _serviceBus.SendAsync(new ReleaseInventoryCommand
{
SagaId = saga.SagaId
});
}
}
}
Monitoring and Observability
Distributed Tracing with OpenTelemetry
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddGrpcClientInstrumentation()
.AddSource("OrderService")
.AddJaegerExporter(options =>
{
options.AgentHost = "jaeger";
options.AgentPort = 6831;
}));
// Usage in code
using var activity = ActivitySource.StartActivity("ProcessOrder");
activity?.SetTag("order.id", orderId);
activity?.SetTag("customer.id", customerId);
try
{
await ProcessOrderAsync(orderId);
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
Best Practices Checklist
- ✅ Use circuit breakers for external calls
- ✅ Implement timeouts on all service calls
- ✅ Version your APIs (URL or header-based)
- ✅ Use correlation IDs for request tracing
- ✅ Implement health checks for each service
- ✅ Monitor message queue depths and processing times
- ✅ Design for idempotency in async handlers
- ✅ Use service discovery for dynamic environments
- ✅ Implement rate limiting to prevent cascade failures
- ✅ Log structured data for better observability
Common Pitfalls
1. Chatty Communication
// ❌ Bad: Multiple sequential calls
var customer = await _customerService.GetAsync(customerId);
var orders = await _orderService.GetByCustomerAsync(customerId);
var products = await _productService.GetByIdsAsync(productIds);
// ✅ Good: Batch or aggregate data
var customerData = await _customerService.GetCustomerWithOrdersAsync(customerId);
2. No Fallback Strategy
// ✅ Always have a fallback
public async Task<Product> GetProductAsync(int id)
{
try
{
return await _productService.GetAsync(id);
}
catch (HttpRequestException)
{
// Return cached data or default
return await _cache.GetOrDefaultAsync(id);
}
}
3. Ignoring Message Ordering
// Use session-enabled queues for ordered processing
await _sender.SendMessageAsync(new ServiceBusMessage
{
Body = BinaryData.FromObjectAsJson(orderEvent),
SessionId = orderId.ToString() // Ensures order
});
Key Takeaways
- Choose sync for real-time needs, async for scalability
- gRPC excels for internal service-to-service communication
- Message queues provide resilience and decoupling
- Always implement retry logic and circuit breakers
- Monitor everything - distributed systems are complex
Building microservices is about finding the right balance between consistency, availability, and partition tolerance (CAP theorem). There's no one-size-fits-all solution.
Questions about microservices architecture? Let's connect!