Skip to main content

dfir_pipes/push/
persist.rs

1use alloc::vec::Vec;
2use core::borrow::BorrowMut;
3use core::pin::Pin;
4
5use pin_project_lite::pin_project;
6
7use crate::push::{Push, PushStep, ready};
8
9pin_project! {
10    /// Special push operator for the `persist` operator.
11    #[must_use = "`Push`es do nothing unless items are pushed into them"]
12    pub struct Persist<Psh, Buf> {
13        #[pin]
14        push: Psh,
15        buf: Buf,
16        replay_idx: usize,
17    }
18}
19
20impl<Psh, Buf> Persist<Psh, Buf> {
21    /// Create with the given replay index and following push.
22    pub(crate) fn new<Item>(buf: Buf, replay: bool, push: Psh) -> Self
23    where
24        Psh: Push<Item, ()>,
25        Item: Clone,
26        Buf: BorrowMut<Vec<Item>>,
27    {
28        let replay_idx = if replay { 0 } else { buf.borrow().len() };
29        Self {
30            push,
31            buf,
32            replay_idx,
33        }
34    }
35
36    /// Drains any pending replay items by pushing them downstream.
37    fn empty_replay<Item>(self: Pin<&mut Self>, ctx: &mut Psh::Ctx<'_>) -> PushStep<Psh::CanPend>
38    where
39        Psh: Push<Item, ()>,
40        Item: Clone,
41        Buf: BorrowMut<Vec<Item>>,
42    {
43        let mut this = self.project();
44        while let Some(item) = this.buf.borrow().get(*this.replay_idx) {
45            ready!(this.push.as_mut().poll_ready(ctx));
46            this.push.as_mut().start_send(item.clone(), ());
47            *this.replay_idx += 1;
48        }
49        debug_assert_eq!(this.buf.borrow().len(), *this.replay_idx);
50        PushStep::Done
51    }
52}
53
54// TODO(mingwei): support arbitrary metadata.
55impl<Psh, Item, Buf> Push<Item, ()> for Persist<Psh, Buf>
56where
57    Psh: Push<Item, ()>,
58    Item: Clone,
59    Buf: BorrowMut<Vec<Item>>,
60{
61    type Ctx<'ctx> = Psh::Ctx<'ctx>;
62
63    type CanPend = Psh::CanPend;
64
65    fn poll_ready(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
66        // Drain any pending replay items first.
67        ready!(self.as_mut().empty_replay(ctx));
68        // Then ready the downstream push.
69        self.project().push.poll_ready(ctx)
70    }
71
72    fn start_send(self: Pin<&mut Self>, item: Item, _meta: ()) {
73        let this = self.project();
74        debug_assert_eq!(this.buf.borrow().len(), *this.replay_idx);
75
76        // Persist the new item.
77        this.buf.borrow_mut().push(item.clone());
78        *this.replay_idx += 1;
79
80        // Push it downstream (downstream was readied via poll_ready).
81        this.push.start_send(item, ());
82    }
83
84    fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
85        // Ensure all replayed items are sent before flushing the underlying sink.
86        ready!(self.as_mut().empty_replay(ctx));
87        // Then flush the downstream push.
88        self.project().push.poll_flush(ctx)
89    }
90
91    fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
92        let this = self.project();
93        this.buf.borrow_mut().reserve(hint.0);
94        this.push.size_hint(hint);
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use core::pin::Pin;
101
102    extern crate alloc;
103    use alloc::vec::Vec;
104
105    use crate::push::Push;
106    use crate::push::test_utils::TestPush;
107
108    #[test]
109    fn persist_readies_downstream_for_replay_and_new() {
110        let mut buf = Vec::new();
111        // First pass: persist items 1, 2.
112        {
113            let mut tp = TestPush::no_pend();
114            let mut p = crate::push::persist_state(&mut buf, false, &mut tp);
115            let mut p = Pin::new(&mut p);
116            p.as_mut().poll_ready(&mut ());
117            p.as_mut().start_send(1, ());
118            p.as_mut().poll_ready(&mut ());
119            p.as_mut().start_send(2, ());
120            p.as_mut().poll_flush(&mut ());
121        }
122        // Second pass: replay=true, should replay 1, 2 then accept new item 3.
123        {
124            let mut tp = TestPush::no_pend();
125            let mut p = crate::push::persist_state(&mut buf, true, &mut tp);
126            let mut p = Pin::new(&mut p);
127            p.as_mut().poll_ready(&mut ());
128            p.as_mut().start_send(3, ());
129            p.as_mut().poll_flush(&mut ());
130        }
131    }
132}