Back to Deep Dives

Microservices Communication Patterns in .NET

.NETMicroservicesArchitecturegRPCMessage Queues

Comprehensive guide to implementing effective communication patterns between microservices using .NET, covering synchronous HTTP, asynchronous messaging, and gRPC.

Microservices Communication Patterns in .NET

Building distributed systems requires careful consideration of how services communicate. Here's what I've learned implementing microservices at scale.

Communication Styles Overview

Synchronous (Request-Response)

  • REST APIs (HTTP/HTTPS)
  • gRPC
  • GraphQL

When to use: Real-time queries, user-facing operations

Asynchronous (Event-Driven)

  • Message queues (RabbitMQ, Azure Service Bus)
  • Event streams (Kafka, Event Hubs)

When to use: Background processing, eventual consistency, high-throughput scenarios

Pattern 1: REST APIs with HTTP Client

Basic Implementation

public class ProductService
{
    private readonly HttpClient _httpClient;
    
    public ProductService(HttpClient httpClient)
    {
        _httpClient = httpClient;
    }
    
    public async Task<Product> GetProductAsync(int id)
    {
        var response = await _httpClient.GetAsync($"/api/products/{id}");
        response.EnsureSuccessStatusCode();
        
        return await response.Content.ReadFromJsonAsync<Product>();
    }
}

// Registration with Polly for resilience
builder.Services.AddHttpClient<ProductService>(client =>
{
    client.BaseAddress = new Uri("https://product-service");
    client.Timeout = TimeSpan.FromSeconds(30);
})
.AddPolicyHandler(GetRetryPolicy())
.AddPolicyHandler(GetCircuitBreakerPolicy());

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .WaitAndRetryAsync(
            retryCount: 3,
            sleepDurationProvider: retryAttempt => 
                TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
            onRetry: (outcome, timespan, retryCount, context) =>
            {
                Log.Warning($"Retry {retryCount} after {timespan.TotalSeconds}s");
            });
}

Service Discovery with Consul

public class ServiceDiscoveryHttpClientHandler : DelegatingHandler
{
    private readonly IConsulClient _consulClient;
    
    protected override async Task<HttpResponseMessage> SendAsync(
        HttpRequestMessage request,
        CancellationToken cancellationToken)
    {
        var serviceName = request.RequestUri.Host;
        var serviceEntry = await DiscoverServiceAsync(serviceName);
        
        var uriBuilder = new UriBuilder(request.RequestUri)
        {
            Host = serviceEntry.Address,
            Port = serviceEntry.Port
        };
        request.RequestUri = uriBuilder.Uri;
        
        return await base.SendAsync(request, cancellationToken);
    }
    
    private async Task<ServiceEntry> DiscoverServiceAsync(string serviceName)
    {
        var services = await _consulClient.Health.Service(
            serviceName, 
            tag: "", 
            passingOnly: true);
        
        var service = services.Response
            .OrderBy(_ => Guid.NewGuid())
            .FirstOrDefault()?.Service;
            
        return service ?? throw new ServiceNotFoundException(serviceName);
    }
}

Pattern 2: gRPC for Internal Service Communication

Why gRPC?

  • Performance: Binary protocol, smaller payloads
  • Type safety: Contract-first with .proto files
  • Streaming: Bidirectional streaming support
  • Language agnostic: Works across different tech stacks

Proto Definition

syntax = "proto3";

option csharp_namespace = "OrderService.Grpc";

service OrderService {
  rpc GetOrder (GetOrderRequest) returns (OrderResponse);
  rpc CreateOrder (CreateOrderRequest) returns (OrderResponse);
  rpc StreamOrders (StreamOrdersRequest) returns (stream OrderResponse);
}

message GetOrderRequest {
  string order_id = 1;
}

message CreateOrderRequest {
  string customer_id = 1;
  repeated OrderItem items = 2;
}

message OrderResponse {
  string order_id = 1;
  string customer_id = 2;
  double total_amount = 3;
  string status = 4;
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double price = 3;
}

gRPC Server Implementation

public class OrderGrpcService : OrderService.OrderServiceBase
{
    private readonly IOrderRepository _repository;
    
    public override async Task<OrderResponse> GetOrder(
        GetOrderRequest request, 
        ServerCallContext context)
    {
        var order = await _repository.GetByIdAsync(request.OrderId);
        
        if (order == null)
        {
            throw new RpcException(new Status(
                StatusCode.NotFound, 
                $"Order {request.OrderId} not found"));
        }
        
        return new OrderResponse
        {
            OrderId = order.Id.ToString(),
            CustomerId = order.CustomerId.ToString(),
            TotalAmount = (double)order.Total,
            Status = order.Status.ToString()
        };
    }
    
    public override async Task StreamOrders(
        StreamOrdersRequest request,
        IServerStreamWriter<OrderResponse> responseStream,
        ServerCallContext context)
    {
        await foreach (var order in _repository.GetOrdersStreamAsync(
            request.CustomerId,
            context.CancellationToken))
        {
            await responseStream.WriteAsync(new OrderResponse
            {
                OrderId = order.Id.ToString(),
                TotalAmount = (double)order.Total
            });
        }
    }
}

// Startup registration
builder.Services.AddGrpc(options =>
{
    options.MaxReceiveMessageSize = 16 * 1024 * 1024; // 16 MB
    options.EnableDetailedErrors = builder.Environment.IsDevelopment();
});

app.MapGrpcService<OrderGrpcService>();

gRPC Client

public class OrderClient
{
    private readonly OrderService.OrderServiceClient _client;
    
    public OrderClient(OrderService.OrderServiceClient client)
    {
        _client = client;
    }
    
    public async Task<OrderResponse> GetOrderAsync(string orderId)
    {
        try
        {
            return await _client.GetOrderAsync(
                new GetOrderRequest { OrderId = orderId });
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound)
        {
            return null;
        }
    }
    
    public async Task ProcessOrderStreamAsync(string customerId)
    {
        using var call = _client.StreamOrders(
            new StreamOrdersRequest { CustomerId = customerId });
        
        await foreach (var order in call.ResponseStream.ReadAllAsync())
        {
            Console.WriteLine($"Received order: {order.OrderId}");
        }
    }
}

// Client registration
builder.Services.AddGrpcClient<OrderService.OrderServiceClient>(options =>
{
    options.Address = new Uri("https://order-service:5001");
})
.ConfigurePrimaryHttpMessageHandler(() => new SocketsHttpHandler
{
    PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
    KeepAlivePingDelay = TimeSpan.FromSeconds(60),
    KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
    EnableMultipleHttp2Connections = true
});

Pattern 3: Asynchronous Messaging

Azure Service Bus Implementation

// Publisher
public class OrderPlacedPublisher
{
    private readonly ServiceBusSender _sender;
    
    public async Task PublishOrderPlacedAsync(Order order)
    {
        var orderPlaced = new OrderPlacedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            Items = order.Items.Select(i => new OrderItemDto
            {
                ProductId = i.ProductId,
                Quantity = i.Quantity,
                Price = i.Price
            }).ToList(),
            Total = order.Total,
            PlacedAt = DateTime.UtcNow
        };
        
        var message = new ServiceBusMessage
        {
            MessageId = Guid.NewGuid().ToString(),
            Subject = "order.placed",
            ContentType = "application/json",
            Body = BinaryData.FromObjectAsJson(orderPlaced),
            ApplicationProperties =
            {
                ["CustomerId"] = order.CustomerId.ToString(),
                ["OrderTotal"] = order.Total
            }
        };
        
        // Set message properties for routing
        await _sender.SendMessageAsync(message);
    }
}

// Subscriber
public class InventoryService : BackgroundService
{
    private readonly ServiceBusProcessor _processor;
    private readonly IInventoryManager _inventoryManager;
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _processor.ProcessMessageAsync += MessageHandler;
        _processor.ProcessErrorAsync += ErrorHandler;
        
        await _processor.StartProcessingAsync(stoppingToken);
    }
    
    private async Task MessageHandler(ProcessMessageEventArgs args)
    {
        var orderPlaced = args.Message.Body.ToObjectFromJson<OrderPlacedEvent>();
        
        try
        {
            // Reserve inventory for order
            await _inventoryManager.ReserveAsync(orderPlaced);
            
            // Complete message
            await args.CompleteMessageAsync(args.Message);
        }
        catch (InsufficientInventoryException ex)
        {
            // Dead letter message
            await args.DeadLetterMessageAsync(
                args.Message,
                "InsufficientInventory",
                ex.Message);
        }
        catch (Exception ex)
        {
            // Abandon message for retry
            await args.AbandonMessageAsync(args.Message);
        }
    }
    
    private Task ErrorHandler(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception, 
            "Error processing message in {EntityPath}", 
            args.EntityPath);
        return Task.CompletedTask;
    }
}

Pattern 4: API Gateway

Using Ocelot

// ocelot.json
{
  "Routes": [
    {
      "DownstreamPathTemplate": "/api/products/{id}",
      "DownstreamScheme": "https",
      "DownstreamHostAndPorts": [
        {
          "Host": "product-service",
          "Port": 443
        }
      ],
      "UpstreamPathTemplate": "/products/{id}",
      "UpstreamHttpMethod": [ "GET" ],
      "AuthenticationOptions": {
        "AuthenticationProviderKey": "Bearer",
        "AllowedScopes": []
      },
      "RateLimitOptions": {
        "EnableRateLimiting": true,
        "Period": "1s",
        "PeriodTimespan": 1,
        "Limit": 10
      }
    }
  ],
  "GlobalConfiguration": {
    "RequestIdKey": "OcRequestId",
    "AdministrationPath": "/administration"
  }
}
// Program.cs
builder.Configuration.AddJsonFile("ocelot.json");
builder.Services.AddOcelot()
    .AddCacheManager(x => x.WithDictionaryHandle());

app.UseOcelot().Wait();

Pattern 5: Saga Pattern for Distributed Transactions

public class OrderSagaOrchestrator
{
    private readonly IServiceBus _serviceBus;
    private readonly ISagaRepository _sagaRepository;
    
    public async Task CreateOrderAsync(CreateOrderCommand command)
    {
        var saga = new OrderSaga
        {
            SagaId = Guid.NewGuid(),
            OrderId = command.OrderId,
            Status = SagaStatus.Started
        };
        
        await _sagaRepository.SaveAsync(saga);
        
        try
        {
            // Step 1: Reserve Inventory
            await _serviceBus.SendAsync(new ReserveInventoryCommand
            {
                SagaId = saga.SagaId,
                OrderId = command.OrderId,
                Items = command.Items
            });
            
            saga.Status = SagaStatus.InventoryReserved;
            await _sagaRepository.UpdateAsync(saga);
            
            // Step 2: Process Payment
            await _serviceBus.SendAsync(new ProcessPaymentCommand
            {
                SagaId = saga.SagaId,
                OrderId = command.OrderId,
                Amount = command.TotalAmount
            });
            
            saga.Status = SagaStatus.PaymentProcessed;
            await _sagaRepository.UpdateAsync(saga);
            
            // Step 3: Confirm Order
            await _serviceBus.SendAsync(new ConfirmOrderCommand
            {
                SagaId = saga.SagaId,
                OrderId = command.OrderId
            });
            
            saga.Status = SagaStatus.Completed;
            await _sagaRepository.UpdateAsync(saga);
        }
        catch (Exception ex)
        {
            saga.Status = SagaStatus.Failed;
            await _sagaRepository.UpdateAsync(saga);
            
            // Trigger compensating transactions
            await CompensateAsync(saga);
            throw;
        }
    }
    
    private async Task CompensateAsync(OrderSaga saga)
    {
        // Rollback in reverse order
        if (saga.Status >= SagaStatus.PaymentProcessed)
        {
            await _serviceBus.SendAsync(new RefundPaymentCommand 
            { 
                SagaId = saga.SagaId 
            });
        }
        
        if (saga.Status >= SagaStatus.InventoryReserved)
        {
            await _serviceBus.SendAsync(new ReleaseInventoryCommand 
            { 
                SagaId = saga.SagaId 
            });
        }
    }
}

Monitoring and Observability

Distributed Tracing with OpenTelemetry

builder.Services.AddOpenTelemetry()
    .WithTracing(tracing => tracing
        .AddAspNetCoreInstrumentation()
        .AddHttpClientInstrumentation()
        .AddGrpcClientInstrumentation()
        .AddSource("OrderService")
        .AddJaegerExporter(options =>
        {
            options.AgentHost = "jaeger";
            options.AgentPort = 6831;
        }));

// Usage in code
using var activity = ActivitySource.StartActivity("ProcessOrder");
activity?.SetTag("order.id", orderId);
activity?.SetTag("customer.id", customerId);

try
{
    await ProcessOrderAsync(orderId);
    activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
    activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
    throw;
}

Best Practices Checklist

  • Use circuit breakers for external calls
  • Implement timeouts on all service calls
  • Version your APIs (URL or header-based)
  • Use correlation IDs for request tracing
  • Implement health checks for each service
  • Monitor message queue depths and processing times
  • Design for idempotency in async handlers
  • Use service discovery for dynamic environments
  • Implement rate limiting to prevent cascade failures
  • Log structured data for better observability

Common Pitfalls

1. Chatty Communication

// ❌ Bad: Multiple sequential calls
var customer = await _customerService.GetAsync(customerId);
var orders = await _orderService.GetByCustomerAsync(customerId);
var products = await _productService.GetByIdsAsync(productIds);

// ✅ Good: Batch or aggregate data
var customerData = await _customerService.GetCustomerWithOrdersAsync(customerId);

2. No Fallback Strategy

// ✅ Always have a fallback
public async Task<Product> GetProductAsync(int id)
{
    try
    {
        return await _productService.GetAsync(id);
    }
    catch (HttpRequestException)
    {
        // Return cached data or default
        return await _cache.GetOrDefaultAsync(id);
    }
}

3. Ignoring Message Ordering

// Use session-enabled queues for ordered processing
await _sender.SendMessageAsync(new ServiceBusMessage
{
    Body = BinaryData.FromObjectAsJson(orderEvent),
    SessionId = orderId.ToString() // Ensures order
});

Key Takeaways

  • Choose sync for real-time needs, async for scalability
  • gRPC excels for internal service-to-service communication
  • Message queues provide resilience and decoupling
  • Always implement retry logic and circuit breakers
  • Monitor everything - distributed systems are complex

Building microservices is about finding the right balance between consistency, availability, and partition tolerance (CAP theorem). There's no one-size-fits-all solution.


Questions about microservices architecture? Let's connect!