Skip to main content

dfir_pipes/push/
sink_compat.rs

1//! [`SinkCompat`] adapter wrapping a [`Push`] into a [`futures_sink::Sink`].
2use core::pin::Pin;
3use core::task::Poll;
4
5use futures_sink::Sink;
6use pin_project_lite::pin_project;
7
8use super::PushStep;
9use crate::Context;
10use crate::push::Push;
11
12pin_project! {
13    /// Adapter that wraps a [`Push`] to implement the [`Sink`] trait.
14    #[must_use = "`Sink`s do nothing unless polled"]
15    pub struct SinkCompat<Psh> {
16        #[pin]
17        push: Psh,
18    }
19}
20
21impl<Psh> SinkCompat<Psh> {
22    /// Creates a new [`SinkCompat`] wrapping the given [`Push`].
23    pub(crate) const fn new(push: Psh) -> Self {
24        Self { push }
25    }
26
27    /// Returns the wrapped [`Push`].
28    pub fn into_inner(self) -> Psh {
29        self.push
30    }
31
32    /// Returns a pinned mutable reference to the wrapped [`Push`].
33    pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Psh> {
34        self.project().push
35    }
36
37    /// Returns a pinned reference to the wrapped [`Push`].
38    pub fn as_pin_ref(self: Pin<&Self>) -> Pin<&Psh> {
39        self.project_ref().push
40    }
41}
42
43impl<Psh> AsMut<Psh> for SinkCompat<Psh> {
44    fn as_mut(&mut self) -> &mut Psh {
45        &mut self.push
46    }
47}
48
49impl<Psh> AsRef<Psh> for SinkCompat<Psh> {
50    fn as_ref(&self) -> &Psh {
51        &self.push
52    }
53}
54
55impl<Psh, Item> Sink<Item> for SinkCompat<Psh>
56where
57    Psh: Push<Item, ()>,
58{
59    type Error = core::convert::Infallible;
60
61    fn poll_ready(
62        self: Pin<&mut Self>,
63        cx: &mut core::task::Context<'_>,
64    ) -> Poll<Result<(), Self::Error>> {
65        match self.as_pin_mut().poll_ready(Context::from_task(cx)) {
66            PushStep::Pending(_) => Poll::Pending,
67            PushStep::Done => Poll::Ready(Ok(())),
68        }
69    }
70
71    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
72        self.as_pin_mut().start_send(item, ());
73        Ok(())
74    }
75
76    fn poll_flush(
77        self: Pin<&mut Self>,
78        cx: &mut core::task::Context<'_>,
79    ) -> Poll<Result<(), Self::Error>> {
80        match self.as_pin_mut().poll_flush(Context::from_task(cx)) {
81            PushStep::Pending(_) => Poll::Pending,
82            PushStep::Done => Poll::Ready(Ok(())),
83        }
84    }
85
86    fn poll_close(
87        self: Pin<&mut Self>,
88        cx: &mut core::task::Context<'_>,
89    ) -> Poll<Result<(), Self::Error>> {
90        self.poll_flush(cx)
91    }
92}