1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use std::thread::Thread;

use futures::executor::LocalSpawner;
use futures::future::RemoteHandle;
use futures::task::LocalSpawnExt;
use futures::{pin_mut, Stream, StreamExt};

use crate::dependencies::Dependency;
use crate::store::channel::WeakSender;

/// Asynchronous work being performed by a `Store`.
///
/// A [`Store`][`crate::Store`] uses a [Local Async Executor] to run its `Task`s.
///
/// [Local Async Executor]: https://maciej.codes/2022-06-09-local-async.html
#[doc(hidden)]
#[derive(Debug)]
#[must_use = "dropping a Task cancels the underlying future"]
pub struct Task {
    pub(crate) handle: Option<RemoteHandle<()>>,
    pub(crate) when: Option<std::time::Instant>,
}

impl Task {
    /// Detaches the task; leaving its [`Future`][`std::future`] running in the background.
    pub fn detach(self) {
        if let Some(handle) = self.handle {
            handle.forget()
        }
    }

    /// Cancels the task; meaning its [`Future`][`std::future`] won’t be polled again.
    pub fn cancel(self) {
        drop(self)
    }

    pub(crate) fn new<Action: 'static, S: Stream<Item = Action> + 'static>(stream: S) -> Self {
        // Only called by “root” `Effects`, so it will be the same `Action` as used by the `Store`
        let handle =
            Dependency::<Executor<Result<Action, Thread>>>::new().and_then(
                |executor| match executor.actions.upgrade() {
                    None => None,
                    Some(sender) => executor
                        .spawner
                        .spawn_local_with_handle(async move {
                            pin_mut!(stream);
                            while let Some(action) = stream.next().await {
                                sender.send(Ok(action));
                            }
                        })
                        .ok(),
                },
            );

        Task {
            handle, // may return a `Task { handle: None }` while the `Store` is shutting down
            when: None,
        }
    }
}

pub(crate) struct Executor<Action> {
    pub(crate) spawner: LocalSpawner,
    pub(crate) actions: WeakSender<Action>,
}

impl<Action> Executor<Action> {
    pub(crate) fn new(spawner: LocalSpawner, actions: WeakSender<Action>) -> Self {
        Self { spawner, actions }
    }
}