1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//! Delivers activities to inboxes asynchronously.

use crate::activitypub::Object;
use crate::database::Database;
use crate::remote::RemoteAccessor;
use actix_web::rt::signal;
use actix_web::rt::time::sleep;
use futures_util::future::join_all;
use std::time::Duration;

/// Runs the following loop until an interrupt signal is received: get currently
/// sendable deliveries ([`Database::get_deliveries`]), if there's none, wait
/// for more ([`wait_for_deliveries`]), if there's some, send them
/// ([`deliver`]), wait for 100 ms, repeat.
pub async fn delivery_loop(db: &Database) {
    let remote = RemoteAccessor::default();
    let ctrl_c = signal::ctrl_c();
    let delivery_future = async {
        loop {
            match db.get_deliveries().await {
                Ok(deliveries) if deliveries.is_empty() => {
                    tracing::debug!("Waiting for activities to send...");
                    wait_for_deliveries(db).await;
                }
                Ok(deliveries) => {
                    tracing::debug!("Sending {} activities to inboxes...", deliveries.len());
                    deliver(deliveries, db, &remote).await;
                }
                Err(err) => {
                    tracing::error!("Failed to poll activities from the db to deliver: {err}");
                    tracing::info!("Retrying inbox deliveries in 10 seconds...");
                    sleep(Duration::from_secs(10)).await;
                }
            };
            sleep(Duration::from_millis(100)).await;
        }
    };
    tokio::select! {
        _ = delivery_future => {}
        _ = ctrl_c => tracing::info!("Ctrl+C received; stopping inbox delivery loop"),
    };
}

/// Attempts to wait for deliveries accurately
/// ([`Database::wait_for_inbox_delivery`]), and if it fails, falls back to
/// printing the error and waiting a flat 10 seconds.
async fn wait_for_deliveries(db: &Database) {
    if let Err(err) = db.wait_for_inbox_delivery().await {
        tracing::error!("Failed to wait for inbox delivery properly: {err}");
        tracing::info!("Waiting for a fallback duration of 10 seconds instead.");
        sleep(Duration::from_secs(10)).await;
    }
}

/// Delivers the given list of objects to the inboxes over HTTP.
async fn deliver(deliveries: Vec<(String, Object<'_>)>, db: &Database, remote: &RemoteAccessor) {
    let mut sends = Vec::with_capacity(deliveries.len());
    for (inbox_url, activity) in &deliveries {
        sends.push(remote.send_ap_object(inbox_url, activity));
    }
    let mut successful_deliveries = Vec::with_capacity(deliveries.len());
    let mut failed_deliveries = Vec::with_capacity(deliveries.len());
    for (result, (inbox_url, activity)) in (join_all(sends).await).into_iter().zip(&deliveries) {
        let activity_id = activity.id.as_ref().unwrap();
        match result {
            Ok(()) => {
                tracing::trace!("Delivered {activity_id} to {inbox_url}");
                successful_deliveries.push((inbox_url.as_ref(), activity_id.as_ref()));
            }
            Err(err) => {
                tracing::debug!("Delivery to inbox {inbox_url} failed: {err}");
                failed_deliveries.push((inbox_url.as_ref(), activity_id.as_ref()));
            }
        }
    }
    if !failed_deliveries.is_empty() {
        if let Err(err) = db.reschedule_failed_deliveries(&failed_deliveries).await {
            tracing::error!("Failed to reschedule successful deliveries: {err}");
        }
    }
    if !successful_deliveries.is_empty() {
        // Retry until it works to avoid resending activities (and since this is
        // failing, deliveries probably can't be made right now anyways...)
        while let Err(err) = db.clear_successful_deliveries(&successful_deliveries).await {
            tracing::error!("Failed to clear successful deliveries: {err}");
            tracing::info!("Trying to clear successful deliveries again in 5 seconds...");
            sleep(Duration::from_secs(5)).await;
        }
    }
}