In this guide, we’ll walk through the creation of a simplified distributed state machine using Rust. 🦀
Our focus will be on the core concepts needed to set up basic node communication, state proposals, and consensus among nodes.
Keep in mind that this implementation is intended for educational purposes and to provide a foundation for more complex distributed systems.
Let’s go! 🚀
Why Use a Distributed State Machine?
- Distributed Databases (e.g., Apache Cassandra, CockroachDB):
- Distributed databases use state machines to replicate data across multiple nodes, ensuring high availability and fault tolerance. Each write operation is a state transition, and consistency is maintained through consensus protocols.
2. Blockchain and Cryptocurrencies (e.g., Ethereum, Bitcoin):
- Blockchain technology is essentially a distributed state machine, where each block represents a state transition based on transactions. Ethereum, for example, not only tracks the state of digital currency but also the state of smart contracts, making it a global, decentralized computing platform.
3. Consensus Protocols (e.g., Raft, Paxos):
- These protocols are foundational to implementing distributed state machines, ensuring all nodes in a distributed system agree on a single source of truth. They are used in various systems, from databases to distributed filesystems, to maintain consistency.
4. Distributed File Systems (e.g., IPFS, HDFS):
- Distributed file systems manage data across multiple servers. They use state machines to track the location and status of each file fragment, ensuring data is accessible even if parts of the system fail.
5. Distributed Configuration Management (e.g., etcd, ZooKeeper):
- These systems provide a reliable way to store and retrieve configuration settings for distributed systems. They rely on distributed state machines to keep configuration data consistent across a cluster of machines.
6. Distributed Ledgers (e.g., Hyperledger Fabric):
- Used in enterprise blockchain solutions, distributed ledgers use state machines to ensure that all participants have a consistent view of the ledger. This is crucial for applications like supply chain tracking, where multiple parties need a reliable and shared source of truth.
7. Real-time Collaboration Tools (e.g., Google Docs):
- These applications allow multiple users to edit a document simultaneously. Behind the scenes, a distributed state machine ensures that all changes are consistently applied, so every user sees the same version of the document.
Hands-on Implementation
Our distributed state machine consists of nodes that can propose state changes, broadcast these proposals to peers, and reach consensus based on received acknowledgments. Each node listens for incoming messages and responds based on predefined rules.
Setting Up the Node Structure
First, we define the Node struct, which represents a node in our distributed system. It includes an ID, the current state, a list of peer nodes with their addresses, a channel for sending messages, and a structure to track proposal acknowledgments.
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use tokio::time::Duration;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
enum State {
Init,
Running,
Stopped,
}
#[derive(Serialize, Deserialize, Debug)]
enum MessageType {
Proposal,
Acknowledgment,
Commit,
}
#[derive(Serialize, Deserialize, Debug)]
struct Message {
sender_id: u64,
message_type: MessageType,
proposed_state: State,
proposal_id: String,
}
struct Node {
id: u64,
state: Arc<Mutex<State>>,
peers: HashMap<u64, String>,
address: String,
tx: mpsc::Sender<Message>,
proposal_acknowledgments: Arc<Mutex<HashMap<String, HashSet<u64>>>>,
}
Sending Messages
Nodes communicate by sending serialized Message objects over TCP connections. The send_message function handles connecting to a peer and transmitting a message.
impl Node {
async fn send_message(&self, message: &Message, receiver_address: &str) -> io::Result<()> {
let mut stream = TcpStream::connect(receiver_address).await?;
let serialized_message = serde_json::to_vec(message)?;
stream.write_all(&serialized_message).await
}
}
Broadcasting Proposals
When a node wants to propose a state change, it broadcasts a proposal message to all peers. The broadcast_proposal function serializes the proposal and uses send_message to distribute it.
async fn broadcast_proposal(&self, proposed_state: State) {
let proposal_id = Uuid::new_v4().to_string();
let message = Message {
sender_id: self.id,
message_type: MessageType::Proposal,
proposed_state,
proposal_id: proposal_id.clone(),
};
let mut proposal_acknowledgments = self.proposal_acknowledgments.lock().await;
proposal_acknowledgments.insert(proposal_id.clone(), HashSet::new());
for address in self.peers.values() {
if let Err(e) = self.send_message(&message, address).await {
eprintln!("Failed to send message to {}: {:?}", address, e);
}
}
self.wait_for_acknowledgments(proposal_id).await;
}
Listening for Incoming Messages
Each node listens on a TCP socket for incoming connections. The listen function accepts connections and spawns tasks to handle them, reading messages and forwarding them to the message handling logic.
async fn listen(&self) -> io::Result<()> {
let listener = TcpListener::bind(&self.address).await?;
println!("Node {} listening on {}", self.id, self.address);
loop {
let (mut socket, _) = listener.accept().await?;
let tx = self.tx.clone();
tokio::spawn(async move {
let mut buf = [0u8; 1024];
loop {
match socket.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
if let Ok(message) = serde_json::from_slice::<Message>(&buf[..n]) {
tx.send(message).await.expect("Failed to send message to channel");
}
}
Err(e) => break,
}
}
});
}
}
Handling Incoming Messages
Nodes react to incoming messages based on their type (proposal, acknowledgment, commit). The handle_incoming_messages function processes messages received through the channel, updating the state machine accordingly.
async fn handle_incoming_messages(&self, mut rx: mpsc::Receiver<Message>) {
while let Some(message) = rx.recv().await {
match message.message_type {
MessageType::Proposal => {
},
MessageType::Acknowledgment => {
},
MessageType::Commit => {
},
_ => {}
}
}
}
Achieving Consensus
After broadcasting a proposal, the node waits for acknowledgments from its peers. If a majority agrees, the node commits the change. The wait_for_acknowledgments function checks for consensus and commits the proposal if achieved.
async fn wait_for_acknowledgments(&self, proposal_id: String) {
let majority = (self.peers.len() / 2) + 1;
loop {
let ack_count = {
let acks = self.proposal_acknowledgments.lock().await;
acks.get(&proposal_id).map(|acks| acks.len()).unwrap_or(0)
};
if ack_count >= majority {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Simulating Client Interactions
To test the distributed state machine, you can simulate client interactions by programmatically sending proposals to the nodes. This helps in validating the system’s behavior without setting up an external client.
async fn simulate_client_interaction() {
let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
let proposal_message = Message {
sender_id: 999,
message_type: MessageType::Proposal,
proposed_state: State::Running,
proposal_id: Uuid::new_v4().to_string(),
};
let serialized_message = serde_json::to_vec(&proposal_message).unwrap();
stream.write_all(&serialized_message).await.unwrap();
println!("Simulated client sent proposal to Node 1");
}
This function connects to a node, constructs a proposal message, serializes it, and sends it over the network. It’s a simple way to trigger node behavior and test the response.
Main Function and Node Initialization
The main function orchestrates the initialization of nodes, starting the listening process, and simulating client interactions.
#[tokio::main]
async fn main() {
let state = Arc::new(Mutex::new(State::Init));
let proposal_acknowledgments = Arc::new(Mutex::new(HashMap::new()));
let (tx1, rx1) = mpsc::channel(32);
let node1 = Arc::new(Node {
id: 1,
state: state.clone(),
peers: HashMap::from([(2, "0.0.0.0:8081".to_string())]),
address: "0.0.0.0:8080".to_string(),
tx: tx1,
proposal_acknowledgments: proposal_acknowledgments.clone(),
});
let (tx2, rx2) = mpsc::channel(32);
let node2 = Arc::new(Node {
id: 2,
state: state.clone(),
peers: HashMap::from([(1, "0.0.0.0:8080".to_string())]),
address: "0.0.0.0:8081".to_string(),
tx: tx2,
proposal_acknowledgments,
});
let node1_clone_for_messages = Arc::clone(&node1);
tokio::spawn(async move {
node1_clone_for_messages.handle_incoming_messages(rx1).await;
});
let node2_clone_for_messages = Arc::clone(&node2);
tokio::spawn(async move {
node2_clone_for_messages.handle_incoming_messages(rx2).await;
});
let node1_clone_for_listen = Arc::clone(&node1);
tokio::spawn(async move {
node1_clone_for_listen.listen().await.expect("Node 1 failed to listen");
});
let node2_clone_for_listen = Arc::clone(&node2);
tokio::spawn(async move {
node2_clone_for_listen.listen().await.expect("Node 2 failed to listen");
});
tokio::time::sleep(Duration::from_secs(1)).await;
node1.broadcast_proposal(State::Running).await;
tokio::time::sleep(Duration::from_secs(2)).await;
if let Err(e) = simulate_client().await {
eprintln!("Failed to simulate client: {:?}", e);
}
}
In this setup, nodes are initialized with unique IDs, shared state, acknowledgment tracking, and predefined peers. The nodes start listening for incoming messages in asynchronous tasks, allowing the system to react to simulated client interactions.
You can check out the full implementation on my GitHub repo.
Master Concurrency & Async hands-on
Go beyond reading — solve interactive exercises with AI-powered code review, track your progress, and get a Skill Radar assessment.