Skip to main content

dfir_pipes/push/
fanout.rs

1//! [`Fanout`] push combinator.
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use super::ready_both;
7use crate::push::{Push, PushStep};
8use crate::{Context, Toggle};
9
10pin_project! {
11    /// Push combinator that clones each item and pushes to both downstream pushes.
12    ///
13    /// This is the push equivalent of `futures::sink::SinkExt::fanout`.
14    #[must_use = "`Push`es do nothing unless items are pushed into them"]
15    #[derive(Clone, Debug)]
16    pub struct Fanout<Push0, Push1> {
17        #[pin]
18        push_0: Push0,
19        #[pin]
20        push_1: Push1,
21    }
22}
23
24impl<Push0, Push1> Fanout<Push0, Push1> {
25    /// Creates with downstream pushes `push_0` and `push_1`.
26    pub(crate) const fn new(push_0: Push0, push_1: Push1) -> Self {
27        Self { push_0, push_1 }
28    }
29}
30
31impl<P0, P1, Item, Meta> Push<Item, Meta> for Fanout<P0, P1>
32where
33    P0: Push<Item, Meta>,
34    P1: Push<Item, Meta>,
35    Item: Clone,
36    Meta: Copy,
37{
38    type Ctx<'ctx> = <P0::Ctx<'ctx> as Context<'ctx>>::Merged<P1::Ctx<'ctx>>;
39
40    type CanPend = <P0::CanPend as Toggle>::Or<P1::CanPend>;
41
42    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
43        let this = self.project();
44        ready_both!(
45            this.push_0
46                .poll_ready(<P0::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
47            this.push_1
48                .poll_ready(<P0::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
49        );
50        PushStep::Done
51    }
52
53    fn start_send(self: Pin<&mut Self>, item: Item, meta: Meta) {
54        let this = self.project();
55        let item_clone = item.clone();
56        this.push_0.start_send(item, meta);
57        this.push_1.start_send(item_clone, meta);
58    }
59
60    fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
61        let this = self.project();
62        ready_both!(
63            this.push_0
64                .poll_flush(<P0::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
65            this.push_1
66                .poll_flush(<P0::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
67        );
68        PushStep::Done
69    }
70
71    fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
72        let this = self.project();
73        this.push_0.size_hint(hint);
74        this.push_1.size_hint(hint);
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use core::pin::Pin;
81
82    use super::Fanout;
83    use crate::push::Push;
84    use crate::push::test_utils::TestPush;
85
86    #[test]
87    fn fanout_readies_both_before_send() {
88        let mut tp_a = TestPush::no_pend();
89        let mut tp_b = TestPush::no_pend();
90        let mut f = Fanout::new(&mut tp_a, &mut tp_b);
91        let mut f = Pin::new(&mut f);
92        f.as_mut().poll_ready(&mut ());
93        f.as_mut().start_send(1, ());
94        f.as_mut().poll_ready(&mut ());
95        f.as_mut().start_send(2, ());
96        f.as_mut().poll_flush(&mut ());
97    }
98}