Skip to main content

dfir_pipes/pull/
flatten.rs

1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::pull::{FusedPull, Pull, PullStep};
6
7pin_project! {
8    /// Pull combinator that flattens nested iterables.
9    #[must_use = "`Pull`s do nothing unless polled"]
10    #[derive(Clone, Debug, Default)]
11    pub struct Flatten<Prev, Iter, Meta> {
12        #[pin]
13        prev: Prev,
14        current: Option<(Iter, Meta)>,
15    }
16}
17
18impl<Prev, Iter, Meta> Flatten<Prev, Iter, Meta>
19where
20    Self: Pull,
21{
22    pub(crate) const fn new(prev: Prev) -> Self {
23        Self {
24            prev,
25            current: None,
26        }
27    }
28}
29
30impl<Prev> Pull for Flatten<Prev, <Prev::Item as IntoIterator>::IntoIter, Prev::Meta>
31where
32    Prev: Pull,
33    Prev::Item: IntoIterator,
34{
35    type Ctx<'ctx> = Prev::Ctx<'ctx>;
36
37    type Item = <Prev::Item as IntoIterator>::Item;
38    type Meta = Prev::Meta;
39    type CanPend = Prev::CanPend;
40    type CanEnd = Prev::CanEnd;
41
42    fn pull(
43        self: Pin<&mut Self>,
44        ctx: &mut Self::Ctx<'_>,
45    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
46        let mut this = self.project();
47        loop {
48            let (iter, meta) = if let Some(current) = this.current.as_mut() {
49                current
50            } else {
51                match this.prev.as_mut().pull(ctx) {
52                    PullStep::Ready(iterable, meta) => {
53                        this.current.insert((iterable.into_iter(), meta))
54                    }
55                    PullStep::Pending(can_pend) => {
56                        return PullStep::Pending(can_pend);
57                    }
58                    PullStep::Ended(can_end) => {
59                        return PullStep::Ended(can_end);
60                    }
61                }
62            };
63            if let Some(item) = iter.next() {
64                return PullStep::Ready(item, *meta);
65            }
66            *this.current = None;
67        }
68    }
69
70    fn size_hint(&self) -> (usize, Option<usize>) {
71        let current_len = self
72            .current
73            .as_ref()
74            .map(|(iter, _)| iter.size_hint().0)
75            .unwrap_or_default();
76        // We can't know the upper bound since each inner iterator could have any size
77        (current_len, None)
78    }
79}
80
81impl<Prev> FusedPull for Flatten<Prev, <Prev::Item as IntoIterator>::IntoIter, Prev::Meta>
82where
83    Prev: FusedPull,
84    Prev::Item: IntoIterator,
85{
86}
87
88#[cfg(test)]
89mod tests {
90    use core::pin::pin;
91
92    use crate::pull::Pull;
93    use crate::pull::test_utils::{TestPull, assert_fused_runtime};
94
95    #[test]
96    fn flatten_fused_shields_upstream() {
97        let p = pin!(TestPull::items(0..5).fuse().map(|x| 0..x).flatten());
98        assert_fused_runtime(p);
99    }
100}