Skip to main content

dfir_pipes/push/
flatten.rs

1//! [`Flatten`] push combinator.
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::push::{Push, PushStep, ready};
7
8pin_project! {
9    /// Push combinator that flattens iterable items by pushing each element downstream.
10    #[must_use = "`Push`es do nothing unless items are pushed into them"]
11    pub struct Flatten<Next, IntoIter, Meta>
12    where
13        IntoIter: IntoIterator,
14    {
15        #[pin]
16        next: Next,
17        // Current iterator and the next item.
18        buffer: Option<(IntoIter::IntoIter, IntoIter::Item, Meta)>,
19    }
20}
21
22impl<Next, IntoIter, Meta> Flatten<Next, IntoIter, Meta>
23where
24    IntoIter: IntoIterator,
25{
26    /// Creates with next `push`.
27    pub(crate) const fn new(next: Next) -> Self
28    where
29        Meta: Copy,
30        Next: Push<IntoIter::Item, Meta>,
31    {
32        Self { next, buffer: None }
33    }
34}
35
36impl<Next, IntoIter, Meta> Push<IntoIter, Meta> for Flatten<Next, IntoIter, Meta>
37where
38    Next: Push<IntoIter::Item, Meta>,
39    IntoIter: IntoIterator,
40    Meta: Copy,
41{
42    type Ctx<'ctx> = Next::Ctx<'ctx>;
43
44    type CanPend = Next::CanPend;
45
46    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
47        let mut this = self.project();
48
49        while let Some((iter, item, meta)) = this.buffer.as_mut() {
50            // Ensure following sink is ready.
51            ready!(this.next.as_mut().poll_ready(ctx));
52            let meta = *meta;
53
54            // Swap in the next item.
55            let item = if let Some(next_item) = iter.next() {
56                core::mem::replace(item, next_item)
57            } else {
58                let (_, item, _) = this.buffer.take().unwrap();
59                item
60            };
61
62            // Send the prev item.
63            this.next.as_mut().start_send(item, meta);
64        }
65        this.next.poll_ready(ctx)
66    }
67
68    fn start_send(self: Pin<&mut Self>, item: IntoIter, meta: Meta) {
69        let this = self.project();
70        assert!(
71            this.buffer.is_none(),
72            "Flatten: poll_ready must be called before start_send"
73        );
74        let mut iter = item.into_iter();
75        *this.buffer = iter.next().map(|next_item| (iter, next_item, meta));
76    }
77
78    fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
79        ready!(self.as_mut().poll_ready(ctx));
80        self.project().next.poll_flush(ctx)
81    }
82
83    fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {
84        self.project().next.size_hint((0, None));
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use core::pin::Pin;
91
92    extern crate alloc;
93    use alloc::vec;
94    use alloc::vec::Vec;
95
96    use crate::push::Push;
97    use crate::push::test_utils::TestPush;
98
99    #[test]
100    fn flatten_readies_downstream_before_each_send() {
101        let mut tp = TestPush::no_pend();
102        let mut fl = crate::push::flatten::<Vec<i32>, (), _>(&mut tp);
103        let mut fl = Pin::new(&mut fl);
104        fl.as_mut().poll_ready(&mut ());
105        fl.as_mut().start_send(vec![1, 2], ());
106        fl.as_mut().poll_ready(&mut ());
107        fl.as_mut().start_send(vec![3], ());
108        fl.as_mut().poll_flush(&mut ());
109    }
110}