1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use crate::activitypub::Object;
use crate::database::Database;
use crate::remote::RemoteAccessor;
use actix_web::rt::signal;
use actix_web::rt::time::sleep;
use futures_util::future::join_all;
use std::time::Duration;
pub async fn delivery_loop(db: &Database) {
let remote = RemoteAccessor::default();
let ctrl_c = signal::ctrl_c();
let delivery_future = async {
loop {
match db.get_deliveries().await {
Ok(deliveries) if deliveries.is_empty() => {
tracing::debug!("Waiting for activities to send...");
wait_for_deliveries(db).await;
}
Ok(deliveries) => {
tracing::debug!("Sending {} activities to inboxes...", deliveries.len());
deliver(deliveries, db, &remote).await;
}
Err(err) => {
tracing::error!("Failed to poll activities from the db to deliver: {err}");
tracing::info!("Retrying inbox deliveries in 10 seconds...");
sleep(Duration::from_secs(10)).await;
}
};
sleep(Duration::from_millis(100)).await;
}
};
tokio::select! {
_ = delivery_future => {}
_ = ctrl_c => tracing::info!("Ctrl+C received; stopping inbox delivery loop"),
};
}
async fn wait_for_deliveries(db: &Database) {
if let Err(err) = db.wait_for_inbox_delivery().await {
tracing::error!("Failed to wait for inbox delivery properly: {err}");
tracing::info!("Waiting for a fallback duration of 10 seconds instead.");
sleep(Duration::from_secs(10)).await;
}
}
async fn deliver(deliveries: Vec<(String, Object<'_>)>, db: &Database, remote: &RemoteAccessor) {
let mut sends = Vec::with_capacity(deliveries.len());
for (inbox_url, activity) in &deliveries {
sends.push(remote.send_ap_object(inbox_url, activity));
}
let mut successful_deliveries = Vec::with_capacity(deliveries.len());
let mut failed_deliveries = Vec::with_capacity(deliveries.len());
for (result, (inbox_url, activity)) in (join_all(sends).await).into_iter().zip(&deliveries) {
let activity_id = activity.id.as_ref().unwrap();
match result {
Ok(()) => {
tracing::trace!("Delivered {activity_id} to {inbox_url}");
successful_deliveries.push((inbox_url.as_ref(), activity_id.as_ref()));
}
Err(err) => {
tracing::debug!("Delivery to inbox {inbox_url} failed: {err}");
failed_deliveries.push((inbox_url.as_ref(), activity_id.as_ref()));
}
}
}
if !failed_deliveries.is_empty() {
if let Err(err) = db.reschedule_failed_deliveries(&failed_deliveries).await {
tracing::error!("Failed to reschedule successful deliveries: {err}");
}
}
if !successful_deliveries.is_empty() {
while let Err(err) = db.clear_successful_deliveries(&successful_deliveries).await {
tracing::error!("Failed to clear successful deliveries: {err}");
tracing::info!("Trying to clear successful deliveries again in 5 seconds...");
sleep(Duration::from_secs(5)).await;
}
}
}