Skip to main content

dfir_pipes/pull/
zip_longest.rs

1use core::pin::Pin;
2
3use itertools::{Either, EitherOrBoth};
4use pin_project_lite::pin_project;
5
6use crate::pull::{FusedPull, Pull, PullStep, fuse_self};
7use crate::{Context, Toggle};
8
9pin_project! {
10    /// A pull that zips two pulls together, continuing until both are exhausted.
11    ///
12    /// Unlike a regular zip which ends when either pull ends, `ZipLongest`
13    /// continues until both pulls have ended, yielding [`EitherOrBoth`] values.
14    ///
15    /// Both upstream pulls must be fused ([`FusedPull`]) to ensure correct
16    /// behavior after one pull ends.
17    #[must_use = "`Pull`s do nothing unless polled"]
18    #[derive(Clone, Debug)]
19    pub struct ZipLongest<Prev1, Prev2>
20    where
21        Prev1: Pull,
22        Prev2: Pull,
23    {
24        #[pin]
25        prev1: Prev1,
26        #[pin]
27        prev2: Prev2,
28        // Store an item from whichever stream was ready first, while waiting for the other.
29        // `Left` = item from prev1, `Right` = item from prev2.
30        buffer: Option<Either<(Prev1::Item, Prev1::Meta), (Prev2::Item, Prev2::Meta)>>,
31    }
32}
33
34impl<Prev1, Prev2> ZipLongest<Prev1, Prev2>
35where
36    Prev1: Pull,
37    Prev2: Pull,
38    Self: Pull,
39{
40    /// Create a new `ZipLongest` stream from two source streams.
41    pub(crate) const fn new(prev1: Prev1, prev2: Prev2) -> Self {
42        Self {
43            prev1,
44            prev2,
45            buffer: None,
46        }
47    }
48}
49
50impl<Prev1, Prev2> Pull for ZipLongest<Prev1, Prev2>
51where
52    Prev1: FusedPull,
53    Prev2: FusedPull<Meta = Prev1::Meta>,
54{
55    type Ctx<'ctx> = <Prev1::Ctx<'ctx> as Context<'ctx>>::Merged<Prev2::Ctx<'ctx>>;
56
57    type Item = EitherOrBoth<Prev1::Item, Prev2::Item>;
58    type Meta = Prev1::Meta;
59    type CanPend = <Prev1::CanPend as Toggle>::Or<Prev2::CanPend>;
60    type CanEnd = <Prev1::CanEnd as Toggle>::And<Prev2::CanEnd>;
61
62    fn pull(
63        self: Pin<&mut Self>,
64        ctx: &mut Self::Ctx<'_>,
65    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
66        let mut this = self.project();
67
68        let (pull_left, pull_right) = match this.buffer.take() {
69            Some(Either::Left((left_item, left_meta))) => {
70                (Some(PullStep::Ready(left_item, left_meta)), None)
71            }
72            Some(Either::Right((right_item, right_meta))) => {
73                (None, Some(PullStep::Ready(right_item, right_meta)))
74            }
75            None => (None, None),
76        };
77
78        let pull_left = pull_left.unwrap_or_else(|| {
79            this.prev1
80                .as_mut()
81                .pull(<Prev1::Ctx<'_> as Context<'_>>::unmerge_self(ctx))
82        });
83        let pull_right = pull_right.unwrap_or_else(|| {
84            this.prev2
85                .as_mut()
86                .pull(<Prev1::Ctx<'_> as Context<'_>>::unmerge_other(ctx))
87        });
88
89        match (pull_left, pull_right) {
90            (PullStep::Ready(left_item, left_meta), PullStep::Ready(right_item, _right_meta)) => {
91                PullStep::Ready(EitherOrBoth::Both(left_item, right_item), left_meta)
92            } // TODO(mingwei): use right_meta
93            (PullStep::Ready(left_item, left_meta), PullStep::Ended(_)) => {
94                PullStep::Ready(EitherOrBoth::Left(left_item), left_meta)
95            }
96            (PullStep::Ended(_), PullStep::Ready(right_item, right_meta)) => {
97                PullStep::Ready(EitherOrBoth::Right(right_item), right_meta)
98            }
99            (PullStep::Ready(left_item, left_meta), PullStep::Pending(_)) => {
100                *this.buffer = Some(Either::Left((left_item, left_meta)));
101                PullStep::pending()
102            }
103            (PullStep::Pending(_), PullStep::Ready(right_item, right_meta)) => {
104                *this.buffer = Some(Either::Right((right_item, right_meta)));
105                PullStep::pending()
106            }
107            (PullStep::Pending(_), PullStep::Pending(_)) => PullStep::pending(),
108            (PullStep::Pending(_), PullStep::Ended(_)) => PullStep::pending(),
109            (PullStep::Ended(_), PullStep::Pending(_)) => PullStep::pending(),
110            (PullStep::Ended(_), PullStep::Ended(_)) => PullStep::ended(),
111        }
112    }
113
114    fn size_hint(&self) -> (usize, Option<usize>) {
115        let (mut min1, mut max1) = self.prev1.size_hint();
116        let (mut min2, mut max2) = self.prev2.size_hint();
117
118        // Account for a buffered item: it adds 1 to the respective stream's remaining count.
119        match self.buffer {
120            Some(Either::Left(_)) => {
121                min1 = min1.saturating_add(1);
122                max1 = max1.and_then(|m| m.checked_add(1));
123            }
124            Some(Either::Right(_)) => {
125                min2 = min2.saturating_add(1);
126                max2 = max2.and_then(|m| m.checked_add(1));
127            }
128            None => {}
129        }
130
131        // Lower bound is the max of the two (we continue until both end)
132        let lower = min1.max(min2);
133        // Upper bound is the max of the two (if both known)
134        let upper = max1.zip(max2).map(|(a, b)| a.max(b));
135
136        (lower, upper)
137    }
138
139    fuse_self!();
140}
141
142impl<A, B> FusedPull for ZipLongest<A, B>
143where
144    A: FusedPull,
145    B: FusedPull<Meta = A::Meta>,
146{
147}
148
149#[cfg(test)]
150mod tests {
151    use core::pin::pin;
152
153    extern crate alloc;
154    use alloc::vec;
155    use alloc::vec::Vec;
156
157    use itertools::EitherOrBoth;
158
159    use super::*;
160    use crate::pull::test_utils::{TestPull, assert_is_fused};
161    use crate::pull::{Pull, PullStep};
162    use crate::{No, Yes};
163
164    #[test]
165    fn zip_longest_functional_same_length() {
166        let mut zip = pin!(ZipLongest::new(
167            TestPull::items_fused(0..2),
168            TestPull::items_fused(0..2)
169        ));
170        assert_is_fused(&*zip);
171        let mut results = Vec::new();
172
173        loop {
174            match zip.as_mut().pull(&mut ()) {
175                PullStep::Ready(item, _) => results.push(item),
176                PullStep::Ended(_) => break,
177                PullStep::Pending(_) => unreachable!(),
178            }
179        }
180
181        assert_eq!(
182            results,
183            vec![EitherOrBoth::Both(0, 0), EitherOrBoth::Both(1, 1)]
184        );
185    }
186
187    #[test]
188    fn zip_longest_functional_first_shorter() {
189        let mut zip = pin!(ZipLongest::new(
190            TestPull::items_fused(0..1),
191            TestPull::items_fused(0..3)
192        ));
193        let mut results = Vec::new();
194
195        loop {
196            match zip.as_mut().pull(&mut ()) {
197                PullStep::Ready(item, _) => results.push(item),
198                PullStep::Ended(_) => break,
199                PullStep::Pending(_) => unreachable!(),
200            }
201        }
202
203        assert_eq!(
204            results,
205            vec![
206                EitherOrBoth::Both(0, 0),
207                EitherOrBoth::Right(1),
208                EitherOrBoth::Right(2)
209            ]
210        );
211    }
212
213    #[test]
214    fn zip_longest_functional_second_shorter() {
215        let mut zip = pin!(ZipLongest::new(
216            TestPull::items_fused(0..3),
217            TestPull::items_fused(0..1)
218        ));
219        let mut results = Vec::new();
220
221        loop {
222            match zip.as_mut().pull(&mut ()) {
223                PullStep::Ready(item, _) => results.push(item),
224                PullStep::Ended(_) => break,
225                PullStep::Pending(_) => unreachable!(),
226            }
227        }
228
229        assert_eq!(
230            results,
231            vec![
232                EitherOrBoth::Both(0, 0),
233                EitherOrBoth::Left(1),
234                EitherOrBoth::Left(2)
235            ]
236        );
237    }
238
239    #[test]
240    fn zip_longest_fused_shields_upstream() {
241        use crate::pull::test_utils::assert_fused_runtime;
242
243        let p = pin!(ZipLongest::new(
244            TestPull::items(0..1).fuse(),
245            TestPull::items(0..2).fuse()
246        ));
247        assert_fused_runtime(p);
248    }
249
250    #[test]
251    fn zip_longest_size_hint_basic() {
252        let mut prev1 = TestPull::<_, _, Yes, Yes, true>::new([
253            PullStep::Ready(0, ()),
254            PullStep::pending(),
255            PullStep::Ready(1, ()),
256            PullStep::pending(),
257            PullStep::ended(),
258        ]);
259        let mut prev2 = TestPull::<_, _, Yes, Yes, true>::new([
260            PullStep::Ready(0, ()),
261            PullStep::pending(),
262            PullStep::ended(),
263        ]);
264        let mut zip = pin!(ZipLongest::new(&mut prev1, &mut prev2));
265
266        assert_eq!(zip.size_hint(), (2, Some(2)));
267
268        // Both Ready → Both(0, 0)
269        assert!(matches!(zip.as_mut().pull(&mut ()), PullStep::Ready(..)));
270        assert_eq!(zip.size_hint(), (1, Some(1)));
271
272        // Both Pending
273        assert!(zip.as_mut().pull(&mut ()).is_pending());
274
275        // prev1 Ready(1), prev2 Ended → Left(1)
276        assert!(matches!(zip.as_mut().pull(&mut ()), PullStep::Ready(..)));
277
278        // prev1 Pending, prev2 Ended → Pending
279        assert!(zip.as_mut().pull(&mut ()).is_pending());
280
281        // Both Ended
282        assert!(zip.as_mut().pull(&mut ()).is_ended());
283    }
284
285    #[test]
286    fn zip_longest_size_hint_with_buffered_item() {
287        let mut prev1 = TestPull::<_, _, No, Yes, true>::new([
288            PullStep::Ready(0, ()),
289            PullStep::Ready(1, ()),
290            PullStep::Ready(2, ()),
291            PullStep::ended(),
292        ]);
293        let mut prev2 = TestPull::<_, _, Yes, Yes, true>::new([
294            PullStep::Ready(0, ()),
295            PullStep::pending(),
296            PullStep::Ready(1, ()),
297            PullStep::pending(),
298            PullStep::Ready(2, ()),
299            PullStep::pending(),
300            PullStep::ended(),
301        ]);
302        let mut zip = pin!(ZipLongest::new(&mut prev1, &mut prev2));
303
304        // Pull 1: prev1 Ready(0), prev2 Ready(0) => Both(0, 0)
305        assert!(matches!(
306            zip.as_mut().pull(&mut ()),
307            PullStep::Ready(EitherOrBoth::Both(0, 0), ())
308        ));
309
310        // Pull 2: prev1 Ready(1), prev2 Pending => buffer Left(1), Pending
311        assert!(zip.as_mut().pull(&mut ()).is_pending());
312
313        // Now buffer has Left(item1). prev1 reports (1, Some(1)), prev2 reports (2, Some(2)).
314        // With buffer: prev1 effective = (2, Some(2)), prev2 = (2, Some(2)) => (2, Some(2))
315        assert_eq!(zip.size_hint(), (2, Some(2)));
316    }
317
318    #[test]
319    fn zip_longest_no_starvation() {
320        // When prev1 is Pending, prev2 should still be polled and its item buffered.
321        let mut prev1 = TestPull::<_, _, Yes, Yes, true>::new([
322            PullStep::Ready(0, ()),
323            PullStep::pending(),
324            PullStep::Ready(1, ()),
325            PullStep::pending(),
326            PullStep::ended(),
327        ]);
328        let mut prev2 = TestPull::<_, _, No, Yes, true>::new([
329            PullStep::Ready(0, ()),
330            PullStep::Ready(1, ()),
331            PullStep::ended(),
332        ]);
333        let mut zip = pin!(ZipLongest::new(&mut prev1, &mut prev2));
334
335        // prev1 Ready(0), prev2 Ready(0) → Both(0, 0)
336        assert!(matches!(
337            zip.as_mut().pull(&mut ()),
338            PullStep::Ready(EitherOrBoth::Both(0, 0), _)
339        ));
340
341        // prev1 Pending, prev2 Ready(1) → buffer Right(1), Pending
342        // This proves prev2 was polled even though prev1 was Pending.
343        assert!(zip.as_mut().pull(&mut ()).is_pending());
344
345        // Buffered Right(1) pairs with prev1 Ready(1) → Both(1, 1)
346        assert!(matches!(
347            zip.as_mut().pull(&mut ()),
348            PullStep::Ready(EitherOrBoth::Both(1, 1), _)
349        ));
350
351        // prev1 Pending, prev2 Ended → Pending
352        assert!(zip.as_mut().pull(&mut ()).is_pending());
353
354        // prev1 Ended, prev2 Ended → Ended
355        assert!(zip.as_mut().pull(&mut ()).is_ended());
356    }
357}