1use std::{
2 io::{
3 BufReader,
4 Read as _,
5 Write as _,
6 },
7 path::{
8 Path,
9 PathBuf,
10 },
11 process::{
12 ChildStdin,
13 ChildStdout,
14 },
15 rc::Rc,
16 sync::{
17 Arc,
18 atomic::{
19 AtomicBool,
20 AtomicU32,
21 Ordering,
22 },
23 },
24 time::{
25 Duration,
26 Instant,
27 },
28};
29
30use async_io::Timer;
31use ffmpeg_sidecar::{
32 child::FfmpegChild,
33 command::FfmpegCommand,
34 event::{
35 FfmpegEvent,
36 OutputVideoFrame,
37 },
38};
39use freya_core::{
40 elements::image::ImageHandle,
41 notify::ArcNotify,
42 prelude::{
43 Bytes,
44 OwnedTaskHandle,
45 provide_root_context,
46 spawn,
47 try_consume_root_context,
48 },
49};
50use freya_engine::prelude::AlphaType;
51use rodio::cpal::traits::{
52 DeviceTrait,
53 HostTrait,
54};
55
56#[derive(PartialEq, Eq, Clone, Debug, Hash)]
58pub struct VideoSource(pub PathBuf);
59
60impl From<PathBuf> for VideoSource {
61 fn from(path: PathBuf) -> Self {
62 Self(path)
63 }
64}
65
66impl From<&Path> for VideoSource {
67 fn from(path: &Path) -> Self {
68 Self(path.to_path_buf())
69 }
70}
71
72impl From<&str> for VideoSource {
73 fn from(path: &str) -> Self {
74 Self(PathBuf::from(path))
75 }
76}
77
78impl From<String> for VideoSource {
79 fn from(path: String) -> Self {
80 Self(PathBuf::from(path))
81 }
82}
83
84impl VideoSource {
85 fn ffmpeg_command(&self, start_offset: Duration) -> FfmpegCommand {
87 let mut cmd = FfmpegCommand::new();
88 let start_secs = start_offset.as_secs_f32();
89 if start_secs > 0.0 {
90 cmd.args(["-ss", &start_secs.to_string()]);
91 }
92 cmd.input(self.0.to_string_lossy().as_ref());
93 cmd
94 }
95}
96
97const FRAME_BUFFER: usize = 2;
99
100const EVENTS_BUFFER: usize = 2;
102
103const FALLBACK_AUDIO_CONFIG: (u32, u16) = (48_000, 2);
105
106#[derive(Clone)]
108pub enum VideoEvent {
109 Duration(Duration),
110 Frame {
111 frame: ImageHandle,
112 position: Duration,
113 },
114 Ended,
115 Errored,
116}
117
118pub struct VideoClient {
120 events: async_channel::Receiver<VideoEvent>,
121 paused: Arc<AtomicBool>,
122 resumed: ArcNotify,
123 volume: Arc<AtomicU32>,
124 _task: OwnedTaskHandle,
125}
126
127impl VideoClient {
128 pub fn new(
130 source: VideoSource,
131 start_offset: Duration,
132 start_paused: bool,
133 volume: f32,
134 ) -> Self {
135 let (sender, receiver) = async_channel::bounded(EVENTS_BUFFER);
136 let paused = Arc::new(AtomicBool::new(start_paused));
137 let resumed = ArcNotify::new();
138 let volume = Arc::new(AtomicU32::new(volume.to_bits()));
139 let task = spawn(Self::run(
140 source,
141 start_offset,
142 paused.clone(),
143 resumed.clone(),
144 volume.clone(),
145 sender,
146 ))
147 .owned();
148 Self {
149 events: receiver,
150 paused,
151 resumed,
152 volume,
153 _task: task,
154 }
155 }
156
157 pub fn events(&self) -> &async_channel::Receiver<VideoEvent> {
159 &self.events
160 }
161
162 pub fn pause(&self) {
164 self.paused.store(true, Ordering::Relaxed);
165 }
166
167 pub fn play(&self) {
169 self.paused.store(false, Ordering::Relaxed);
170 self.resumed.notify();
171 }
172
173 pub fn set_volume(&self, volume: f32) {
175 self.volume.store(volume.to_bits(), Ordering::Relaxed);
176 }
177
178 async fn run(
180 source: VideoSource,
181 start_offset: Duration,
182 paused: Arc<AtomicBool>,
183 resumed: ArcNotify,
184 volume: Arc<AtomicU32>,
185 events: async_channel::Sender<VideoEvent>,
186 ) {
187 let mut cmd = source.ffmpeg_command(start_offset);
188 cmd.format("rawvideo").pix_fmt("rgba").pipe_stdout();
189 let mut child = match cmd.spawn() {
190 Ok(child) => child,
191 Err(err) => {
192 tracing::error!("Failed to spawn ffmpeg: {err}");
193 let _ = events.send(VideoEvent::Errored).await;
194 return;
195 }
196 };
197 let _quitter = child.take_stdin().map(FfmpegQuitter);
198
199 let audio = AudioPlayback::start(
200 &source,
201 start_offset,
202 f32::from_bits(volume.load(Ordering::Relaxed)),
203 );
204 if paused.load(Ordering::Relaxed)
205 && let Some(audio) = audio.as_ref()
206 {
207 audio.sink.pause();
208 }
209
210 let (sender, receiver) = async_channel::bounded::<DecoderEvent>(FRAME_BUFFER);
211 let decoder = blocking::unblock(move || Self::run_decoder(child, sender));
212
213 let mut wall_start: Option<Instant> = None;
214 let mut paused_for = Duration::ZERO;
215
216 while let Ok(event) = receiver.recv().await {
217 let frame = match event {
218 DecoderEvent::Duration(duration) => {
219 let _ = events.send(VideoEvent::Duration(duration)).await;
220 continue;
221 }
222 DecoderEvent::Frame(frame) => frame,
223 };
224
225 if wall_start.is_some() {
227 paused_for += Self::wait_for_resume(&paused, &resumed, audio.as_ref()).await;
228 }
229
230 if let Some(audio) = audio.as_ref() {
231 audio
232 .sink
233 .set_volume(f32::from_bits(volume.load(Ordering::Relaxed)));
234 }
235
236 let wall_start = *wall_start.get_or_insert_with(Instant::now);
237 let frame_offset = Duration::from_secs_f32(frame.timestamp.max(0.0));
238 let elapsed = wall_start.elapsed().saturating_sub(paused_for);
239 if elapsed < frame_offset {
240 Timer::after(frame_offset - elapsed).await;
241 }
242
243 let Some(frame) = Self::decode_frame(frame) else {
244 tracing::warn!("Dropping frame: failed to decode raw RGBA into a Skia image");
245 continue;
246 };
247 if events
248 .send(VideoEvent::Frame {
249 frame,
250 position: start_offset + frame_offset,
251 })
252 .await
253 .is_err()
254 {
255 tracing::warn!("Video event consumer dropped, stopping pacing loop");
256 break;
257 }
258 }
259
260 match decoder.await {
261 Ok(()) => {
262 let _ = events.send(VideoEvent::Ended).await;
263 }
264 Err(err) => {
265 tracing::error!("Video decoder failed: {err}");
266 let _ = events.send(VideoEvent::Errored).await;
267 }
268 }
269 }
270
271 fn decode_frame(frame: OutputVideoFrame) -> Option<ImageHandle> {
273 ImageHandle::from_rgba(
274 frame.width,
275 frame.height,
276 Bytes::from(frame.data),
277 AlphaType::Unpremul,
278 )
279 }
280
281 async fn wait_for_resume(
283 paused: &AtomicBool,
284 resumed: &ArcNotify,
285 audio: Option<&AudioPlayback>,
286 ) -> Duration {
287 if !paused.load(Ordering::Relaxed) {
288 return Duration::ZERO;
289 }
290 if let Some(audio) = audio {
291 audio.sink.pause();
292 }
293 let pause_start = Instant::now();
294 while paused.load(Ordering::Relaxed) {
295 resumed.notified().await;
296 }
297 if let Some(audio) = audio {
298 audio.sink.play();
299 }
300 pause_start.elapsed()
301 }
302
303 fn run_decoder(
304 mut child: FfmpegChild,
305 sender: async_channel::Sender<DecoderEvent>,
306 ) -> anyhow::Result<()> {
307 for event in child.iter()? {
308 let item = match event {
309 FfmpegEvent::ParsedDuration(d) if d.duration.is_finite() && d.duration >= 0.0 => {
310 DecoderEvent::Duration(Duration::from_secs_f64(d.duration))
311 }
312 FfmpegEvent::OutputFrame(frame) => DecoderEvent::Frame(frame),
313 _ => continue,
314 };
315 if sender.send_blocking(item).is_err() {
316 tracing::warn!("Decoder consumer dropped, stopping ffmpeg ingest");
317 break;
318 }
319 }
320
321 let _ = child.kill();
323 let _ = child.wait();
324
325 Ok(())
326 }
327}
328
329enum DecoderEvent {
330 Duration(Duration),
331 Frame(OutputVideoFrame),
332}
333
334struct FfmpegQuitter(ChildStdin);
336
337impl Drop for FfmpegQuitter {
338 fn drop(&mut self) {
339 let _ = self.0.write_all(b"q\n");
340 let _ = self.0.flush();
341 }
342}
343
344struct PcmSource {
346 reader: BufReader<ChildStdout>,
347 sample_rate: u32,
348 channels: u16,
349}
350
351impl Iterator for PcmSource {
352 type Item = i16;
353
354 fn next(&mut self) -> Option<i16> {
355 let mut buf = [0u8; 2];
356 self.reader.read_exact(&mut buf).ok()?;
357 Some(i16::from_le_bytes(buf))
358 }
359}
360
361impl rodio::Source for PcmSource {
362 fn current_frame_len(&self) -> Option<usize> {
363 None
364 }
365 fn channels(&self) -> u16 {
366 self.channels
367 }
368 fn sample_rate(&self) -> u32 {
369 self.sample_rate
370 }
371 fn total_duration(&self) -> Option<Duration> {
372 None
373 }
374}
375
376struct AudioPlayback {
378 _quitter: Option<FfmpegQuitter>,
379 sink: rodio::Sink,
380 _child: FfmpegChild,
381}
382
383impl AudioPlayback {
384 fn start(source: &VideoSource, start_offset: Duration, volume: f32) -> Option<Self> {
386 let handle = Self::handle()?;
387 let (sample_rate, channels) = Self::output_config();
388 let mut cmd = source.ffmpeg_command(start_offset);
389 cmd.args([
390 "-vn",
391 "-f",
392 "s16le",
393 "-ar",
394 &sample_rate.to_string(),
395 "-ac",
396 &channels.to_string(),
397 ])
398 .pipe_stdout();
399 let mut child = cmd
400 .spawn()
401 .map_err(|err| tracing::warn!("Failed to spawn audio ffmpeg: {err}"))
402 .ok()?;
403 let stdout = child.take_stdout()?;
404 let quitter = child.take_stdin().map(FfmpegQuitter);
405 let sink = rodio::Sink::try_new(&handle)
406 .map_err(|err| tracing::warn!("Failed to create audio sink: {err}"))
407 .ok()?;
408 sink.set_volume(volume);
409 sink.append(PcmSource {
410 reader: BufReader::new(stdout),
411 sample_rate,
412 channels,
413 });
414 Some(Self {
415 _quitter: quitter,
416 sink,
417 _child: child,
418 })
419 }
420
421 fn handle() -> Option<Rc<rodio::OutputStreamHandle>> {
423 if let Some(handle) = try_consume_root_context::<Rc<rodio::OutputStreamHandle>>() {
424 return Some(handle);
425 }
426
427 let (stream, handle) = rodio::OutputStream::try_default()
428 .map_err(|err| tracing::info!("No audio output device: {err}"))
429 .ok()?;
430 let stream = Rc::new(stream);
431 let handle = Rc::new(handle);
432
433 provide_root_context(stream);
434 provide_root_context(handle.clone());
435
436 Some(handle)
437 }
438
439 fn output_config() -> (u32, u16) {
441 rodio::cpal::default_host()
442 .default_output_device()
443 .and_then(|device| device.default_output_config().ok())
444 .map(|config| (config.sample_rate().0, config.channels()))
445 .unwrap_or(FALLBACK_AUDIO_CONFIG)
446 }
447}