Hey there! If you’ve landed here, you’re probably interested in understanding how distributed systems work, particularly in Rust. Today, we’re going to roll up our sleeves and get our hands dirty building a Peer-to-Peer (P2P) key-value database.
Here’s a rundown of what we’ll be covering:
- UDP Handshake: We’ll start by setting up a method for nodes to discover each other on the network using UDP.
- TCP Communication: UDP is fast, but it doesn’t guarantee message delivery. So, for more reliable communication, we’ll implement TCP connections between nodes.
- Concurrency with Tokio: We’ll use the Tokio runtime to allow our nodes to handle multiple tasks at once. This is especially important for network operations.
- Key-Value Store: At its core, our database will store data in key-value pairs. It’s a straightforward approach that we can always expand later.
- Command Line Interface (CLI): We’ll also build a simple CLI tool. This way, users can interact with our database directly from the terminal to set or get values.
Ready to dive in? Let’s start building!
Step 1: Setup and Dependencies
Start by setting up a new Rust project and adding necessary dependencies:
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
mac_address = "1"
Step 2: Define Communication Messages
Use Rust enums to define various messages that nodes can send to each other.
#[derive(Debug, Serialize, Deserialize)]
enum Message {
Handshake { node_name: String, tcp_addr: SocketAddr },
Greeting,
Heartbeat,
HeartbeatResponse,
SetValue { key: String, value: String },
GetValue { key: String },
ValueResponse { value: Option<String> },
Sync { key: String, value: String },
}
These messages cover:
- Handshaking for new nodes.
- Exchanging greetings.
- Sending and responding to heartbeats.
- Setting, getting, and synchronizing key-value pairs.
Step 3: Design the Key-Value Store
Our database is a simple key-value store. We need concurrent access, hence wrapping the HashMap with RwLock.
struct KeyValueStore {
store: RwLock<HashMap<String, String>>,
}
impl KeyValueStore {
fn new() -> Self {
KeyValueStore {
store: RwLock::new(HashMap::new()),
}
}
async fn set(&self, key: String, value: String) {
let mut store = self.store.write().await;
store.insert(key, value);
}
async fn get(&self, key: &str) -> Option<String> {
let store = self.store.read().await;
store.get(key).cloned()
}
}
The set and get async methods allow us to interact with the database.
Step 4: Tracking Nodes with NodeInfo
To keep track of other nodes in the network, use the NodeInfo struct:
struct NodeInfo {
last_seen: std::time::Instant,
tcp_addr: SocketAddr,
}
It stores the last time we interacted with a node and its TCP address for direct communication.
Step 5: Network Initialization
For the initial handshake and node discovery, we use UDP. Each node announces its presence with its MAC address as a unique identifier:
fn get_mac_address() -> Result<String, MacAddressError> {
let mac = mac_address::get_mac_address()?;
match mac {
Some(address) => Ok(address.to_string()),
None => Err(MacAddressError::InternalError),
}
}
In the main function, initialize the UDP socket and set it to broadcast mode:
let socket = UdpSocket::bind(&local_addr).await?;
socket.set_broadcast(true)?;
Step 6: Announcing Node’s Presence
After obtaining its MAC address, each node broadcasts a Handshake message:
tokio::spawn(async move {
match get_mac_address() {
Ok(node_name) => {
let tcp_addr = format!("{}:{}", "0.0.0.0", TCP_PORT).parse().unwrap();
let msg = Message::Handshake {
node_name: node_name.clone(),
tcp_addr,
};
let serialized_msg = serde_json::to_string(&msg).unwrap();
loop {
println!("Sending UDP broadcast...");
socket_for_broadcast.send_to(serialized_msg.as_bytes(), BROADCAST_ADDR).await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
},
Err(e) => {
eprintln!("Error fetching MAC address: {:?}", e);
}
}
});
Other nodes store this information upon receipt, ensuring that they are aware of each other.

Step 7: Listening for TCP Connections
Nodes communicate directly using TCP. Initialize a TCP listener to handle incoming connections:
let listener = TcpListener::bind(("0.0.0.0", TCP_PORT)).await.unwrap();
For each connection, we spawn a new task to process the messages:
tokio::spawn(handle_tcp_stream(stream, nodes_clone.clone(), kv_store.clone()));
Step 8: Handling TCP Messages
Depending on the received message type:
- Heartbeat: Acknowledge with a
HeartbeatResponse. - SetValue: Store the key-value and synchronize with other nodes.
- GetValue: Retrieve the value from the store and respond.
- Sync: Update the store with the provided key-value pair.
This is managed within:
async fn handle_tcp_stream(mut stream: TcpStream, nodes: Arc<RwLock<HashMap<String, NodeInfo>>>, kv_store: Arc<KeyValueStore> ) {
let mut buf = vec![0u8; 1024];
let len = stream.read(&mut buf).await.unwrap();
let received_msg: Message = serde_json::from_slice(&buf[..len]).unwrap();
match received_msg {
Message::Heartbeat => {
println!("Received Heartbeat");
let response = Message::HeartbeatResponse;
let serialized_response = serde_json::to_string(&response).unwrap();
stream.write_all(serialized_response.as_bytes()).await.unwrap();
},
Message::SetValue { key, value } => {
println!("Received SetValue");
kv_store.set(key.clone(), value.clone()).await;
// Broadcast sync to all nodes
let nodes_guard = nodes.read().await;
for (_, node_info) in nodes_guard.iter() {
let mut stream = match TcpStream::connect(node_info.tcp_addr).await {
Ok(stream) => stream,
Err(_) => continue,
};
let sync_msg = Message::Sync { key: key.clone(), value: value.clone() };
let serialized_msg = serde_json::to_string(&sync_msg).unwrap();
let _ = stream.write_all(serialized_msg.as_bytes()).await;
}
let response = Message::ValueResponse { value: Some("Value set successfully.".to_string()) };
let serialized_response = serde_json::to_string(&response).unwrap();
stream.write_all(serialized_response.as_bytes()).await.unwrap();
},
Message::GetValue { key } => {
println!("Received GetValue");
let value = kv_store.get(&key).await;
let response = Message::ValueResponse { value };
let serialized_response = serde_json::to_string(&response).unwrap();
stream.write_all(serialized_response.as_bytes()).await.unwrap();
},
Message::Sync { key, value } => {
println!("Received Sync");
kv_store.set(key, value).await;
},
_ => {}
}
}
Step 9: Continuous Broadcasting
Our nodes continuously broadcast their presence using a loop:
loop {
socket_for_broadcast.send_to(serialized_msg.as_bytes(), BROADCAST_ADDR).await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
Step 10: Listening for UDP Messages
Finally, each node needs to continuously listen for UDP messages:
let mut buf = vec![0u8; 1024];
loop {
let (len, addr) = socket.recv_from(&mut buf).await?;
println!("Received data on UDP from {}", addr);
let received_msg: Message = serde_json::from_slice(&buf[..len])?;
let local_node_name = get_mac_address()?;
if let Message::Handshake { node_name, tcp_addr } = received_msg {
// Ignore packets from ourselves
if node_name == local_node_name {
continue;
}
println!("Received handshake from: {}", node_name);
{
let mut nodes_guard = nodes.write().await;
nodes_guard.insert(node_name.clone(), NodeInfo { last_seen: std::time::Instant::now(), tcp_addr });
}
let greeting = Message::Greeting;
let serialized_greeting = serde_json::to_string(&greeting).unwrap();
socket.send_to(serialized_greeting.as_bytes(), &addr).await?;
// Start heartbeat for this node
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
println!("Sending heartbeat to {}", tcp_addr);
let mut stream = TcpStream::connect(tcp_addr).await.unwrap();
let heartbeat_msg = Message::Heartbeat;
let serialized_msg = serde_json::to_string(&heartbeat_msg).unwrap();
stream.write_all(serialized_msg.as_bytes()).await.unwrap();
}
});
}
}
Upon receiving a handshake, nodes send back a greeting and initiate a heartbeat mechanism.
Here is the final implementation:
use serde::{Deserialize, Serialize};
use tokio::{net::{TcpListener, TcpStream, UdpSocket}, sync::RwLock};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use mac_address::MacAddressError;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const BROADCAST_ADDR: &str = "255.255.255.255:8888";
const TCP_PORT: u16 = 9000;
#[derive(Debug, Serialize, Deserialize)]
enum Message {
Handshake { node_name: String, tcp_addr: SocketAddr },
Greeting,
Heartbeat,
HeartbeatResponse,
SetValue { key: String, value: String },
GetValue { key: String },
ValueResponse { value: Option<String> },
Sync { key: String, value: String },
}


