Skip to main content

dfir_pipes/pull/
cross_singleton.rs

1use core::borrow::BorrowMut;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::pull::{FusedPull, Pull, PullStep};
7use crate::{Context, Toggle};
8
9pin_project! {
10    /// Pull combinator that crosses each item from `item_pull` with a singleton value from `singleton_pull`.
11    ///
12    /// The singleton value is obtained from the first item of `singleton_pull` and cached.
13    /// All subsequent items from `item_pull` are paired with this cached singleton value.
14    ///
15    /// If `singleton_pull` ends before yielding any items, the entire combinator ends immediately.
16    #[must_use = "`Pull`s do nothing unless polled"]
17    #[derive(Clone, Debug, Default)]
18    pub struct CrossSingleton<ItemPull, SinglePull, SingleState> {
19        #[pin]
20        item_pull: ItemPull,
21        #[pin]
22        singleton_pull: SinglePull,
23
24        singleton_state: SingleState,
25    }
26}
27
28impl<ItemPull, SinglePull, SingleState> CrossSingleton<ItemPull, SinglePull, SingleState>
29where
30    Self: Pull,
31{
32    pub(crate) const fn new(
33        item_pull: ItemPull,
34        singleton_pull: SinglePull,
35        singleton_state: SingleState,
36    ) -> Self {
37        Self {
38            item_pull,
39            singleton_pull,
40            singleton_state,
41        }
42    }
43}
44
45impl<ItemPull, SinglePull, SingleState> Pull for CrossSingleton<ItemPull, SinglePull, SingleState>
46where
47    ItemPull: Pull,
48    SinglePull: Pull,
49    SinglePull::Item: Clone,
50    SingleState: BorrowMut<Option<SinglePull::Item>>,
51{
52    type Ctx<'ctx> = <ItemPull::Ctx<'ctx> as Context<'ctx>>::Merged<SinglePull::Ctx<'ctx>>;
53
54    type Item = (ItemPull::Item, SinglePull::Item);
55    type Meta = ItemPull::Meta;
56    type CanPend = <ItemPull::CanPend as Toggle>::Or<SinglePull::CanPend>;
57    type CanEnd = <ItemPull::CanEnd as Toggle>::Or<SinglePull::CanEnd>;
58
59    fn pull(
60        self: Pin<&mut Self>,
61        ctx: &mut Self::Ctx<'_>,
62    ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
63        let this = self.project();
64
65        // Set the singleton state only if it is not already set.
66        // This short-circuits the `SinglePull` side to the first item only.
67        let singleton_state = this.singleton_state.borrow_mut();
68        let singleton = match singleton_state {
69            Some(singleton) => singleton,
70            None => {
71                match this
72                    .singleton_pull
73                    .pull(<ItemPull::Ctx<'_> as Context<'_>>::unmerge_other(ctx))
74                {
75                    PullStep::Ready(item, _meta) => singleton_state.insert(item),
76                    PullStep::Pending(_) => {
77                        return PullStep::pending();
78                    }
79                    PullStep::Ended(_) => {
80                        // If `singleton_pull` returns EOS, we return EOS, no fused requirement.
81                        // This short-circuits the `ItemPull` side, dropping them.
82                        return PullStep::ended();
83                    }
84                }
85            }
86        };
87
88        // Stream any items.
89        match this
90            .item_pull
91            .pull(<ItemPull::Ctx<'_> as Context<'_>>::unmerge_self(ctx))
92        {
93            PullStep::Ready(item, meta) => {
94                // TODO(mingwei): use meta of singleton too
95                PullStep::Ready((item, singleton.clone()), meta)
96            }
97            PullStep::Pending(_) => PullStep::pending(),
98            // If `item_pull` returns EOS, we return EOS, no fused requirement.
99            PullStep::Ended(_) => PullStep::ended(),
100        }
101    }
102
103    fn size_hint(&self) -> (usize, Option<usize>) {
104        let (mut lower, upper) = self.item_pull.size_hint();
105        if self.singleton_state.borrow().is_none() {
106            lower = 0;
107        }
108        (lower, upper)
109    }
110}
111
112impl<ItemPull, SinglePull, SingleState> FusedPull
113    for CrossSingleton<ItemPull, SinglePull, SingleState>
114where
115    ItemPull: FusedPull,
116    SinglePull: FusedPull,
117    SinglePull::Item: Clone,
118    SingleState: BorrowMut<Option<SinglePull::Item>>,
119{
120}
121
122#[cfg(test)]
123mod tests {
124    use core::pin::pin;
125
126    use super::*;
127    use crate::pull::test_utils::TestPull;
128    use crate::pull::{Pull, Repeat};
129
130    /// When item_pull CanEnd=No and singleton_pull CanEnd=Yes,
131    /// CanEnd should allow ending when singleton_pull ends empty.
132    #[test]
133    fn cross_singleton_ends_when_singleton_ends_empty() {
134        let mut cs = pin!(CrossSingleton::new(
135            Repeat::new(1i32),
136            TestPull::items(0i32..0),
137            None
138        ));
139        let _ = cs.as_mut().pull(&mut ());
140    }
141
142    /// When item_pull CanEnd=Yes and singleton_pull CanEnd=No,
143    /// CanEnd should allow ending when item_pull ends.
144    #[test]
145    fn cross_singleton_ends_when_item_pull_ends() {
146        let mut cs = pin!(CrossSingleton::new(
147            TestPull::items(0i32..0),
148            Repeat::new(42i32),
149            None
150        ));
151        let _ = cs.as_mut().pull(&mut ());
152    }
153
154    #[test]
155    fn cross_singleton_fused_shields_upstream() {
156        use crate::pull;
157        use crate::pull::test_utils::assert_fused_runtime;
158
159        let p = pin!(TestPull::items(0..2).fuse().cross_singleton(pull::once(42)));
160        assert_fused_runtime(p);
161    }
162}