dfir_pipes/push/
flatten.rs1use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::push::{Push, PushStep, ready};
7
8pin_project! {
9 #[must_use = "`Push`es do nothing unless items are pushed into them"]
11 pub struct Flatten<Next, IntoIter, Meta>
12 where
13 IntoIter: IntoIterator,
14 {
15 #[pin]
16 next: Next,
17 buffer: Option<(IntoIter::IntoIter, IntoIter::Item, Meta)>,
19 }
20}
21
22impl<Next, IntoIter, Meta> Flatten<Next, IntoIter, Meta>
23where
24 IntoIter: IntoIterator,
25{
26 pub(crate) const fn new(next: Next) -> Self
28 where
29 Meta: Copy,
30 Next: Push<IntoIter::Item, Meta>,
31 {
32 Self { next, buffer: None }
33 }
34}
35
36impl<Next, IntoIter, Meta> Push<IntoIter, Meta> for Flatten<Next, IntoIter, Meta>
37where
38 Next: Push<IntoIter::Item, Meta>,
39 IntoIter: IntoIterator,
40 Meta: Copy,
41{
42 type Ctx<'ctx> = Next::Ctx<'ctx>;
43
44 type CanPend = Next::CanPend;
45
46 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
47 let mut this = self.project();
48
49 while let Some((iter, item, meta)) = this.buffer.as_mut() {
50 ready!(this.next.as_mut().poll_ready(ctx));
52 let meta = *meta;
53
54 let item = if let Some(next_item) = iter.next() {
56 core::mem::replace(item, next_item)
57 } else {
58 let (_, item, _) = this.buffer.take().unwrap();
59 item
60 };
61
62 this.next.as_mut().start_send(item, meta);
64 }
65 this.next.poll_ready(ctx)
66 }
67
68 fn start_send(self: Pin<&mut Self>, item: IntoIter, meta: Meta) {
69 let this = self.project();
70 assert!(
71 this.buffer.is_none(),
72 "Flatten: poll_ready must be called before start_send"
73 );
74 let mut iter = item.into_iter();
75 *this.buffer = iter.next().map(|next_item| (iter, next_item, meta));
76 }
77
78 fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
79 ready!(self.as_mut().poll_ready(ctx));
80 self.project().next.poll_flush(ctx)
81 }
82
83 fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {
84 self.project().next.size_hint((0, None));
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use core::pin::Pin;
91
92 extern crate alloc;
93 use alloc::vec;
94 use alloc::vec::Vec;
95
96 use crate::push::Push;
97 use crate::push::test_utils::TestPush;
98
99 #[test]
100 fn flatten_readies_downstream_before_each_send() {
101 let mut tp = TestPush::no_pend();
102 let mut fl = crate::push::flatten::<Vec<i32>, (), _>(&mut tp);
103 let mut fl = Pin::new(&mut fl);
104 fl.as_mut().poll_ready(&mut ());
105 fl.as_mut().start_send(vec![1, 2], ());
106 fl.as_mut().poll_ready(&mut ());
107 fl.as_mut().start_send(vec![3], ());
108 fl.as_mut().poll_flush(&mut ());
109 }
110}