Skip to main content

dfir_pipes/push/
flat_map.rs

1//! [`FlatMap`] push combinator.
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::push::{Push, PushStep, ready};
7
8pin_project! {
9    /// Push combinator that maps each item to an iterator and pushes each element.
10    #[must_use = "`Push`es do nothing unless items are pushed into them"]
11    pub struct FlatMap<Next, Func, IntoIter, Meta>
12    where
13        IntoIter: IntoIterator,
14    {
15        #[pin]
16        next: Next,
17        func: Func,
18        buffer: Option<(IntoIter::IntoIter, IntoIter::Item, Meta)>,
19    }
20}
21
22impl<Next, Func, IntoIter, Meta> FlatMap<Next, Func, IntoIter, Meta>
23where
24    IntoIter: IntoIterator,
25{
26    /// Creates with flat-mapping `func` and next `push`.
27    pub(crate) const fn new<In>(func: Func, next: Next) -> Self
28    where
29        Meta: Copy,
30        Next: Push<IntoIter::Item, Meta>,
31        Func: FnMut(In) -> IntoIter,
32    {
33        Self {
34            next,
35            func,
36            buffer: None,
37        }
38    }
39}
40
41impl<Next, Func, IntoIter, In, Meta> Push<In, Meta> for FlatMap<Next, Func, IntoIter, Meta>
42where
43    Next: Push<IntoIter::Item, Meta>,
44    Func: FnMut(In) -> IntoIter,
45    IntoIter: IntoIterator,
46    Meta: Copy,
47{
48    type Ctx<'ctx> = Next::Ctx<'ctx>;
49
50    type CanPend = Next::CanPend;
51
52    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
53        let mut this = self.project();
54
55        while let Some((iter, item, meta)) = this.buffer.as_mut() {
56            // Ensure following sink is ready.
57            ready!(this.next.as_mut().poll_ready(ctx));
58            let meta = *meta;
59
60            // Swap in the next item.
61            let item = if let Some(next_item) = iter.next() {
62                core::mem::replace(item, next_item)
63            } else {
64                let (_, item, _) = this.buffer.take().unwrap();
65                item
66            };
67
68            // Send the prev item.
69            this.next.as_mut().start_send(item, meta);
70        }
71        this.next.poll_ready(ctx)
72    }
73
74    fn start_send(self: Pin<&mut Self>, item: In, meta: Meta) {
75        let this = self.project();
76        assert!(
77            this.buffer.is_none(),
78            "FlatMap: poll_ready must be called before start_send"
79        );
80        let mut iter = (this.func)(item).into_iter();
81        *this.buffer = iter.next().map(|next_item| (iter, next_item, meta));
82    }
83
84    fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
85        ready!(self.as_mut().poll_ready(ctx));
86        self.project().next.poll_flush(ctx)
87    }
88
89    fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {
90        self.project().next.size_hint((0, None));
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use core::pin::Pin;
97
98    use crate::push::Push;
99    use crate::push::test_utils::TestPush;
100
101    #[test]
102    fn flat_map_readies_downstream_before_each_send() {
103        let mut tp = TestPush::no_pend();
104        let mut fm = crate::push::flat_map(|x: i32| [x, x + 10], &mut tp);
105        let mut fm = Pin::new(&mut fm);
106        fm.as_mut().poll_ready(&mut ());
107        fm.as_mut().start_send(1, ());
108        fm.as_mut().poll_ready(&mut ());
109        fm.as_mut().start_send(2, ());
110        fm.as_mut().poll_flush(&mut ());
111    }
112}