baubot_core/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
//! # BauBot
//! Inspired by the [baudot code](https://en.wikipedia.org/wiki/Baudot_code).
//! [BauBot] is meant to simplify the plumbing around [teloxide](https://github.com/teloxide/teloxide) for use with server applications. Possible applications include:
//! - E-mail server to receive e-mails and rebroadcast them on telegram
//! - OTP login approval
//!
//! # Goals and non-goals
//! ## Goals
//! - (Asynchronously) receive push messages from a **client** application.
//! - (Asynchronously) receive responses from a **user** and send it back to the **client**
//! application.
//! - (Asynchronously) register users through a database interface.
//!
//! ## Non-goals
//! - Database implementation: users have to implement their own database and their own interface
//! in the form of [BauData].
//! - Standalone application interface (e.g. TCP server listening for updates): other parts of this
//! project may address taht.

pub(crate) use prelude::*;

use std::marker::PhantomData;

pub mod prelude;

pub mod broadcaster;

/// # [BauBot]
/// Call [BauBot::new] with a [BauData] database to start the server(s). A new instance of [BauBot]
/// is created that implements [Deref] to a [broadcaster::types::ClientSocket] (for sending
/// a [broadcaster::types::BauMessage]) to [BauBot].
///
/// Under the hood, calling [BauBot::new] orchestrates and wraps a number of tasks. See
/// [BauBot::new] for more information.
pub struct BauBot<
    Db: BauData + Send + Sync + 'static,
    DbRef: Deref<Target = Db> + Clone + Send + Sync + 'static,
> {
    db: PhantomData<DbRef>,
    bot_server_handle: task::JoinHandle<()>,
    request_server_handle: task::JoinHandle<()>,
    client_socket: broadcaster::types::ClientSocket,
}

impl<
        Db: BauData + Send + Sync + 'static,
        DbRef: Deref<Target = Db> + Clone + Send + Sync + 'static,
    > Deref for BauBot<Db, DbRef>
{
    type Target = broadcaster::types::ClientSocket;

    fn deref(&self) -> &Self::Target {
        &self.client_socket
    }
}

impl<
        Db: BauData + Send + Sync + 'static,
        DbRef: Deref<Target = Db> + Clone + Send + Sync + 'static,
    > AsRef<broadcaster::types::ClientSocket> for BauBot<Db, DbRef>
{
    fn as_ref(&self) -> &broadcaster::types::ClientSocket {
        &self.client_socket
    }
}

/// Ensures that all threads are stopped on drop
impl<Db: BauData + Send + Sync, DbRef: Deref<Target = Db> + Clone + Send + Sync + 'static> Drop
    for BauBot<Db, DbRef>
{
    fn drop(&mut self) {
        self.bot_server_handle.abort();
        self.request_server_handle.abort();
    }
}

impl<
        Db: BauData + Send + Sync + 'static,
        DbRef: Deref<Target = Db> + Clone + Send + Sync + 'static,
    > BauBot<Db, DbRef>
{
    /// Creates a new [BauBot]. This runs a number of concurrent tasks
    /// - (test mode) Initialise environment variables and logger
    /// - Initialises a request server to listen for requests sent through a
    /// [broadcaster::types::ClientSocket]
    /// - Initialises a [Bot] to interact with telegram
    pub fn new<S: Into<String>>(db: DbRef, token: S) -> Self {
        #[cfg(test)]
        baubot_utils::init();

        // Create sockets
        let (client_socket, server_socket) = tokio::sync::mpsc::unbounded_channel();

        // Create bot: If in test mode use utils
        let bot = Bot::new(token);

        // Create server
        let request_server = Arc::new(broadcaster::Server::new());

        // Start server
        let request_server_clone = request_server.clone();
        let db_clone = db.clone();
        let bot_clone = bot.clone();
        let request_server_handle = task::spawn(async move {
            broadcaster::Server::listen(request_server_clone, bot_clone, db_clone, server_socket)
                .await;
        });

        // Create dependancy map
        let mut dependencies = DependencyMap::new();
        dependencies.insert(db);
        dependencies.insert(request_server);

        // Wrap bot server handle
        let bot_server_handle = task::spawn(async move {
            Dispatcher::builder(bot, Self::handler_builder())
                .dependencies(dependencies)
                .build()
                .dispatch()
                .await
        });

        Self {
            db: PhantomData,
            bot_server_handle,
            request_server_handle,
            client_socket,
        }
    }

    /// Build the handler schema
    fn handler_builder() -> UpdateHandler<Box<dyn std::error::Error + Send + Sync + 'static>> {
        /// Only used here.
        use teloxide::dispatching::dialogue::GetChatId;

        // Command handler
        let command = teloxide::filter_command::<Command, _>().endpoint(Self::command_handler);

        // Callback handler
        let callback = broadcaster::Server::callback_update();

        // Message handler
        let message = Update::filter_message()
            // Inject user
            .filter_map(|update: Update| update.from().cloned())
            // Inject chatId
            .filter_map(|message: Message| message.chat_id())
            // Inject messageId
            .filter_map(|message: Message| Some(message.id))
            .branch(command)
            .endpoint(Self::catch_all);

        // Overall handler?
        let master = dptree::entry().branch(callback).branch(message);

        master
    }

    /// Parse [Command] received by the Bot
    async fn command_handler(
        bot: Bot,
        ChatId(chat_id): ChatId,
        user: User,
        command: Command,
        MessageId(message_id): MessageId,
        db: DbRef,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        // Run command
        let outcome = match command {
            Command::Start => Self::register_user(db, chat_id, user).await,
            Command::Unregister => Self::delete_user(db, user).await,
            Command::Help => Ok(Command::descriptions().to_string()),
        }
        .unwrap_or_else(|err| format!("ERROR: {err}"));

        // Send result
        reply_message(&bot, chat_id, message_id, outcome).await?;

        Ok(())
    }

    /// Handler to register a user in the DB
    async fn register_user(db: DbRef, chat_id: i64, user: User) -> Result<String, String> {
        // Attempt to get username, reject if fail
        let username = user.username.ok_or(format!("No username supplied."))?;

        // Attempt to insert
        trace!("Attempting to register {username}");
        match db.insert_chat_id(&username, chat_id).await {
            Ok(id) => Ok(format!(
                fmt!(pass "Registered!{}\n\n{}"),
                match id {
                    Some(id) =>
                        format!(" Your old registration of <code>{id}</code> has been updated."),
                    None => "".to_string(),
                },
                "🤗 Welcome to baubot's notification system."
            )),
            Err(err) => Err(err.to_string()),
        }
    }

    /// Handler to delete a user from the DB
    async fn delete_user(db: DbRef, user: User) -> Result<String, String> {
        // Attempt to get username, reject if fail
        let username = user.username.ok_or(format!("No username supplied."))?;

        // Attempt to insert
        match db.delete_chat_id(&username).await {
            Ok(id) => Ok(format!(
                fmt!(fail "Your chat_id <code>{}</code> has been deleted"),
                id
            )),
            Err(err) => Err(err.to_string()),
        }
    }

    /// Catch-all
    async fn catch_all(
        bot: Bot,
        MessageId(message_id): MessageId,
        ChatId(chat_id): ChatId,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        reply_message(
            &bot,
            chat_id,
            message_id,
            fmt!(fail "Baubot does not know how to respond to your input. <b>Baubot is a bad elf!</b>" )
                .into(),
        )
        .await?;

        Ok(())
    }
}