1use core::pin::Pin;
2
3use itertools::{Either, EitherOrBoth};
4use pin_project_lite::pin_project;
5
6use crate::pull::{FusedPull, Pull, PullStep, fuse_self};
7use crate::{Context, Toggle};
8
9pin_project! {
10 #[must_use = "`Pull`s do nothing unless polled"]
18 #[derive(Clone, Debug)]
19 pub struct ZipLongest<Prev1, Prev2>
20 where
21 Prev1: Pull,
22 Prev2: Pull,
23 {
24 #[pin]
25 prev1: Prev1,
26 #[pin]
27 prev2: Prev2,
28 buffer: Option<Either<(Prev1::Item, Prev1::Meta), (Prev2::Item, Prev2::Meta)>>,
31 }
32}
33
34impl<Prev1, Prev2> ZipLongest<Prev1, Prev2>
35where
36 Prev1: Pull,
37 Prev2: Pull,
38 Self: Pull,
39{
40 pub(crate) const fn new(prev1: Prev1, prev2: Prev2) -> Self {
42 Self {
43 prev1,
44 prev2,
45 buffer: None,
46 }
47 }
48}
49
50impl<Prev1, Prev2> Pull for ZipLongest<Prev1, Prev2>
51where
52 Prev1: FusedPull,
53 Prev2: FusedPull<Meta = Prev1::Meta>,
54{
55 type Ctx<'ctx> = <Prev1::Ctx<'ctx> as Context<'ctx>>::Merged<Prev2::Ctx<'ctx>>;
56
57 type Item = EitherOrBoth<Prev1::Item, Prev2::Item>;
58 type Meta = Prev1::Meta;
59 type CanPend = <Prev1::CanPend as Toggle>::Or<Prev2::CanPend>;
60 type CanEnd = <Prev1::CanEnd as Toggle>::And<Prev2::CanEnd>;
61
62 fn pull(
63 self: Pin<&mut Self>,
64 ctx: &mut Self::Ctx<'_>,
65 ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
66 let mut this = self.project();
67
68 let (pull_left, pull_right) = match this.buffer.take() {
69 Some(Either::Left((left_item, left_meta))) => {
70 (Some(PullStep::Ready(left_item, left_meta)), None)
71 }
72 Some(Either::Right((right_item, right_meta))) => {
73 (None, Some(PullStep::Ready(right_item, right_meta)))
74 }
75 None => (None, None),
76 };
77
78 let pull_left = pull_left.unwrap_or_else(|| {
79 this.prev1
80 .as_mut()
81 .pull(<Prev1::Ctx<'_> as Context<'_>>::unmerge_self(ctx))
82 });
83 let pull_right = pull_right.unwrap_or_else(|| {
84 this.prev2
85 .as_mut()
86 .pull(<Prev1::Ctx<'_> as Context<'_>>::unmerge_other(ctx))
87 });
88
89 match (pull_left, pull_right) {
90 (PullStep::Ready(left_item, left_meta), PullStep::Ready(right_item, _right_meta)) => {
91 PullStep::Ready(EitherOrBoth::Both(left_item, right_item), left_meta)
92 } (PullStep::Ready(left_item, left_meta), PullStep::Ended(_)) => {
94 PullStep::Ready(EitherOrBoth::Left(left_item), left_meta)
95 }
96 (PullStep::Ended(_), PullStep::Ready(right_item, right_meta)) => {
97 PullStep::Ready(EitherOrBoth::Right(right_item), right_meta)
98 }
99 (PullStep::Ready(left_item, left_meta), PullStep::Pending(_)) => {
100 *this.buffer = Some(Either::Left((left_item, left_meta)));
101 PullStep::pending()
102 }
103 (PullStep::Pending(_), PullStep::Ready(right_item, right_meta)) => {
104 *this.buffer = Some(Either::Right((right_item, right_meta)));
105 PullStep::pending()
106 }
107 (PullStep::Pending(_), PullStep::Pending(_)) => PullStep::pending(),
108 (PullStep::Pending(_), PullStep::Ended(_)) => PullStep::pending(),
109 (PullStep::Ended(_), PullStep::Pending(_)) => PullStep::pending(),
110 (PullStep::Ended(_), PullStep::Ended(_)) => PullStep::ended(),
111 }
112 }
113
114 fn size_hint(&self) -> (usize, Option<usize>) {
115 let (mut min1, mut max1) = self.prev1.size_hint();
116 let (mut min2, mut max2) = self.prev2.size_hint();
117
118 match self.buffer {
120 Some(Either::Left(_)) => {
121 min1 = min1.saturating_add(1);
122 max1 = max1.and_then(|m| m.checked_add(1));
123 }
124 Some(Either::Right(_)) => {
125 min2 = min2.saturating_add(1);
126 max2 = max2.and_then(|m| m.checked_add(1));
127 }
128 None => {}
129 }
130
131 let lower = min1.max(min2);
133 let upper = max1.zip(max2).map(|(a, b)| a.max(b));
135
136 (lower, upper)
137 }
138
139 fuse_self!();
140}
141
142impl<A, B> FusedPull for ZipLongest<A, B>
143where
144 A: FusedPull,
145 B: FusedPull<Meta = A::Meta>,
146{
147}
148
149#[cfg(test)]
150mod tests {
151 use core::pin::pin;
152
153 extern crate alloc;
154 use alloc::vec;
155 use alloc::vec::Vec;
156
157 use itertools::EitherOrBoth;
158
159 use super::*;
160 use crate::pull::test_utils::{TestPull, assert_is_fused};
161 use crate::pull::{Pull, PullStep};
162 use crate::{No, Yes};
163
164 #[test]
165 fn zip_longest_functional_same_length() {
166 let mut zip = pin!(ZipLongest::new(
167 TestPull::items_fused(0..2),
168 TestPull::items_fused(0..2)
169 ));
170 assert_is_fused(&*zip);
171 let mut results = Vec::new();
172
173 loop {
174 match zip.as_mut().pull(&mut ()) {
175 PullStep::Ready(item, _) => results.push(item),
176 PullStep::Ended(_) => break,
177 PullStep::Pending(_) => unreachable!(),
178 }
179 }
180
181 assert_eq!(
182 results,
183 vec![EitherOrBoth::Both(0, 0), EitherOrBoth::Both(1, 1)]
184 );
185 }
186
187 #[test]
188 fn zip_longest_functional_first_shorter() {
189 let mut zip = pin!(ZipLongest::new(
190 TestPull::items_fused(0..1),
191 TestPull::items_fused(0..3)
192 ));
193 let mut results = Vec::new();
194
195 loop {
196 match zip.as_mut().pull(&mut ()) {
197 PullStep::Ready(item, _) => results.push(item),
198 PullStep::Ended(_) => break,
199 PullStep::Pending(_) => unreachable!(),
200 }
201 }
202
203 assert_eq!(
204 results,
205 vec![
206 EitherOrBoth::Both(0, 0),
207 EitherOrBoth::Right(1),
208 EitherOrBoth::Right(2)
209 ]
210 );
211 }
212
213 #[test]
214 fn zip_longest_functional_second_shorter() {
215 let mut zip = pin!(ZipLongest::new(
216 TestPull::items_fused(0..3),
217 TestPull::items_fused(0..1)
218 ));
219 let mut results = Vec::new();
220
221 loop {
222 match zip.as_mut().pull(&mut ()) {
223 PullStep::Ready(item, _) => results.push(item),
224 PullStep::Ended(_) => break,
225 PullStep::Pending(_) => unreachable!(),
226 }
227 }
228
229 assert_eq!(
230 results,
231 vec![
232 EitherOrBoth::Both(0, 0),
233 EitherOrBoth::Left(1),
234 EitherOrBoth::Left(2)
235 ]
236 );
237 }
238
239 #[test]
240 fn zip_longest_fused_shields_upstream() {
241 use crate::pull::test_utils::assert_fused_runtime;
242
243 let p = pin!(ZipLongest::new(
244 TestPull::items(0..1).fuse(),
245 TestPull::items(0..2).fuse()
246 ));
247 assert_fused_runtime(p);
248 }
249
250 #[test]
251 fn zip_longest_size_hint_basic() {
252 let mut prev1 = TestPull::<_, _, Yes, Yes, true>::new([
253 PullStep::Ready(0, ()),
254 PullStep::pending(),
255 PullStep::Ready(1, ()),
256 PullStep::pending(),
257 PullStep::ended(),
258 ]);
259 let mut prev2 = TestPull::<_, _, Yes, Yes, true>::new([
260 PullStep::Ready(0, ()),
261 PullStep::pending(),
262 PullStep::ended(),
263 ]);
264 let mut zip = pin!(ZipLongest::new(&mut prev1, &mut prev2));
265
266 assert_eq!(zip.size_hint(), (2, Some(2)));
267
268 assert!(matches!(zip.as_mut().pull(&mut ()), PullStep::Ready(..)));
270 assert_eq!(zip.size_hint(), (1, Some(1)));
271
272 assert!(zip.as_mut().pull(&mut ()).is_pending());
274
275 assert!(matches!(zip.as_mut().pull(&mut ()), PullStep::Ready(..)));
277
278 assert!(zip.as_mut().pull(&mut ()).is_pending());
280
281 assert!(zip.as_mut().pull(&mut ()).is_ended());
283 }
284
285 #[test]
286 fn zip_longest_size_hint_with_buffered_item() {
287 let mut prev1 = TestPull::<_, _, No, Yes, true>::new([
288 PullStep::Ready(0, ()),
289 PullStep::Ready(1, ()),
290 PullStep::Ready(2, ()),
291 PullStep::ended(),
292 ]);
293 let mut prev2 = TestPull::<_, _, Yes, Yes, true>::new([
294 PullStep::Ready(0, ()),
295 PullStep::pending(),
296 PullStep::Ready(1, ()),
297 PullStep::pending(),
298 PullStep::Ready(2, ()),
299 PullStep::pending(),
300 PullStep::ended(),
301 ]);
302 let mut zip = pin!(ZipLongest::new(&mut prev1, &mut prev2));
303
304 assert!(matches!(
306 zip.as_mut().pull(&mut ()),
307 PullStep::Ready(EitherOrBoth::Both(0, 0), ())
308 ));
309
310 assert!(zip.as_mut().pull(&mut ()).is_pending());
312
313 assert_eq!(zip.size_hint(), (2, Some(2)));
316 }
317
318 #[test]
319 fn zip_longest_no_starvation() {
320 let mut prev1 = TestPull::<_, _, Yes, Yes, true>::new([
322 PullStep::Ready(0, ()),
323 PullStep::pending(),
324 PullStep::Ready(1, ()),
325 PullStep::pending(),
326 PullStep::ended(),
327 ]);
328 let mut prev2 = TestPull::<_, _, No, Yes, true>::new([
329 PullStep::Ready(0, ()),
330 PullStep::Ready(1, ()),
331 PullStep::ended(),
332 ]);
333 let mut zip = pin!(ZipLongest::new(&mut prev1, &mut prev2));
334
335 assert!(matches!(
337 zip.as_mut().pull(&mut ()),
338 PullStep::Ready(EitherOrBoth::Both(0, 0), _)
339 ));
340
341 assert!(zip.as_mut().pull(&mut ()).is_pending());
344
345 assert!(matches!(
347 zip.as_mut().pull(&mut ()),
348 PullStep::Ready(EitherOrBoth::Both(1, 1), _)
349 ));
350
351 assert!(zip.as_mut().pull(&mut ()).is_pending());
353
354 assert!(zip.as_mut().pull(&mut ()).is_ended());
356 }
357}