1use core::pin::Pin;
7use core::task::Waker;
8
9use crate::{Context, Toggle};
10
11mod fanout;
12mod filter;
13mod filter_map;
14mod flat_map;
15mod flat_map_stream;
16mod flatten;
17mod flatten_stream;
18mod for_each;
19mod inspect;
20mod map;
21#[cfg(feature = "alloc")]
22#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
23mod persist;
24mod resolve_futures;
25mod sink;
26mod sink_compat;
27mod unzip;
28#[cfg(feature = "alloc")]
29#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
30mod vec_push;
31
32#[cfg(test)]
33pub(crate) mod test_utils;
34
35#[cfg(feature = "variadics")]
36#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
37pub mod demux_var;
38
39#[cfg(feature = "variadics")]
40#[cfg_attr(docsrs, doc(cfg(feature = "variadics")))]
41pub use demux_var::{DemuxVar, PushVariadic, demux_var};
42pub use fanout::Fanout;
43pub use filter::Filter;
44pub use filter_map::FilterMap;
45pub use flat_map::FlatMap;
46pub use flat_map_stream::FlatMapStream;
47pub use flatten::Flatten;
48pub use flatten_stream::FlattenStream;
49pub use for_each::ForEach;
50use futures_core::FusedStream;
51pub use inspect::Inspect;
52pub use map::Map;
53#[cfg(feature = "alloc")]
54#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
55pub use persist::Persist;
56pub use resolve_futures::ResolveFutures;
57pub use sink::Sink;
58pub use sink_compat::SinkCompat;
59pub use unzip::Unzip;
60#[cfg(feature = "alloc")]
61#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
62pub use vec_push::VecPush;
63
64#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
74pub enum PushStep<CanPend>
75where
76 CanPend: Toggle,
77{
78 Done,
80 Pending(CanPend),
82}
83
84impl<CanPend> PushStep<CanPend>
85where
86 CanPend: Toggle,
87{
88 pub fn pending() -> Self {
90 PushStep::Pending(Toggle::create())
91 }
92
93 pub const fn is_done(&self) -> bool {
95 matches!(self, PushStep::Done)
96 }
97
98 pub const fn is_pending(&self) -> bool {
100 matches!(self, PushStep::Pending(_))
101 }
102
103 pub fn try_convert_into<NewPend>(self) -> Option<PushStep<NewPend>>
105 where
106 NewPend: Toggle,
107 {
108 Some(match self {
109 PushStep::Done => PushStep::Done,
110 PushStep::Pending(_) => PushStep::Pending(Toggle::try_create()?),
111 })
112 }
113
114 pub fn convert_into<NewPend>(self) -> PushStep<NewPend>
116 where
117 NewPend: Toggle,
118 {
119 match self {
120 PushStep::Done => PushStep::Done,
121 PushStep::Pending(_) => PushStep::pending(),
122 }
123 }
124}
125
126pub trait Push<Item, Meta>
137where
138 Meta: Copy,
139{
140 type Ctx<'ctx>: Context<'ctx>;
142
143 type CanPend: Toggle;
145
146 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
148
149 fn start_send(self: Pin<&mut Self>, item: Item, meta: Meta);
153
154 fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend>;
156
157 fn size_hint(self: Pin<&mut Self>, hint: (usize, Option<usize>));
170}
171
172impl<P, Item, Meta> Push<Item, Meta> for &mut P
173where
174 P: Push<Item, Meta> + Unpin + ?Sized,
175 Meta: Copy,
176{
177 type Ctx<'ctx> = P::Ctx<'ctx>;
178
179 type CanPend = P::CanPend;
180
181 fn poll_ready(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
182 Pin::new(&mut **self).poll_ready(ctx)
183 }
184
185 fn start_send(mut self: Pin<&mut Self>, item: Item, meta: Meta) {
186 Pin::new(&mut **self).start_send(item, meta)
187 }
188
189 fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
190 Pin::new(&mut **self).poll_flush(ctx)
191 }
192
193 fn size_hint(mut self: Pin<&mut Self>, hint: (usize, Option<usize>)) {
194 Pin::new(&mut **self).size_hint(hint)
195 }
196}
197
198macro_rules! ready {
201 ($e:expr $(,)?) => {
202 match $e {
203 PushStep::Done => (),
204 PushStep::Pending(_) => return PushStep::pending(),
205 }
206 };
207}
208use ready;
209
210macro_rules! ready_both {
213 ($a:expr, $b:expr $(,)?) => {{
214 let a = $a;
215 let b = $b;
216 $crate::push::ready!(a);
217 $crate::push::ready!(b);
218 }};
219}
220use ready_both;
221
222pub const fn fanout<P0, P1>(push0: P0, push1: P1) -> Fanout<P0, P1> {
224 Fanout::new(push0, push1)
225}
226
227pub const fn filter<Func, Item, Next>(func: Func, next: Next) -> Filter<Next, Func>
229where
230 Func: FnMut(&Item) -> bool,
231{
232 Filter::new(func, next)
233}
234
235pub const fn filter_map<Func, In, Out, Next>(func: Func, next: Next) -> FilterMap<Next, Func>
237where
238 Func: FnMut(In) -> Option<Out>,
239{
240 FilterMap::new(func, next)
241}
242
243pub const fn flat_map<Func, In, IntoIter, Meta, Next>(
245 func: Func,
246 next: Next,
247) -> FlatMap<Next, Func, IntoIter, Meta>
248where
249 Func: FnMut(In) -> IntoIter,
250 IntoIter: IntoIterator,
251 Meta: Copy,
252 Next: Push<IntoIter::Item, Meta>,
253{
254 FlatMap::new(func, next)
255}
256
257pub const fn flat_map_stream<Func, In, St, Meta, Next>(
259 func: Func,
260 next: Next,
261) -> FlatMapStream<Next, Func, St, Meta>
262where
263 Func: FnMut(In) -> St,
264 St: futures_core::Stream,
265 Meta: Copy,
266 Next: Push<St::Item, Meta>,
267{
268 FlatMapStream::new(func, next)
269}
270
271pub const fn flatten<IntoIter, Meta, Next>(next: Next) -> Flatten<Next, IntoIter, Meta>
273where
274 IntoIter: IntoIterator,
275 Meta: Copy,
276 Next: Push<IntoIter::Item, Meta>,
277{
278 Flatten::new(next)
279}
280
281pub const fn flatten_stream<St, Meta, Next>(next: Next) -> FlattenStream<Next, St, Meta>
283where
284 St: futures_core::Stream,
285 Meta: Copy,
286 Next: Push<St::Item, Meta>,
287{
288 FlattenStream::new(next)
289}
290
291pub const fn for_each<Func, Item>(func: Func) -> ForEach<Func>
293where
294 Func: FnMut(Item),
295{
296 ForEach::new(func)
297}
298
299pub const fn inspect<Func, Item, Next>(func: Func, next: Next) -> Inspect<Next, Func>
301where
302 Func: FnMut(&Item),
303{
304 Inspect::new(func, next)
305}
306
307pub const fn map<Func, In, Out, Next>(func: Func, next: Next) -> Map<Next, Func>
309where
310 Func: FnMut(In) -> Out,
311{
312 Map::new(func, next)
313}
314
315#[cfg(feature = "alloc")]
317#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
318pub fn persist_state<Item, Next>(
319 buf: &mut alloc::vec::Vec<Item>,
320 replay: bool,
321 next: Next,
322) -> Persist<Next, &mut alloc::vec::Vec<Item>>
323where
324 Item: Clone,
325 Next: Push<Item, ()>,
326{
327 Persist::new(buf, replay, next)
328}
329
330pub const fn resolve_futures_state<Queue, Fut, Next>(
337 queue: &mut Queue,
338 subgraph_waker: Option<Waker>,
339 next: Next,
340) -> ResolveFutures<Next, &mut Queue, Queue>
341where
342 Queue: Default + Extend<Fut> + FusedStream<Item = Fut::Output> + Unpin,
343 Fut: Future,
344 Next: Push<Fut::Output, ()>,
345{
346 ResolveFutures::new(queue, subgraph_waker, next)
347}
348
349pub const fn sink<Si, Item>(si: Si) -> Sink<Si>
351where
352 Si: futures_sink::Sink<Item>,
353{
354 Sink::new(si)
355}
356
357pub const fn sink_compat<Psh, Item>(push: Psh) -> SinkCompat<Psh>
359where
360 Psh: Push<Item, ()>,
361{
362 SinkCompat::new(push)
363}
364
365pub const fn unzip<P0, P1>(push0: P0, push1: P1) -> Unzip<P0, P1> {
367 Unzip::new(push0, push1)
368}
369
370#[cfg(feature = "alloc")]
372#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
373pub const fn vec_push<Item>(
374 buf: &mut alloc::vec::Vec<Item>,
375) -> VecPush<&mut alloc::vec::Vec<Item>> {
376 VecPush::new(buf)
377}