dfir_pipes/pull/
flatten.rs1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::pull::{FusedPull, Pull, PullStep};
6
7pin_project! {
8 #[must_use = "`Pull`s do nothing unless polled"]
10 #[derive(Clone, Debug, Default)]
11 pub struct Flatten<Prev, Iter, Meta> {
12 #[pin]
13 prev: Prev,
14 current: Option<(Iter, Meta)>,
15 }
16}
17
18impl<Prev, Iter, Meta> Flatten<Prev, Iter, Meta>
19where
20 Self: Pull,
21{
22 pub(crate) const fn new(prev: Prev) -> Self {
23 Self {
24 prev,
25 current: None,
26 }
27 }
28}
29
30impl<Prev> Pull for Flatten<Prev, <Prev::Item as IntoIterator>::IntoIter, Prev::Meta>
31where
32 Prev: Pull,
33 Prev::Item: IntoIterator,
34{
35 type Ctx<'ctx> = Prev::Ctx<'ctx>;
36
37 type Item = <Prev::Item as IntoIterator>::Item;
38 type Meta = Prev::Meta;
39 type CanPend = Prev::CanPend;
40 type CanEnd = Prev::CanEnd;
41
42 fn pull(
43 self: Pin<&mut Self>,
44 ctx: &mut Self::Ctx<'_>,
45 ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
46 let mut this = self.project();
47 loop {
48 let (iter, meta) = if let Some(current) = this.current.as_mut() {
49 current
50 } else {
51 match this.prev.as_mut().pull(ctx) {
52 PullStep::Ready(iterable, meta) => {
53 this.current.insert((iterable.into_iter(), meta))
54 }
55 PullStep::Pending(can_pend) => {
56 return PullStep::Pending(can_pend);
57 }
58 PullStep::Ended(can_end) => {
59 return PullStep::Ended(can_end);
60 }
61 }
62 };
63 if let Some(item) = iter.next() {
64 return PullStep::Ready(item, *meta);
65 }
66 *this.current = None;
67 }
68 }
69
70 fn size_hint(&self) -> (usize, Option<usize>) {
71 let current_len = self
72 .current
73 .as_ref()
74 .map(|(iter, _)| iter.size_hint().0)
75 .unwrap_or_default();
76 (current_len, None)
78 }
79}
80
81impl<Prev> FusedPull for Flatten<Prev, <Prev::Item as IntoIterator>::IntoIter, Prev::Meta>
82where
83 Prev: FusedPull,
84 Prev::Item: IntoIterator,
85{
86}
87
88#[cfg(test)]
89mod tests {
90 use core::pin::pin;
91
92 use crate::pull::Pull;
93 use crate::pull::test_utils::{TestPull, assert_fused_runtime};
94
95 #[test]
96 fn flatten_fused_shields_upstream() {
97 let p = pin!(TestPull::items(0..5).fuse().map(|x| 0..x).flatten());
98 assert_fused_runtime(p);
99 }
100}