From f170b727068a7e03ef2aee6e7b5930ddbf8d85c7 Mon Sep 17 00:00:00 2001 From: Adrien Prokopowicz Date: Mon, 22 May 2023 22:14:33 +0200 Subject: [PATCH] Fix background thread spawning and joining --- src/event_loop/background_thread.rs | 132 ++++++++++++---------------- 1 file changed, 56 insertions(+), 76 deletions(-) diff --git a/src/event_loop/background_thread.rs b/src/event_loop/background_thread.rs index b41d34ac..3e437f8a 100644 --- a/src/event_loop/background_thread.rs +++ b/src/event_loop/background_thread.rs @@ -3,9 +3,9 @@ //! //! This is essentially a slimmed down version of the `LinuxEventLoop`. +use anymap::Entry; use crossbeam::channel; use parking_lot::Mutex; -use std::sync::atomic::{AtomicIsize, Ordering}; use std::sync::{Arc, Weak}; use std::thread::{self, JoinHandle}; @@ -23,18 +23,15 @@ pub(crate) struct BackgroundThread { /// A thread that act as our worker thread. When [`schedule()`][Self::schedule()] is called, /// this thread will be woken up to execute the task on the executor. When the last worker /// thread handle gets dropped the thread is shut down. - worker_thread: WorkerThreadHandle, + worker_thread: Arc>, } /// A handle for the singleton worker thread. This lets multiple instances of the same plugin share /// a worker thread, and when the last instance gets dropped the worker thread gets terminated. -struct WorkerThreadHandle { - pub(self) tasks_sender: channel::Sender>, - /// The thread's reference count. Shared between all handles to the same thread. This is - /// decrased by one when the struct is dropped. - reference_count: Arc, - /// The thread's join handle. Joined when the reference count reaches 0. - join_handle: Arc>>>, +struct WorkerThread { + tasks_sender: channel::Sender>, + /// The thread's join handle. Joined when the WorkerThread is dropped. + join_handle: Option>, } /// A message for communicating with the worker thread. @@ -77,84 +74,67 @@ where // workaround to have a singleton that also works if for whatever reason there arem ultiple `T` // and `E`s in a single process (won't happen with normal plugin usage, but sho knwos). lazy_static::lazy_static! { - static ref HANDLE_MAP: Mutex> = + static ref HANDLE_MAP: Mutex> = Mutex::new(anymap::Map::new()); } -impl Clone for WorkerThreadHandle { - fn clone(&self) -> Self { - self.reference_count.fetch_add(1, Ordering::SeqCst); - - Self { - tasks_sender: self.tasks_sender.clone(), - reference_count: self.reference_count.clone(), - join_handle: self.join_handle.clone(), - } - } -} - -impl Drop for WorkerThreadHandle { - fn drop(&mut self) { - // If the host for whatever reason instantiates and destroys a plugin at the same time from - // different threads, we need to make sure this doesn't do anything weird. - let _handle_map = HANDLE_MAP.lock(); - - // The thread is shut down and joined when the last handle is dropped - if self.reference_count.fetch_sub(1, Ordering::SeqCst) == 1 { - self.tasks_sender - .send(Message::Shutdown) - .expect("Failed while sending worker thread shutdown request"); - let join_handle = self - .join_handle - .lock() - .take() - .expect("The thread has already been joined"); - join_handle.join().expect("Worker thread panicked"); - } - } -} - -/// Either acquire a handle for an existing worker thread or create one if it does not yet exists. -/// This allows multiple plugin instances to share a worker thread. Reference counting happens -/// automatically as part of this function and `WorkerThreadHandle`'s lifecycle. -fn get_or_create_worker_thread() -> WorkerThreadHandle -where - T: Send + 'static, - E: MainThreadExecutor + 'static, -{ - // The map entry contains both the thread's reference count - // NOTE: This uses `AtomicIsize` for a reason. The `HANDLE_MAP` also holds a reference to this - // thread handle, and its `Drop` implementation will also fire if the - // `Option>` is ever overwritten. This will cause the reference - // count to become -1 which is fine. - let mut handle_map = HANDLE_MAP.lock(); - let (reference_count, worker_thread_handle) = handle_map - .entry::<(Arc, Option>)>() - .or_insert_with(|| (Arc::new(AtomicIsize::new(0)), None)); - - // When this is the first reference to the worker thread, the thread is (re)initialized - if reference_count.fetch_add(1, Ordering::SeqCst) <= 0 { +impl + 'static> WorkerThread { + fn spawn() -> Self { let (tasks_sender, tasks_receiver) = channel::bounded(super::TASK_QUEUE_CAPACITY); let join_handle = thread::Builder::new() .name(String::from("bg-worker")) .spawn(move || worker_thread(tasks_receiver)) .expect("Could not spawn background worker thread"); - // This needs special handling if `worker_thread_handle` was already a `Some` value because - // the `Drop` will decrease the reference count when it gets overwritten. There may be a - // better alternative to this. - if worker_thread_handle.is_some() { - reference_count.fetch_add(1, Ordering::SeqCst); - } - - *worker_thread_handle = Some(WorkerThreadHandle { + Self { + join_handle: Some(join_handle), tasks_sender, - reference_count: reference_count.clone(), - join_handle: Arc::new(Mutex::new(Some(join_handle))), - }); + } } +} - worker_thread_handle.clone().unwrap() +impl Drop for WorkerThread { + fn drop(&mut self) { + // The thread is shut down and joined when the handle is dropped + self.tasks_sender + .send(Message::Shutdown) + .expect("Failed while sending worker thread shutdown request"); + self.join_handle + .take() + // Only possible if the WorkerThread got dropped twice, somehow? + .expect("Missing Worker thread JoinHandle") + .join() + .expect("Worker thread panicked"); + } +} + +/// Either acquire a handle for an existing worker thread or create one if it does not yet exists. +/// This allows multiple plugin instances to share a worker thread. Reference counting happens +/// automatically as part of this function and `WorkerThreadHandle`'s lifecycle. +fn get_or_create_worker_thread() -> Arc> +where + T: Send + 'static, + E: MainThreadExecutor + 'static, +{ + let mut handle_map = HANDLE_MAP.lock(); + + match handle_map.entry::>>() { + Entry::Occupied(mut entry) => { + let weak = entry.get_mut(); + if let Some(arc) = weak.upgrade() { + arc + } else { + let arc = Arc::new(WorkerThread::spawn()); + *weak = Arc::downgrade(&arc); + arc + } + } + Entry::Vacant(entry) => { + let arc = Arc::new(WorkerThread::spawn()); + entry.insert(Arc::downgrade(&arc)); + arc + } + } } /// The worker thread used in [`EventLoop`] that executes incoming tasks on the event loop's @@ -162,7 +142,7 @@ where fn worker_thread(tasks_receiver: channel::Receiver>) where T: Send, - E: MainThreadExecutor, + E: MainThreadExecutor + 'static, { loop { match tasks_receiver.recv() {