Question: Allow a future to store a pointer to a pinned value in its container
Prelude
I have been working on this segment of code that attempts to provide a recyclable pointers error API for implementing an asynchronous stream for a REST paginator.
I have gone through many iterations and settled on storing state in an enumerable that describes at what point the process is at, both because I feel that it is the best fit for this purpose and also because it is something to learn from, being especially explicit about the whole process. I do not want to use stream!
or try_stream!
from the async-stream
crate.
The state begins at Begin
, and moves a PaginationDelegate
into the next state after using it to make a request. This state is Pending
and owns the delegate
and a future
that is returned from PaginationDelegate::next_page
.
The issue appears when the next_page
method needs a reference, &self
, but the self
is not stored on the stack frame of the future that is stored within the Pending
state.
I wanted to keep this "flat" because I find the algorithm easier to follow, but I also wanted to learn how to create this self-referential structure the most correct way. I am aware that I can wrap the future and have it own the PaginationDelegate
, and indeed this may be the method I end up using. Nevertheless, I want to know how I could move the two values into the same holding structure and keep the pointer alive for my own education.
Delegate Trait
Here a PaginationDelegate
is defined. This trait is intended to be implemented and used by any method for function that intends to return a PaginatedStream
or dyn Stream
. Its purpose is to define how the requests will be made, as well as store a limited subset of the state (the offset for the next page from the REST API, and the total number of items that are expected from the API).
#[async_trait] pub trait PaginationDelegate { type Item; type Error; /// Performs an asynchronous request for the next page and returns either /// a vector of the result items or an error. async fn next_page(&self) -> Result<Vec<Self::Item>, Self::Error>; /// Gets the current offset, which will be the index at the end of the /// current/previous page. The value returned from this will be changed by /// [`PaginatedStream`] immediately following a successful call to /// [`next_page()`], increasing by the number of items returned. fn offset(&self) -> usize; /// Sets the offset for the next page. The offset is required to be the /// index of the last item from the previous page. fn set_offset(&mut self, value: usize); /// Gets the total count of items that are currently expected from the API. /// This may change if the API returns a different number of results on /// subsequent pages, and may be less than what the API claims in its /// response data if the API has a maximum limit. fn total_items(&self) -> Option<usize>; }
Stream State
The next segment is the enum
itself, which serves as the implimentor for Stream
and the holder for the current state of the iterator.
Note that currently the Pending
variant has the delegate
and the future
separate. I could have used future: Pin<Box<dyn Future<Output = Result<(D, Vec<D::Item>), D::Error>>>>
to keep the delegate
inside of the Future
but prefer not to because I want to solve the underlying problem and not gloss over it. Also, the delegate
field is a Pin<Box<D>>
because I was experimenting and I feel that this is the closest I have gotten to a correct solution.
pub enum PaginatedStream<D: PaginationDelegate> { Begin { delegate: D, }, Pending { delegate: Pin<Box<D>>, #[allow(clippy::type_complexity)] future: Pin<Box<dyn Future<Output = Result<Vec<D::Item>, D::Error>>>>, }, Ready { delegate: D, items: VecDeque<D::Item>, }, Closed, Indeterminate, }
Stream Implementation
Last part is the implementation of Stream
. This is incomplete for two reasons; I have not finished it, and it would be best to keep the example short and minimal.
impl<D: 'static> Stream for PaginatedStream<D> where D: PaginationDelegate + Unpin, D::Item: Unpin, { // If the state is `Pending` and the future resolves to an `Err`, that error is // forwarded only once and the state set to `Closed`. If there is at least one // result to return, the `Ok` variant is, of course, used instead. type Item = Result<D::Item, D::Error>; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Avoid using the full namespace to match all variants. use PaginatedStream::*; // Take ownership of the current state (`self`) and replace it with the // `Indeterminate` state until the new state is in fact determined. let this = std::mem::replace(&mut *self, Indeterminate); match this { // This state only occurs at the entry of the state machine. It only holds the // `PaginationDelegate` that will be used to update the offset and make new requests. Begin { delegate } => { // Pin the delegate to the heap to ensure that it doesn't move and that pointers // remain valid even after moving the value into the new state. let delegate = Box::pin(delegate); // Set the current state to `Pending`, after making the next request using the // pinned delegate. self.set(Pending { delegate, future: PaginationDelegate::next_page(delegate.as_ref()), }); // Return the distilled verson of the new state to the callee, indicating that a // new request has been made and we are waiting or new data. Poll::Pending } // At some point in the past this stream was polled and made a new request. Now it is // time to poll the future returned from that request that was made, and if results are // available, unpack them to the `Ready` state and move the delegate. If the future // still doesn't have results, set the state back to `Pending` and move the fields back // into position. Pending { delegate, future } => todo!(), // The request has resolved with data in the past, and there are items ready for us to // provide the callee. In the event that there are no more items in the `VecDeque`, we // will make the next request and construct the state for `Pending` again. Ready { delegate, items } => todo!(), // Either an error has occurred, or the last item has been yielded already. Nobody // should be polling anymore, but to be nice, just tell them that there are no more // results with `Poll::Ready(None)`. Closed => Poll::Ready(None), // The `Indeterminate` state should have only been used internally and reset back to a // valid state before yielding the `Poll` to the callee. This branch should never be // reached, if it is, that is a panic. Indeterminate => unreachable!(), } } }
Compiler Messages
At the moment, in the Begin
branch, there are two compiler messages where the borrow to the delegate
(delegate.as_ref()
) is taken and passed to the PaginationDelegate::next_page
method.
The first is that the delegate
does not live long enough, because the pinned value is moved into the new state variant Pending
, and no longer resides at the position it was assigned. I do not understand why the compiler wants this to exist for 'static
though, and would appreciate if this could be explained.
error[E0597]: `delegate` does not live long enough --> src/lib.rs:90:59 | 90 | future: PaginationDelegate::next_page(delegate.as_ref()), | ------------------------------^^^^^^^^^^^^^^^^^- | | | | | borrowed value does not live long enough | cast requires that `delegate` is borrowed for `'static` ... 96 | } | - `delegate` dropped here while still borrowed
I would also like to hear any methods you have for creating the values for fields of a struct
that rely on data that should be moved into the struct
(self-referential, the main issue of this entire post). I know it is wrong (and impossible) to use MaybeUninit
here because any placeholder value that would later be dropped will cause undefined behavior. Possibly show me a method for allocating a structure of uninitialized memory and then overwriting those fields with values after they have been constructed, without letting the compiler attempt to free the uninitialized memory.
The second compiler message is as follows, which is similar to the first except that the temporary value for delegate
is moved into the struct. I am to understand that this is fundamentally the same issue described above, but just explained differently by two separate heuristics. Is my understanding wrong?
error[E0382]: borrow of moved value: `delegate` --> src/lib.rs:90:59 | 84 | let delegate = Box::pin(delegate); | -------- move occurs because `delegate` has type `Pin<Box<D>>`, which does not implement the `Copy` trait ... 89 | delegate, | -------- value moved here 90 | future: PaginationDelegate::next_page(delegate.as_ref()), | ^^^^^^^^^^^^^^^^^ value borrowed here after move
Environment
This is real code but is already a MCVE I believe.
To set up the environment for this, the crate dependencies are as follows.
[dependencies] futures-core = "0.3" async-trait = "0.1"
And the imports that are used in the code,
use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; use async_trait::async_trait; use futures_core::{Future, Stream};