dfir_pipes/pull/
send_push.rs1use core::pin::Pin;
2use core::task::Poll;
3
4use pin_project_lite::pin_project;
5
6use crate::Context;
7use crate::pull::{Pull, PullStep};
8use crate::push::{Push, PushStep};
9
10pin_project! {
11 #[must_use = "futures do nothing unless polled"]
13 #[derive(Clone, Debug)]
14 pub struct SendPush<Pul, Psh> {
15 #[pin]
16 pull: Pul,
17 #[pin]
18 push: Psh,
19 pull_ended: bool,
20 }
21}
22
23impl<Pul, Psh> SendPush<Pul, Psh>
24where
25 Self: Future,
26{
27 pub(crate) const fn new(pull: Pul, push: Psh) -> Self {
29 Self {
30 pull,
31 push,
32 pull_ended: false,
33 }
34 }
35}
36
37impl<Pul, Psh, Item, Meta> Future for SendPush<Pul, Psh>
38where
39 Pul: Pull<Item = Item, Meta = Meta>,
40 Meta: Copy,
41 Psh: Push<Item, Meta>,
42 for<'ctx> Pul::Ctx<'ctx>: Context<'ctx>,
43 for<'ctx> Psh::Ctx<'ctx>: Context<'ctx>,
44{
45 type Output = ();
46
47 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
48 let mut this = self.project();
49 if !*this.pull_ended {
50 loop {
51 match this
53 .push
54 .as_mut()
55 .poll_ready(<Psh::Ctx<'_> as Context<'_>>::from_task(cx))
56 {
57 PushStep::Done => {}
58 PushStep::Pending(_) => return Poll::Pending,
59 }
60
61 match this
62 .pull
63 .as_mut()
64 .pull(<Pul::Ctx<'_> as Context<'_>>::from_task(cx))
65 {
66 PullStep::Ready(item, meta) => {
67 this.push.as_mut().start_send(item, meta);
68 }
69 PullStep::Pending(_) => return Poll::Pending,
70 PullStep::Ended(_) => {
71 *this.pull_ended = true;
72 break;
73 }
74 }
75 }
76 }
77 match this
78 .push
79 .as_mut()
80 .poll_flush(<Psh::Ctx<'_> as Context<'_>>::from_task(cx))
81 {
82 PushStep::Done => Poll::Ready(()),
83 PushStep::Pending(_) => Poll::Pending,
84 }
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use core::task::Waker;
91
92 extern crate alloc;
93 use alloc::vec;
94 use alloc::vec::Vec;
95
96 use super::SendPush;
97 use crate::Yes;
98 use crate::pull::test_utils::TestPull;
99 use crate::push::PushStep;
100 use crate::push::test_utils::TestPush;
101
102 #[test]
105 fn send_push_no_repoll_after_ended_on_flush_pending() {
106 let pull = TestPull::items(0..2);
107 let push = TestPush::<i32, _, _>::new_fused([], [PushStep::Pending(Yes), PushStep::Done]);
108 let mut send = core::pin::pin!(SendPush::new(pull, push));
109
110 let waker = Waker::noop();
111 let mut cx = core::task::Context::from_waker(waker);
112
113 let result = send.as_mut().poll(&mut cx);
114 assert!(result.is_pending(), "expected Pending from first poll");
115
116 let result = send.as_mut().poll(&mut cx);
117 assert!(result.is_ready(), "expected Ready from second poll");
118
119 let items: Vec<i32> = send.into_ref().get_ref().push.items();
120 assert_eq!(items, vec![0, 1]);
121 }
122}