Allow a future to store a pointer to a pinned value in its container

120

Question: Allow a future to store a pointer to a pinned value in its container

Prelude

I have been working on this segment of code that attempts to provide a recyclable pointers error API for implementing an asynchronous stream for a REST paginator.

I have gone through many iterations and settled on storing state in an enumerable that describes at what point the process is at, both because I feel that it is the best fit for this purpose and also because it is something to learn from, being especially explicit about the whole process. I do not want to use stream! or try_stream! from the async-stream crate.

The state begins at Begin, and moves a PaginationDelegate into the next state after using it to make a request. This state is Pending and owns the delegate and a future that is returned from PaginationDelegate::next_page.

The issue appears when the next_page method needs a reference, &self, but the self is not stored on the stack frame of the future that is stored within the Pending state.

I wanted to keep this "flat" because I find the algorithm easier to follow, but I also wanted to learn how to create this self-referential structure the most correct way. I am aware that I can wrap the future and have it own the PaginationDelegate, and indeed this may be the method I end up using. Nevertheless, I want to know how I could move the two values into the same holding structure and keep the pointer alive for my own education.


Delegate Trait

Here a PaginationDelegate is defined. This trait is intended to be implemented and used by any method for function that intends to return a PaginatedStream or dyn Stream. Its purpose is to define how the requests will be made, as well as store a limited subset of the state (the offset for the next page from the REST API, and the total number of items that are expected from the API).

#[async_trait] pub trait PaginationDelegate {     type Item;     type Error;      /// Performs an asynchronous request for the next page and returns either     /// a vector of the result items or an error.     async fn next_page(&self) -> Result<Vec<Self::Item>, Self::Error>;      /// Gets the current offset, which will be the index at the end of the     /// current/previous page. The value returned from this will be changed by     /// [`PaginatedStream`] immediately following a successful call to     /// [`next_page()`], increasing by the number of items returned.     fn offset(&self) -> usize;      /// Sets the offset for the next page. The offset is required to be the     /// index of the last item from the previous page.     fn set_offset(&mut self, value: usize);      /// Gets the total count of items that are currently expected from the API.     /// This may change if the API returns a different number of results on     /// subsequent pages, and may be less than what the API claims in its     /// response data if the API has a maximum limit.     fn total_items(&self) -> Option<usize>; } 

Stream State

The next segment is the enum itself, which serves as the implimentor for Stream and the holder for the current state of the iterator.

Note that currently the Pending variant has the delegate and the future separate. I could have used future: Pin<Box<dyn Future<Output = Result<(D, Vec<D::Item>), D::Error>>>> to keep the delegate inside of the Future but prefer not to because I want to solve the underlying problem and not gloss over it. Also, the delegate field is a Pin<Box<D>> because I was experimenting and I feel that this is the closest I have gotten to a correct solution.

pub enum PaginatedStream<D: PaginationDelegate> {     Begin {         delegate: D,     },     Pending {         delegate: Pin<Box<D>>,         #[allow(clippy::type_complexity)]         future: Pin<Box<dyn Future<Output = Result<Vec<D::Item>, D::Error>>>>,     },     Ready {         delegate: D,         items: VecDeque<D::Item>,     },     Closed,     Indeterminate, } 

Stream Implementation

Last part is the implementation of Stream. This is incomplete for two reasons; I have not finished it, and it would be best to keep the example short and minimal.

 impl<D: 'static> Stream for PaginatedStream<D> where     D: PaginationDelegate + Unpin,     D::Item: Unpin, {     // If the state is `Pending` and the future resolves to an `Err`, that error is     // forwarded only once and the state set to `Closed`. If there is at least one     // result to return, the `Ok` variant is, of course, used instead.     type Item = Result<D::Item, D::Error>;      fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {         // Avoid using the full namespace to match all variants.         use PaginatedStream::*;          // Take ownership of the current state (`self`) and replace it with the         // `Indeterminate` state until the new state is in fact determined.         let this = std::mem::replace(&mut *self, Indeterminate);          match this {             // This state only occurs at the entry of the state machine. It only holds the             // `PaginationDelegate` that will be used to update the offset and make new requests.             Begin { delegate } => {                 // Pin the delegate to the heap to ensure that it doesn't move and that pointers                 // remain valid even after moving the value into the new state.                 let delegate = Box::pin(delegate);                  // Set the current state to `Pending`, after making the next request using the                 // pinned delegate.                 self.set(Pending {                     delegate,                     future: PaginationDelegate::next_page(delegate.as_ref()),                 });                  // Return the distilled verson of the new state to the callee, indicating that a                 // new request has been made and we are waiting or new data.                 Poll::Pending             }             // At some point in the past this stream was polled and made a new request. Now it is             // time to poll the future returned from that request that was made, and if results are             // available, unpack them to the `Ready` state and move the delegate. If the future             // still doesn't have results, set the state back to `Pending` and move the fields back             // into position.             Pending { delegate, future } => todo!(),             // The request has resolved with data in the past, and there are items ready for us to             // provide the callee. In the event that there are no more items in the `VecDeque`, we             // will make the next request and construct the state for `Pending` again.             Ready { delegate, items } => todo!(),             // Either an error has occurred, or the last item has been yielded already. Nobody             // should be polling anymore, but to be nice, just tell them that there are no more             // results with `Poll::Ready(None)`.             Closed => Poll::Ready(None),             // The `Indeterminate` state should have only been used internally and reset back to a             // valid state before yielding the `Poll` to the callee. This branch should never be             // reached, if it is, that is a panic.             Indeterminate => unreachable!(),         }     } } 

Compiler Messages

At the moment, in the Begin branch, there are two compiler messages where the borrow to the delegate (delegate.as_ref()) is taken and passed to the PaginationDelegate::next_page method.


The first is that the delegate does not live long enough, because the pinned value is moved into the new state variant Pending, and no longer resides at the position it was assigned. I do not understand why the compiler wants this to exist for 'static though, and would appreciate if this could be explained.

error[E0597]: `delegate` does not live long enough   --> src/lib.rs:90:59    | 90 |                     future: PaginationDelegate::next_page(delegate.as_ref()),    |                             ------------------------------^^^^^^^^^^^^^^^^^-    |                             |                             |    |                             |                             borrowed value does not live long enough    |                             cast requires that `delegate` is borrowed for `'static` ... 96 |             }    |             - `delegate` dropped here while still borrowed  

I would also like to hear any methods you have for creating the values for fields of a struct that rely on data that should be moved into the struct (self-referential, the main issue of this entire post). I know it is wrong (and impossible) to use MaybeUninit here because any placeholder value that would later be dropped will cause undefined behavior. Possibly show me a method for allocating a structure of uninitialized memory and then overwriting those fields with values after they have been constructed, without letting the compiler attempt to free the uninitialized memory.


The second compiler message is as follows, which is similar to the first except that the temporary value for delegate is moved into the struct. I am to understand that this is fundamentally the same issue described above, but just explained differently by two separate heuristics. Is my understanding wrong?

error[E0382]: borrow of moved value: `delegate`   --> src/lib.rs:90:59    | 84 |                 let delegate = Box::pin(delegate);    |                     -------- move occurs because `delegate` has type `Pin<Box<D>>`, which does not implement the `Copy` trait ... 89 |                     delegate,    |                     -------- value moved here 90 |                     future: PaginationDelegate::next_page(delegate.as_ref()),    |                                                           ^^^^^^^^^^^^^^^^^ value borrowed here after move  

Environment

This is real code but is already a MCVE I believe.

To set up the environment for this, the crate dependencies are as follows.

[dependencies] futures-core = "0.3" async-trait = "0.1" 

And the imports that are used in the code,

use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll};  use async_trait::async_trait; use futures_core::{Future, Stream}; 

Total Answers: 1

17

Answers 1: of Allow a future to store a pointer to a pinned value in its container

The potential solution that I did not want to use, because it hides the underlying issue (or rather avoids the intent pointers error of this question entirely) follows.

Where the PaginatedStream enumerable is defined, change the Pending to the following.

Pending {     #[allow(clippy::type_complexity)]     future: Pin<Box<dyn Future<Output = Result<(D, Vec<D::Item>), D::Error>>>>, }, 

Now, inside the implementation of Stream change the matching arm for Begin to the following.

// This state only occurs at the entry of the state machine. It only holds the // `PaginationDelegate` that will be used to update the offset and make new requests. Begin { delegate } => {     self.set(Pending {         // Construct a new future that awaits the result and has a new type for `Output`         // that contains both the result and the moved delegate.         // Here the delegate is moved into the future via the `async` block.         future: Box::pin(async {             let result = delegate.next_page().await;             result.map(|items| (delegate, items))         }),     });      // Return the distilled verson of the new state to the callee, indicating that a     // new request has been made and we are waiting or new data.     Poll::Pending } 

The compiler knows that that async block is really async move, you could be more explicit if you wanted. This effectively moves the delegate into the stack frame of the future that is boxed and pinned, ensuring that whenever the value is moved in memory the two values move together and the pointer cannot be invalidated.

The other matching arm for Pending needs to be updated to reflect the change in signature. Here is a complete implementation of the logic.

// At some point in the past this stream was polled and asked the delegate to make a new // request. Now it is time to poll the future returned from that request that was made, // and if results are available, unpack them to the `Ready` state and move // the delegate. If the future still doesn't have results, set the state // back to `Pending` and move the fields back into position. Pending { mut future } => match future.as_mut().poll(ctx) {     // The future from the last request returned successfully with new items,     // and gave the delegate back.     Poll::Ready(Ok((mut delegate, items))) => {         // Tell the delegate the offset for the next page, which is the sum of the old         // old offset and the number of items that the API sent back.         delegate.set_offset(delegate.offset() + items.len());         // Construct a new `VecDeque` so that the items can be popped from the front.         // This should be more efficient than reversing the `Vec`, and less confusing.         let mut items = VecDeque::from(items);         // Get the first item out so that it can be yielded. The event that there are no         // more items should have been handled by the `Ready` branch, so it should be         // safe to unwrap.         let popped = items.pop_front().unwrap();          // Set the new state to `Ready` with the delegate and the items.         self.set(Ready { delegate, items });          Poll::Ready(Some(Ok(popped)))     }     // The future from the last request returned with an error.     Poll::Ready(Err(error)) => {         // Set the state to `Closed` so that any future polls will return         // `Poll::Ready(None)`. The callee can even match against this if needed.         self.set(Closed);          // Forward the error to whoever polled. This will only happen once because the         // error is moved, and the state set to `Closed`.         Poll::Ready(Some(Err(error)))     }     // The future from the last request is still pending.     Poll::Pending => {         // Because the state is currently `Indeterminate` it must be set back to what it         // was. This will move the future back into the state.         self.set(Pending { future });          // Tell the callee that we are still waiting for a response.         Poll::Pending     } },