dfir_pipes/pull/
send_sink.rs1use core::pin::Pin;
2use core::task::{Context, Poll, ready};
3
4use futures_sink::Sink;
5use pin_project_lite::pin_project;
6
7use crate::pull::{Pull, PullStep};
8
9pin_project! {
10 #[must_use = "futures do nothing unless polled"]
12 #[derive(Clone, Debug)]
13 pub struct SendSink<Pul, Psh> {
14 #[pin]
15 pull: Pul,
16 #[pin]
17 push: Psh,
18 pull_ended: bool,
19 }
20}
21
22impl<Pul, Psh> SendSink<Pul, Psh>
23where
24 Self: Future,
25{
26 pub(crate) const fn new(pull: Pul, push: Psh) -> Self {
28 Self {
29 pull,
30 push,
31 pull_ended: false,
32 }
33 }
34}
35
36impl<Pul, Psh, Item> Future for SendSink<Pul, Psh>
37where
38 Pul: Pull<Item = Item>,
39 Psh: Sink<Item>,
40 for<'ctx> Pul::Ctx<'ctx>: crate::Context<'ctx>,
41{
42 type Output = Result<(), Psh::Error>;
43
44 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45 let mut this = self.project();
46 if !*this.pull_ended {
47 loop {
48 ready!(this.push.as_mut().poll_ready(cx)?);
49 match this
50 .pull
51 .as_mut()
52 .pull(<Pul::Ctx<'_> as crate::Context<'_>>::from_task(cx))
53 {
54 PullStep::Ready(item, meta) => {
55 let _ = meta; let () = this.push.as_mut().start_send(item)?;
57 }
58 PullStep::Pending(_) => return Poll::Pending,
59 PullStep::Ended(_) => {
60 *this.pull_ended = true;
61 break;
62 }
63 }
64 }
65 }
66 this.push.as_mut().poll_flush(cx)
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use alloc::vec;
73 use core::task::{Poll, Waker};
74
75 use super::SendSink;
76 use crate::Yes;
77 use crate::pull::test_utils::TestPull;
78 use crate::push::test_utils::TestPush;
79 use crate::push::{PushStep, sink_compat};
80
81 #[test]
84 fn send_sink_no_repoll_after_ended_on_flush_pending() {
85 let pull = TestPull::items_fused(0..2);
86 let mut push: TestPush<_, _, true> = TestPush::new_fused([], [PushStep::Pending(Yes)]);
87 let sink = sink_compat(&mut push); let mut send = core::pin::pin!(SendSink::new(pull, sink));
90
91 let waker = Waker::noop();
92 let mut cx = core::task::Context::from_waker(waker);
93
94 let result = send.as_mut().poll(&mut cx);
95 assert!(result.is_pending());
96
97 let result = send.as_mut().poll(&mut cx);
98 assert!(result == Poll::Ready(Ok(())));
99
100 assert_eq!(push.items(), vec![0, 1]);
101 }
102}