Skip to main content

dfir_pipes/push/
unzip.rs

1//! [`Unzip`] push combinator.
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use super::ready_both;
7use crate::push::{Push, PushStep};
8use crate::{Context, Toggle};
9
10pin_project! {
11    /// Push combinator that splits `(A, B)` items into two separate downstream pushes.
12    #[must_use = "`Push`es do nothing unless items are pushed into them"]
13    #[derive(Clone, Debug)]
14    pub struct Unzip<Push0, Push1> {
15        #[pin]
16        push_0: Push0,
17        #[pin]
18        push_1: Push1,
19    }
20}
21
22impl<Push0, Push1> Unzip<Push0, Push1> {
23    /// Creates with downstream pushes `push_0` and `push_1`.
24    pub(crate) const fn new(push_0: Push0, push_1: Push1) -> Self {
25        Self { push_0, push_1 }
26    }
27}
28
29impl<P0, P1, Item0, Item1, Meta> Push<(Item0, Item1), Meta> for Unzip<P0, P1>
30where
31    P0: Push<Item0, Meta>,
32    P1: Push<Item1, Meta>,
33    Meta: Copy,
34{
35    type Ctx<'ctx> = <P0::Ctx<'ctx> as Context<'ctx>>::Merged<P1::Ctx<'ctx>>;
36
37    type CanPend = <P0::CanPend as Toggle>::Or<P1::CanPend>;
38
39    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
40        let this = self.project();
41        ready_both!(
42            this.push_0
43                .poll_ready(<P0::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
44            this.push_1
45                .poll_ready(<P0::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
46        );
47        PushStep::Done
48    }
49
50    fn start_send(self: Pin<&mut Self>, (item0, item1): (Item0, Item1), meta: Meta) {
51        let this = self.project();
52        this.push_0.start_send(item0, meta);
53        this.push_1.start_send(item1, meta);
54    }
55
56    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
57        let this = self.project();
58        ready_both!(
59            this.push_0
60                .poll_flush(<P0::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
61            this.push_1
62                .poll_flush(<P0::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
63        );
64        PushStep::Done
65    }
66
67    fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
68        let this = self.project();
69        this.push_0.size_hint(hint);
70        this.push_1.size_hint(hint);
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use core::pin::Pin;
77
78    use super::Unzip;
79    use crate::push::Push;
80    use crate::push::test_utils::TestPush;
81
82    #[test]
83    fn unzip_readies_both_before_send() {
84        let mut tp_a = TestPush::no_pend();
85        let mut tp_b = TestPush::no_pend();
86        let mut u = Unzip::new(&mut tp_a, &mut tp_b);
87        let mut u = Pin::new(&mut u);
88        u.as_mut().poll_ready(&mut ());
89        u.as_mut().start_send((1, 2), ());
90        u.as_mut().poll_ready(&mut ());
91        u.as_mut().start_send((3, 4), ());
92        u.as_mut().poll_flush(&mut ());
93    }
94}