1use core::pin::Pin;
3use core::task::Poll;
4
5use pin_project_lite::pin_project;
6
7use crate::Yes;
8use crate::push::{Push, PushStep};
9
10pin_project! {
11 #[must_use = "`Push`es do nothing unless items are pushed into them"]
16 pub struct Sink<Si> {
17 #[pin]
18 sink: Si,
19 }
20}
21
22impl<Si> Sink<Si> {
23 pub(crate) const fn new(sink: Si) -> Self {
25 Self { sink }
26 }
27}
28
29impl<Si, Item, Meta> Push<Item, Meta> for Sink<Si>
30where
31 Si: futures_sink::Sink<Item>,
32 Si::Error: core::fmt::Debug,
33 Meta: Copy,
34{
35 type Ctx<'ctx> = core::task::Context<'ctx>;
36
37 type CanPend = Yes;
38
39 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
40 match self.project().sink.poll_ready(ctx) {
41 Poll::Ready(Ok(())) => PushStep::Done,
42 Poll::Ready(Err(err)) => panic!("Sink error during poll_ready: {err:?}"),
43 Poll::Pending => PushStep::Pending(Yes),
44 }
45 }
46
47 fn start_send(self: Pin<&mut Self>, item: Item, _meta: Meta) {
48 match self.project().sink.start_send(item) {
50 Ok(()) => {}
51 Err(err) => panic!("Sink error during start_send: {err:?}"),
52 }
53 }
54
55 fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
56 match self.project().sink.poll_flush(ctx) {
57 Poll::Ready(Ok(())) => PushStep::Done,
58 Poll::Ready(Err(err)) => panic!("Sink error during poll_flush: {err:?}"),
59 Poll::Pending => PushStep::Pending(Yes),
60 }
61 }
62
63 fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option<usize>)) {
64 }
66}