use std::thread::Thread;
use futures::executor::LocalSpawner;
use futures::future::RemoteHandle;
use futures::task::LocalSpawnExt;
use futures::{pin_mut, Stream, StreamExt};
use crate::dependencies::Dependency;
use crate::store::channel::WeakSender;
#[doc(hidden)]
#[derive(Debug)]
#[must_use = "dropping a Task cancels the underlying future"]
pub struct Task {
pub(crate) handle: Option<RemoteHandle<()>>,
pub(crate) when: Option<std::time::Instant>,
}
impl Task {
pub fn detach(self) {
if let Some(handle) = self.handle {
handle.forget()
}
}
pub fn cancel(self) {
drop(self)
}
pub(crate) fn new<Action: 'static, S: Stream<Item = Action> + 'static>(stream: S) -> Self {
let handle =
Dependency::<Executor<Result<Action, Thread>>>::new().and_then(
|executor| match executor.actions.upgrade() {
None => None,
Some(sender) => executor
.spawner
.spawn_local_with_handle(async move {
pin_mut!(stream);
while let Some(action) = stream.next().await {
sender.send(Ok(action));
}
})
.ok(),
},
);
Task {
handle, when: None,
}
}
}
pub(crate) struct Executor<Action> {
pub(crate) spawner: LocalSpawner,
pub(crate) actions: WeakSender<Action>,
}
impl<Action> Executor<Action> {
pub(crate) fn new(spawner: LocalSpawner, actions: WeakSender<Action>) -> Self {
Self { spawner, actions }
}
}