All articles
Rust Concurrency in Practice
Concurrency & Async

Rust Concurrency in Practice

Concurrency in Rust is based on ownership, types, and borrowing principles, which help manage memory safety without a garbage collector.

By Luis SoaresJanuary 30, 2024Original on Medium

Concurrency in Rust is based on ownership, types, and borrowing principles, which help manage memory safety without a garbage collector.

Rust achieves thread safety by ensuring that shared data is either immutable or only accessible from one thread at a time.

Key Concepts:

  • Threads: Rust uses threads to run multiple parts of your code simultaneously. The std::thread module allows you to create new threads.
  • Message Passing: Rust’s concurrency model follows the “Do not communicate by sharing memory; instead, share memory by communicating” philosophy. Channels in Rust, provided by the std::sync::mpsc module, facilitate this.
  • Shared State Concurrency: Rust allows for shared-state concurrency using Mutex and Arc from the std::sync module to safely share and mutate data across threads.
  • Sync and Send Traits: These marker traits ensure that only safe types are shared between threads. Send allows a type to be transferred across thread boundaries, while Sync allows a type to be safely shared through references.

Let’s now put it into practice with exercises to help you grasp or refresh these key concepts! 🦀

Exercises

Exercise 1: Spawning Threads

Create a simple program that spawns multiple threads and prints “Hello from thread!” from each thread.

use std::thread;
use std::time::Duration;

fn main() {
    for i in 0..5 {
        thread::spawn(move || {
            println!("Hello from thread {}", i);
            thread::sleep(Duration::from_millis(100));
        });
    }
    thread::sleep(Duration::from_secs(1)); // Wait for all threads to complete
}

Exercise 2: Message Passing

Implement a program where you spawn two threads: one for sending a message using a channel, and another for receiving and printing that message.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let msg = String::from("Hello from the sender!");
        tx.send(msg).unwrap();
    });
    let received = rx.recv().unwrap();
    println!("Received: {}", received);
}

Exercise 3: Shared State with Mutex

Create a program that uses a Mutex to safely increment a counter from multiple threads.

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}

Exercise 4: Building a Concurrent Web Server

This is a more advanced exercise. Implement a basic concurrent web server using TcpListener and threads. Accept connections, read the request, and respond with a fixed message.

use std::io::prelude::*;
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::time::Duration;

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();
    let response = "HTTP/1.1 200 OK\r\n\r\nHello from Rust server!";
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

Advanced Exercise: Implementing a Thread Pool

An effective way to manage resources in a concurrent program is by using a thread pool. A thread pool creates a fixed number of threads at program start-up and reuses these threads to execute tasks. This exercise involves implementing a simple thread pool to handle tasks in a concurrent web server.

Step 1: Define the ThreadPool Structure

Start by defining the structure of your thread pool and the initial function to create a new thread pool with a specified number of threads.

struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let mut workers = Vec::with_capacity(size);
        for _ in 0..size {
            // Create and store workers
        }
        ThreadPool { workers }
    }
}

Step 2: Implement the Worker

Each worker will be responsible for executing tasks. A worker holds a thread and listens for tasks sent through a channel.

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {
            // Placeholder for the task execution logic
        });
        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Step 3: Sending Tasks to Threads

Modify the ThreadPool to hold a sending end of a channel. Workers will listen on the receiving end for tasks to execute. Tasks can be represented as closures.

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

Practice what you learned

Reinforce this article with hands-on coding exercises and AI-powered feedback.

View all exercises

type Job = Box<dyn FnOnce() + Send + 'static>; struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } // Modify the ThreadPool::new function to include a channel


Each worker’s thread should retrieve tasks from the channel and execute them.

#### Step 4: Execute Tasks in the ThreadPool

Implement a method on ThreadPool to send tasks to the worker threads for execution.

```rust
impl ThreadPool {
    // ...

    fn execute<F>(&self, f: F)
    where
      F: FnOnce() + Send + 'static,
      {
          let job = Box::new(f);
          self.sender.send(job).unwrap();
      }
}

Step 5: Graceful Shutdown

Implement logic for a graceful shutdown of the thread pool. Ensure that all workers finish their current tasks before the program exits.

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

Integrating the ThreadPool into the Web Server

Replace the simple thread spawn logic in your web server with the ThreadPool. This will limit the number of concurrent threads and reuse them for incoming requests, which is more efficient than spawning a new thread per request.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4); // Adjust the number of threads in the pool as needed

    for stream in listener.incoming() {
        let stream = stream.unwrap();
        pool.execute(|| {
            handle_connection(stream);
        });
    }
    println!("Shutting down.");
}

Going Further: Async/Await in Rust

As you become more comfortable with threads, message passing, and shared state concurrency, it’s important to explore Rust’s asynchronous programming capabilities. The async/await syntax in Rust provides a powerful model for writing concurrent code that is both efficient and easy to read.

Exercise: Asynchronous File Reading

In this exercise, you’ll write a program that reads the contents of a file asynchronously. This will demonstrate how to perform I/O-bound tasks without blocking the execution of your program.

First, include the necessary crates in your Cargo.toml:

[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"

Next, write the async function to read a file:

use tokio::fs::File;
use tokio::io::AsyncReadExt;

async fn read_file_async(path: &str) -> Result<String, std::io::Error> {
    let mut file = File::open(path).await?;
    let mut contents = String::new();
    file.read_to_string(&mut contents).await?;
    Ok(contents)
}

To execute the async function, use the Tokio runtime:

#[tokio::main]
async fn main() {
    let path = "data.txt";
    match read_file_async(path).await {
        Ok(contents) => println!("File contents: {}", contents),
        Err(e) => println!("Error reading the file: {:?}", e),
    }
}

Exercise: Concurrent Web Requests

In this exercise, you’ll make multiple web requests concurrently and wait for all of them to complete. This is a common scenario in web servers and clients that need to aggregate data from multiple sources.

First, add the reqwest and tokio crates to your Cargo.toml:

[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }

Then, write a function to perform a web request asynchronously:

use reqwest::Error;

async fn fetch_url(url: &str) -> Result<String, Error> {
    let response = reqwest::get(url).await?;
    let body = response.text().await?;
    Ok(body)
}

Execute multiple requests concurrently and wait for all to complete:

#[tokio::main]
async fn main() {
    let urls = ["https://example.com", "https://api.example.com", "https://blog.example.com"];

    let fetches = futures::future::join_all(urls.iter().map(|&url| fetch_url(url)));
    match fetches.await {
        Ok(responses) => {
            for response in responses {
                match response {
                    Ok(content) => println!("Fetched content: {}", content),
                    Err(e) => println!("Failed to fetch: {:?}", e),
                }
            }
        },
        Err(_) => println!("Error in executing requests"),
    }
}

Practice what you learned

Reinforce this article with hands-on coding exercises and AI-powered feedback.

View all exercises

Want to practice Rust hands-on?

Go beyond reading — solve interactive exercises with AI-powered code review on Rust Lab.