dfir_pipes/push/
flat_map.rs1use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::push::{Push, PushStep, ready};
7
8pin_project! {
9 #[must_use = "`Push`es do nothing unless items are pushed into them"]
11 pub struct FlatMap<Next, Func, IntoIter, Meta>
12 where
13 IntoIter: IntoIterator,
14 {
15 #[pin]
16 next: Next,
17 func: Func,
18 buffer: Option<(IntoIter::IntoIter, IntoIter::Item, Meta)>,
19 }
20}
21
22impl<Next, Func, IntoIter, Meta> FlatMap<Next, Func, IntoIter, Meta>
23where
24 IntoIter: IntoIterator,
25{
26 pub(crate) const fn new<In>(func: Func, next: Next) -> Self
28 where
29 Meta: Copy,
30 Next: Push<IntoIter::Item, Meta>,
31 Func: FnMut(In) -> IntoIter,
32 {
33 Self {
34 next,
35 func,
36 buffer: None,
37 }
38 }
39}
40
41impl<Next, Func, IntoIter, In, Meta> Push<In, Meta> for FlatMap<Next, Func, IntoIter, Meta>
42where
43 Next: Push<IntoIter::Item, Meta>,
44 Func: FnMut(In) -> IntoIter,
45 IntoIter: IntoIterator,
46 Meta: Copy,
47{
48 type Ctx<'ctx> = Next::Ctx<'ctx>;
49
50 type CanPend = Next::CanPend;
51
52 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
53 let mut this = self.project();
54
55 while let Some((iter, item, meta)) = this.buffer.as_mut() {
56 ready!(this.next.as_mut().poll_ready(ctx));
58 let meta = *meta;
59
60 let item = if let Some(next_item) = iter.next() {
62 core::mem::replace(item, next_item)
63 } else {
64 let (_, item, _) = this.buffer.take().unwrap();
65 item
66 };
67
68 this.next.as_mut().start_send(item, meta);
70 }
71 this.next.poll_ready(ctx)
72 }
73
74 fn start_send(self: Pin<&mut Self>, item: In, meta: Meta) {
75 let this = self.project();
76 assert!(
77 this.buffer.is_none(),
78 "FlatMap: poll_ready must be called before start_send"
79 );
80 let mut iter = (this.func)(item).into_iter();
81 *this.buffer = iter.next().map(|next_item| (iter, next_item, meta));
82 }
83
84 fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
85 ready!(self.as_mut().poll_ready(ctx));
86 self.project().next.poll_flush(ctx)
87 }
88
89 fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {
90 self.project().next.size_hint((0, None));
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use core::pin::Pin;
97
98 use crate::push::Push;
99 use crate::push::test_utils::TestPush;
100
101 #[test]
102 fn flat_map_readies_downstream_before_each_send() {
103 let mut tp = TestPush::no_pend();
104 let mut fm = crate::push::flat_map(|x: i32| [x, x + 10], &mut tp);
105 let mut fm = Pin::new(&mut fm);
106 fm.as_mut().poll_ready(&mut ());
107 fm.as_mut().start_send(1, ());
108 fm.as_mut().poll_ready(&mut ());
109 fm.as_mut().start_send(2, ());
110 fm.as_mut().poll_flush(&mut ());
111 }
112}