1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//! The server-side [GameplaySocket].

use anyhow::Context;
use common::protocol::MessageOrderer;
use common::{ClientAction, ServerAction};
use std::collections::HashMap;
use std::io::ErrorKind;
use std::net::{SocketAddr, UdpSocket};
use std::time::Instant;

// Used in docs.
#[allow(unused_imports)]
use crate::login_server::LoginServer;

/// A randomly generated identifier that matches on specific peer. This is
/// included in plaintext at the start of each UDP packet from the peer.
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct PeerIdentifier(pub u128);

/// Sends effects and receives actions over UDP.
pub struct GameplaySocket {
    socket: UdpSocket,
    peer_refresh_times: HashMap<PeerIdentifier, Instant>,
    peer_addrs: HashMap<PeerIdentifier, SocketAddr>,
    peer_queues: HashMap<PeerIdentifier, MessageOrderer<ServerAction, ClientAction>>,
}

impl GameplaySocket {
    /// Binds a [UdpSocket] to `0.0.0.0:7812` to start listening for messages
    /// and sets up initial state.
    pub fn new() -> anyhow::Result<GameplaySocket> {
        let addr = "0.0.0.0:7812";
        let socket = UdpSocket::bind(addr)
            .context("Could not bind the UDP socket! Is the server already running?")?;
        socket
            .set_nonblocking(true)
            .context("Could not set UDP socket to non-blocking.")?;
        log::info!("Gameplay socket init done, listening for UDP datagrams on {addr}.");
        Ok(GameplaySocket {
            socket,
            peer_refresh_times: HashMap::new(),
            peer_addrs: HashMap::new(),
            peer_queues: HashMap::new(),
        })
    }

    /// Used by the [LoginServer], adds a new peer identifier to listen for, and
    /// their associated encryption key.
    pub fn add_peer(&mut self, peer_id: PeerIdentifier, encryption_key: [u8; 32]) {
        self.peer_queues
            .insert(peer_id, MessageOrderer::new(encryption_key));
    }

    /// Sends and receives all pending messages on the UDP socket.
    pub fn transport(&mut self) {
        let mut buf = [0; 65536];
        loop {
            match self.socket.recv_from(&mut buf) {
                Ok((n, peer_addr)) if n >= 16 => {
                    let peer_id =
                        PeerIdentifier(u128::from_be_bytes(buf[0..16].try_into().unwrap()));
                    let message = &mut buf[16..n];
                    if let Some(queue) = self.peer_queues.get_mut(&peer_id) {
                        queue.transport_recv(peer_addr, message);
                        self.peer_addrs.insert(peer_id, peer_addr);
                        self.peer_refresh_times.insert(peer_id, Instant::now());
                    } else {
                        log::trace!(
                            "<- [{peer_addr}]: no queue for id {:032x}, dropping message",
                            peer_id.0,
                        );
                    }
                }
                Ok((n, peer_addr)) => {
                    log::trace!("<- [{peer_addr}]: got malformed message, only {n} bytes (<16)");
                }
                Err(err) => {
                    if err.kind() != ErrorKind::TimedOut && err.kind() != ErrorKind::WouldBlock {
                        log::debug!("Error receiving UDP datagram: {err}");
                    }
                    break;
                }
            }
        }

        // TODO: Parallellize sending messages to peers
        for (peer_id, peer_addr) in &self.peer_addrs {
            if let Some(datagram) = self
                .peer_queues
                .get_mut(peer_id)
                .and_then(|queue| queue.transport_send(*peer_addr))
            {
                if let Err(err) = self.socket.send_to(&datagram, *peer_addr) {
                    log::warn!("Error sending UDP datagram: {err}");
                }
            }
        }
    }

    /// Returns queued messages as long as there are any. These are queued up by
    /// [GameplaySocket::transport].
    pub fn recv_actions(&mut self) -> Option<(PeerIdentifier, ClientAction)> {
        self.peer_queues
            .iter_mut()
            .find_map(|(peer_identifier, queue)| Some((*peer_identifier, queue.recv()?)))
    }

    /// Disconnects clients that haven't been refreshed in a while, returns all
    /// of the current peers.
    pub fn refresh(&mut self) -> Vec<PeerIdentifier> {
        self.peer_refresh_times.retain(|peer_id, time| {
            if Instant::now() - *time >= common::protocol::DISCONNECT_THRESHOLD {
                let peer = self.peer_addrs.remove(peer_id);
                self.peer_queues.remove(peer_id);
                if let Some(peer) = peer {
                    log::info!("Disconnected {peer} (peer id {:x})", peer_id.0);
                } else {
                    log::info!("Disconnected peer id {:x} (without an address?)", peer_id.0);
                }
                false
            } else {
                true
            }
        });
        self.peer_queues.keys().copied().collect::<Vec<_>>()
    }

    /// Queues up the action to the peer, for sending during the next
    /// [GameplaySocket::transport].
    pub fn send_action(&mut self, peer: PeerIdentifier, action: ServerAction) {
        if let Some(queue) = self.peer_queues.get_mut(&peer) {
            queue.send(action);
        } else {
            log::warn!("Can't send server action to {:x}, no queue found.", peer.0);
        }
    }
}