Skip to main content

dfir_pipes/pull/
chain.rs

1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::pull::{FusedPull, Pull, PullStep};
6use crate::{Context, Toggle, Yes};
7
8pin_project! {
9    /// Pull combinator that chains two pulls in sequence.
10    #[must_use = "`Pull`s do nothing unless polled"]
11    #[derive(Clone, Debug, Default)]
12    pub struct Chain<A, B> {
13        #[pin]
14        first: A,
15        #[pin]
16        second: B,
17    }
18}
19
20impl<A, B> Chain<A, B>
21where
22    Self: Pull,
23{
24    pub(crate) const fn new(first: A, second: B) -> Self {
25        Self { first, second }
26    }
27}
28
29impl<A, B> Pull for Chain<A, B>
30where
31    A: FusedPull<CanEnd = Yes>,
32    B: Pull<Item = A::Item, Meta = A::Meta>,
33{
34    type Ctx<'ctx> = <A::Ctx<'ctx> as Context<'ctx>>::Merged<B::Ctx<'ctx>>;
35
36    type Item = A::Item;
37    type Meta = A::Meta;
38    type CanPend = <A::CanPend as Toggle>::Or<B::CanPend>;
39    type CanEnd = B::CanEnd;
40
41    fn pull(
42        self: Pin<&mut Self>,
43        ctx: &mut Self::Ctx<'_>,
44    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
45        let mut this = self.project();
46
47        match this.first.as_mut().pull(Context::unmerge_self(ctx)) {
48            PullStep::Ready(item, meta) => {
49                return PullStep::Ready(item, meta);
50            }
51            PullStep::Pending(_) => {
52                return PullStep::pending();
53            }
54            PullStep::Ended(_) => {
55                // First is fused, so it will keep returning Ended.
56                // Fall through to pull from second.
57            }
58        }
59
60        this.second
61            .as_mut()
62            .pull(<A::Ctx<'_> as Context<'_>>::unmerge_other(ctx))
63            .convert_into()
64    }
65
66    fn size_hint(&self) -> (usize, Option<usize>) {
67        let (a_lower, a_upper) = self.first.size_hint();
68        let (b_lower, b_upper) = self.second.size_hint();
69
70        let lower = a_lower.saturating_add(b_lower);
71        let upper = a_upper.zip(b_upper).and_then(|(a, b)| a.checked_add(b));
72
73        (lower, upper)
74    }
75}
76
77impl<A, B> FusedPull for Chain<A, B>
78where
79    A: FusedPull<CanEnd = Yes>,
80    B: FusedPull<Item = A::Item, Meta = A::Meta>,
81{
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use crate::pull::test_utils::{TestPull, assert_is_fused, assert_types};
88    use crate::pull::{No, Repeat, Yes, pending};
89
90    /// Fused pull with CanPend=No, CanEnd=Yes.
91    type SyncTestPull = TestPull<i32, (), No, Yes, true>;
92    /// Fused pull with CanPend=Yes, CanEnd=Yes.
93    type AsyncTestPull = TestPull<i32, (), Yes, Yes, true>;
94
95    #[test]
96    fn chain_sync_with_various_second() {
97        // Sync + Sync: CanPend=No, CanEnd=Yes
98        let chain = Chain::new(SyncTestPull::new([]), SyncTestPull::new([]));
99        assert_types::<No, Yes>(&chain);
100        assert_is_fused(&chain);
101
102        // Sync + Infinite: CanPend=No, CanEnd=No
103        let chain = Chain::new(SyncTestPull::new([]), Repeat::new(42));
104        assert_types::<No, No>(&chain);
105
106        // Sync + Pending: CanPend=Yes (No.or(Yes)), CanEnd=No
107        let chain = Chain::new(SyncTestPull::new([]), pending());
108        assert_types::<Yes, No>(&chain);
109    }
110
111    #[test]
112    fn chain_async_with_various_second() {
113        // Async + Async: CanPend=Yes, CanEnd=Yes
114        let chain = Chain::new(AsyncTestPull::new([]), AsyncTestPull::new([]));
115        assert_types::<Yes, Yes>(&chain);
116        assert_is_fused(&chain);
117
118        // Async + Infinite: CanPend=Yes, CanEnd=No
119        let chain = Chain::new(AsyncTestPull::new([]), Repeat::new(42));
120        assert_types::<Yes, No>(&chain);
121    }
122
123    #[test]
124    fn chain_nested_types() {
125        // Chain<Chain<Sync, Async>, Infinite>: CanPend=Yes, CanEnd=No
126        let chain_ab = Chain::new(SyncTestPull::new([]), AsyncTestPull::new([]));
127        assert_types::<Yes, Yes>(&chain_ab);
128
129        let chain_abc = Chain::new(chain_ab, Repeat::new(3));
130        assert_types::<Yes, No>(&chain_abc);
131    }
132
133    #[test]
134    fn chain_fused_shields_upstream() {
135        use core::pin::pin;
136
137        use crate::pull::once;
138        use crate::pull::test_utils::assert_fused_runtime;
139
140        // TODO(mingwei): use upstream `Pull`s that pend sometimes.
141        let p = pin!(once(5).chain(once(6)));
142        assert_fused_runtime(p);
143    }
144}