Skip to main content

dfir_pipes/pull/
zip.rs

1use core::pin::Pin;
2
3use itertools::Either;
4use pin_project_lite::pin_project;
5
6use crate::pull::{Pull, PullStep};
7use crate::{Context, Toggle};
8
9pin_project! {
10    /// A pull that zips two pulls together, ending when either is exhausted.
11    ///
12    /// Yields `(Item1, Item2)` pairs. Ends as soon as either upstream pull ends.
13    #[must_use = "`Pull`s do nothing unless polled"]
14    #[derive(Clone, Debug)]
15    pub struct Zip<Prev1, Prev2>
16    where
17        Prev1: Pull,
18        Prev2: Pull,
19    {
20        #[pin]
21        prev1: Prev1,
22        #[pin]
23        prev2: Prev2,
24        // Buffer an item from whichever stream was ready first, to prevent starvation.
25        buffer: Option<Either<(Prev1::Item, Prev1::Meta), (Prev2::Item, Prev2::Meta)>>,
26    }
27}
28
29impl<Prev1, Prev2> Zip<Prev1, Prev2>
30where
31    Prev1: Pull,
32    Prev2: Pull,
33    Self: Pull,
34{
35    /// Create a new `Zip` stream from two source streams.
36    pub(crate) const fn new(prev1: Prev1, prev2: Prev2) -> Self {
37        Self {
38            prev1,
39            prev2,
40            buffer: None,
41        }
42    }
43}
44
45impl<Prev1, Prev2> Pull for Zip<Prev1, Prev2>
46where
47    Prev1: Pull,
48    Prev2: Pull<Meta = Prev1::Meta>,
49{
50    type Ctx<'ctx> = <Prev1::Ctx<'ctx> as Context<'ctx>>::Merged<Prev2::Ctx<'ctx>>;
51
52    type Item = (Prev1::Item, Prev2::Item);
53    type Meta = Prev1::Meta;
54    type CanPend = <Prev1::CanPend as Toggle>::Or<Prev2::CanPend>;
55    type CanEnd = <Prev1::CanEnd as Toggle>::Or<Prev2::CanEnd>;
56
57    fn pull(
58        self: Pin<&mut Self>,
59        ctx: &mut Self::Ctx<'_>,
60    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
61        let mut this = self.project();
62
63        let (pull_left, pull_right) = match this.buffer.take() {
64            Some(Either::Left((left_item, left_meta))) => {
65                (Some(PullStep::Ready(left_item, left_meta)), None)
66            }
67            Some(Either::Right((right_item, right_meta))) => {
68                (None, Some(PullStep::Ready(right_item, right_meta)))
69            }
70            None => (None, None),
71        };
72
73        let pull_left = pull_left.unwrap_or_else(|| {
74            this.prev1
75                .as_mut()
76                .pull(<Prev1::Ctx<'_> as Context<'_>>::unmerge_self(ctx))
77        });
78        let pull_right = pull_right.unwrap_or_else(|| {
79            this.prev2
80                .as_mut()
81                .pull(<Prev1::Ctx<'_> as Context<'_>>::unmerge_other(ctx))
82        });
83
84        match (pull_left, pull_right) {
85            (PullStep::Ready(left_item, left_meta), PullStep::Ready(right_item, _right_meta)) => {
86                PullStep::Ready((left_item, right_item), left_meta)
87            } // TODO(mingwei): use right_meta
88            (PullStep::Ready(left_item, left_meta), PullStep::Pending(_)) => {
89                *this.buffer = Some(Either::Left((left_item, left_meta)));
90                PullStep::pending()
91            }
92            (PullStep::Pending(_), PullStep::Ready(right_item, right_meta)) => {
93                *this.buffer = Some(Either::Right((right_item, right_meta)));
94                PullStep::pending()
95            }
96            (PullStep::Pending(_), PullStep::Pending(_)) => PullStep::pending(),
97            // Any Ended → whole zip ends.
98            (PullStep::Ready(..), PullStep::Ended(_))
99            | (PullStep::Ended(_), PullStep::Ready(..))
100            | (PullStep::Pending(_), PullStep::Ended(_))
101            | (PullStep::Ended(_), PullStep::Pending(_))
102            | (PullStep::Ended(_), PullStep::Ended(_)) => PullStep::ended(),
103        }
104    }
105
106    fn size_hint(&self) -> (usize, Option<usize>) {
107        let (mut min1, mut max1) = self.prev1.size_hint();
108        let (mut min2, mut max2) = self.prev2.size_hint();
109
110        // Account for the buffered item: if we have a buffered item from one side,
111        // that side effectively has one more item remaining.
112        match self.buffer {
113            Some(Either::Left(_)) => {
114                min1 = min1.saturating_add(1);
115                max1 = max1.and_then(|m| m.checked_add(1));
116            }
117            Some(Either::Right(_)) => {
118                min2 = min2.saturating_add(1);
119                max2 = max2.and_then(|m| m.checked_add(1));
120            }
121            None => {}
122        }
123
124        // Lower bound is the min of the two (we end when either ends)
125        let lower = min1.min(min2);
126        // Upper bound is the min of the two (if either known)
127        let upper = match (max1, max2) {
128            (Some(a), Some(b)) => Some(a.min(b)),
129            (Some(a), None) => Some(a),
130            (None, Some(b)) => Some(b),
131            (None, None) => None,
132        };
133
134        (lower, upper)
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use core::pin::pin;
141
142    extern crate alloc;
143    use alloc::vec;
144    use alloc::vec::Vec;
145
146    use super::*;
147    use crate::pull::test_utils::TestPull;
148    use crate::pull::{Pull, PullStep};
149    use crate::{No, Yes};
150
151    #[test]
152    fn zip_functional_same_length() {
153        let mut zip = pin!(Zip::new(TestPull::items(0..2), TestPull::items(0..2)));
154        let mut results = Vec::new();
155
156        loop {
157            match zip.as_mut().pull(&mut ()) {
158                PullStep::Ready(item, _) => results.push(item),
159                PullStep::Ended(_) => break,
160                PullStep::Pending(_) => unreachable!(),
161            }
162        }
163
164        assert_eq!(results, vec![(0, 0), (1, 1)]);
165    }
166
167    #[test]
168    fn zip_functional_first_shorter() {
169        let mut zip = pin!(Zip::new(TestPull::items(0..1), TestPull::items(0..3)));
170        let mut results = Vec::new();
171
172        loop {
173            match zip.as_mut().pull(&mut ()) {
174                PullStep::Ready(item, _) => results.push(item),
175                PullStep::Ended(_) => break,
176                PullStep::Pending(_) => unreachable!(),
177            }
178        }
179
180        assert_eq!(results, vec![(0, 0)]);
181    }
182
183    #[test]
184    fn zip_functional_second_shorter() {
185        let mut zip = pin!(Zip::new(TestPull::items(0..3), TestPull::items(0..1)));
186        let mut results = Vec::new();
187
188        loop {
189            match zip.as_mut().pull(&mut ()) {
190                PullStep::Ready(item, _) => results.push(item),
191                PullStep::Ended(_) => break,
192                PullStep::Pending(_) => unreachable!(),
193            }
194        }
195
196        assert_eq!(results, vec![(0, 0)]);
197    }
198
199    #[test]
200    fn zip_size_hint_includes_buffer() {
201        // After one pair consumed and prev1's item buffered as Left,
202        // prev1's raw size_hint is (0, Some(0)) but the buffer adds 1,
203        // making the effective hint (1, Some(1)) instead of (0, Some(0)).
204        let mut prev1 = TestPull::<_, _, No, Yes, true>::new([
205            PullStep::Ready(0, ()),
206            PullStep::Ready(1, ()),
207            PullStep::ended(),
208        ]);
209        let mut prev2 = TestPull::<_, _, Yes, Yes, true>::new([
210            PullStep::Ready(0, ()),
211            PullStep::pending(),
212            PullStep::Ready(1, ()),
213            PullStep::pending(),
214            PullStep::ended(),
215        ]);
216        let mut zip = pin!(Zip::new(&mut prev1, &mut prev2));
217
218        // Initial: prev1=(2,Some(2)), prev2=(2,Some(2)) → min = (2, Some(2))
219        assert_eq!(zip.size_hint(), (2, Some(2)));
220
221        // Pull 1: prev1 Ready(0), prev2 Ready(0) → Ready((0,0))
222        assert_eq!(zip.as_mut().pull(&mut ()), PullStep::Ready((0, 0), ()));
223
224        // Pull 2: prev1 Ready(1), prev2 Pending → buffer Left(1), Pending
225        assert!(zip.as_mut().pull(&mut ()).is_pending());
226
227        // prev1 raw=(0,Some(0)) + buffer Left → effective (1,Some(1))
228        // prev2 raw=(1,Some(1))
229        // Without buffer accounting this would be (0, Some(0)).
230        assert_eq!(zip.size_hint(), (1, Some(1)));
231    }
232
233    #[test]
234    fn zip_no_starvation() {
235        // When prev1 is Pending, prev2 should still be polled and its item buffered.
236        let mut prev1 = TestPull::<_, _, Yes, Yes, true>::new([
237            PullStep::Ready(0, ()),
238            PullStep::pending(),
239            PullStep::Ready(1, ()),
240            PullStep::pending(),
241            PullStep::ended(),
242        ]);
243        let mut prev2 = TestPull::<_, _, No, Yes, true>::new([
244            PullStep::Ready(0, ()),
245            PullStep::Ready(1, ()),
246            PullStep::ended(),
247        ]);
248        let mut zip = pin!(Zip::new(&mut prev1, &mut prev2));
249
250        // Pull 1: both Ready → Ready((0, 0))
251        assert!(matches!(
252            zip.as_mut().pull(&mut ()),
253            PullStep::Ready((0, 0), _)
254        ));
255
256        // Pull 2: prev1 Pending, prev2 Ready(1) → buffered as Right, Pending
257        assert!(zip.as_mut().pull(&mut ()).is_pending());
258
259        // Pull 3: prev1 Ready(1) pairs with buffered Right(1) → Ready((1, 1))
260        // This proves prev2 was polled (not starved) when prev1 was Pending.
261        assert!(matches!(
262            zip.as_mut().pull(&mut ()),
263            PullStep::Ready((1, 1), _)
264        ));
265    }
266}