Skip to main content

dfir_pipes/pull/
send_sink.rs

1use core::pin::Pin;
2use core::task::{Context, Poll, ready};
3
4use futures_sink::Sink;
5use pin_project_lite::pin_project;
6
7use crate::pull::{Pull, PullStep};
8
9pin_project! {
10    /// [`Future`] for pulling from a [`Pull`] and pushing to a [`Sink`].
11    #[must_use = "futures do nothing unless polled"]
12    #[derive(Clone, Debug)]
13    pub struct SendSink<Pul, Psh> {
14        #[pin]
15        pull: Pul,
16        #[pin]
17        push: Psh,
18        pull_ended: bool,
19    }
20}
21
22impl<Pul, Psh> SendSink<Pul, Psh>
23where
24    Self: Future,
25{
26    /// Create a new [`SendSink`] from the given `pull` and `push` sides.
27    pub(crate) const fn new(pull: Pul, push: Psh) -> Self {
28        Self {
29            pull,
30            push,
31            pull_ended: false,
32        }
33    }
34}
35
36impl<Pul, Psh, Item> Future for SendSink<Pul, Psh>
37where
38    Pul: Pull<Item = Item>,
39    Psh: Sink<Item>,
40    for<'ctx> Pul::Ctx<'ctx>: crate::Context<'ctx>,
41{
42    type Output = Result<(), Psh::Error>;
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        let mut this = self.project();
46        if !*this.pull_ended {
47            loop {
48                ready!(this.push.as_mut().poll_ready(cx)?);
49                match this
50                    .pull
51                    .as_mut()
52                    .pull(<Pul::Ctx<'_> as crate::Context<'_>>::from_task(cx))
53                {
54                    PullStep::Ready(item, meta) => {
55                        let _ = meta; // TODO(mingwei):
56                        let () = this.push.as_mut().start_send(item)?;
57                    }
58                    PullStep::Pending(_) => return Poll::Pending,
59                    PullStep::Ended(_) => {
60                        *this.pull_ended = true;
61                        break;
62                    }
63                }
64            }
65        }
66        this.push.as_mut().poll_flush(cx)
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use alloc::vec;
73    use core::task::{Poll, Waker};
74
75    use super::SendSink;
76    use crate::Yes;
77    use crate::pull::test_utils::TestPull;
78    use crate::push::test_utils::TestPush;
79    use crate::push::{PushStep, sink_compat};
80
81    /// SendSink must not re-poll the pull after it returned Ended,
82    /// even if poll_flush returns Pending.
83    #[test]
84    fn send_sink_no_repoll_after_ended_on_flush_pending() {
85        let pull = TestPull::items_fused(0..2);
86        let mut push: TestPush<_, _, true> = TestPush::new_fused([], [PushStep::Pending(Yes)]);
87        let sink = sink_compat(&mut push); // Re-use `TestPush` by converting into a sink.
88
89        let mut send = core::pin::pin!(SendSink::new(pull, sink));
90
91        let waker = Waker::noop();
92        let mut cx = core::task::Context::from_waker(waker);
93
94        let result = send.as_mut().poll(&mut cx);
95        assert!(result.is_pending());
96
97        let result = send.as_mut().poll(&mut cx);
98        assert!(result == Poll::Ready(Ok(())));
99
100        assert_eq!(push.items(), vec![0, 1]);
101    }
102}