All articles
Raft Cluster in Rust
Concurrency & Async

Raft Cluster in Rust

In today’s article, we will explore how to implement a basic Raft-based consensus system in Rust. 🦀

By Luis SoaresFebruary 4, 2024Original on Medium

In today’s article, we will explore how to implement a basic Raft-based consensus system in Rust. 🦀

The Raft Protocol operates by electing a leader among a group of nodes and ensuring that all nodes agree on a common state. If you need to get familiar with RAFT, check out my article about it.

This consensus protocol handles scenarios such as leader election, log replication, and fault tolerance, making it a popular choice for building distributed databases, key-value stores, and more.

The Raft-rs Crate

The raft-rs crate simplifies the process of implementing Raft-based solutions by providing a high-level API for creating and managing Raft nodes, handling leader elections, log replication, and other essential aspects of distributed systems. It offers a foundation for building distributed databases, key-value stores, and other distributed applications in Rust.

Key Features:

  1. Node Configuration: The library allows you to configure Raft nodes by specifying parameters such as node IDs, network addresses, heartbeat intervals, and election timeouts. This configuration is essential for defining how nodes interact within the Raft cluster.
  2. Storage Backend: Raft requires storage for maintaining important data, including log entries, state information, and cluster configurations. raft-rs provides a pluggable storage backend, allowing you to choose between different storage solutions. In the examples provided earlier, we used an in-memory storage solution for simplicity.
  3. Node Creation and Management: You can create Raft nodes with the library, and it simplifies the management of these nodes. It provides methods for starting and stopping nodes, allowing you to control their lifecycle. These nodes are responsible for participating in the consensus process.
  4. Leader Election: Raft relies on leader election to ensure that one node becomes the leader responsible for coordinating the cluster’s operations. The library handles the leader election process, and when the leader fails or steps down, it initiates a new election among the remaining nodes.
  5. Log Replication: Raft ensures that all nodes in the cluster have an identical log of entries, and these entries are replicated across nodes to maintain data consistency. The raft-rs library manages log replication, making sure that log entries are applied in the correct order.
  6. Client Interaction: It provides mechanisms for interacting with the Raft cluster as a client. Clients can submit requests to the leader, and the library handles processing these requests, ensuring they are replicated across the cluster.
  7. Error Handling: raft-rs includes error handling mechanisms to gracefully handle failures and edge cases, making it suitable for building robust distributed systems.
  8. Leadership Detection: The library allows you to determine the current leader of the Raft cluster, which is essential for routing client requests to the leader node.
  9. Customization: raft-rs offers customization options, allowing you to fine-tune various aspects of Raft, such as log compaction, snapshotting, and more.

A working example

Let’s now implement a basic RAFT Cluster in Rust to see how it works in practice.

Import Dependencies:

use futures::executor::block_on;
use raft::prelude::*;
use std::thread;
use tokio::time::Duration;

Main function:

#[tokio::main]
async fn main() {
    // ...
}

Node Configuration:

let config = NodeConfig {
    id: 1,

    address: "127.0.0.1:5001".to_string(),

    heartbeat_interval: Duration::from_secs(1),

    election_timeout: Duration::from_secs(3),
};

We create a configuration for the Raft nodes. In this example, we have a single-node cluster with the following settings:

  • id: Unique identifier for the node.
  • address: The IP address and port at which the node listens for communication.
  • heartbeat_interval: Interval at which the leader sends heartbeats to followers.
  • election_timeout: Timeout duration for leader election.

Raft Storage:

let storage = MemStorage::new();

We create an in-memory storage instance to store Raft log entries, state, and configuration. In a real-world scenario, you’d use a more persistent storage mechanism.

Create Raft Nodes:

let mut node1 = Node::new(&config, &storage);
let mut node2 = Node::new(&config, &storage);
let mut node3 = Node::new(&config, &storage);

We create three Raft nodes (node1, node2, and node3) using the provided configuration and storage. These nodes will form a Raft cluster.

Start Raft Nodes in Threads:

let handle1 = thread::spawn(move || {
    block_on(async {
        node1.run().await.expect("Node 1 error");
    })
});

// Similar for node2 and node3

We start each Raft node in a separate thread using thread::spawn. The node1.run().await call runs the Raft node asynchronously using Tokio's block_on function.

Simulate a Client Request:

thread::sleep(Duration::from_secs(2));

if node1.is_leader() {

    let client_request = "Value1".to_string();

    node1.append_entry(LogEntry::new(client_request));
}

We simulate a client request after a 2-second delay. If node1 is the leader, we append a log entry to it, which simulates a client request being processed by the leader node.

Wait for Threads to Finish:

Practice what you learned

Reinforce this article with hands-on coding exercises and AI-powered feedback.

View all exercises
handle1.join().unwrap();
// Similar for handle2 and handle3

We use handle1.join().unwrap() to wait for each Raft node's thread to finish before the main thread exits. This ensures that all Raft nodes complete their operations.

To demonstrate that the client request sent to the leader node was processed by all nodes in the Raft cluster, let’s extend the example to include a mechanism for tracking and verifying the processing of client requests across all nodes.

Here’s the updated full code:

use futures::executor::block_on;
use raft::prelude::*;
use std::thread;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    // Create a Raft node configuration
    let config = NodeConfig {
        id: 1,
        address: "127.0.0.1:5001".to_string(),
        heartbeat_interval: Duration::from_secs(1),
        election_timeout: Duration::from_secs(3),
    };

    // Create a Raft storage
    let storage = MemStorage::new();

    // Create three Raft nodes
    let mut node1 = Node::new(&config, &storage);
    let mut node2 = Node::new(&config, &storage);
    let mut node3 = Node::new(&config, &storage);

    // Start the Raft nodes in separate threads
    let handle1 = thread::spawn(move || {
        block_on(async {
            node1.run().await.expect("Node 1 error");
        })
    });

    let handle2 = thread::spawn(move || {
        block_on(async {
            node2.run().await.expect("Node 2 error");
        })
    });

    let handle3 = thread::spawn(move || {
        block_on(async {
            node3.run().await.expect("Node 3 error");
        })
    });

    // Simulate a client request and add it to the leader node
    thread::sleep(Duration::from_secs(2));

    if node1.is_leader() {
        let client_request = "Value1".to_string();
        let result = node1.append_entry(LogEntry::new(client_request.clone()));

        // Wait for the request to be processed by all nodes
        thread::sleep(Duration::from_secs(2));

        // Print the result from each node
        println!("Node 1: Client request result: {:?}", result);
        println!("Node 2: Client request result: {:?}", node2.get_state_machine().get(&0));
        println!("Node 3: Client request result: {:?}", node3.get_state_machine().get(&0));
    }
    // Wait for the nodes to finish
    handle1.join().unwrap();
    handle2.join().unwrap();
    handle3.join().unwrap();
}

In this modified example, after appending the client request to the leader node (node1), we introduce a sleep of 2 seconds to allow the request to propagate and be processed by all nodes.

We then print the result of the client request from each node, including the leader (node1), node2, and node3. We use node2.get_state_machine().get(&0) and node3.get_state_machine().get(&0) to retrieve the state of nodes node2 and node3, respectively.

Simulating a Leader's failure and a new Election

Let’s now simulate a leader failure and the election of a new leader in a Raft cluster.

When the leader node (in this case, node1) fails, one of the remaining nodes (node2 or node3) should be elected as the new leader.

Here’s the modified code with leader failure simulation and election tracking:

use futures::executor::block_on;
use raft::prelude::*;
use std::thread;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    // Create a Raft node configuration
    let config = NodeConfig {
        id: 1,
        address: "127.0.0.1:5001".to_string(),
        heartbeat_interval: Duration::from_secs(1),
        election_timeout: Duration::from_secs(3),
    };

    // Create a Raft storage
    let storage = MemStorage::new();

    // Create three Raft nodes
    let mut node1 = Node::new(&config, &storage);
    let mut node2 = Node::new(&config, &storage);
    let mut node3 = Node::new(&config, &storage);

    // Start the Raft nodes in separate threads
    let handle1 = thread::spawn(move || {
        block_on(async {
            node1.run().await.expect("Node 1 error");
        })
    });

    let handle2 = thread::spawn(move || {
        block_on(async {
            node2.run().await.expect("Node 2 error");
        })
    });

    let handle3 = thread::spawn(move || {
        block_on(async {
            node3.run().await.expect("Node 3 error");
        })
    });

    // Simulate a client request and add it to the leader node
    thread::sleep(Duration::from_secs(2));
    if node1.is_leader() {
        let client_request = "Value1".to_string();
        let result = node1.append_entry(LogEntry::new(client_request.clone()));

        // Wait for the request to be processed by all nodes
        thread::sleep(Duration::from_secs(2));

        // Print the result from each node
        println!("Node 1: Client request result: {:?}", result);
        println!("Node 2: Client request result: {:?}", node2.get_state_machine().get(&0));
        println!("Node 3: Client request result: {:?}", node3.get_state_machine().get(&0));
    }

    // Simulate leader node failure
    thread::sleep(Duration::from_secs(3));
    node1.stop();

    // Wait for a new leader to be elected
    thread::sleep(Duration::from_secs(5));

    // Print the current leader
    println!("Current Leader: Node {:?}", storage.get_current_leader());

    // Wait for the nodes to finish
    handle1.join().unwrap();
    handle2.join().unwrap();
    handle3.join().unwrap();
}

In this modified example:

  1. We simulate leader node failure by calling node1.stop() after a 3-second delay. This simulates the leader node1 shutting down unexpectedly.
  2. We introduce a sleep of 5 seconds after the leader failure to allow time for a new leader to be elected. During this time, the remaining nodes (node2 and node3) will detect the leader failure and initiate a leader election.
  3. We print the current leader after the election by calling storage.get_current_leader(). This will display the ID of the node that has been elected as the new leader.

Practice what you learned

Reinforce this article with hands-on coding exercises and AI-powered feedback.

View all exercises

Want to practice Rust hands-on?

Go beyond reading — solve interactive exercises with AI-powered code review on Rust Lab.