Skip to main content

dfir_pipes/pull/
send_push.rs

1use core::pin::Pin;
2use core::task::Poll;
3
4use pin_project_lite::pin_project;
5
6use crate::Context;
7use crate::pull::{Pull, PullStep};
8use crate::push::{Push, PushStep};
9
10pin_project! {
11    /// [`Future`] for pulling from a [`Pull`] and pushing to a [`Push`].
12    #[must_use = "futures do nothing unless polled"]
13    #[derive(Clone, Debug)]
14    pub struct SendPush<Pul, Psh> {
15        #[pin]
16        pull: Pul,
17        #[pin]
18        push: Psh,
19        pull_ended: bool,
20    }
21}
22
23impl<Pul, Psh> SendPush<Pul, Psh>
24where
25    Self: Future,
26{
27    /// Create a new [`SendPush`] from the given `pull` and `push` sides.
28    pub(crate) const fn new(pull: Pul, push: Psh) -> Self {
29        Self {
30            pull,
31            push,
32            pull_ended: false,
33        }
34    }
35}
36
37impl<Pul, Psh, Item, Meta> Future for SendPush<Pul, Psh>
38where
39    Pul: Pull<Item = Item, Meta = Meta>,
40    Meta: Copy,
41    Psh: Push<Item, Meta>,
42    for<'ctx> Pul::Ctx<'ctx>: Context<'ctx>,
43    for<'ctx> Psh::Ctx<'ctx>: Context<'ctx>,
44{
45    type Output = ();
46
47    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
48        let mut this = self.project();
49        if !*this.pull_ended {
50            loop {
51                // Ensure push is ready before pulling.
52                match this
53                    .push
54                    .as_mut()
55                    .poll_ready(<Psh::Ctx<'_> as Context<'_>>::from_task(cx))
56                {
57                    PushStep::Done => {}
58                    PushStep::Pending(_) => return Poll::Pending,
59                }
60
61                match this
62                    .pull
63                    .as_mut()
64                    .pull(<Pul::Ctx<'_> as Context<'_>>::from_task(cx))
65                {
66                    PullStep::Ready(item, meta) => {
67                        this.push.as_mut().start_send(item, meta);
68                    }
69                    PullStep::Pending(_) => return Poll::Pending,
70                    PullStep::Ended(_) => {
71                        *this.pull_ended = true;
72                        break;
73                    }
74                }
75            }
76        }
77        match this
78            .push
79            .as_mut()
80            .poll_flush(<Psh::Ctx<'_> as Context<'_>>::from_task(cx))
81        {
82            PushStep::Done => Poll::Ready(()),
83            PushStep::Pending(_) => Poll::Pending,
84        }
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use core::task::Waker;
91
92    extern crate alloc;
93    use alloc::vec;
94    use alloc::vec::Vec;
95
96    use super::SendPush;
97    use crate::Yes;
98    use crate::pull::test_utils::TestPull;
99    use crate::push::PushStep;
100    use crate::push::test_utils::TestPush;
101
102    /// SendPush must not re-poll the pull after it returned Ended,
103    /// even if poll_flush returns Pending.
104    #[test]
105    fn send_push_no_repoll_after_ended_on_flush_pending() {
106        let pull = TestPull::items(0..2);
107        let push = TestPush::<i32, _, _>::new_fused([], [PushStep::Pending(Yes), PushStep::Done]);
108        let mut send = core::pin::pin!(SendPush::new(pull, push));
109
110        let waker = Waker::noop();
111        let mut cx = core::task::Context::from_waker(waker);
112
113        let result = send.as_mut().poll(&mut cx);
114        assert!(result.is_pending(), "expected Pending from first poll");
115
116        let result = send.as_mut().poll(&mut cx);
117        assert!(result.is_ready(), "expected Ready from second poll");
118
119        let items: Vec<i32> = send.into_ref().get_ref().push.items();
120        assert_eq!(items, vec![0, 1]);
121    }
122}