thread_pool.rs
1use std::{
2 panic::{self, AssertUnwindSafe},
3 thread,
4};
5
6pub struct ThreadPool {
7 workers: Vec<Worker>,
8 sender: Option<crossbeam_channel::Sender<Job>>,
9}
10
11type Job = Box<dyn FnOnce() + Send + 'static>;
12
13impl ThreadPool {
14 pub fn new(size: usize, max_waiting: usize) -> Self {
15 assert!(size > 0);
16
17 let (sender, receiver) = crossbeam_channel::bounded(max_waiting);
18 let mut workers = Vec::with_capacity(size);
19 for id in 0..size {
20 workers.push(Worker::new(id, receiver.clone()));
21 }
22
23 ThreadPool {
24 workers,
25 sender: Some(sender),
26 }
27 }
28
29 pub fn execute<F>(&self, f: F)
30 where
31 F: FnOnce() + Send + 'static,
32 {
33 let job = Box::new(f);
34 if let Some(sender) = &self.sender {
35 sender.send(job).unwrap();
36 }
37 }
38}
39
40impl Drop for ThreadPool {
41 fn drop(&mut self) {
42 drop(self.sender.take()); // close channel
43 for worker in &mut self.workers {
44 if let Some(thread) = worker.thread.take() {
45 thread.join().unwrap();
46 }
47 }
48 }
49}
50
51#[allow(unused)]
52struct Worker {
53 id: usize,
54 thread: Option<thread::JoinHandle<()>>,
55}
56
57impl Worker {
58 fn new(id: usize, receiver: crossbeam_channel::Receiver<Job>) -> Self {
59 let thread = thread::spawn(move || {
60 while let Ok(job) = receiver.recv() {
61 let result = panic::catch_unwind(AssertUnwindSafe(job));
62 if let Err(e) = result {
63 eprintln!("Worker {id} saw a panic: {e:?}");
64 }
65 }
66 });
67
68 Worker {
69 id,
70 thread: Some(thread),
71 }
72 }
73}