Skip to main content

dfir_pipes/push/
sink.rs

1//! [`Sink`] adapter wrapping a [`futures_sink::Sink`] into a [`Push`].
2use core::pin::Pin;
3use core::task::Poll;
4
5use pin_project_lite::pin_project;
6
7use crate::Yes;
8use crate::push::{Push, PushStep};
9
10pin_project! {
11    /// Adapter that wraps a [`futures_sink::Sink`] to implement the [`Push`] trait.
12    ///
13    /// Since `Sink` is asynchronous, this push requires `core::task::Context`
14    /// and can return `Pending`.
15    #[must_use = "`Push`es do nothing unless items are pushed into them"]
16    pub struct Sink<Si> {
17        #[pin]
18        sink: Si,
19    }
20}
21
22impl<Si> Sink<Si> {
23    /// Creates a new [`Sink`] wrapping the given [`futures_sink::Sink`].
24    pub(crate) const fn new(sink: Si) -> Self {
25        Self { sink }
26    }
27}
28
29impl<Si, Item, Meta> Push<Item, Meta> for Sink<Si>
30where
31    Si: futures_sink::Sink<Item>,
32    Si::Error: core::fmt::Debug,
33    Meta: Copy,
34{
35    type Ctx<'ctx> = core::task::Context<'ctx>;
36
37    type CanPend = Yes;
38
39    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
40        match self.project().sink.poll_ready(ctx) {
41            Poll::Ready(Ok(())) => PushStep::Done,
42            Poll::Ready(Err(err)) => panic!("Sink error during poll_ready: {err:?}"),
43            Poll::Pending => PushStep::Pending(Yes),
44        }
45    }
46
47    fn start_send(self: Pin<&mut Self>, item: Item, _meta: Meta) {
48        // Discard `_meta`.
49        match self.project().sink.start_send(item) {
50            Ok(()) => {}
51            Err(err) => panic!("Sink error during start_send: {err:?}"),
52        }
53    }
54
55    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
56        match self.project().sink.poll_flush(ctx) {
57            Poll::Ready(Ok(())) => PushStep::Done,
58            Poll::Ready(Err(err)) => panic!("Sink error during poll_flush: {err:?}"),
59            Poll::Pending => PushStep::Pending(Yes),
60        }
61    }
62
63    fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {
64        // unused
65    }
66}