Back to Deep Dives

System Design, Distributed Systems Architecture

System DesignArchitectureDesigning Data-Intensive Applications

Modern distributed systems require careful consideration of fundamental design principles, trade-offs, and architectural patterns. This paper provides a comprehensive examination of core system design concepts including consistency models, reliability patterns, performance characteristics, storage architectures, and data modeling principles. Drawing on the foundational concepts explored in data-intensive application design, we present a framework for making architectural decisions that balance competing requirements. Through detailed analysis with practical examples and decision-making frameworks, this work serves as both an educational resource for engineers at all levels and a reference guide for building systems that handle data at scale.

A Comprehensive Deep Dive into System Design: Building Scalable, Reliable Distributed Systems

Author: Engineering Deep Dives
Date: November 2, 2025
Subject Area: System Design, Distributed Systems Architecture
Primary Reference Framework: Principles from "Designing Data-Intensive Applications" by Martin Kleppmann


Abstract

Modern distributed systems require careful consideration of fundamental design principles, trade-offs, and architectural patterns. This paper provides a comprehensive examination of core system design concepts including consistency models, reliability patterns, performance characteristics, storage architectures, and data modeling principles. Drawing on the foundational concepts explored in data-intensive application design, we present a framework for making architectural decisions that balance competing requirements. Through detailed analysis with practical examples and decision-making frameworks, this work serves as both an educational resource for engineers at all levels and a reference guide for building systems that handle data at scale.


1. Introduction

System design for data-intensive applications differs fundamentally from compute-intensive applications. While compute-intensive applications are limited primarily by CPU cycles, data-intensive applications face challenges in data volume, complexity, and rate of change. The architecture must handle not just storing data, but ensuring data remains correct and consistent over time, providing good performance despite system failures, and scaling to meet growing demands.

1.1 Core Pillars of Data-Intensive System Design

Modern data-intensive systems are evaluated against four fundamental pillars:

Reliability: The system continues to work correctly (performing the correct function at the desired level of performance) even when things go wrong. Faults can occur in hardware, software, or from human error, and the system should handle these faults gracefully.

Scalability: As the system grows (in data volume, traffic volume, or complexity), there should be reasonable ways of dealing with that growth. Scalability is not a simple label but rather a set of questions about how we add computing resources to handle additional load.

Maintainability: Over time, many different people will work on the system, maintaining current behavior and adapting it to new use cases. The system should be designed to maximize productivity by making it easy to understand, modify, and extend.

Efficiency: The system should make good use of available resources to achieve desired performance characteristics. This includes minimizing latency for user-facing operations and maximizing throughput for batch processing.

These concerns frequently exist in tension. An optimally efficient system may be difficult to maintain. A maximally reliable system may sacrifice some efficiency through redundancy. Understanding these tensions and making conscious trade-offs distinguishes thoughtful architecture from ad-hoc design.


2. Reliability: Building Systems That Work Correctly

2.1 Understanding Reliability

Reliability means the system continues working correctly even when faults occur. It's important to distinguish between faults and failures:

  • Fault: A component of the system deviating from its specification
  • Failure: The system as a whole stops providing the required service to users

The goal is to build fault-tolerant systems that prevent faults from causing failures. While we cannot reduce the probability of faults to zero, we can design systems that tolerate certain types of faults.

2.2 Types of Faults

Hardware Faults

Hard disks fail, RAM becomes faulty, power grids have outages, network cables get unplugged. These were once the dominant cause of system failures.

Traditional response: Add redundancy to individual hardware components (RAID for disks, dual power supplies, backup generators). When one component fails, the redundant component can take over.

Modern approach: As data volumes and computing demands increase, applications use larger numbers of machines, proportionally increasing the rate of hardware faults. There is a shift toward systems that tolerate machine failures through software fault-tolerance techniques rather than relying solely on hardware redundancy. This allows rolling upgrades (patching one node at a time while keeping the system running).

Software Errors

Systematic errors within the system are harder to anticipate and tend to cause many more system failures than uncorrelated hardware faults:

  • A software bug that causes every instance of an application server to crash when given a particular bad input
  • A runaway process that uses up shared resources (CPU, memory, disk space, network bandwidth)
  • A service that the system depends on slowing down, becoming unresponsive, or returning corrupted responses
  • Cascading failures where a small fault in one component triggers faults in others

These bugs often lie dormant for a long time until triggered by unusual circumstances. There is no quick solution, but various approaches help:

  • Careful thinking about assumptions and interactions in the system
  • Thorough testing at multiple levels
  • Process isolation to prevent runaway processes from affecting other parts of the system
  • Allowing processes to crash and restart
  • Measuring, monitoring, and analyzing system behavior in production

Human Errors

Humans are known to be unreliable. Configuration errors by operators are a leading cause of outages. Approaches to make systems reliable despite unreliable humans:

  • Design systems that minimize opportunities for error (well-designed abstractions, APIs, and admin interfaces)
  • Decouple places where mistakes are made from places where they cause failures (sandbox environments where people can explore safely, separate from production)
  • Test thoroughly at all levels (unit tests, integration tests, manual tests)
  • Allow quick and easy recovery from human errors (fast rollback of configuration changes, gradual rollout of new code, tools to recompute data)
  • Set up detailed and clear monitoring (telemetry showing early warning signals, performance metrics)

2.3 Redundancy and Replication Strategies

Single-Leader Replication (Master-Slave)

One replica is designated the leader (also known as master or primary). When clients want to write to the database, they send requests to the leader, which first writes the new data to its local storage. Other replicas are known as followers (read replicas, slaves, secondaries, hot standbys). Whenever the leader writes new data, it sends the data change to all followers as part of a replication log or change stream.

Advantages:

  • Simple to understand and implement
  • No conflict resolution needed (all writes go through one node)
  • Scales read capacity (add more followers)
  • Followers can be geographically distributed for lower read latency

Disadvantages:

  • All writes must go through a single node (write bottleneck)
  • Failover complexity when leader fails
  • Replication lag can cause inconsistencies

Multi-Leader Replication (Master-Master)

A natural extension of single-leader replication is to allow more than one node to accept writes. Each leader simultaneously acts as a follower to the other leaders.

Use cases:

  • Multi-datacenter operation: each datacenter has its own leader, which replicates changes to leaders in other datacenters
  • Clients with offline operation: each device acts as a leader for its local data
  • Collaborative editing: each user's changes are a form of write that must be merged

The biggest problem with multi-leader replication is that write conflicts can occur, requiring conflict resolution strategies:

  • Conflict avoidance: Route all writes for a particular record through the same leader
  • Last write wins: Give each write a timestamp and pick the most recent (loses data)
  • Custom conflict resolution: Application code that handles conflicts on write or read

Leaderless Replication

Some systems abandon the concept of a leader and allow any replica to directly accept writes from clients. Clients send writes to several replicas in parallel, and reads query several nodes to detect and correct inconsistencies.

When a client reads from the database, it sends the read request to several nodes in parallel. The client may get different responses from different nodes (stale data). Version numbers determine which value is newer.

To handle this, two mechanisms are often used:

  • Read repair: When a client detects stale data during a read, it writes the newer value back to stale replicas
  • Anti-entropy process: Background process that constantly looks for differences between replicas and copies missing data

2.4 Consistency Guarantees in Replication

Different replication methods provide different consistency guarantees. These guarantees affect what application developers can assume about the behavior of the database:

Eventual Consistency

If you stop writing to the database and wait for some unspecified length of time, eventually all replicas will converge to the same value. This is a weak guarantee—it doesn't say anything about when convergence will happen.

Read-After-Write Consistency (Read-Your-Writes Consistency)

Users should always see data that they submitted themselves. This guarantee makes the system appear as if there is no replication lag for a user's own writes, though other users may still experience replication lag.

Implementation approaches:

  • When reading something the user may have modified, read from the leader
  • Track the time of last update and read from replicas that have been updated since
  • Remember timestamp of most recent write on the client side and ensure reads reflect updates at least as recent

Monotonic Reads

If one user makes several reads in sequence, they will not see time go backward. Without monotonic reads, a user might see newer data on the first read and older data on subsequent reads (if they hit different replicas with different lag).

Implementation: Each user always reads from the same replica (can be chosen based on hash of user ID).

Consistent Prefix Reads

If a sequence of writes happens in a certain order, anyone reading those writes will see them appear in the same order. This is particularly a problem in partitioned databases where different partitions operate independently.

2.5 Trade-offs in Reliability

Cost of Redundancy: Running multiple replicas requires more machines, more network bandwidth, and more operational complexity. Each additional layer of redundancy increases costs.

Complexity of Coordination: Ensuring replicas remain consistent requires coordination protocols. More sophisticated consistency guarantees require more coordination and thus higher latency.

CAP Theorem Constraints: In the presence of network partitions, you must choose between consistency and availability. Systems designed for high availability must accept that data will sometimes be inconsistent across replicas.

Operational Burden: More replicas mean more complexity in deployment, monitoring, and debugging. Each replica is another potential point of failure that must be monitored.


3. Scalability: Designing for Growth

3.1 Describing Load

Before discussing how to scale a system, we must describe its current load. Load can be described with load parameters, which depend on the architecture:

  • Requests per second to a web server
  • Ratio of reads to writes in a database
  • Number of simultaneously active users
  • Hit rate on a cache
  • Volume of data written per day

The appropriate parameters depend on your system's bottlenecks.

3.2 Describing Performance

Once load parameters are defined, we investigate what happens when load increases:

  • When you increase a load parameter and keep system resources unchanged, how does system performance change?
  • When you increase a load parameter, how much do you need to increase resources to keep performance unchanged?

Performance can be described with different numbers:

Throughput: The number of records processed per second or the total time to run a job on a dataset of a certain size.

Response Time: The time between a client sending a request and receiving a response. This varies even for identical requests due to context switches, network delays, garbage collection pauses, page faults, and other factors.

3.3 The Importance of Percentiles

When measuring response time, we should think of it as a distribution of values, not a single number. The arithmetic mean is not useful for understanding typical response time because it doesn't tell you how many users experienced that delay.

Better metrics use percentiles:

Median (50th percentile or p50): Half of requests are faster, half are slower. This gives you the typical response time.

Higher Percentiles (p95, p99, p999): These are tail latencies. If the 95th percentile is 1.5 seconds, that means 95% of requests take less than 1.5 seconds, but 5% take longer.

High percentiles matter because:

  • Users with slow requests often have the most data (valuable customers)
  • Tail latencies directly affect user experience
  • In services with many backend calls, the slowest call dominates overall response time (tail latency amplification)

Optimizing for percentiles is important for providing good user experience, but it becomes more expensive as you target higher percentiles because the rarest outliers are often caused by factors outside your control.

3.4 Approaches to Scaling

Vertical Scaling (Scaling Up)

Moving to a more powerful machine with more CPUs, RAM, or faster disks.

Advantages:

  • Simpler to implement (no application changes needed)
  • No data distribution concerns
  • Easier to maintain strong consistency

Disadvantages:

  • Limited by maximum machine size
  • Cost increases non-linearly with capability
  • Single point of failure
  • Requires downtime for upgrades

Horizontal Scaling (Scaling Out)

Distributing load across multiple smaller machines.

Advantages:

  • Can scale to much larger sizes
  • Better fault tolerance (no single point of failure)
  • Can be more cost-effective using commodity hardware
  • Allows gradual scaling as needed

Disadvantages:

  • Application complexity increases significantly
  • Data consistency becomes harder
  • Distributed transactions are complex and slow
  • Operational complexity increases

Elastic Scaling

Automatically adding computing resources when load increases and removing them when load decreases. This works well when load is highly unpredictable, but manually scaled systems are simpler and have fewer operational surprises.

3.5 Shared-Nothing Architectures

A shared-nothing architecture (also called horizontal scaling or sharding) distributes data across multiple machines. Each machine (node) runs database software independently, coordinates with other nodes at the software level, and requires no special hardware.

This architecture can distribute both data and query load across many machines, though it introduces significant complexity into application development and operations.


4. Storage and Retrieval: How Databases Work

4.1 Data Structures That Power Databases

The simplest database could be implemented with two bash functions:

#!/bin/bash
db_set() {
    echo "$1,$2" >> database
}

db_get() {
    grep "^$1," database | sed -e "s/^$1,//" | tail -n 1
}

This has excellent write performance (appending to a file is very efficient) but terrible read performance (O(n) scan through entire file). To improve read performance, we need an index.

Indexes and Trade-offs

An index is an additional structure derived from primary data. Maintaining indexes incurs overhead on writes (the index must be updated every time data is written). This is an important trade-off: well-chosen indexes speed up reads but slow down writes.

Database administrators must choose indexes manually based on typical query patterns. The database doesn't automatically create indexes for everything because of the write overhead.

4.2 Hash Indexes

The simplest indexing strategy: keep an in-memory hash map where every key is mapped to a byte offset in the data file. When writing new values, append to the file and update the hash map. When reading, use hash map to find offset and read value.

This approach requires all keys fit in memory, but values can be on disk. This is the approach used by Bitcask (the default storage engine in Riak).

Compaction: Since we only append to files, we'll eventually run out of disk space. The solution is to break the log into segments and perform compaction (throwing away duplicate keys, keeping only the most recent update for each key). Compaction can happen in background threads while continuing to serve reads and writes using old segments.

4.3 SSTables and LSM-Trees

Sorted String Tables (SSTables)

A variation on log-structured storage where we require the sequence of key-value pairs be sorted by key. Advantages over unsorted log segments:

  1. Merging segments is simple and efficient (like mergesort)
  2. Don't need to keep index of all keys in memory—a sparse index suffices
  3. Can group records into blocks and compress before writing (saves disk space and I/O bandwidth)

Log-Structured Merge-Tree (LSM-Tree)

Storage engines based on this principle of merging and compacting sorted files are called LSM storage engines. The basic idea:

  • When a write comes in, add it to an in-memory balanced tree (memtable)
  • When memtable gets too big, write it to disk as an SSTable file
  • To serve a read, first check memtable, then most recent on-disk segment, then older segments
  • Periodically run merging and compaction in background

LSM-trees can support very high write throughput because writes are sequential. They use disk bandwidth very efficiently through compression and compaction.

4.4 B-Trees

The most widely used indexing structure in relational databases is the B-tree. Like SSTables, B-trees keep key-value pairs sorted by key, allowing efficient lookups and range queries.

B-Tree Structure

B-trees break the database down into fixed-size blocks or pages (traditionally 4KB) and read or write one page at a time. Each page contains several keys and references to child pages. Each child is responsible for a continuous range of keys.

To find a value: start at root page, follow references through intermediate pages until you reach a leaf page containing the actual values.

To update a value: search for the leaf page, change the value, write the page back to disk. To add a key: find the page whose range encompasses the new key and add it. If there isn't enough space, split the page into two half-full pages and update parent to account for the new subdivision.

Write-Ahead Log (WAL)

B-trees modify files in place (overwriting pages on disk), which is dangerous if the database crashes in the middle of splitting a page. To make the database resilient to crashes, B-tree implementations include a write-ahead log (also known as a redo log): an append-only file where every B-tree modification is written before being applied. When database recovers from crash, this log is used to restore the B-tree to a consistent state.

Concurrency Control

Since multiple threads may access the B-tree simultaneously, careful concurrency control is needed (typically using latches—lightweight locks).

4.5 LSM-Trees vs B-Trees Trade-offs

LSM-Tree Advantages:

  • Higher write throughput (sequential writes are faster than random writes on many storage devices)
  • Better compression (SSTables can be compressed more effectively)
  • Lower write amplification (B-trees write data multiple times: WAL, tree page, sometimes adjacent pages)

B-Tree Advantages:

  • Faster reads (don't need to check multiple data structures at different stages of compaction)
  • Each key exists in exactly one place (simpler for transaction isolation)
  • More mature technology with decades of optimization

Write Amplification

Write amplification is when one write to the database results in multiple writes to disk over the database's lifetime. This is particularly concerning with SSDs, which can only overwrite blocks a limited number of times before wearing out.

LSM-trees typically have lower write amplification than B-trees, though compaction can interfere with ongoing reads and writes (using disk bandwidth that could otherwise serve requests).


5. Partitioning (Sharding): Distributing Data Across Nodes

For very large datasets or very high query throughput, replication is not sufficient. We need to break the data up into partitions (also known as sharding).

5.1 Goals of Partitioning

The main reason for partitioning is scalability. Different partitions can be placed on different nodes in a shared-nothing cluster. A large dataset can be distributed across many disks, and query load can be distributed across many processors.

The goal is to spread data and query load evenly across nodes. If partitioning is unfair (some partitions have more data or queries than others), we call it skewed. A partition with disproportionately high load is called a hot spot.

5.2 Partitioning Strategies

Partitioning by Key Range

Assign a continuous range of keys to each partition (like volumes of an encyclopedia). If you know the boundaries between ranges, you can determine which partition contains a given key.

Advantages:

  • Range queries are efficient (can route to relevant partition)
  • Keys can be kept sorted within each partition

Disadvantages:

  • Certain access patterns can lead to hot spots (e.g., timestamps as keys lead to all writes going to same partition)
  • Requires rebalancing boundaries as data distribution changes

Partitioning by Hash of Key

Use a hash function to determine partition for a given key. A good hash function takes skewed data and makes it uniformly distributed.

For partitioning purposes, hash function need not be cryptographically strong (MongoDB uses MD5, Cassandra uses Murmur3). The hash function should give same hash for same key across different processes.

Once you have hash function, assign each partition a range of hashes. Every key whose hash falls within a partition's range is stored in that partition.

Advantages:

  • Distributes keys fairly among partitions
  • Reduces hot spots

Disadvantages:

  • Lose ability to do efficient range queries (adjacent keys are now scattered across partitions)
  • Must query all partitions for range queries

Hybrid Approach

Cassandra uses a compound primary key: first part for hashing (determines partition), remaining columns for sorting within partition (allows efficient range queries within a partition).

5.3 Partitioning and Secondary Indexes

Secondary indexes complicate partitioning because they don't identify a record uniquely but rather are a way of searching for occurrences of a particular value.

Document-Partitioned Indexes (Local Indexes)

Each partition maintains its own secondary indexes, covering only documents in that partition. When you write to database, you only update the indexes for the partition containing the document ID.

Reading requires querying all partitions and combining results (scatter/gather). This makes reads expensive but keeps writes fast and isolated to a single partition.

Term-Partitioned Indexes (Global Indexes)

Construct a global index that covers data in all partitions, but partition the index itself. The index for a particular term might be on a different partition than the documents containing that term.

Advantage: Can serve reads more efficiently (don't need to query all partitions)

Disadvantage: Writes are slower and more complicated (single write might affect multiple partitions of the index)

In practice, updates to global secondary indexes are often asynchronous (eventually consistent).

5.4 Rebalancing Partitions

Over time, things change in a database:

  • Query throughput increases (need more CPUs)
  • Dataset size increases (need more disks and RAM)
  • Machines fail (other machines need to take over)

The process of moving load from one node to another is called rebalancing. Requirements:

  • After rebalancing, load should be shared fairly between nodes
  • While rebalancing, database should continue accepting reads and writes
  • No more data than necessary should be moved (to minimize network/disk I/O load)

Strategies for Rebalancing:

Fixed Number of Partitions: Create many more partitions than nodes and assign several partitions to each node. When a node is added, steal a few partitions from every existing node until partitions are fairly distributed. Number of partitions doesn't change, only assignment of partitions to nodes changes.

Dynamic Partitioning: When partition grows beyond configured size, split into two. When partition shrinks below threshold, merge with adjacent partition. Number of partitions adapts to total data volume.

Proportional Partitioning: Make number of partitions proportional to number of nodes (fixed number of partitions per node). When new node joins, randomly choose fixed number of existing partitions to split and take one half while leaving the other in place.

5.5 Request Routing (Service Discovery)

When partitions are rebalanced, how does a client know which node to connect to?

Three main approaches:

  1. Allow clients to contact any node. If node owns partition, handle request; otherwise forward to appropriate node and return response.

  2. Send all requests to routing tier that determines which node should handle request and forwards accordingly. Routing tier acts as partition-aware load balancer.

  3. Require clients be aware of partitioning and assignment. Clients connect directly to appropriate node.

The key challenge is: how does the component making routing decision learn about changes in partition assignment?

Many distributed systems rely on a separate coordination service like ZooKeeper to keep track of cluster metadata. Each node registers itself in ZooKeeper, which maintains the authoritative mapping of partitions to nodes. Other actors (routing tier or clients) can subscribe to this information in ZooKeeper.


6. Consistency Models and Distributed Transactions

6.1 The Consistency Spectrum

Different applications have different tolerance for inconsistency. The challenge for system designers is understanding the trade-offs and choosing the right consistency model for their application.

Strong Consistency (Linearizability)

Linearizability is a recency guarantee: as soon as one client successfully completes a write, all clients reading from the database must be able to see the value just written. The system gives an illusion that there is only one copy of the data.

Use cases requiring linearizability:

  • Locking and leader election (prevent split brain with two leaders)
  • Constraints and uniqueness guarantees (ensure username is unique)
  • Cross-channel timing dependencies (when writes and reads happen through different channels)

Cost of linearizability:

  • Performance: Linearizable reads/writes typically slower than non-linearizable
  • Availability: Applications that require linearizability may be unavailable during network partitions (CAP theorem)

Causal Consistency

Causality imposes an ordering on events: cause comes before effect. Causal consistency is the strongest consistency model that doesn't slow down due to network delays and remains available during network partitions.

Causal consistency means: if one operation happened before another (causally), everyone must see those operations in that order. But operations that are concurrent (no causal relationship) can be seen in different orders by different clients.

Eventual Consistency

Eventual consistency is a weak guarantee: if no new writes are made, eventually all replicas will converge to the same value. But it says nothing about when that convergence will happen.

6.2 Distributed Transactions and Consensus

Two-Phase Commit (2PC)

Two-phase commit is an algorithm for achieving atomic transaction commit across multiple nodes—ensuring either all nodes commit or all abort.

Process:

  1. Application wants to commit. Coordinator sends prepare request to all participants.
  2. When participant receives prepare, ensures it can definitely commit (write to disk all data, check constraints). It replies yes or no.
  3. If all participants reply yes, coordinator sends commit request in phase 2. If any participant replied no, coordinator sends abort.
  4. When participant receives commit/abort decision, it acts accordingly and acknowledges.

The key insight: When participant votes "yes" it promises it will definitely be able to commit. Once coordinator decides, that decision is irrevocable.

Problem: Coordinator failure between phases leaves participants uncertain (in doubt). They must wait for coordinator to recover. This is why 2PC is called a blocking protocol—it can block making progress.

Three-Phase Commit

Three-phase commit (3PC) is an alternative that makes 2PC non-blocking, but assumes bounded network delay and bounded process pauses. In most practical systems with unbounded delays, 3PC cannot guarantee atomicity.

Consensus

Consensus means getting several nodes to agree on something. The consensus problem is typically formalized as follows: one or more nodes may propose values, and the consensus algorithm decides on one of those values.

Properties a consensus algorithm must satisfy:

  • Uniform agreement: No two nodes decide differently
  • Integrity: No node decides twice
  • Validity: If a node decides value v, then v was proposed by some node
  • Termination: Every node that doesn't crash eventually decides some value

Many consensus algorithms exist: Viewstamped Replication, Paxos, Raft, Zab. They all use epochs/terms and ensure only one leader per epoch. Before a leader makes decisions, it must check if there is a more recent leader.

6.3 Consistency and Consensus Trade-offs

Coordinator-Based Systems: Using a consensus algorithm to elect a leader or using distributed transactions (2PC) requires careful coordination. This coordination takes time and reduces availability during network partitions.

Leaderless Systems: Systems without leaders can remain available during partitions but provide weaker consistency guarantees. Eventual consistency is a natural consequence.

Application-Level Compensation: Some systems push the complexity to applications, requiring developers to handle conflicts and write compensation logic. This increases application complexity but may provide better performance and availability.

The right choice depends on the specific requirements of your application. Systems handling money often require strong consistency. Social media feeds can typically tolerate eventual consistency. There's no universal right answer—only trade-offs.


7. Batch Processing: Working with Large Datasets

7.1 Batch Processing Concepts

A batch processing job takes a large amount of input data, processes it, and produces output data without user interaction. The job may run for minutes, hours, or days. The distinguishing feature of batch jobs is that they read input and write output—they don't modify the input.

Unix Philosophy Applied to Data

The Unix philosophy provides good design principles for data systems:

  1. Make each program do one thing well. If you need a new job, build fresh rather than complicating old programs with new features.
  2. Expect output of every program to become input to another. Use standard interface (files and pipes).
  3. Design and build software to be tried early. Build pilot systems and throw them away; you'll build a better one anyway.

These principles translate to data processing:

  • Inputs are immutable (don't modify input files)
  • Outputs become inputs to other programs
  • Complex problems are solved through composability of simple tools

7.2 MapReduce and Distributed File Systems

MapReduce is a programming model for processing large datasets in parallel across many machines. While implementations vary, the concepts translate broadly.

Distributed File Systems

To handle datasets larger than any single machine can store, files are broken into blocks (typically 64MB or 128MB) and replicated across multiple machines. Distributed filesystems like HDFS conceptually create a big filesystem across many machines.

MapReduce Execution

  1. Map phase: Call mapper function once for each input record. Mapper extracts key-value pairs from record.
  2. Shuffle: MapReduce framework sorts the key-value pairs by key.
  3. Reduce phase: Call reducer function once for each unique key. Reducer function processes all values for that key and outputs result.

Key insight: Moving computation to where data is located (rather than moving data to computation) is more efficient when dealing with large datasets.

Handling Failures

MapReduce can tolerate machine failures: if a map task crashes, it can be rescheduled on another machine and executed again. This is possible because input files are immutable and output goes to separate files (not modifying input). Reprocessing is safe because mapper and reducer are pure functions with no side effects.

7.3 Beyond MapReduce: Dataflow Engines

MapReduce has limitations: writes intermediate state to distributed filesystem after each job, which is inefficient when chaining multiple MapReduce jobs. More recent dataflow engines (Spark, Flink, Tez) handle entire workflows of computations more efficiently:

  • Don't materialize intermediate state to distributed filesystem
  • Keep data in memory or stream between operators when possible
  • Support more flexible execution graphs (not just map-reduce structure)

These systems maintain the fault tolerance benefits while improving performance significantly.

7.4 Joins in Batch Processing

Many datasets need to be joined. In batch processing context, joins mean resolving associations between records at processing time.

Sort-Merge Join: Both inputs sorted by join key, then merged together (like mergesort). The mapper output is sorted by join key, and reducer receives all records with same key from both datasets.

Broadcast Hash Join: When one side of join is small (fits in memory), load it into hash table and stream other dataset past it. Each mapper loads small input into memory and streams large input.

Partitioned Hash Join: When both inputs are partitioned in same way on join key, apply hash join approach independently to each partition. MapReduce framework ensures records with same key go to same reducer.


8. Stream Processing: Real-Time Data Processing

8.1 Streams vs. Batch Processing

Batch processing treats input as bounded (finite size). Stream processing treats input as unbounded—data arrives incrementally over time. Near real-time processing can't wait for entire day's data before starting.

Event Streams

A stream is like an unbounded version of batch input. Events are generated once by producer, then potentially processed by multiple consumers. Related events are grouped into topics or streams.

Message Brokers

To handle streams, we need message brokers (also called message queues): systems specialized in handling streams of messages between producers and consumers.

Two main patterns:

Message Queue: Multiple consumers can read messages from queue. Each message delivered to one consumer. Once processed, message deleted from queue. Good for task queues where each task handled by one worker.

Publish/Subscribe: Multiple consumers (subscribers) each receive copy of every message published to a topic. Message not deleted after being read. Good for notifications and broadcasting.

8.2 Logs for Message Storage

Message brokers can be implemented on top of logs (append-only sequence of records on disk). This is the approach taken by Apache Kafka, Amazon Kinesis, and others.

Log-Based Message Broker Properties:

  • Within a partition, messages are totally ordered
  • Broker assigns monotonically increasing sequence number (offset) to every message
  • Different consumers can read from log independently (just track their offset)
  • Log can be kept on disk (unlike in-memory queues), enabling replay of old messages
  • Log is partitioned across machines for scalability

Consumer maintains offset in the log. To process messages: read from current offset, process them, then advance offset. If consumer crashes and restarts, it resumes from last recorded offset.

8.3 Stream Processing Operations

Complex Event Processing (CEP): Search for patterns in stream (queries stored long-term, events pass through, system notifies when pattern detected).

Stream Analytics: Measuring rate of events, calculating rolling averages, comparing statistics to previous time intervals. Often uses probabilistic algorithms (bloom filters, HyperLogLog).

Maintaining Materialized Views: Derive alternative view of data (like denormalization in batch processing, but continually updated as new events arrive).

Search on Streams: Indexing documents as they arrive and allowing searches on recent documents.

8.4 Reasoning About Time in Streams

Stream processing often needs to reason about time, which is tricky in distributed systems.

Event Time vs. Processing Time

  • Event time: When event actually occurred according to device that generated it
  • Processing time: When event was observed by stream processor

These can differ significantly due to network delays, queueing, processing time. Depending on which time you use, you may get different answers to "how many events per minute?"

Windows

To handle unbounded streams, often need to break stream into finite windows:

  • Tumbling window: Fixed length, every event belongs to exactly one window
    -Hopping window: Fixed length but with overlap (e.g., 5-minute windows every 1 minute)
  • Sliding window: Contains all events within some interval of each other
  • Session window: Group events from same user with no more than some timeout between events

Handling Stragglers

Events may arrive late (after you've already processed the window they belong to). Options:

  1. Ignore straggler events (simple but loses data)
  2. Publish correction (emit updated value when straggler arrives)
  3. Wait for stragglers before publishing results (delays output)

The right approach depends on whether downstream consumers can handle corrections and how much latency is acceptable.

8.5 Stream Joins

Three types of joins appear in stream processing:

Stream-Stream Join (Window Join)

Both inputs are activity streams. To join events from two streams, you need state: buffer events from one stream in window, and when event arrives on other stream, check buffer for matches.

Example: Search queries joined with clicks. Buffer recent queries; when click arrives, find matching query in buffer.

Stream-Table Join (Enrichment)

One input is stream of activity events, other is database table (changelog stream). Stream processor queries table to enrich stream events.

Example: User activity events joined with user profile database to add user details.

Table-Table Join (Materialized View Maintenance)

Both inputs are changelog streams from databases. Stream processor maintains materialized view of join result, updating it whenever either input changes.

Example: Timeline cache for social network (join user follows with their posts).

8.6 Fault Tolerance in Stream Processing

Unlike batch processing where you can simply restart failed tasks, stream processing poses challenges because state may be lost on failure.

Microbatching and Checkpointing

One approach: break stream into small blocks and treat each block like a miniature batch job (Apache Spark Streaming). State is replicated periodically (checkpoints).

Recovery: restart stream processor from most recent checkpoint and reprocess events since checkpoint. This requires:

  • Events can be replayed (need durable log-based message broker)
  • Processing is deterministic or idempotent
  • State checkpointing mechanism

Atomic Commit Revisited

To ensure exactly-once semantics in presence of failures, need atomic commit: either all effects of processing a message happen (state updates, output messages, message acknowledgment) or none happen.

Approaches:

  • Distributed transactions across message broker and output systems (slow, reduces throughput)
  • Rely on idempotent operations (safe to execute multiple times)
  • Deduplicate outputs (track which events have been processed, skip duplicates)

Idempotence

Best solution when possible: make operations idempotent. Stream processor can safely be restarted and reprocess recent events without causing incorrect results.

Even better: make deterministic operation depend only on event content, not on order of arrival or timing. This allows arbitrary replay and reprocessing.


9. Data Modeling and Schema Evolution

9.1 Formats for Data Encoding

When writing data to files or sending over network, need to encode in-memory objects as sequence of bytes. Many formats exist, each with trade-offs.

Language-Specific Formats

Many programming languages have built-in support for encoding (Java Serialization, Python pickle). Problems:

  • Tied to particular programming language
  • Security problems (arbitrary code execution from untrusted data)
  • Versioning and forward/backward compatibility often neglected
  • Efficiency often poor

JSON, XML, and Binary Variants

Textual formats like JSON and XML are human-readable and language-independent. Problems:

  • Ambiguity around number encoding (don't distinguish integers from floats)
  • No support for binary strings
  • Optional schema support makes tooling weaker
  • Verbose (uses lots of bandwidth)

Binary encodings (MessagePack, BSON, BJSON) reduce size but keep same data model. Still lack schema and type information.

Thrift and Protocol Buffers

Binary encoding libraries that require schema. Both use schema to define structure, then provide code generation tools.

Key features:

  • Compact binary encoding
  • Field tags in schema (each field has unique tag number)
  • Schema evolution through tag numbers

Schema Evolution Rules:

Forward compatibility (old code can read data written by new code):

  • Can add new fields with new tag numbers
  • Old code ignores fields it doesn't recognize

Backward compatibility (new code can read data written by old code):

  • New fields must be optional or have default values
  • Can never delete required fields

Avro

Another binary encoding format with different philosophy. Avro schema doesn't contain tag numbers. When reading, use exact same schema that was used for writing.

To handle schema changes:

  • Writer's schema and reader's schema don't need to be same, just compatible
  • Avro library resolves differences by looking at both schemas side-by-side

Schema evolution:

  • Forward compatible: new writer schema, old reader schema
  • Backward compatible: new reader schema, old writer schema

9.2 Modes of Dataflow

Different contexts for sending data from one process to another:

Dataflow Through Databases

Process writing to database encodes data; process reading from database decodes it. Different processes accessing database may be running different versions of code.

Implications:

  • Forward compatibility required (old code reads data written by new code)
  • Backward compatibility required (new code reads data written by old code)
  • Unknown fields must be preserved (read old data, update some fields, write back)

Dataflow Through Services (REST and RPC)

Client sends request over network, server processes it and sends response. Service-oriented architecture (SOA) or microservices is common organizational pattern.

REST philosophy: Use features of HTTP (URLs, cache control, authentication, content type negotiation). Resources identified by URLs, operations are HTTP methods (GET, POST, PUT, DELETE).

RPC tries to make remote service look like calling function in same process. Problem: network requests are fundamentally different from local function calls (unpredictable duration, may timeout, may need retries, results may vary).

Schema evolution:

  • Requests: server usually must maintain backward compatibility (handle requests in old format)
  • Responses: clients must handle responses from old and new servers

Dataflow Through Message Brokers

Asynchronous message-passing systems: message sent to message broker, which stores temporarily and delivers to consumers. Advantages over direct RPC:

  • Acts as buffer if recipient unavailable or overloaded
  • Automatically redeliver messages if process crashes
  • Sender doesn't need to know recipient's address
  • One message can be sent to multiple recipients
  • Decouples sender from recipient

Message brokers don't enforce schema. Application controls encoding. Evolution works similar to databases: need forward and backward compatibility.


10. Designing Data Systems: Architectural Patterns

10.1 Derived Data vs. Systems of Record

Important distinction in data systems:

Systems of Record (Source of Truth)

Holds authoritative version of data. If discrepancy between systems, system of record is correct by definition. Writes first go to system of record.

Derived Data Systems

Data that results from taking existing data and transforming or processing it. If lost, can be recreated from system of record. Caches, denormalized values, indexes, materialized views are all derived data.

10.2 Maintaining Derived State

The challenge: how to keep derived data systems in sync with system of record?

Batch Processing Approach

Periodically run batch job that reads from system of record and updates derived system. Simple but:

  • Derived system lags behind by batch interval
  • Not suitable for applications requiring up-to-date views
  • Batch processing all data repeatedly can be expensive

Stream Processing Approach

Treat changes to system of record as stream of events. Derived systems subscribe to stream and update themselves as changes happen.

Change data capture (CDC): Process of observing all data changes written to database and extracting them in form usable by other systems. CDC makes one database the leader and turns others into followers.

Event Sourcing

Similar to CDC but applied at application level rather than database level. Application stores log of changes (events) rather than current state. Current state derived by replaying events.

Difference from CDC: Event sourcing uses higher-level events (user actions) rather than low-level state changes. Events are immutable and append-only.

10.3 Lambda Architecture

One approach to combining batch and stream processing: Lambda architecture.

Structure:

  • Batch layer: Preprocesses all available historical data in batch
  • Speed layer: Processes recent data in stream processor
  • Merge: Queries merge results from both layers

Advantage: Can reprocess entire historical dataset using different logic (if you discover bug or want to compute new type of aggregation).

Disadvantage: Complex—need to maintain two different systems doing similar things.

10.4 Unbundling Databases

Traditionally, databases bundled together several components:

  • Storage engine
  • Indexes
  • Replication
  • Query language

With distributed data systems, these components are increasingly unbundled:

  • Store immutable events in distributed log (Kafka)
  • Derive read-optimized views in separate systems (search indexes, caches, data warehouses)
  • Use stream processors to maintain derived views
  • Query specific system optimized for query type

This architecture provides:

  • Flexibility: Can add new derived views without affecting system of record
  • Fault tolerance: Can rebuild derived view from immutable log
  • Scalability: Each component scales independently

10.5 Designing for Evolvability

Systems change over time due to:

  • New features and requirements
  • Changing understanding of problem domain
  • Changes in business priorities
  • Growth in scale
  • Changes in technology landscape

Design principles for evolvable systems:

Keep Components Loosely Coupled

Use well-defined interfaces between components. Changes within component don't affect others. Can replace or upgrade components independently.

Make Data Flow Explicit

Make it clear where data comes from and where it goes. Explicitly define derived data and how it's maintained. Document data lineage.

Make State Management Explicit

Clearly separate mutable state from immutable data. Keep mutable state in one place when possible. Use immutable logs and append-only operations.

Support Incremental Migration

Large systems can't be rewritten in one go. Design for gradual migration: run old and new side-by-side, gradually shift traffic. Use feature flags, compatibility layers, parallel processing.

Build in Observability

Monitor system behavior: which queries are slow, which components are bottlenecks, error rates, data quality. Good observability enables informed decisions about where to focus optimization efforts.


11. Idempotency and Data Integrity

11.1 The Importance of Idempotence

In distributed systems, operations may be executed multiple times due to:

  • Network issues causing retries
  • Process crashes during operation
  • Uncertainty about whether operation completed
  • Message broker delivering message multiple times

An idempotent operation produces same result whether executed once or multiple times.

11.2 Achieving Idempotence

Natural Idempotence

Some operations are naturally idempotent:

  • Setting a value: SET x = 5 (executing multiple times gives same result)
  • Deletion: deleting same record multiple times succeeds (second deletion is no-op)
  • Unique constraints: INSERT with unique constraint fails second time rather than creating duplicate

Idempotence Through Operation Design

Design operations to be idempotent when possible:

Non-idempotent: INCREMENT counter (executing twice increments by 2)
Idempotent: SET counter = 5 (executing multiple times sets to 5)

Non-idempotent: ADD item to list (executing twice adds duplicate)
Idempotent: ENSURE item in set (set naturally prevents duplicates)

Idempotence Keys

For operations that aren't naturally idempotent, use idempotence keys (also called request IDs):

  1. Client generates unique key for each operation
  2. Client sends key along with operation
  3. Server records which keys have been processed
  4. If request arrives with previously seen key, return cached result instead of re-executing

Implementation considerations:

  • Store processed keys in database with TTL (can't store forever)
  • Return cached result for duplicate requests within retention window
  • After TTL expires, accept risk of re-execution vs. indefinite storage

11.3 Exactly-Once Semantics

Distributed systems typically provide:

  • At-most-once: Send request, don't retry. Operation may not happen if request lost.
  • At-least-once: Retry until acknowledgment. Operation may happen multiple times.
  • Exactly-once: Operation executed once despite failures and retries.

True exactly-once is impossible in general (can't distinguish between slow execution and failure). But can achieve effectively-once through:

Idempotent Operations: At-least-once + idempotence = effectively exactly-once

Atomic Commit: Use distributed transactions (2PC) to make operation atomic across all affected systems. Expensive and reduces availability.

Deduplication: Track which operations have been processed and skip duplicates. Moves complexity from coordination to bookkeeping.


12. API Design for Data-Intensive Applications

12.1 RESTful API Principles

REST (Representational State Transfer) provides patterns for building web services that work well with HTTP infrastructure.

Resource-Oriented Design

URLs identify resources (nouns, not verbs):

  • Good: GET /products/123
  • Bad: GET /getProduct?id=123

Resources should correspond to domain entities (products, users, orders) or collections of entities.

HTTP Methods as Operations

Standard HTTP methods map to CRUD operations:

  • GET: Retrieve resource (safe, idempotent, cacheable)
  • POST: Create new resource (not idempotent)
  • PUT: Replace resource (idempotent)
  • PATCH: Partially update resource
  • DELETE: Remove resource (idempotent)

Idempotence in HTTP

GET, PUT, DELETE are specified as idempotent in HTTP. This means:

  • Clients can safely retry these operations
  • Caches and proxies know they can replay these requests
  • Infrastructure can optimize based on idempotence guarantees

POST is not idempotent by default. For idempotence with POST:

  • Use idempotence keys in headers
  • Design application logic to handle duplicates
  • Or use PUT with client-generated IDs instead

12.2 Resource Hierarchies and Relationships

Express relationships through URL structure:

/api/{domain}/users/{userId}
/api/{domain}/users/{userId}/orders
/api/{domain}/users/{userId}/orders/{orderId}

This makes relationships explicit and enables authorization checks at each level.

Shallow vs. Deep Nesting

Shallow (preferred for independent resources):

/api/domain/orders/789
/api/domain/products/456

Nested (when resource is dependent):

/api/domain/users/123/addresses/5

Avoid deep nesting (more than 2-3 levels):

# Too deep - hard to use
/api/companies/1/departments/2/teams/3/members/4/assignments/5

12.3 Filtering, Pagination, and Querying

Use query parameters for operations that don't fit REST resource model:

Filtering:

GET /api/domain/products?category=electronics&price_min=100&price_max=500
GET /api/domain/orders?status=pending&created_after=2025-01-01

Sorting:

GET /api/domain/products?sort=price&order=desc

Pagination:

Offset-based (simple but can skip/duplicate items if data changes):

GET /api/domain/products?offset=100&limit=50

Cursor-based (consistent but opaque):

GET /api/domain/products?cursor=eyJpZCI6MTIzfQ&limit=50

Return pagination metadata in response:

{
  "data": [...],
  "pagination": {
    "total": 1500,
    "limit": 50,
    "offset": 100,
    "next_cursor": "eyJpZCI6MTUwfQ"
  }
}

Field Selection (reduce payload size):

GET /api/domain/products?fields=id,name,price

12.4 Versioning and Evolution

APIs must evolve without breaking existing clients.

URL Versioning (most common):

/api/v1/products
/api/v2/products

Clear and explicit. Breaking changes require new version.

Header Versioning:

GET /api/products
Accept: application/vnd.myapi.v2+json

Cleaner URLs but less discoverable.

Evolution Without Breaking Changes:

Compatible changes (don't require version bump):

  • Adding new optional fields to requests
  • Adding new fields to responses (clients should ignore unknown fields)
  • Adding new endpoints
  • Adding new optional query parameters

Incompatible changes (require new version):

  • Removing or renaming fields
  • Changing field types
  • Changing URL structures
  • Changing required fields
  • Changing semantics of existing operations

12.5 Error Handling and Status Codes

Use appropriate HTTP status codes:

2xx Success:

  • 200 OK: Successful GET, PUT, PATCH
  • 201 Created: Successful POST creating resource
  • 204 No Content: Successful DELETE or operation with no body

4xx Client Error:

  • 400 Bad Request: Invalid syntax or validation failure
  • 401 Unauthorized: Authentication required
  • 403 Forbidden: Authenticated but not authorized
  • 404 Not Found: Resource doesn't exist
  • 409 Conflict: Request conflicts with current state
  • 429 Too Many Requests: Rate limit exceeded

5xx Server Error:

  • 500 Internal Server Error: Unexpected server error
  • 503 Service Unavailable: Temporarily unavailable

Provide structured error responses:

{
  "error": {
    "code": "VALIDATION_ERROR",
    "message": "Request validation failed",
    "details": [
      {
        "field": "email",
        "issue": "Invalid email format"
      }
    ],
    "request_id": "req_abc123",
    "documentation_url": "https://docs.example.com/errors/validation"
  }
}

13. Caching Strategies for Performance

13.1 Why Cache?

Caching exploits the principle of locality:

  • Temporal locality: Data accessed recently is likely to be accessed again soon
  • Spatial locality: Data near recently accessed data is likely to be accessed soon

Caching trades consistency for performance—cached data may be stale.

13.2 Cache Hierarchies

Multiple layers of caching, each with different characteristics:

Client-Side Cache:

  • Closest to user (lowest latency)
  • Limited size
  • Private to user
  • HTTP cache headers control behavior

CDN/Edge Cache:

  • Geographically distributed
  • Shared across users
  • Good for static content
  • Reduces origin load

Application Cache (Redis, Memcached):

  • Shared across application servers
  • Flexible (can cache any data structure)
  • Requires explicit cache management

Database Query Cache:

  • Caches query results
  • Transparent to application
  • Invalidated when underlying tables change

13.3 Cache Writing Strategies

Cache-Aside (Lazy Loading):

Application code explicitly manages cache:

function getUser(id):
    user = cache.get(id)
    if user is null:
        user = database.get(id)
        cache.set(id, user)
    return user

Advantages:

  • Only requested data is cached
  • Cache failures don't prevent database access
  • Simple to understand

Disadvantages:

  • Cache miss penalty (extra latency on first access)
  • Stale data if database updated directly
  • Complexity in application code

Write-Through:

Write to cache and database synchronously:

function updateUser(id, data):
    database.update(id, data)
    cache.set(id, data)

Advantages:

  • Cache always consistent with database
  • No stale reads

Disadvantages:

  • Write latency (must wait for both operations)
  • Wasted writes (data might never be read)
  • Writes still required to both systems

Write-Behind (Write-Back):

Write to cache immediately, asynchronously write to database:

function updateUser(id, data):
    cache.set(id, data)
    queue.enqueue({op: 'update', id: id, data: data})
    // Background worker drains queue and updates database

Advantages:

  • Low write latency
  • Can batch multiple writes
  • Reduces database load

Disadvantages:

  • Risk of data loss if cache fails before database write
  • Eventual consistency
  • Complex failure handling

13.4 Cache Invalidation

"There are only two hard things in Computer Science: cache invalidation and naming things." — Phil Karlton

Time-Based Expiration (TTL):

Set expiration time when caching:

cache.set(key, value, ttl=3600)  # Expire after 1 hour

Advantages:

  • Simple to implement
  • Guarantees eventual consistency
  • No coordination needed

Disadvantages:

  • May serve stale data until expiration
  • Hard to choose optimal TTL
  • Wasted memory if data changes frequently

Event-Based Invalidation:

Invalidate cache entries when data changes:

function updateUser(id, data):
    database.update(id, data)
    cache.delete(id)  # Invalidate cache

Advantages:

  • Minimizes stale data
  • More efficient (don't cache what changes frequently)

Disadvantages:

  • Complex in distributed systems
  • Must ensure all write paths invalidate cache
  • Thundering herd problem (many requests after invalidation)

Versioned Keys:

Include version in cache key:

cache.set(f"user:{id}:v{version}", user)

Advantages:

  • Old version remains cached during transition
  • No cache invalidation needed (old versions expire naturally)

Disadvantages:

  • Increased memory usage
  • Must track versions

13.5 Cache Consistency Models

Strong Consistency (write-through):

  • Cache always matches database
  • Higher latency
  • Use when stale data causes problems

Eventual Consistency (TTL-based):

  • Cache may temporarily differ from database
  • Lower latency
  • Use when brief staleness is acceptable

Weak Consistency (cache-aside with long TTL):

  • Cache may be significantly out of date
  • Lowest latency
  • Use for data that rarely changes or approximate values

13.6 Trade-offs in Caching

Memory vs. Latency: Larger caches improve hit rates but consume more memory. Need to balance cache size against available RAM and cost.

Consistency vs. Performance: Stronger consistency requires coordination, reducing performance. Weaker consistency provides better performance but may serve stale data.

Simplicity vs. Optimization: Simple caching strategies (cache-aside with TTL) are easier to understand and debug. Complex strategies (write-behind, multi-level invalidation) can provide better performance but increase operational complexity.

Hit Rate vs. Size: Hit rate improves logarithmically with cache size. Doubling cache size doesn't double hit rate. Find the point of diminishing returns.


14. Database Scaling: Practical Considerations

14.1 When to Scale

Scale when current system can't meet requirements:

  • Response times exceed acceptable limits
  • System can't handle load (requests queuing, timeouts)
  • Storage capacity insufficient
  • System reliability degraded (failures more frequent)

Don't scale prematurely. Single database instances are simpler and can handle surprising load with appropriate indexing and query optimization.

14.2 Vertical Scaling Limits

Modern database servers can be very large (hundreds of GB RAM, dozens of CPU cores). Vertical scaling works until:

  • Machine cost becomes prohibitive
  • Largest available machine still insufficient
  • Downtime for upgrades unacceptable
  • Single machine creates unacceptable failure risk

14.3 Read Scaling with Replication

When reads are bottleneck:

  1. Add read replicas (followers)
  2. Route read traffic to replicas
  3. Leader handles only writes

This works when:

  • Read/write ratio heavily favors reads
  • Application tolerates replication lag
  • Read queries don't require latest data

Considerations:

  • Monitor replication lag
  • Consider whether each query can tolerate lag
  • Route queries requiring fresh data to leader
  • Handle replica failures gracefully

14.4 Write Scaling with Sharding

When writes are bottleneck, need to shard (partition) data across multiple databases.

Choosing Shard Key:

Most important decision in sharding strategy. Good shard key:

  • Distributes writes evenly across shards
  • Allows most queries to target single shard
  • Doesn't create hot spots

Bad example: Timestamp as shard key (all writes go to one shard)
Good example: User ID as shard key (distributes writes, user queries hit one shard)

Shard Key Examples:

E-commerce platform:

  • Shard by User ID: User-specific queries efficient, but orders across users requires scatter-gather
  • Shard by Geographic Region: Good if users primarily interact within region
  • Hybrid: User data sharded by User ID, product catalog replicated to all shards

Social network:

  • Shard by User ID: User's feed stored on their shard
  • Problem: Loading feed requires accessing many shards (fan-out)
  • Solution: Maintain denormalized feed cache on user's shard

14.5 Dealing with Distributed Queries

Some queries require data from multiple shards:

Scatter-Gather: Send query to all shards, aggregate results

  • Simple but slow (latency = slowest shard)
  • Doesn't scale well (hits all shards)

Denormalization: Duplicate data across shards

  • Faster queries (data local to shard)
  • More complex writes (maintain consistency)
  • Increased storage

Application-Side Joins: Fetch from multiple shards in application

  • Flexible
  • Can parallelize requests
  • Moves complexity to application

14.6 Rebalancing Shards

As data grows or nodes are added/removed, must rebalance shards.

Dynamic Partitioning:

  • Split shards when they grow too large
  • Merge shards when they shrink
  • Number of partitions adapts to data

Fixed Partitioning:

  • Create many partitions upfront
  • Assign multiple partitions to each node
  • Move partitions between nodes to rebalance
  • Number of partitions fixed

Rebalancing considerations:

  • Move minimal data (expensive operation)
  • Continue serving requests during rebalancing
  • Avoid cascading failures
  • Monitor progress and allow rollback

15. Principal Engineer's Decision Framework

15.1 Understanding Requirements

Before designing system, deeply understand requirements:

Functional Requirements: What should system do?

  • What operations must be supported?
  • What are the use cases?
  • What queries will be made?

Non-Functional Requirements: How should system behave?

  • Performance: What latency is acceptable? What throughput needed?
  • Availability: Can system have downtime? How much?
  • Consistency: What consistency guarantees required?
  • Durability: Can data loss be tolerated? How much?

Constraints:

  • Team capabilities and expertise
  • Available budget
  • Time constraints
  • Existing systems and dependencies

15.2 Start Simple, Evolve as Needed

Complexity has real costs:

  • Development time
  • Operational burden
  • Debugging difficulty
  • Onboarding friction

Start with simplest approach that could meet requirements:

  • Single database with replication before sharding
  • Synchronous processing before event-driven
  • Monolith before microservices
  • Manual scaling before auto-scaling

Evolve architecture when metrics show current approach insufficient.

15.3 Making Trade-off Decisions

For each architectural decision, evaluate:

Option A: [Simple approach]

  • Advantages: What does this provide?
  • Disadvantages: What are the limitations?
  • Cost: Development, operational, infrastructure
  • Risks: What could go wrong?

Option B: [Complex approach]

  • Advantages: What does this provide?
  • Disadvantages: What are the limitations?
  • Cost: Development, operational, infrastructure
  • Risks: What could go wrong?

Choose Option B only if advantages outweigh costs and risks.

15.4 Designing for Observability

Build observability into system from start:

Metrics: Quantitative measurements

  • Request rate, error rate, latency percentiles
  • Resource utilization (CPU, memory, disk, network)
  • Business metrics (orders/second, revenue/hour)

Logs: Record of events

  • Structured logging (JSON, not plain text)
  • Include correlation IDs for tracing requests
  • Log levels appropriate to event importance

Tracing: Track requests through distributed system

  • Distributed tracing (Jaeger, Zipkin concepts)
  • Shows which services involved in request
  • Identifies slow components

Good observability enables:

  • Understanding system behavior
  • Identifying bottlenecks
  • Debugging production issues
  • Making informed scaling decisions

15.5 Planning for Failure

Assume components will fail:

  • Hardware fails
  • Networks partition
  • Dependencies become unavailable
  • Code has bugs

Design assuming failure:

  • What happens if this database becomes unavailable?
  • What happens if this service becomes slow?
  • What happens if network partitions data centers?
  • How do we detect failures?
  • How do we recover?

Test failure scenarios:

  • Kill random processes
  • Inject network delays
  • Fill disks
  • Exhaust memory

15.6 Documenting Decisions

Document architectural decisions (Architecture Decision Records):

For each significant decision:

  • Context: What is the issue we're addressing?
  • Decision: What did we decide?
  • Rationale: Why did we choose this?
  • Alternatives: What other options were considered?
  • Consequences: What are the trade-offs?
  • Date: When was this decided?

This helps:

  • Future engineers understand why system designed this way
  • Re-evaluate decisions as circumstances change
  • Avoid repeating past analysis

16. Conclusion

Building data-intensive applications requires understanding fundamental trade-offs and making conscious architectural decisions. The principles explored in this paper—reliability through redundancy, scalability through partitioning and replication, maintainability through simplicity, and efficiency through appropriate caching and indexing—provide a foundation for designing robust distributed systems.

Key insights for engineers at all experience levels:

Core Principles:

  • There is no perfect architecture, only appropriate trade-offs
  • Start simple, add complexity only when necessary
  • Measure before optimizing
  • Design for failure from the beginning
  • Prioritize maintainability for long-term success

For Junior Engineers:

  • Focus on understanding fundamentals deeply
  • Study how existing systems work before building new ones
  • Ask "why" when reviewing architectural decisions
  • Recognize that simple solutions are often best
  • Learn from failures and post-mortems

For Mid-Level Engineers:

  • Practice analyzing trade-offs systematically
  • Consider operational implications of design choices
  • Think about how systems evolve over time
  • Develop intuition through hands-on experience
  • Balance theoretical knowledge with practical constraints

For Senior Engineers:

  • Make decisions based on data and requirements, not trends
  • Consider team capabilities alongside technical factors
  • Document decisions for future reference
  • Mentor others in trade-off analysis
  • Know when to apply complexity and when to avoid it

For Principal Engineers:

  • Balance technical excellence with business pragmatism
  • Design systems that can evolve as requirements change
  • Build for observability and debuggability from the start
  • Foster culture of thoughtful architecture
  • Recognize that people and processes matter as much as technology

The field of distributed systems continues evolving, with new patterns and technologies emerging regularly. However, the fundamental principles—understanding consistency models, designing for failures, making conscious trade-offs, and building evolvable systems—remain constant. By internalizing these principles and applying them thoughtfully, engineers can build data-intensive applications that are reliable, scalable, maintainable, and efficient.


References

This deep dive draws exclusively from concepts and principles presented in:

Kleppmann, Martin (2017). "Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems." O'Reilly Media.

The book provides comprehensive treatment of:

  • Data models and query languages
  • Storage and retrieval engines
  • Encoding and evolution
  • Replication and partitioning
  • Consistency and consensus
  • Batch and stream processing
  • Future of data systems

For engineers seeking deeper understanding of these topics, "Designing Data-Intensive Applications" remains the definitive reference, providing both theoretical foundations and practical guidance for building robust distributed systems.


Last Updated: November 2, 2025