1use core::pin::Pin;
17
18use pin_project_lite::pin_project;
19use sealed::sealed;
20
21use crate::push::{Push, PushStep, ready_both};
22use crate::{Context, No, Toggle};
23
24#[sealed]
36pub trait PushVariadic<Item, Meta>: variadics::Variadic
37where
38 Meta: Copy,
39{
40 type Ctx<'ctx>: Context<'ctx>;
46
47 type CanPend: Toggle;
52
53 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
59
60 fn start_send(self: Pin<&mut Self>, idx: usize, item: Item, meta: Meta);
66
67 fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
73
74 fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>));
76}
77
78#[sealed]
80impl<P, Item, Meta: Copy, Rest> PushVariadic<Item, Meta> for (P, Rest)
81where
82 P: Push<Item, Meta>,
83 Rest: PushVariadic<Item, Meta>,
84{
85 type Ctx<'ctx> = <P::Ctx<'ctx> as Context<'ctx>>::Merged<Rest::Ctx<'ctx>>;
86 type CanPend = <P::CanPend as Toggle>::Or<Rest::CanPend>;
87
88 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
89 let (push, rest) = pin_project_pair(self);
90 ready_both!(
91 push.poll_ready(<P::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
92 rest.poll_ready(<P::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
93 );
94 PushStep::Done
95 }
96
97 fn start_send(self: Pin<&mut Self>, idx: usize, item: Item, meta: Meta) {
98 let (push, rest) = pin_project_pair(self);
99 if idx == 0 {
100 push.start_send(item, meta);
101 } else {
102 rest.start_send(idx - 1, item, meta);
103 }
104 }
105
106 fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
107 let (push, rest) = pin_project_pair(self);
108 ready_both!(
109 push.poll_flush(<P::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
110 rest.poll_flush(<P::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
111 );
112 PushStep::Done
113 }
114
115 fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
116 let (push, rest) = pin_project_pair(self);
117 push.size_hint(hint);
118 rest.size_hint(hint);
119 }
120}
121
122#[sealed]
124impl<Item, Meta> PushVariadic<Item, Meta> for ()
125where
126 Meta: Copy,
127{
128 type Ctx<'ctx> = ();
129 type CanPend = No;
130
131 fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep<No> {
132 PushStep::Done
133 }
134
135 fn start_send(self: Pin<&mut Self>, idx: usize, _item: Item, _meta: Meta) {
136 panic!("PushVariadic index out of bounds (len + {idx})");
137 }
138
139 fn poll_flush(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep<No> {
140 PushStep::Done
141 }
142
143 fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {}
144}
145
146const fn pin_project_pair<A, B>(pair: Pin<&mut (A, B)>) -> (Pin<&mut A>, Pin<&mut B>) {
152 unsafe {
154 let (a, b) = pair.get_unchecked_mut();
155 (Pin::new_unchecked(a), Pin::new_unchecked(b))
156 }
157}
158
159pin_project! {
160 #[must_use = "`Push`es do nothing unless items are pushed into them"]
177 pub struct DemuxVar<Pushes> {
178 #[pin]
179 pushes: Pushes,
180 }
181}
182
183impl<Pushes> DemuxVar<Pushes> {
184 pub(crate) const fn new<Item, Meta>(pushes: Pushes) -> Self
186 where
187 Meta: Copy,
188 Pushes: PushVariadic<Item, Meta>,
189 {
190 Self { pushes }
191 }
192}
193
194impl<Pushes, Item, Meta> Push<(usize, Item), Meta> for DemuxVar<Pushes>
195where
196 Pushes: PushVariadic<Item, Meta>,
197 Meta: Copy,
198{
199 type Ctx<'ctx> = Pushes::Ctx<'ctx>;
200 type CanPend = Pushes::CanPend;
201
202 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
203 self.project().pushes.poll_ready(ctx)
204 }
205
206 fn start_send(self: Pin<&mut Self>, (idx, item): (usize, Item), meta: Meta) {
207 self.project().pushes.start_send(idx, item, meta);
208 }
209
210 fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
211 self.project().pushes.poll_flush(ctx)
212 }
213
214 fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
215 self.project().pushes.size_hint(hint);
216 }
217}
218
219pub const fn demux_var<Pushes, Item, Meta>(pushes: Pushes) -> DemuxVar<Pushes>
222where
223 Pushes: PushVariadic<Item, Meta>,
224 Meta: Copy,
225{
226 DemuxVar::new(pushes)
227}
228
229#[cfg(test)]
230mod tests {
231 use core::pin::Pin;
232
233 extern crate alloc;
234 use alloc::vec;
235
236 use super::*;
237 use crate::push::test_utils::{TestPush, assert_can_pend_no};
238
239 #[test]
240 fn test_demux_var_basic_dispatch() {
241 let mut tp_a = TestPush::no_pend();
242 let mut tp_b = TestPush::no_pend();
243 let mut tp_c = TestPush::no_pend();
244
245 {
246 let pushes = variadics::var_expr!(&mut tp_a, &mut tp_b, &mut tp_c);
247 let mut demux = demux_var(pushes);
248 let mut demux = Pin::new(&mut demux);
249
250 demux.as_mut().poll_ready(&mut ());
251 demux.as_mut().start_send((0, 10), ());
252 demux.as_mut().poll_ready(&mut ());
253 demux.as_mut().start_send((1, 20), ());
254 demux.as_mut().poll_ready(&mut ());
255 demux.as_mut().start_send((2, 30), ());
256 demux.as_mut().poll_ready(&mut ());
257 demux.as_mut().start_send((0, 40), ());
258 demux.as_mut().poll_ready(&mut ());
259 demux.as_mut().start_send((2, 50), ());
260 demux.as_mut().poll_flush(&mut ());
261 }
262
263 assert_eq!(tp_a.items(), vec![10, 40]);
264 assert_eq!(tp_b.items(), vec![20]);
265 assert_eq!(tp_c.items(), vec![30, 50]);
266 }
267
268 #[test]
269 fn test_demux_var_canpend_is_no_for_sync_pushes() {
270 let mut tp_a: TestPush<i32, _, _> = TestPush::no_pend();
271 let mut tp_b: TestPush<i32, _, _> = TestPush::no_pend();
272 let pushes = variadics::var_expr!(&mut tp_a, &mut tp_b);
273 let demux = demux_var(pushes);
274 assert_can_pend_no(&demux);
275 }
276
277 #[test]
278 fn test_demux_var_readies_all_before_send() {
279 let mut tp_a = TestPush::no_pend();
280 let mut tp_b = TestPush::no_pend();
281 let pushes = variadics::var_expr!(&mut tp_a, &mut tp_b);
282 let mut demux = demux_var(pushes);
283 let mut demux = Pin::new(&mut demux);
284 demux.as_mut().poll_ready(&mut ());
285 demux.as_mut().start_send((0, 10), ());
286 demux.as_mut().poll_ready(&mut ());
287 demux.as_mut().start_send((1, 20), ());
288 demux.as_mut().poll_flush(&mut ());
289 }
290}