Skip to main content

freya_video/
client.rs

1use std::{
2    io::{
3        BufReader,
4        Read as _,
5        Write as _,
6    },
7    path::{
8        Path,
9        PathBuf,
10    },
11    process::{
12        ChildStdin,
13        ChildStdout,
14    },
15    rc::Rc,
16    sync::{
17        Arc,
18        atomic::{
19            AtomicBool,
20            AtomicU32,
21            Ordering,
22        },
23    },
24    time::{
25        Duration,
26        Instant,
27    },
28};
29
30use async_io::Timer;
31use ffmpeg_sidecar::{
32    child::FfmpegChild,
33    command::FfmpegCommand,
34    event::{
35        FfmpegEvent,
36        OutputVideoFrame,
37    },
38};
39use freya_core::{
40    elements::image::ImageHandle,
41    notify::ArcNotify,
42    prelude::{
43        Bytes,
44        OwnedTaskHandle,
45        provide_root_context,
46        spawn,
47        try_consume_root_context,
48    },
49};
50use freya_engine::prelude::AlphaType;
51use rodio::cpal::traits::{
52    DeviceTrait,
53    HostTrait,
54};
55
56/// Source of a video to decode.
57#[derive(PartialEq, Eq, Clone, Debug, Hash)]
58pub struct VideoSource(pub PathBuf);
59
60impl From<PathBuf> for VideoSource {
61    fn from(path: PathBuf) -> Self {
62        Self(path)
63    }
64}
65
66impl From<&Path> for VideoSource {
67    fn from(path: &Path) -> Self {
68        Self(path.to_path_buf())
69    }
70}
71
72impl From<&str> for VideoSource {
73    fn from(path: &str) -> Self {
74        Self(PathBuf::from(path))
75    }
76}
77
78impl From<String> for VideoSource {
79    fn from(path: String) -> Self {
80        Self(PathBuf::from(path))
81    }
82}
83
84impl VideoSource {
85    /// Base ffmpeg command for this source with an optional `-ss` seek.
86    fn ffmpeg_command(&self, start_offset: Duration) -> FfmpegCommand {
87        let mut cmd = FfmpegCommand::new();
88        let start_secs = start_offset.as_secs_f32();
89        if start_secs > 0.0 {
90            cmd.args(["-ss", &start_secs.to_string()]);
91        }
92        cmd.input(self.0.to_string_lossy().as_ref());
93        cmd
94    }
95}
96
97/// Max decoded frames buffered ahead of the pacing loop.
98const FRAME_BUFFER: usize = 2;
99
100/// Max outgoing events buffered before the pacing loop blocks.
101const EVENTS_BUFFER: usize = 2;
102
103/// Audio format used when the output device's default config can't be queried.
104const FALLBACK_AUDIO_CONFIG: (u32, u16) = (48_000, 2);
105
106/// Event emitted by a [`VideoClient`].
107#[derive(Clone)]
108pub enum VideoEvent {
109    Duration(Duration),
110    Frame {
111        frame: ImageHandle,
112        position: Duration,
113    },
114    Ended,
115    Errored,
116}
117
118/// Decoding pipeline for one video. Drop to stop.
119pub struct VideoClient {
120    events: async_channel::Receiver<VideoEvent>,
121    paused: Arc<AtomicBool>,
122    resumed: ArcNotify,
123    volume: Arc<AtomicU32>,
124    _task: OwnedTaskHandle,
125}
126
127impl VideoClient {
128    /// Start decoding `source` at `start_offset`, optionally paused, at `volume`.
129    pub fn new(
130        source: VideoSource,
131        start_offset: Duration,
132        start_paused: bool,
133        volume: f32,
134    ) -> Self {
135        let (sender, receiver) = async_channel::bounded(EVENTS_BUFFER);
136        let paused = Arc::new(AtomicBool::new(start_paused));
137        let resumed = ArcNotify::new();
138        let volume = Arc::new(AtomicU32::new(volume.to_bits()));
139        let task = spawn(Self::run(
140            source,
141            start_offset,
142            paused.clone(),
143            resumed.clone(),
144            volume.clone(),
145            sender,
146        ))
147        .owned();
148        Self {
149            events: receiver,
150            paused,
151            resumed,
152            volume,
153            _task: task,
154        }
155    }
156
157    /// Stream of decoded frames and lifecycle events.
158    pub fn events(&self) -> &async_channel::Receiver<VideoEvent> {
159        &self.events
160    }
161
162    /// Pause playback.
163    pub fn pause(&self) {
164        self.paused.store(true, Ordering::Relaxed);
165    }
166
167    /// Resume playback, waking the pacing loop if it is waiting.
168    pub fn play(&self) {
169        self.paused.store(false, Ordering::Relaxed);
170        self.resumed.notify();
171    }
172
173    /// Set the audio volume, where `1.0` is the original level.
174    pub fn set_volume(&self, volume: f32) {
175        self.volume.store(volume.to_bits(), Ordering::Relaxed);
176    }
177
178    /// Decode `source` and emit pacing-corrected frames into `events`.
179    async fn run(
180        source: VideoSource,
181        start_offset: Duration,
182        paused: Arc<AtomicBool>,
183        resumed: ArcNotify,
184        volume: Arc<AtomicU32>,
185        events: async_channel::Sender<VideoEvent>,
186    ) {
187        let mut cmd = source.ffmpeg_command(start_offset);
188        cmd.format("rawvideo").pix_fmt("rgba").pipe_stdout();
189        let mut child = match cmd.spawn() {
190            Ok(child) => child,
191            Err(err) => {
192                tracing::error!("Failed to spawn ffmpeg: {err}");
193                let _ = events.send(VideoEvent::Errored).await;
194                return;
195            }
196        };
197        let _quitter = child.take_stdin().map(FfmpegQuitter);
198
199        let audio = AudioPlayback::start(
200            &source,
201            start_offset,
202            f32::from_bits(volume.load(Ordering::Relaxed)),
203        );
204        if paused.load(Ordering::Relaxed)
205            && let Some(audio) = audio.as_ref()
206        {
207            audio.sink.pause();
208        }
209
210        let (sender, receiver) = async_channel::bounded::<DecoderEvent>(FRAME_BUFFER);
211        let decoder = blocking::unblock(move || Self::run_decoder(child, sender));
212
213        let mut wall_start: Option<Instant> = None;
214        let mut paused_for = Duration::ZERO;
215
216        while let Ok(event) = receiver.recv().await {
217            let frame = match event {
218                DecoderEvent::Duration(duration) => {
219                    let _ = events.send(VideoEvent::Duration(duration)).await;
220                    continue;
221                }
222                DecoderEvent::Frame(frame) => frame,
223            };
224
225            // Show the first frame even when paused, so a seek reveals a preview.
226            if wall_start.is_some() {
227                paused_for += Self::wait_for_resume(&paused, &resumed, audio.as_ref()).await;
228            }
229
230            if let Some(audio) = audio.as_ref() {
231                audio
232                    .sink
233                    .set_volume(f32::from_bits(volume.load(Ordering::Relaxed)));
234            }
235
236            let wall_start = *wall_start.get_or_insert_with(Instant::now);
237            let frame_offset = Duration::from_secs_f32(frame.timestamp.max(0.0));
238            let elapsed = wall_start.elapsed().saturating_sub(paused_for);
239            if elapsed < frame_offset {
240                Timer::after(frame_offset - elapsed).await;
241            }
242
243            let Some(frame) = Self::decode_frame(frame) else {
244                tracing::warn!("Dropping frame: failed to decode raw RGBA into a Skia image");
245                continue;
246            };
247            if events
248                .send(VideoEvent::Frame {
249                    frame,
250                    position: start_offset + frame_offset,
251                })
252                .await
253                .is_err()
254            {
255                tracing::warn!("Video event consumer dropped, stopping pacing loop");
256                break;
257            }
258        }
259
260        match decoder.await {
261            Ok(()) => {
262                let _ = events.send(VideoEvent::Ended).await;
263            }
264            Err(err) => {
265                tracing::error!("Video decoder failed: {err}");
266                let _ = events.send(VideoEvent::Errored).await;
267            }
268        }
269    }
270
271    /// Wrap a raw RGBA frame as a Skia raster image.
272    fn decode_frame(frame: OutputVideoFrame) -> Option<ImageHandle> {
273        ImageHandle::from_rgba(
274            frame.width,
275            frame.height,
276            Bytes::from(frame.data),
277            AlphaType::Unpremul,
278        )
279    }
280
281    /// If paused, suspend audio and await a resume notification. Returns the paused-for delta.
282    async fn wait_for_resume(
283        paused: &AtomicBool,
284        resumed: &ArcNotify,
285        audio: Option<&AudioPlayback>,
286    ) -> Duration {
287        if !paused.load(Ordering::Relaxed) {
288            return Duration::ZERO;
289        }
290        if let Some(audio) = audio {
291            audio.sink.pause();
292        }
293        let pause_start = Instant::now();
294        while paused.load(Ordering::Relaxed) {
295            resumed.notified().await;
296        }
297        if let Some(audio) = audio {
298            audio.sink.play();
299        }
300        pause_start.elapsed()
301    }
302
303    fn run_decoder(
304        mut child: FfmpegChild,
305        sender: async_channel::Sender<DecoderEvent>,
306    ) -> anyhow::Result<()> {
307        for event in child.iter()? {
308            let item = match event {
309                FfmpegEvent::ParsedDuration(d) if d.duration.is_finite() && d.duration >= 0.0 => {
310                    DecoderEvent::Duration(Duration::from_secs_f64(d.duration))
311                }
312                FfmpegEvent::OutputFrame(frame) => DecoderEvent::Frame(frame),
313                _ => continue,
314            };
315            if sender.send_blocking(item).is_err() {
316                tracing::warn!("Decoder consumer dropped, stopping ffmpeg ingest");
317                break;
318            }
319        }
320
321        // Always reap the child to avoid a zombie process (ffmpeg-sidecar#72).
322        let _ = child.kill();
323        let _ = child.wait();
324
325        Ok(())
326    }
327}
328
329enum DecoderEvent {
330    Duration(Duration),
331    Frame(OutputVideoFrame),
332}
333
334/// Asks ffmpeg to exit gracefully when dropped.
335struct FfmpegQuitter(ChildStdin);
336
337impl Drop for FfmpegQuitter {
338    fn drop(&mut self) {
339        let _ = self.0.write_all(b"q\n");
340        let _ = self.0.flush();
341    }
342}
343
344/// PCM audio samples streamed from an ffmpeg process.
345struct PcmSource {
346    reader: BufReader<ChildStdout>,
347    sample_rate: u32,
348    channels: u16,
349}
350
351impl Iterator for PcmSource {
352    type Item = i16;
353
354    fn next(&mut self) -> Option<i16> {
355        let mut buf = [0u8; 2];
356        self.reader.read_exact(&mut buf).ok()?;
357        Some(i16::from_le_bytes(buf))
358    }
359}
360
361impl rodio::Source for PcmSource {
362    fn current_frame_len(&self) -> Option<usize> {
363        None
364    }
365    fn channels(&self) -> u16 {
366        self.channels
367    }
368    fn sample_rate(&self) -> u32 {
369        self.sample_rate
370    }
371    fn total_duration(&self) -> Option<Duration> {
372        None
373    }
374}
375
376/// Audio side of a running playback.
377struct AudioPlayback {
378    _quitter: Option<FfmpegQuitter>,
379    sink: rodio::Sink,
380    _child: FfmpegChild,
381}
382
383impl AudioPlayback {
384    /// Start an audio-only ffmpeg pipeline feeding into rodio at `volume`.
385    fn start(source: &VideoSource, start_offset: Duration, volume: f32) -> Option<Self> {
386        let handle = Self::handle()?;
387        let (sample_rate, channels) = Self::output_config();
388        let mut cmd = source.ffmpeg_command(start_offset);
389        cmd.args([
390            "-vn",
391            "-f",
392            "s16le",
393            "-ar",
394            &sample_rate.to_string(),
395            "-ac",
396            &channels.to_string(),
397        ])
398        .pipe_stdout();
399        let mut child = cmd
400            .spawn()
401            .map_err(|err| tracing::warn!("Failed to spawn audio ffmpeg: {err}"))
402            .ok()?;
403        let stdout = child.take_stdout()?;
404        let quitter = child.take_stdin().map(FfmpegQuitter);
405        let sink = rodio::Sink::try_new(&handle)
406            .map_err(|err| tracing::warn!("Failed to create audio sink: {err}"))
407            .ok()?;
408        sink.set_volume(volume);
409        sink.append(PcmSource {
410            reader: BufReader::new(stdout),
411            sample_rate,
412            channels,
413        });
414        Some(Self {
415            _quitter: quitter,
416            sink,
417            _child: child,
418        })
419    }
420
421    /// Shared audio output handle, created once and cached in the root context.
422    fn handle() -> Option<Rc<rodio::OutputStreamHandle>> {
423        if let Some(handle) = try_consume_root_context::<Rc<rodio::OutputStreamHandle>>() {
424            return Some(handle);
425        }
426
427        let (stream, handle) = rodio::OutputStream::try_default()
428            .map_err(|err| tracing::info!("No audio output device: {err}"))
429            .ok()?;
430        let stream = Rc::new(stream);
431        let handle = Rc::new(handle);
432
433        provide_root_context(stream);
434        provide_root_context(handle.clone());
435
436        Some(handle)
437    }
438
439    /// Default output device's sample rate and channels, to avoid a second resample.
440    fn output_config() -> (u32, u16) {
441        rodio::cpal::default_host()
442            .default_output_device()
443            .and_then(|device| device.default_output_config().ok())
444            .map(|config| (config.sample_rate().0, config.channels()))
445            .unwrap_or(FALLBACK_AUDIO_CONFIG)
446    }
447}