All articles
Implementing a Distributed State Machine in Rust
Concurrency & Async

Implementing a Distributed State Machine in Rust

In this guide, we’ll walk through the creation of a simplified distributed state machine using Rust. 🦀

By Luis SoaresFebruary 13, 2024Original on Medium

In this guide, we’ll walk through the creation of a simplified distributed state machine using Rust. 🦀

Our focus will be on the core concepts needed to set up basic node communication, state proposals, and consensus among nodes.

Keep in mind that this implementation is intended for educational purposes and to provide a foundation for more complex distributed systems.

Let’s go! 🚀

Why Use a Distributed State Machine?

  1. Distributed Databases (e.g., Apache Cassandra, CockroachDB):
  • Distributed databases use state machines to replicate data across multiple nodes, ensuring high availability and fault tolerance. Each write operation is a state transition, and consistency is maintained through consensus protocols.

2. Blockchain and Cryptocurrencies (e.g., Ethereum, Bitcoin):

  • Blockchain technology is essentially a distributed state machine, where each block represents a state transition based on transactions. Ethereum, for example, not only tracks the state of digital currency but also the state of smart contracts, making it a global, decentralized computing platform.

3. Consensus Protocols (e.g., Raft, Paxos):

  • These protocols are foundational to implementing distributed state machines, ensuring all nodes in a distributed system agree on a single source of truth. They are used in various systems, from databases to distributed filesystems, to maintain consistency.

4. Distributed File Systems (e.g., IPFS, HDFS):

  • Distributed file systems manage data across multiple servers. They use state machines to track the location and status of each file fragment, ensuring data is accessible even if parts of the system fail.

5. Distributed Configuration Management (e.g., etcd, ZooKeeper):

  • These systems provide a reliable way to store and retrieve configuration settings for distributed systems. They rely on distributed state machines to keep configuration data consistent across a cluster of machines.

6. Distributed Ledgers (e.g., Hyperledger Fabric):

  • Used in enterprise blockchain solutions, distributed ledgers use state machines to ensure that all participants have a consistent view of the ledger. This is crucial for applications like supply chain tracking, where multiple parties need a reliable and shared source of truth.

7. Real-time Collaboration Tools (e.g., Google Docs):

  • These applications allow multiple users to edit a document simultaneously. Behind the scenes, a distributed state machine ensures that all changes are consistently applied, so every user sees the same version of the document.

Hands-on Implementation

Our distributed state machine consists of nodes that can propose state changes, broadcast these proposals to peers, and reach consensus based on received acknowledgments. Each node listens for incoming messages and responds based on predefined rules.

Setting Up the Node Structure

First, we define the Node struct, which represents a node in our distributed system. It includes an ID, the current state, a list of peer nodes with their addresses, a channel for sending messages, and a structure to track proposal acknowledgments.

use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use tokio::time::Duration;
use uuid::Uuid;

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
enum State {
    Init,
    Running,
    Stopped,
}

#[derive(Serialize, Deserialize, Debug)]
enum MessageType {
    Proposal,
    Acknowledgment,
    Commit,
}

#[derive(Serialize, Deserialize, Debug)]
struct Message {
    sender_id: u64,
    message_type: MessageType,
    proposed_state: State,
    proposal_id: String,
}

struct Node {
    id: u64,
    state: Arc<Mutex<State>>,
    peers: HashMap<u64, String>,
    address: String,
    tx: mpsc::Sender<Message>,
    proposal_acknowledgments: Arc<Mutex<HashMap<String, HashSet<u64>>>>,
}

Sending Messages

Nodes communicate by sending serialized Message objects over TCP connections. The send_message function handles connecting to a peer and transmitting a message.

impl Node {
    async fn send_message(&self, message: &Message, receiver_address: &str) -> io::Result<()> {
        let mut stream = TcpStream::connect(receiver_address).await?;
        let serialized_message = serde_json::to_vec(message)?;
        stream.write_all(&serialized_message).await
    }
}

Broadcasting Proposals

Practice what you learned

Reinforce this article with hands-on coding exercises and AI-powered feedback.

View all exercises

When a node wants to propose a state change, it broadcasts a proposal message to all peers. The broadcast_proposal function serializes the proposal and uses send_message to distribute it.

async fn broadcast_proposal(&self, proposed_state: State) {
    let proposal_id = Uuid::new_v4().to_string();
    let message = Message {
        sender_id: self.id,
        message_type: MessageType::Proposal,
        proposed_state,
        proposal_id: proposal_id.clone(),
    };

    let mut proposal_acknowledgments = self.proposal_acknowledgments.lock().await;
    proposal_acknowledgments.insert(proposal_id.clone(), HashSet::new());
    for address in self.peers.values() {
        if let Err(e) = self.send_message(&message, address).await {
            eprintln!("Failed to send message to {}: {:?}", address, e);
        }
    }
    self.wait_for_acknowledgments(proposal_id).await;
}

Listening for Incoming Messages

Each node listens on a TCP socket for incoming connections. The listen function accepts connections and spawns tasks to handle them, reading messages and forwarding them to the message handling logic.

async fn listen(&self) -> io::Result<()> {
    let listener = TcpListener::bind(&self.address).await?;
    println!("Node {} listening on {}", self.id, self.address);

    loop {
        let (mut socket, _) = listener.accept().await?;
        let tx = self.tx.clone();
        tokio::spawn(async move {
            let mut buf = [0u8; 1024];
            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => break, // Connection closed
                    Ok(n) => {
                        if let Ok(message) = serde_json::from_slice::<Message>(&buf[..n]) {
                            tx.send(message).await.expect("Failed to send message to channel");
                        }
                    }
                    Err(e) => break,
                }
            }
        });
    }
}

Handling Incoming Messages

Nodes react to incoming messages based on their type (proposal, acknowledgment, commit). The handle_incoming_messages function processes messages received through the channel, updating the state machine accordingly.

async fn handle_incoming_messages(&self, mut rx: mpsc::Receiver<Message>) {
    while let Some(message) = rx.recv().await {
        match message.message_type {
            MessageType::Proposal => {
                // Handle proposal: Send acknowledgment back
            },
            MessageType::Acknowledgment => {
                // Track acknowledgment and check for consensus
            },
            MessageType::Commit => {
                // Commit the proposed state change
            },
            _ => {}
        }
    }
}

Achieving Consensus

After broadcasting a proposal, the node waits for acknowledgments from its peers. If a majority agrees, the node commits the change. The wait_for_acknowledgments function checks for consensus and commits the proposal if achieved.

async fn wait_for_acknowledgments(&self, proposal_id: String) {
    let majority = (self.peers.len() / 2) + 1;

    loop {
        let ack_count = {
            let acks = self.proposal_acknowledgments.lock().await;
            acks.get(&proposal_id).map(|acks| acks.len()).unwrap_or(0)
        };
        if ack_count >= majority {
            // Commit the proposal
            break;
        }
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
}

Simulating Client Interactions

To test the distributed state machine, you can simulate client interactions by programmatically sending proposals to the nodes. This helps in validating the system’s behavior without setting up an external client.

async fn simulate_client_interaction() {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); // Connect to Node 1
    let proposal_message = Message {
        sender_id: 999, // Example sender ID
        message_type: MessageType::Proposal,
        proposed_state: State::Running,
        proposal_id: Uuid::new_v4().to_string(), // Generate a unique proposal ID
    };

    let serialized_message = serde_json::to_vec(&proposal_message).unwrap(); // Serialize the message
    stream.write_all(&serialized_message).await.unwrap(); // Send the message
    println!("Simulated client sent proposal to Node 1");
}

This function connects to a node, constructs a proposal message, serializes it, and sends it over the network. It’s a simple way to trigger node behavior and test the response.

Main Function and Node Initialization

The main function orchestrates the initialization of nodes, starting the listening process, and simulating client interactions.

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(State::Init));
    let proposal_acknowledgments = Arc::new(Mutex::new(HashMap::new()));

    let (tx1, rx1) = mpsc::channel(32);
    let node1 = Arc::new(Node {
        id: 1,
        state: state.clone(),
        peers: HashMap::from([(2, "0.0.0.0:8081".to_string())]),
        address: "0.0.0.0:8080".to_string(),
        tx: tx1,
        proposal_acknowledgments: proposal_acknowledgments.clone(),
    });

    let (tx2, rx2) = mpsc::channel(32);
    let node2 = Arc::new(Node {
        id: 2,
        state: state.clone(),
        peers: HashMap::from([(1, "0.0.0.0:8080".to_string())]),
        address: "0.0.0.0:8081".to_string(),
        tx: tx2,
        proposal_acknowledgments,
    });

    let node1_clone_for_messages = Arc::clone(&node1);
    tokio::spawn(async move {
        node1_clone_for_messages.handle_incoming_messages(rx1).await;
    });

    let node2_clone_for_messages = Arc::clone(&node2);
    tokio::spawn(async move {
        node2_clone_for_messages.handle_incoming_messages(rx2).await;
    });

    // Listen for incoming connections
    let node1_clone_for_listen = Arc::clone(&node1);
    tokio::spawn(async move {
        node1_clone_for_listen.listen().await.expect("Node 1 failed to listen");
    });

    let node2_clone_for_listen = Arc::clone(&node2);
    tokio::spawn(async move {
        node2_clone_for_listen.listen().await.expect("Node 2 failed to listen");
    });

    // Ensure the servers have time to start up
    tokio::time::sleep(Duration::from_secs(1)).await;

    // Use the original `node1` Arc to broadcast a proposal
    node1.broadcast_proposal(State::Running).await;

    // Start the simulation after a short delay to ensure nodes are listening
    tokio::time::sleep(Duration::from_secs(2)).await;
    if let Err(e) = simulate_client().await {
        eprintln!("Failed to simulate client: {:?}", e);
    }
}

In this setup, nodes are initialized with unique IDs, shared state, acknowledgment tracking, and predefined peers. The nodes start listening for incoming messages in asynchronous tasks, allowing the system to react to simulated client interactions.

You can check out the full implementation on my GitHub repo.

Practice what you learned

Reinforce this article with hands-on coding exercises and AI-powered feedback.

View all exercises

Want to practice Rust hands-on?

Go beyond reading — solve interactive exercises with AI-powered code review on Rust Lab.