Skip to main content

dfir_pipes/push/
demux_var.rs

1//! [`DemuxVar`] push combinator for variadic demultiplexing.
2//!
3//! Receives `(usize, Item)` pairs and dispatches each item to one of several
4//! downstream [`Push`]es based on the index. The downstream pushes are stored
5//! as a variadic tuple (from the [`variadics`] crate) and accessed via the
6//! [`PushVariadic`] trait.
7//!
8//! # Context handling
9//!
10//! Each downstream push may have a different [`Push::Ctx`] type (especially
11//! when type-erased behind `impl Push`). The [`PushVariadic`] trait uses
12//! recursive [`Context::Merged`] to combine all downstream contexts into a
13//! single merged context type, then [`Context::unmerge_self`] /
14//! [`Context::unmerge_other`] to extract each push's individual context.
15
16use core::pin::Pin;
17
18use pin_project_lite::pin_project;
19use sealed::sealed;
20
21use crate::push::{Push, PushStep, ready_both};
22use crate::{Context, No, Toggle};
23
24/// A variadic of [`Push`]es for use with [`DemuxVar`].
25///
26/// This sealed trait is implemented recursively for variadic tuples `(P, Rest)`
27/// where `P: Push` and `Rest: PushVariadic`, with the base case `()`.
28///
29/// The [`Ctx`](PushVariadic::Ctx) GAT is built by recursively merging each
30/// push's context type: for `(P0, (P1, (P2, ())))`, the merged context is
31/// `<P0::Ctx as Context>::Merged<<P1::Ctx as Context>::Merged<P2::Ctx>>`.
32///
33/// The [`CanPend`](PushVariadic::CanPend) type is built by recursively OR-ing
34/// each push's `CanPend` via [`Toggle::Or`].
35#[sealed]
36pub trait PushVariadic<Item, Meta>: variadics::Variadic
37where
38    Meta: Copy,
39{
40    /// The merged context type for all pushes in this variadic.
41    ///
42    /// Built recursively via [`Context::Merged`] so that each downstream push's
43    /// context can be extracted with [`Context::unmerge_self`] and
44    /// [`Context::unmerge_other`].
45    type Ctx<'ctx>: Context<'ctx>;
46
47    /// Whether any downstream push can return [`PushStep::Pending`].
48    ///
49    /// Built recursively via [`Toggle::Or`]: if any downstream push has
50    /// `CanPend = Yes`, then the variadic also has `CanPend = Yes`.
51    type CanPend: Toggle;
52
53    /// Poll readiness of all downstream pushes.
54    ///
55    /// Calls [`Push::poll_ready`] on each push, passing the appropriate
56    /// unmerged context slice. All pushes are polled even if one returns
57    /// pending, so that all wakers are registered.
58    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
59
60    /// Send an item to the push at index `idx`.
61    ///
62    /// # Panics
63    ///
64    /// Panics if `idx` is out of bounds (greater than or equal to the number of pushes).
65    fn start_send(self: Pin<&mut Self>, idx: usize, item: Item, meta: Meta);
66
67    /// Flush all downstream pushes.
68    ///
69    /// Calls [`Push::poll_flush`] on each push, passing the appropriate
70    /// unmerged context slice. All pushes are flushed even if one returns
71    /// pending, so that all wakers are registered.
72    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
73
74    /// Inform all downstream pushes that approximately `hint` items are about to be sent.
75    fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>));
76}
77
78/// Recursive case: a push `P` followed by the rest of the variadic `Rest`.
79#[sealed]
80impl<P, Item, Meta: Copy, Rest> PushVariadic<Item, Meta> for (P, Rest)
81where
82    P: Push<Item, Meta>,
83    Rest: PushVariadic<Item, Meta>,
84{
85    type Ctx<'ctx> = <P::Ctx<'ctx> as Context<'ctx>>::Merged<Rest::Ctx<'ctx>>;
86    type CanPend = <P::CanPend as Toggle>::Or<Rest::CanPend>;
87
88    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
89        let (push, rest) = pin_project_pair(self);
90        ready_both!(
91            push.poll_ready(<P::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
92            rest.poll_ready(<P::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
93        );
94        PushStep::Done
95    }
96
97    fn start_send(self: Pin<&mut Self>, idx: usize, item: Item, meta: Meta) {
98        let (push, rest) = pin_project_pair(self);
99        if idx == 0 {
100            push.start_send(item, meta);
101        } else {
102            rest.start_send(idx - 1, item, meta);
103        }
104    }
105
106    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
107        let (push, rest) = pin_project_pair(self);
108        ready_both!(
109            push.poll_flush(<P::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
110            rest.poll_flush(<P::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
111        );
112        PushStep::Done
113    }
114
115    fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
116        let (push, rest) = pin_project_pair(self);
117        push.size_hint(hint);
118        rest.size_hint(hint);
119    }
120}
121
122/// Base case: the empty variadic. Always ready, panics on send.
123#[sealed]
124impl<Item, Meta> PushVariadic<Item, Meta> for ()
125where
126    Meta: Copy,
127{
128    type Ctx<'ctx> = ();
129    type CanPend = No;
130
131    fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep<No> {
132        PushStep::Done
133    }
134
135    fn start_send(self: Pin<&mut Self>, idx: usize, _item: Item, _meta: Meta) {
136        panic!("PushVariadic index out of bounds (len + {idx})");
137    }
138
139    fn poll_flush(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep<No> {
140        PushStep::Done
141    }
142
143    fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {}
144}
145
146/// Pin-projects a pair `(A, B)` into its two pinned components.
147///
148/// # Safety
149///
150/// Both `A` and `B` are structurally pinned since the pair itself is pinned.
151const fn pin_project_pair<A, B>(pair: Pin<&mut (A, B)>) -> (Pin<&mut A>, Pin<&mut B>) {
152    // SAFETY: `pair` is pinned, and both fields of a tuple are structurally pinned.
153    unsafe {
154        let (a, b) = pair.get_unchecked_mut();
155        (Pin::new_unchecked(a), Pin::new_unchecked(b))
156    }
157}
158
159pin_project! {
160    /// Push combinator that dispatches `(usize, Item)` pairs to one of several
161    /// downstream pushes based on the index.
162    ///
163    /// The downstream pushes are stored as a variadic tuple implementing
164    /// [`PushVariadic`]. Each item `(idx, value)` is routed to the push at
165    /// position `idx` in the variadic.
166    ///
167    /// # Context
168    ///
169    /// The [`Push::Ctx`] for `DemuxVar` is the recursively merged context of all
170    /// downstream pushes (via [`PushVariadic::Ctx`]), allowing each downstream
171    /// push to have an independent context type.
172    ///
173    /// # Panics
174    ///
175    /// [`Push::start_send`] panics if the index is out of bounds.
176    #[must_use = "`Push`es do nothing unless items are pushed into them"]
177    pub struct DemuxVar<Pushes> {
178        #[pin]
179        pushes: Pushes,
180    }
181}
182
183impl<Pushes> DemuxVar<Pushes> {
184    /// Creates a new [`DemuxVar`] with the given downstream pushes.
185    pub(crate) const fn new<Item, Meta>(pushes: Pushes) -> Self
186    where
187        Meta: Copy,
188        Pushes: PushVariadic<Item, Meta>,
189    {
190        Self { pushes }
191    }
192}
193
194impl<Pushes, Item, Meta> Push<(usize, Item), Meta> for DemuxVar<Pushes>
195where
196    Pushes: PushVariadic<Item, Meta>,
197    Meta: Copy,
198{
199    type Ctx<'ctx> = Pushes::Ctx<'ctx>;
200    type CanPend = Pushes::CanPend;
201
202    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
203        self.project().pushes.poll_ready(ctx)
204    }
205
206    fn start_send(self: Pin<&mut Self>, (idx, item): (usize, Item), meta: Meta) {
207        self.project().pushes.start_send(idx, item, meta);
208    }
209
210    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
211        self.project().pushes.poll_flush(ctx)
212    }
213
214    fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
215        self.project().pushes.size_hint(hint);
216    }
217}
218
219/// Creates a [`DemuxVar`] push that dispatches each `(usize, Item)` pair to
220/// one of the downstream pushes in the given variadic, based on the index.
221pub const fn demux_var<Pushes, Item, Meta>(pushes: Pushes) -> DemuxVar<Pushes>
222where
223    Pushes: PushVariadic<Item, Meta>,
224    Meta: Copy,
225{
226    DemuxVar::new(pushes)
227}
228
229#[cfg(test)]
230mod tests {
231    use core::pin::Pin;
232
233    extern crate alloc;
234    use alloc::vec;
235
236    use super::*;
237    use crate::push::test_utils::{TestPush, assert_can_pend_no};
238
239    #[test]
240    fn test_demux_var_basic_dispatch() {
241        let mut tp_a = TestPush::no_pend();
242        let mut tp_b = TestPush::no_pend();
243        let mut tp_c = TestPush::no_pend();
244
245        {
246            let pushes = variadics::var_expr!(&mut tp_a, &mut tp_b, &mut tp_c);
247            let mut demux = demux_var(pushes);
248            let mut demux = Pin::new(&mut demux);
249
250            demux.as_mut().poll_ready(&mut ());
251            demux.as_mut().start_send((0, 10), ());
252            demux.as_mut().poll_ready(&mut ());
253            demux.as_mut().start_send((1, 20), ());
254            demux.as_mut().poll_ready(&mut ());
255            demux.as_mut().start_send((2, 30), ());
256            demux.as_mut().poll_ready(&mut ());
257            demux.as_mut().start_send((0, 40), ());
258            demux.as_mut().poll_ready(&mut ());
259            demux.as_mut().start_send((2, 50), ());
260            demux.as_mut().poll_flush(&mut ());
261        }
262
263        assert_eq!(tp_a.items(), vec![10, 40]);
264        assert_eq!(tp_b.items(), vec![20]);
265        assert_eq!(tp_c.items(), vec![30, 50]);
266    }
267
268    #[test]
269    fn test_demux_var_canpend_is_no_for_sync_pushes() {
270        let mut tp_a: TestPush<i32, _, _> = TestPush::no_pend();
271        let mut tp_b: TestPush<i32, _, _> = TestPush::no_pend();
272        let pushes = variadics::var_expr!(&mut tp_a, &mut tp_b);
273        let demux = demux_var(pushes);
274        assert_can_pend_no(&demux);
275    }
276
277    #[test]
278    fn test_demux_var_readies_all_before_send() {
279        let mut tp_a = TestPush::no_pend();
280        let mut tp_b = TestPush::no_pend();
281        let pushes = variadics::var_expr!(&mut tp_a, &mut tp_b);
282        let mut demux = demux_var(pushes);
283        let mut demux = Pin::new(&mut demux);
284        demux.as_mut().poll_ready(&mut ());
285        demux.as_mut().start_send((0, 10), ());
286        demux.as_mut().poll_ready(&mut ());
287        demux.as_mut().start_send((1, 20), ());
288        demux.as_mut().poll_flush(&mut ());
289    }
290}