1use core::pin::Pin;
2
3use itertools::Either;
4use pin_project_lite::pin_project;
5
6use crate::pull::{Pull, PullStep};
7use crate::{Context, Toggle};
8
9pin_project! {
10 #[must_use = "`Pull`s do nothing unless polled"]
14 #[derive(Clone, Debug)]
15 pub struct Zip<Prev1, Prev2>
16 where
17 Prev1: Pull,
18 Prev2: Pull,
19 {
20 #[pin]
21 prev1: Prev1,
22 #[pin]
23 prev2: Prev2,
24 buffer: Option<Either<(Prev1::Item, Prev1::Meta), (Prev2::Item, Prev2::Meta)>>,
26 }
27}
28
29impl<Prev1, Prev2> Zip<Prev1, Prev2>
30where
31 Prev1: Pull,
32 Prev2: Pull,
33 Self: Pull,
34{
35 pub(crate) const fn new(prev1: Prev1, prev2: Prev2) -> Self {
37 Self {
38 prev1,
39 prev2,
40 buffer: None,
41 }
42 }
43}
44
45impl<Prev1, Prev2> Pull for Zip<Prev1, Prev2>
46where
47 Prev1: Pull,
48 Prev2: Pull<Meta = Prev1::Meta>,
49{
50 type Ctx<'ctx> = <Prev1::Ctx<'ctx> as Context<'ctx>>::Merged<Prev2::Ctx<'ctx>>;
51
52 type Item = (Prev1::Item, Prev2::Item);
53 type Meta = Prev1::Meta;
54 type CanPend = <Prev1::CanPend as Toggle>::Or<Prev2::CanPend>;
55 type CanEnd = <Prev1::CanEnd as Toggle>::Or<Prev2::CanEnd>;
56
57 fn pull(
58 self: Pin<&mut Self>,
59 ctx: &mut Self::Ctx<'_>,
60 ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
61 let mut this = self.project();
62
63 let (pull_left, pull_right) = match this.buffer.take() {
64 Some(Either::Left((left_item, left_meta))) => {
65 (Some(PullStep::Ready(left_item, left_meta)), None)
66 }
67 Some(Either::Right((right_item, right_meta))) => {
68 (None, Some(PullStep::Ready(right_item, right_meta)))
69 }
70 None => (None, None),
71 };
72
73 let pull_left = pull_left.unwrap_or_else(|| {
74 this.prev1
75 .as_mut()
76 .pull(<Prev1::Ctx<'_> as Context<'_>>::unmerge_self(ctx))
77 });
78 let pull_right = pull_right.unwrap_or_else(|| {
79 this.prev2
80 .as_mut()
81 .pull(<Prev1::Ctx<'_> as Context<'_>>::unmerge_other(ctx))
82 });
83
84 match (pull_left, pull_right) {
85 (PullStep::Ready(left_item, left_meta), PullStep::Ready(right_item, _right_meta)) => {
86 PullStep::Ready((left_item, right_item), left_meta)
87 } (PullStep::Ready(left_item, left_meta), PullStep::Pending(_)) => {
89 *this.buffer = Some(Either::Left((left_item, left_meta)));
90 PullStep::pending()
91 }
92 (PullStep::Pending(_), PullStep::Ready(right_item, right_meta)) => {
93 *this.buffer = Some(Either::Right((right_item, right_meta)));
94 PullStep::pending()
95 }
96 (PullStep::Pending(_), PullStep::Pending(_)) => PullStep::pending(),
97 (PullStep::Ready(..), PullStep::Ended(_))
99 | (PullStep::Ended(_), PullStep::Ready(..))
100 | (PullStep::Pending(_), PullStep::Ended(_))
101 | (PullStep::Ended(_), PullStep::Pending(_))
102 | (PullStep::Ended(_), PullStep::Ended(_)) => PullStep::ended(),
103 }
104 }
105
106 fn size_hint(&self) -> (usize, Option<usize>) {
107 let (mut min1, mut max1) = self.prev1.size_hint();
108 let (mut min2, mut max2) = self.prev2.size_hint();
109
110 match self.buffer {
113 Some(Either::Left(_)) => {
114 min1 = min1.saturating_add(1);
115 max1 = max1.and_then(|m| m.checked_add(1));
116 }
117 Some(Either::Right(_)) => {
118 min2 = min2.saturating_add(1);
119 max2 = max2.and_then(|m| m.checked_add(1));
120 }
121 None => {}
122 }
123
124 let lower = min1.min(min2);
126 let upper = match (max1, max2) {
128 (Some(a), Some(b)) => Some(a.min(b)),
129 (Some(a), None) => Some(a),
130 (None, Some(b)) => Some(b),
131 (None, None) => None,
132 };
133
134 (lower, upper)
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use core::pin::pin;
141
142 extern crate alloc;
143 use alloc::vec;
144 use alloc::vec::Vec;
145
146 use super::*;
147 use crate::pull::test_utils::TestPull;
148 use crate::pull::{Pull, PullStep};
149 use crate::{No, Yes};
150
151 #[test]
152 fn zip_functional_same_length() {
153 let mut zip = pin!(Zip::new(TestPull::items(0..2), TestPull::items(0..2)));
154 let mut results = Vec::new();
155
156 loop {
157 match zip.as_mut().pull(&mut ()) {
158 PullStep::Ready(item, _) => results.push(item),
159 PullStep::Ended(_) => break,
160 PullStep::Pending(_) => unreachable!(),
161 }
162 }
163
164 assert_eq!(results, vec![(0, 0), (1, 1)]);
165 }
166
167 #[test]
168 fn zip_functional_first_shorter() {
169 let mut zip = pin!(Zip::new(TestPull::items(0..1), TestPull::items(0..3)));
170 let mut results = Vec::new();
171
172 loop {
173 match zip.as_mut().pull(&mut ()) {
174 PullStep::Ready(item, _) => results.push(item),
175 PullStep::Ended(_) => break,
176 PullStep::Pending(_) => unreachable!(),
177 }
178 }
179
180 assert_eq!(results, vec![(0, 0)]);
181 }
182
183 #[test]
184 fn zip_functional_second_shorter() {
185 let mut zip = pin!(Zip::new(TestPull::items(0..3), TestPull::items(0..1)));
186 let mut results = Vec::new();
187
188 loop {
189 match zip.as_mut().pull(&mut ()) {
190 PullStep::Ready(item, _) => results.push(item),
191 PullStep::Ended(_) => break,
192 PullStep::Pending(_) => unreachable!(),
193 }
194 }
195
196 assert_eq!(results, vec![(0, 0)]);
197 }
198
199 #[test]
200 fn zip_size_hint_includes_buffer() {
201 let mut prev1 = TestPull::<_, _, No, Yes, true>::new([
205 PullStep::Ready(0, ()),
206 PullStep::Ready(1, ()),
207 PullStep::ended(),
208 ]);
209 let mut prev2 = TestPull::<_, _, Yes, Yes, true>::new([
210 PullStep::Ready(0, ()),
211 PullStep::pending(),
212 PullStep::Ready(1, ()),
213 PullStep::pending(),
214 PullStep::ended(),
215 ]);
216 let mut zip = pin!(Zip::new(&mut prev1, &mut prev2));
217
218 assert_eq!(zip.size_hint(), (2, Some(2)));
220
221 assert_eq!(zip.as_mut().pull(&mut ()), PullStep::Ready((0, 0), ()));
223
224 assert!(zip.as_mut().pull(&mut ()).is_pending());
226
227 assert_eq!(zip.size_hint(), (1, Some(1)));
231 }
232
233 #[test]
234 fn zip_no_starvation() {
235 let mut prev1 = TestPull::<_, _, Yes, Yes, true>::new([
237 PullStep::Ready(0, ()),
238 PullStep::pending(),
239 PullStep::Ready(1, ()),
240 PullStep::pending(),
241 PullStep::ended(),
242 ]);
243 let mut prev2 = TestPull::<_, _, No, Yes, true>::new([
244 PullStep::Ready(0, ()),
245 PullStep::Ready(1, ()),
246 PullStep::ended(),
247 ]);
248 let mut zip = pin!(Zip::new(&mut prev1, &mut prev2));
249
250 assert!(matches!(
252 zip.as_mut().pull(&mut ()),
253 PullStep::Ready((0, 0), _)
254 ));
255
256 assert!(zip.as_mut().pull(&mut ()).is_pending());
258
259 assert!(matches!(
262 zip.as_mut().pull(&mut ()),
263 PullStep::Ready((1, 1), _)
264 ));
265 }
266}