dfir_pipes/push/
fanout.rs1use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use super::ready_both;
7use crate::push::{Push, PushStep};
8use crate::{Context, Toggle};
9
10pin_project! {
11 #[must_use = "`Push`es do nothing unless items are pushed into them"]
15 #[derive(Clone, Debug)]
16 pub struct Fanout<Push0, Push1> {
17 #[pin]
18 push_0: Push0,
19 #[pin]
20 push_1: Push1,
21 }
22}
23
24impl<Push0, Push1> Fanout<Push0, Push1> {
25 pub(crate) const fn new(push_0: Push0, push_1: Push1) -> Self {
27 Self { push_0, push_1 }
28 }
29}
30
31impl<P0, P1, Item, Meta> Push<Item, Meta> for Fanout<P0, P1>
32where
33 P0: Push<Item, Meta>,
34 P1: Push<Item, Meta>,
35 Item: Clone,
36 Meta: Copy,
37{
38 type Ctx<'ctx> = <P0::Ctx<'ctx> as Context<'ctx>>::Merged<P1::Ctx<'ctx>>;
39
40 type CanPend = <P0::CanPend as Toggle>::Or<P1::CanPend>;
41
42 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
43 let this = self.project();
44 ready_both!(
45 this.push_0
46 .poll_ready(<P0::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
47 this.push_1
48 .poll_ready(<P0::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
49 );
50 PushStep::Done
51 }
52
53 fn start_send(self: Pin<&mut Self>, item: Item, meta: Meta) {
54 let this = self.project();
55 let item_clone = item.clone();
56 this.push_0.start_send(item, meta);
57 this.push_1.start_send(item_clone, meta);
58 }
59
60 fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
61 let this = self.project();
62 ready_both!(
63 this.push_0
64 .poll_flush(<P0::Ctx<'_> as Context<'_>>::unmerge_self(ctx)),
65 this.push_1
66 .poll_flush(<P0::Ctx<'_> as Context<'_>>::unmerge_other(ctx)),
67 );
68 PushStep::Done
69 }
70
71 fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
72 let this = self.project();
73 this.push_0.size_hint(hint);
74 this.push_1.size_hint(hint);
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use core::pin::Pin;
81
82 use super::Fanout;
83 use crate::push::Push;
84 use crate::push::test_utils::TestPush;
85
86 #[test]
87 fn fanout_readies_both_before_send() {
88 let mut tp_a = TestPush::no_pend();
89 let mut tp_b = TestPush::no_pend();
90 let mut f = Fanout::new(&mut tp_a, &mut tp_b);
91 let mut f = Pin::new(&mut f);
92 f.as_mut().poll_ready(&mut ());
93 f.as_mut().start_send(1, ());
94 f.as_mut().poll_ready(&mut ());
95 f.as_mut().start_send(2, ());
96 f.as_mut().poll_flush(&mut ());
97 }
98}