Skip to main content

dfir_pipes/pull/
flat_map.rs

1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::pull::{FusedPull, Pull, PullStep};
6
7pin_project! {
8    /// Pull combinator that maps each item to an iterator and flattens the results.
9    #[must_use = "`Pull`s do nothing unless polled"]
10    #[derive(Clone, Debug)]
11    pub struct FlatMap<Prev, Func, Iter, Meta> {
12        #[pin]
13        prev: Prev,
14        func: Func,
15        current: Option<(Iter, Meta)>,
16    }
17}
18
19impl<Prev, Func, Iter, Meta> FlatMap<Prev, Func, Iter, Meta>
20where
21    Self: Pull,
22{
23    pub(crate) const fn new(prev: Prev, func: Func) -> Self {
24        Self {
25            prev,
26            func,
27            current: None,
28        }
29    }
30}
31
32impl<Prev, Func, IntoIter> Pull for FlatMap<Prev, Func, IntoIter::IntoIter, Prev::Meta>
33where
34    Prev: Pull,
35    Func: FnMut(Prev::Item) -> IntoIter,
36    IntoIter: IntoIterator,
37{
38    type Ctx<'ctx> = Prev::Ctx<'ctx>;
39
40    type Item = IntoIter::Item;
41    type Meta = Prev::Meta;
42    type CanPend = Prev::CanPend;
43    type CanEnd = Prev::CanEnd;
44
45    fn pull(
46        self: Pin<&mut Self>,
47        ctx: &mut Self::Ctx<'_>,
48    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
49        let mut this = self.project();
50        loop {
51            let (iter, meta) = if let Some(current) = this.current.as_mut() {
52                current
53            } else {
54                match this.prev.as_mut().pull(ctx) {
55                    PullStep::Ready(item, meta) => {
56                        this.current.insert(((this.func)(item).into_iter(), meta))
57                    }
58                    PullStep::Pending(can_pend) => {
59                        return PullStep::Pending(can_pend);
60                    }
61                    PullStep::Ended(can_end) => {
62                        return PullStep::Ended(can_end);
63                    }
64                }
65            };
66            if let Some(item) = iter.next() {
67                return PullStep::Ready(item, *meta);
68            }
69            *this.current = None;
70        }
71    }
72
73    fn size_hint(&self) -> (usize, Option<usize>) {
74        let current_len = self
75            .current
76            .as_ref()
77            .map(|(iter, _)| iter.size_hint().0)
78            .unwrap_or_default();
79        // We can't know the upper bound since each mapped iterator could have any size
80        (current_len, None)
81    }
82}
83
84impl<Prev, Func, IntoIter> FusedPull for FlatMap<Prev, Func, IntoIter::IntoIter, Prev::Meta>
85where
86    Prev: FusedPull,
87    Func: FnMut(Prev::Item) -> IntoIter,
88    IntoIter: IntoIterator,
89{
90}
91
92#[cfg(test)]
93mod tests {
94    use core::pin::pin;
95
96    use crate::pull::Pull;
97    use crate::pull::test_utils::{TestPull, assert_fused_runtime};
98
99    #[test]
100    fn flat_map_fused_shields_upstream() {
101        let p = pin!(TestPull::items(0..5).fuse().flat_map(|x| 0..x));
102        assert_fused_runtime(p);
103    }
104}