dfir_pipes/pull/
cross_singleton.rs1use core::borrow::BorrowMut;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::pull::{FusedPull, Pull, PullStep};
7use crate::{Context, Toggle};
8
9pin_project! {
10 #[must_use = "`Pull`s do nothing unless polled"]
17 #[derive(Clone, Debug, Default)]
18 pub struct CrossSingleton<ItemPull, SinglePull, SingleState> {
19 #[pin]
20 item_pull: ItemPull,
21 #[pin]
22 singleton_pull: SinglePull,
23
24 singleton_state: SingleState,
25 }
26}
27
28impl<ItemPull, SinglePull, SingleState> CrossSingleton<ItemPull, SinglePull, SingleState>
29where
30 Self: Pull,
31{
32 pub(crate) const fn new(
33 item_pull: ItemPull,
34 singleton_pull: SinglePull,
35 singleton_state: SingleState,
36 ) -> Self {
37 Self {
38 item_pull,
39 singleton_pull,
40 singleton_state,
41 }
42 }
43}
44
45impl<ItemPull, SinglePull, SingleState> Pull for CrossSingleton<ItemPull, SinglePull, SingleState>
46where
47 ItemPull: Pull,
48 SinglePull: Pull,
49 SinglePull::Item: Clone,
50 SingleState: BorrowMut<Option<SinglePull::Item>>,
51{
52 type Ctx<'ctx> = <ItemPull::Ctx<'ctx> as Context<'ctx>>::Merged<SinglePull::Ctx<'ctx>>;
53
54 type Item = (ItemPull::Item, SinglePull::Item);
55 type Meta = ItemPull::Meta;
56 type CanPend = <ItemPull::CanPend as Toggle>::Or<SinglePull::CanPend>;
57 type CanEnd = <ItemPull::CanEnd as Toggle>::Or<SinglePull::CanEnd>;
58
59 fn pull(
60 self: Pin<&mut Self>,
61 ctx: &mut Self::Ctx<'_>,
62 ) -> PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
63 let this = self.project();
64
65 let singleton_state = this.singleton_state.borrow_mut();
68 let singleton = match singleton_state {
69 Some(singleton) => singleton,
70 None => {
71 match this
72 .singleton_pull
73 .pull(<ItemPull::Ctx<'_> as Context<'_>>::unmerge_other(ctx))
74 {
75 PullStep::Ready(item, _meta) => singleton_state.insert(item),
76 PullStep::Pending(_) => {
77 return PullStep::pending();
78 }
79 PullStep::Ended(_) => {
80 return PullStep::ended();
83 }
84 }
85 }
86 };
87
88 match this
90 .item_pull
91 .pull(<ItemPull::Ctx<'_> as Context<'_>>::unmerge_self(ctx))
92 {
93 PullStep::Ready(item, meta) => {
94 PullStep::Ready((item, singleton.clone()), meta)
96 }
97 PullStep::Pending(_) => PullStep::pending(),
98 PullStep::Ended(_) => PullStep::ended(),
100 }
101 }
102
103 fn size_hint(&self) -> (usize, Option<usize>) {
104 let (mut lower, upper) = self.item_pull.size_hint();
105 if self.singleton_state.borrow().is_none() {
106 lower = 0;
107 }
108 (lower, upper)
109 }
110}
111
112impl<ItemPull, SinglePull, SingleState> FusedPull
113 for CrossSingleton<ItemPull, SinglePull, SingleState>
114where
115 ItemPull: FusedPull,
116 SinglePull: FusedPull,
117 SinglePull::Item: Clone,
118 SingleState: BorrowMut<Option<SinglePull::Item>>,
119{
120}
121
122#[cfg(test)]
123mod tests {
124 use core::pin::pin;
125
126 use super::*;
127 use crate::pull::test_utils::TestPull;
128 use crate::pull::{Pull, Repeat};
129
130 #[test]
133 fn cross_singleton_ends_when_singleton_ends_empty() {
134 let mut cs = pin!(CrossSingleton::new(
135 Repeat::new(1i32),
136 TestPull::items(0i32..0),
137 None
138 ));
139 let _ = cs.as_mut().pull(&mut ());
140 }
141
142 #[test]
145 fn cross_singleton_ends_when_item_pull_ends() {
146 let mut cs = pin!(CrossSingleton::new(
147 TestPull::items(0i32..0),
148 Repeat::new(42i32),
149 None
150 ));
151 let _ = cs.as_mut().pull(&mut ());
152 }
153
154 #[test]
155 fn cross_singleton_fused_shields_upstream() {
156 use crate::pull;
157 use crate::pull::test_utils::assert_fused_runtime;
158
159 let p = pin!(TestPull::items(0..2).fuse().cross_singleton(pull::once(42)));
160 assert_fused_runtime(p);
161 }
162}