diff --git a/src/wrapper/standalone/backend/cpal.rs b/src/wrapper/standalone/backend/cpal.rs index 97135ec6..888ed471 100644 --- a/src/wrapper/standalone/backend/cpal.rs +++ b/src/wrapper/standalone/backend/cpal.rs @@ -1,22 +1,28 @@ -use std::num::NonZeroU32; - use anyhow::{Context, Result}; use cpal::{ traits::*, Device, InputCallbackInfo, OutputCallbackInfo, Sample, SampleFormat, Stream, StreamConfig, }; use crossbeam::sync::{Parker, Unparker}; -use midir::{MidiInput, MidiInputPort, MidiOutput, MidiOutputPort}; +use midir::{ + MidiInput, MidiInputConnection, MidiInputPort, MidiOutput, MidiOutputConnection, MidiOutputPort, +}; use parking_lot::Mutex; use rtrb::RingBuffer; +use std::borrow::Borrow; +use std::num::NonZeroU32; +use std::thread::ScopedJoinHandle; use super::super::config::WrapperConfig; use super::Backend; use crate::audio_setup::{AudioIOLayout, AuxiliaryBuffers}; use crate::buffer::Buffer; use crate::context::process::Transport; -use crate::midi::{MidiConfig, PluginNoteEvent}; +use crate::midi::{MidiConfig, MidiResult, PluginNoteEvent}; use crate::plugin::Plugin; +use crate::prelude::NoteEvent; + +const MIDI_EVENT_QUEUE_CAPACITY: usize = 2048; /// Uses CPAL for audio and midir for MIDI. pub struct CpalMidir { @@ -26,8 +32,8 @@ pub struct CpalMidir { input: Option, output: CpalDevice, - midi_input: Option>, - midi_output: Option>, + midi_input: Mutex>, + midi_output: Mutex>, } /// All data needed for a CPAL input or output stream. @@ -41,8 +47,12 @@ struct CpalDevice { struct MidirInputDevice { pub backend: MidiInput, pub port: MidiInputPort, - // The name can be retrieved from the port, not sure why the connect function needs the name - // again +} + +/// An active `MidirInputDevice`. Transformed back and from this during the `.run()` function. +struct ActiveMidirInputDevice { + pub connection: MidiInputConnection<()>, + pub port: MidiInputPort, } /// All data needed to create a Midir output stream. @@ -51,6 +61,20 @@ struct MidirOutputDevice { pub port: MidiOutputPort, } +/// An active `MidirOutputDevice`. Transformed back and from this during the `.run()` function. +struct ActiveMidirOutputDevice { + pub connection: MidiOutputConnection, + pub port: MidiOutputPort, +} + +/// A task for the MIDI output thread. +enum MidiOutputTask { + /// Send an event as MIDI data. + Send(PluginNoteEvent

), + /// Terminate the thread, stopping it from blocking and allowing it to be joined. + Terminate, +} + impl Backend

for CpalMidir { fn run( &mut self, @@ -64,96 +88,253 @@ impl Backend

for CpalMidir { + 'static + Send, ) { - // The CPAL audio devices may not accept floating point samples, so all of the actual audio + // So this is a lot of fun. There are up to four separate streams here, all using their own + // callbacks. The audio output stream acts as the primary stream, and everything else either + // sends data to it or (in the case of the MIDI output stream) receives data from it using + // channels. + // + // Audio input is read from the input device (if configured), and is send at a period at a + // time to the output stream in an interleaved format. Because of that the audio output + // stream is delayed for one period using a parker to you don't immediately get xruns. CPAL + // audio devices may also not accept floating point samples, so all of the actual audio // handling and buffer management handles in the `build_*_data_callback()` functions defined // below. + // + // MIDI input is parsed in the Midir callback and the events are sent over a callback to the + // output audio thread where the process callback happens. If that process callback outputs + // events then those are sent over another ringbuffer to a thread that handles MIDI output. + // Both MIDI input and MIDI output are disabled by default. + // + // The thread scope is needed to accomodate the midir MIDI output API. Outputting MIDI is + // realtime unsafe, and to be able to output MIDI with midir you need to transform between + // `MidiOutputPort` and `MidiOutputPortConnection` types by taking values out of an + // `Option`. + std::thread::scope(|s| { + let mut _input_stream: Option = None; + let mut input_rb_consumer: Option> = None; + if let Some(input) = &self.input { + // Data is sent to the output data callback using a wait-free ring buffer + let (rb_producer, rb_consumer) = RingBuffer::new( + self.output.config.channels as usize * self.config.period_size as usize, + ); + input_rb_consumer = Some(rb_consumer); - // CPAL does not support duplex streams, so audio input (when enabled, inputs aren't - // connected by default) waits a read a period of data before starting the output stream - let mut _input_stream: Option = None; - let mut input_rb_consumer: Option> = None; - if let Some(input) = &self.input { - // Data is sent to the output data callback using a wait-free ring buffer - let (rb_producer, rb_consumer) = RingBuffer::new( - self.output.config.channels as usize * self.config.period_size as usize, - ); - input_rb_consumer = Some(rb_consumer); + let input_parker = Parker::new(); + let input_unparker = input_parker.unparker().clone(); + let error_cb = { + let input_unparker = input_unparker.clone(); + move |err| { + nih_error!("Error during capture: {err:#}"); + input_unparker.clone().unpark(); + } + }; - let input_parker = Parker::new(); - let input_unparker = input_parker.unparker().clone(); + let stream = match input.sample_format { + SampleFormat::I16 => input.device.build_input_stream( + &input.config, + self.build_input_data_callback::(input_unparker, rb_producer), + error_cb, + ), + SampleFormat::U16 => input.device.build_input_stream( + &input.config, + self.build_input_data_callback::(input_unparker, rb_producer), + error_cb, + ), + SampleFormat::F32 => input.device.build_input_stream( + &input.config, + self.build_input_data_callback::(input_unparker, rb_producer), + error_cb, + ), + } + .expect("Fatal error creating the capture stream"); + stream + .play() + .expect("Fatal error trying to start the capture stream"); + _input_stream = Some(stream); + + // Playback is delayed one period if we're capturing audio so it has something to + // process + input_parker.park() + } + + // The output callback can read input events from this ringbuffer + let mut midi_input_rb_consumer: Option>> = None; + let midi_input_connection: Option = + self.midi_input.lock().take().and_then(|midi_input| { + // Data is sent to the output data callback using a wait-free ring buffer + let (rb_producer, rb_consumer) = RingBuffer::new(MIDI_EVENT_QUEUE_CAPACITY); + midi_input_rb_consumer = Some(rb_consumer); + + let result = midi_input.backend.connect( + &midi_input.port, + "MIDI input", + self.build_midi_input_thread::

(rb_producer), + (), + ); + + match result { + Ok(connection) => Some(ActiveMidirInputDevice { + connection, + port: midi_input.port, + }), + Err(err) => { + // We won't retry once this fails + nih_error!("Could not create the MIDI input connection: {err:#}"); + midi_input_rb_consumer = None; + + None + } + } + }); + + // The output callback can also emit MIDI events. To handle these we'll need to spawn + // our own thread. This can be simplified a lot by using the `MidiOutputConnection` + // directly inside the audio output callback, but looking at the implementation sending + // MIDI events is not realtime safe in most midir backends. + // NOTE: This uses crossbeam channels instead of rtrb specifically for the optional + // blocking API. This lets the MIDI sending thread sleep when there's no work to + // do. + let mut midi_output_rb_producer: Option>> = + None; + let midi_output_connection: Option> = + self.midi_output.lock().take().and_then(|midi_output| { + // This uses crossbeam channels for the reason mentioned above, but to keep + // things cohesive we'll use the same naming scheme as we use for rtrb + let (sender, receiver) = crossbeam::channel::bounded(MIDI_EVENT_QUEUE_CAPACITY); + midi_output_rb_producer = Some(sender); + + let result = midi_output + .backend + .connect(&midi_output.port, "MIDI output"); + + match result { + Ok(mut connection) => Some(s.spawn(move || { + while let Ok(task) = receiver.recv() { + match task { + MidiOutputTask::Send(event) => match event.as_midi() { + Some(MidiResult::Basic(midi_data)) => { + if let Err(err) = connection.send(&midi_data) { + nih_error!("Could not send MIDI event: {err}"); + } + } + Some(MidiResult::SysEx(padded_sysex_buffer, length)) => { + // The SysEx buffer may contain padding + let padded_sysex_buffer = padded_sysex_buffer.borrow(); + nih_debug_assert!(length <= padded_sysex_buffer.len()); + + if let Err(err) = + connection.send(&padded_sysex_buffer[..length]) + { + nih_error!("Could not send MIDI event: {err}"); + } + } + None => (), + }, + MidiOutputTask::Terminate => break, + } + } + + // We'll return the same value from the join handle as what ends up + // being stored in `midi_input_connection` to keep this symmetrical with + // the input handling + ActiveMidirOutputDevice { + connection, + port: midi_output.port, + } + })), + Err(err) => { + nih_error!("Could not create the MIDI output connection: {err:#}"); + midi_output_rb_producer = None; + + None + } + } + }); + + // This thread needs to be blocked until audio processing ends as CPAL processes the + // streams on another thread instead of blocking + let parker = Parker::new(); + let unparker = parker.unparker().clone(); let error_cb = { - let input_unparker = input_unparker.clone(); + let unparker = unparker.clone(); move |err| { - nih_error!("Error during capture: {err:#}"); - input_unparker.clone().unpark(); + nih_error!("Error during playback: {err:#}"); + unparker.clone().unpark(); } }; - let stream = match input.sample_format { - SampleFormat::I16 => input.device.build_input_stream( - &input.config, - self.build_input_data_callback::(input_unparker, rb_producer), + let output_stream = match self.output.sample_format { + SampleFormat::I16 => self.output.device.build_output_stream( + &self.output.config, + self.build_output_data_callback::( + unparker, + input_rb_consumer, + midi_input_rb_consumer, + // This is a MPMC crossbeam channel instead of an rtrb ringbuffer, and we + // also need it to terminate the thread + midi_output_rb_producer.clone(), + cb, + ), error_cb, ), - SampleFormat::U16 => input.device.build_input_stream( - &input.config, - self.build_input_data_callback::(input_unparker, rb_producer), + SampleFormat::U16 => self.output.device.build_output_stream( + &self.output.config, + self.build_output_data_callback::( + unparker, + input_rb_consumer, + midi_input_rb_consumer, + midi_output_rb_producer.clone(), + cb, + ), error_cb, ), - SampleFormat::F32 => input.device.build_input_stream( - &input.config, - self.build_input_data_callback::(input_unparker, rb_producer), + SampleFormat::F32 => self.output.device.build_output_stream( + &self.output.config, + self.build_output_data_callback::( + unparker, + input_rb_consumer, + midi_input_rb_consumer, + midi_output_rb_producer.clone(), + cb, + ), error_cb, ), } - .expect("Fatal error creating the capture stream"); - stream + .expect("Fatal error creating the output stream"); + + // TODO: Wait a period before doing this when also reading the input + output_stream .play() - .expect("Fatal error trying to start the capture stream"); - _input_stream = Some(stream); + .expect("Fatal error trying to start the output stream"); - // Playback is delayed one period if we're capturing audio so it has something to process - input_parker.park() - } + // Wait for the audio thread to exit + parker.park(); - // This thread needs to be blocked until audio processing ends as CPAL processes the streams - // on another thread instead of blocking - let parker = Parker::new(); - let unparker = parker.unparker().clone(); - let error_cb = { - let unparker = unparker.clone(); - move |err| { - nih_error!("Error during playback: {err:#}"); - unparker.clone().unpark(); - } - }; + // The Midir API requires us to take things out of Options and transform between these + // structs + *self.midi_input.lock() = + midi_input_connection.map(|midi_input_connection| MidirInputDevice { + backend: midi_input_connection.connection.close().0, + port: midi_input_connection.port, + }); + *self.midi_output.lock() = + midi_output_connection.map(move |midi_output_connection_handle| { + // The thread needs to be terminated first + midi_output_rb_producer + .expect("Inconsistent internal MIDI output state") + .send(MidiOutputTask::Terminate) + .expect("Could not terminate the MIDI output thread"); - let output_stream = match self.output.sample_format { - SampleFormat::I16 => self.output.device.build_output_stream( - &self.output.config, - self.build_output_data_callback::(unparker, input_rb_consumer, cb), - error_cb, - ), - SampleFormat::U16 => self.output.device.build_output_stream( - &self.output.config, - self.build_output_data_callback::(unparker, input_rb_consumer, cb), - error_cb, - ), - SampleFormat::F32 => self.output.device.build_output_stream( - &self.output.config, - self.build_output_data_callback::(unparker, input_rb_consumer, cb), - error_cb, - ), - } - .expect("Fatal error creating the output stream"); + let midi_output_connection = midi_output_connection_handle + .join() + .expect("MIDI output thread panicked"); - // TODO: Wait a period before doing this when also reading the input - output_stream - .play() - .expect("Fatal error trying to start the output stream"); - - // Wait for the audio thread to exit - parker.park(); + MidirOutputDevice { + backend: midi_output_connection.connection.close(), + port: midi_output_connection.port, + } + }); + }); } } @@ -351,10 +532,10 @@ impl CpalMidir { }; match found_port { - Some(port) => Some(Mutex::new(MidirInputDevice { + Some(port) => Some(MidirInputDevice { backend: midi_backend, port: port.clone(), - })), + }), None => { let mut message = format!( "Unknown input MIDI device '{midi_input_name}'. Available devices are:" @@ -362,7 +543,7 @@ impl CpalMidir { for port in available_ports { match midi_backend.port_name(&port) { Ok(device_name) => message.push_str(&format!("\n{device_name}")), - Err(err) => message.push_str(&format!("\nERROR: {err:?}")), + Err(err) => message.push_str(&format!("\nERROR: {err:#}")), } } @@ -388,10 +569,10 @@ impl CpalMidir { }; match found_port { - Some(port) => Some(Mutex::new(MidirOutputDevice { + Some(port) => Some(MidirOutputDevice { backend: midi_backend, port: port.clone(), - })), + }), None => { let mut message = format!( "Unknown output MIDI device '{midi_output_name}'. Available devices \ @@ -400,7 +581,7 @@ impl CpalMidir { for port in available_ports { match midi_backend.port_name(&port) { Ok(device_name) => message.push_str(&format!("\n{device_name}")), - Err(err) => message.push_str(&format!("\nERROR: {err:?}")), + Err(err) => message.push_str(&format!("\nERROR: {err:#}")), } } @@ -418,8 +599,8 @@ impl CpalMidir { input, output, - midi_input, - midi_output, + midi_input: Mutex::new(midi_input), + midi_output: Mutex::new(midi_output), }) } @@ -443,10 +624,28 @@ impl CpalMidir { } } + fn build_midi_input_thread( + &self, + mut midi_input_rb_producer: rtrb::Producer>, + ) -> impl FnMut(u64, &[u8], &mut ()) + Send + 'static { + // This callback parses the received MIDI bytes and sends them to a ring buffer + move |_timing, midi_data, _data| { + // Since this is system MIDI there's no real useful timing information and we'll set all + // the timings to the first sample in the buffer + if let Ok(event) = NoteEvent::from_midi(0, midi_data) { + if midi_input_rb_producer.push(event).is_err() { + nih_error!("The MIDI input event queue was full, dropping event"); + } + } + } + } + fn build_output_data_callback( &self, unparker: Unparker, mut input_rb_consumer: Option>, + mut input_event_rb_consumer: Option>>, + mut output_event_rb_producer: Option>>, mut cb: impl FnMut( &mut Buffer, &mut AuxiliaryBuffers, @@ -516,9 +715,8 @@ impl CpalMidir { aux_output_buffers.push(aux_buffer); } - // TODO: MIDI input and output - let midi_input_events = Vec::with_capacity(1024); - let mut midi_output_events = Vec::with_capacity(1024); + let mut midi_input_events = Vec::with_capacity(MIDI_EVENT_QUEUE_CAPACITY); + let mut midi_output_events = Vec::with_capacity(MIDI_EVENT_QUEUE_CAPACITY); // Can't borrow from `self` in the callback let config = self.config.clone(); @@ -615,6 +813,13 @@ impl CpalMidir { } } + midi_input_events.clear(); + if let Some(input_event_rb_consumer) = &mut input_event_rb_consumer { + if let Ok(event) = input_event_rb_consumer.pop() { + midi_input_events.push(event); + } + } + // SAFETY: Shortening these borrows is safe as even if the plugin overwrites the // slices (which it cannot do without using unsafe code), then they // would still be reset on the next iteration @@ -647,7 +852,17 @@ impl CpalMidir { *output_sample = T::from(buffer_sample); } - // TODO: Handle MIDI output events + if let Some(output_event_rb_producer) = &mut output_event_rb_producer { + for event in midi_output_events.drain(..) { + if output_event_rb_producer + .try_send(MidiOutputTask::Send(event)) + .is_err() + { + nih_error!("The MIDI output event queue was full, dropping event"); + break; + } + } + } num_processed_samples += buffer.samples() as i64; }