dfir_pipes/push/
persist.rs1use alloc::vec::Vec;
2use core::borrow::BorrowMut;
3use core::pin::Pin;
4
5use pin_project_lite::pin_project;
6
7use crate::push::{Push, PushStep, ready};
8
9pin_project! {
10 #[must_use = "`Push`es do nothing unless items are pushed into them"]
12 pub struct Persist<Psh, Buf> {
13 #[pin]
14 push: Psh,
15 buf: Buf,
16 replay_idx: usize,
17 }
18}
19
20impl<Psh, Buf> Persist<Psh, Buf> {
21 pub(crate) fn new<Item>(buf: Buf, replay: bool, push: Psh) -> Self
23 where
24 Psh: Push<Item, ()>,
25 Item: Clone,
26 Buf: BorrowMut<Vec<Item>>,
27 {
28 let replay_idx = if replay { 0 } else { buf.borrow().len() };
29 Self {
30 push,
31 buf,
32 replay_idx,
33 }
34 }
35
36 fn empty_replay<Item>(self: Pin<&mut Self>, ctx: &mut Psh::Ctx<'_>) -> PushStep<Psh::CanPend>
38 where
39 Psh: Push<Item, ()>,
40 Item: Clone,
41 Buf: BorrowMut<Vec<Item>>,
42 {
43 let mut this = self.project();
44 while let Some(item) = this.buf.borrow().get(*this.replay_idx) {
45 ready!(this.push.as_mut().poll_ready(ctx));
46 this.push.as_mut().start_send(item.clone(), ());
47 *this.replay_idx += 1;
48 }
49 debug_assert_eq!(this.buf.borrow().len(), *this.replay_idx);
50 PushStep::Done
51 }
52}
53
54impl<Psh, Item, Buf> Push<Item, ()> for Persist<Psh, Buf>
56where
57 Psh: Push<Item, ()>,
58 Item: Clone,
59 Buf: BorrowMut<Vec<Item>>,
60{
61 type Ctx<'ctx> = Psh::Ctx<'ctx>;
62
63 type CanPend = Psh::CanPend;
64
65 fn poll_ready(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
66 ready!(self.as_mut().empty_replay(ctx));
68 self.project().push.poll_ready(ctx)
70 }
71
72 fn start_send(self: Pin<&mut Self>, item: Item, _meta: ()) {
73 let this = self.project();
74 debug_assert_eq!(this.buf.borrow().len(), *this.replay_idx);
75
76 this.buf.borrow_mut().push(item.clone());
78 *this.replay_idx += 1;
79
80 this.push.start_send(item, ());
82 }
83
84 fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
85 ready!(self.as_mut().empty_replay(ctx));
87 self.project().push.poll_flush(ctx)
89 }
90
91 fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
92 let this = self.project();
93 this.buf.borrow_mut().reserve(hint.0);
94 this.push.size_hint(hint);
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use core::pin::Pin;
101
102 extern crate alloc;
103 use alloc::vec::Vec;
104
105 use crate::push::Push;
106 use crate::push::test_utils::TestPush;
107
108 #[test]
109 fn persist_readies_downstream_for_replay_and_new() {
110 let mut buf = Vec::new();
111 {
113 let mut tp = TestPush::no_pend();
114 let mut p = crate::push::persist_state(&mut buf, false, &mut tp);
115 let mut p = Pin::new(&mut p);
116 p.as_mut().poll_ready(&mut ());
117 p.as_mut().start_send(1, ());
118 p.as_mut().poll_ready(&mut ());
119 p.as_mut().start_send(2, ());
120 p.as_mut().poll_flush(&mut ());
121 }
122 {
124 let mut tp = TestPush::no_pend();
125 let mut p = crate::push::persist_state(&mut buf, true, &mut tp);
126 let mut p = Pin::new(&mut p);
127 p.as_mut().poll_ready(&mut ());
128 p.as_mut().start_send(3, ());
129 p.as_mut().poll_flush(&mut ());
130 }
131 }
132}