1use core::borrow::BorrowMut;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll, Waker};
7
8use futures_core::stream::{FusedStream, Stream};
9use pin_project_lite::pin_project;
10
11use crate::Yes;
12use crate::push::{Push, PushStep, ready};
13
14pin_project! {
15 #[must_use = "`Push`es do nothing unless items are pushed into them"]
20pub struct ResolveFutures<Psh, Queue, QueueInner> {
21 #[pin]
22 push: Psh,
23 queue: Queue,
24 subgraph_waker: Option<Waker>,
27
28 _phantom: PhantomData<QueueInner>,
29 }
30}
31
32impl<Psh, Queue, QueueInner> ResolveFutures<Psh, Queue, QueueInner> {
33 pub(crate) const fn new<Fut>(queue: Queue, subgraph_waker: Option<Waker>, push: Psh) -> Self
37 where
38 Psh: Push<Fut::Output, ()>,
39 Queue: BorrowMut<QueueInner>,
40 QueueInner: Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
41 Fut: Future,
42 {
44 Self {
45 push,
46 queue,
47 subgraph_waker,
48 _phantom: PhantomData,
49 }
50 }
51
52 fn empty_ready<Fut>(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> PushStep<Yes>
54 where
55 Psh: Push<Fut::Output, ()>,
56 Queue: BorrowMut<QueueInner>,
57 QueueInner: Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
58 Fut: Future,
59 {
60 let mut this = self.project();
61
62 loop {
63 ready!(
65 this.push
66 .as_mut()
67 .poll_ready(crate::Context::from_task(ctx))
68 );
69
70 let poll_result = if let Some(w) = this.subgraph_waker.as_ref() {
71 Stream::poll_next(
72 Pin::new(this.queue.borrow_mut()),
73 &mut Context::<'_>::from_waker(w),
74 )
75 } else {
76 Stream::poll_next(Pin::new(this.queue.borrow_mut()), ctx)
77 };
78
79 match poll_result {
80 Poll::Ready(Some(out)) => {
81 this.push.as_mut().start_send(out, ());
82 }
83 Poll::Ready(None) => {
84 return PushStep::Done;
85 }
86 Poll::Pending => {
87 if this.subgraph_waker.is_some() {
88 return PushStep::Done; } else {
90 return PushStep::Pending(Yes);
93 }
94 }
95 }
96 }
97 }
98}
99
100impl<Psh, Queue, QueueInner, Fut> Push<Fut, ()> for ResolveFutures<Psh, Queue, QueueInner>
102where
103 Psh: Push<Fut::Output, ()>,
104 Queue: BorrowMut<QueueInner>,
105 QueueInner: Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
106 Fut: Future,
107 for<'ctx> Psh::Ctx<'ctx>: crate::Context<'ctx>,
108{
109 type Ctx<'ctx> = Context<'ctx>;
110 type CanPend = Yes;
111
112 fn poll_ready(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
113 self.as_mut().empty_ready(ctx) }
115
116 fn start_send(self: Pin<&mut Self>, item: Fut, _meta: ()) {
117 let this = self.project();
118 this.queue.borrow_mut().extend(core::iter::once(item));
119
120 if let Some(waker) = this.subgraph_waker.as_ref() {
121 if let Poll::Ready(Some(out)) = Stream::poll_next(
131 Pin::new(this.queue.borrow_mut()),
132 &mut Context::from_waker(waker),
133 ) {
134 this.push.start_send(out, ());
135 }
136 }
137 }
138
139 fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
140 ready!(self.as_mut().empty_ready(ctx));
142 let this = self.project();
144 this.push
145 .poll_flush(crate::Context::from_task(ctx))
146 .convert_into()
147 }
148
149 fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
150 self.project().push.size_hint(hint);
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use core::pin::Pin;
158 use core::task::{Context, Waker};
159
160 use futures_util::stream::FuturesUnordered;
161
162 use super::*;
163 use crate::push::Push;
164 use crate::push::test_utils::{PushCall, TestPush};
165
166 type Queue = FuturesUnordered<core::future::Ready<i32>>;
167
168 #[test]
169 fn test_poll_ready_readies_downstream() {
170 let waker = Waker::noop();
171 let mut cx = Context::from_waker(waker);
172
173 let mut mock = TestPush::no_pend();
174 let mut queue: Queue = [1, 2, 3].into_iter().map(core::future::ready).collect();
175 let mut rf = ResolveFutures::<_, _, Queue>::new(&mut queue, None, &mut mock);
176
177 let result = Push::<core::future::Ready<i32>, ()>::poll_ready(Pin::new(&mut rf), &mut cx);
178 assert!(result.is_done());
179
180 drop(rf);
181 assert!(
182 mock.history
183 .iter()
184 .any(|c| matches!(c, PushCall::PollReady)),
185 "downstream poll_ready was not called"
186 );
187 assert!(
188 mock.history
189 .iter()
190 .any(|c| matches!(c, PushCall::SendItem(_))),
191 "downstream should have received items"
192 );
193 }
194
195 #[test]
196 fn test_poll_flush_calls_downstream_flush() {
197 let waker = Waker::noop();
198 let mut cx = Context::from_waker(waker);
199
200 let mut mock = TestPush::no_pend();
201 let mut queue: Queue = FuturesUnordered::new();
202 let mut rf = ResolveFutures::<_, _, Queue>::new(&mut queue, None, &mut mock);
203
204 let result = Push::<core::future::Ready<i32>, ()>::poll_flush(Pin::new(&mut rf), &mut cx);
205 assert!(result.is_done());
206
207 drop(rf);
208 assert!(
209 mock.history
210 .iter()
211 .any(|c| matches!(c, PushCall::PollFlush)),
212 "downstream poll_flush was not called"
213 );
214 }
215
216 #[test]
217 fn resolve_futures_readies_downstream_before_each_send() {
218 let waker = Waker::noop();
219 let mut cx = Context::from_waker(waker);
220
221 let mut guard = TestPush::no_pend();
222 let mut queue: Queue = [1, 2, 3].into_iter().map(core::future::ready).collect();
223 let mut rf = ResolveFutures::<_, _, Queue>::new(&mut queue, None, &mut guard);
224
225 let result = Push::<core::future::Ready<i32>, ()>::poll_ready(Pin::new(&mut rf), &mut cx);
227 assert!(result.is_done());
228
229 Push::<core::future::Ready<i32>, ()>::start_send(
231 Pin::new(&mut rf),
232 core::future::ready(4),
233 (),
234 );
235
236 let result = Push::<core::future::Ready<i32>, ()>::poll_flush(Pin::new(&mut rf), &mut cx);
238 assert!(result.is_done());
239 }
240}