dfir_pipes/pull/
collect.rs1use core::pin::Pin;
2use core::task::Poll;
3
4use pin_project_lite::pin_project;
5
6use crate::Context;
7use crate::pull::{Pull, PullStep};
8
9pin_project! {
10 #[must_use = "futures do nothing unless polled"]
12 #[derive(Clone, Debug, Default)]
13 pub struct Collect<Prev, C> {
14 #[pin]
15 prev: Prev,
16 collect: C,
17 }
18}
19
20impl<Prev, C> Collect<Prev, C>
21where
22 Self: Future,
23 C: Default,
24{
25 pub(crate) fn new(prev: Prev) -> Self {
26 Self {
27 prev,
28 collect: C::default(),
29 }
30 }
31}
32
33impl<Prev, C> Future for Collect<Prev, C>
34where
35 Prev: Pull,
36 for<'ctx> Prev::Ctx<'ctx>: Context<'ctx>,
37 C: Default + Extend<Prev::Item>,
38{
39 type Output = C;
40
41 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
42 let mut this = self.project();
43 let ctx = <Prev::Ctx<'_> as Context<'_>>::from_task(cx);
44
45 #[cfg(nightly)]
46 this.collect.extend_reserve(this.prev.size_hint().0);
47
48 loop {
49 return match this.prev.as_mut().pull(ctx) {
50 PullStep::Ready(item, _meta) => {
51 #[cfg(nightly)]
52 this.collect.extend_one(item);
53 #[cfg(not(nightly))]
54 this.collect.extend(core::iter::once(item));
55
56 continue;
57 }
58 PullStep::Pending(_) => Poll::Pending,
59 PullStep::Ended(_) => Poll::Ready(core::mem::take(this.collect)),
60 };
61 }
62 }
63}