chunkedge_server/
event_loop.rs

1use std::time::Instant;
2
3use bevy_app::prelude::*;
4use bevy_app::MainScheduleOrder;
5use bevy_ecs::prelude::*;
6use bevy_ecs::schedule::ScheduleLabel;
7use bevy_ecs::system::SystemState;
8use bytes::Bytes;
9use chunkedge_binary::Decode;
10use chunkedge_protocol::Packet;
11use tracing::{debug, warn};
12
13use crate::client::Client;
14
15pub struct EventLoopPlugin;
16
17impl Plugin for EventLoopPlugin {
18    fn build(&self, app: &mut App) {
19        app.add_event::<PacketEvent>()
20            .add_schedule(Schedule::new(RunEventLoop))
21            .add_schedule(Schedule::new(EventLoopPreUpdate))
22            .add_schedule(Schedule::new(EventLoopUpdate))
23            .add_schedule(Schedule::new(EventLoopPostUpdate))
24            .add_systems(RunEventLoop, run_event_loop);
25
26        app.world_mut()
27            .resource_mut::<MainScheduleOrder>()
28            .insert_after(PreUpdate, RunEventLoop);
29    }
30}
31
32/// The schedule responsible for running [`EventLoopPreUpdate`],
33/// [`EventLoopUpdate`], and [`EventLoopPostUpdate`].
34///
35/// This schedule is situated between [`PreUpdate`] and [`Update`].
36#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
37pub struct RunEventLoop;
38
39#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
40pub struct EventLoopPreUpdate;
41
42#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
43pub struct EventLoopUpdate;
44
45#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
46pub struct EventLoopPostUpdate;
47
48#[derive(Event, Clone, Debug)]
49pub struct PacketEvent {
50    /// The client this packet originated from.
51    pub client: Entity,
52    /// The moment in time this packet arrived.
53    pub timestamp: Instant,
54    /// This packet's ID.
55    pub id: i32,
56    /// The content of the packet, excluding the leading varint packet ID.
57    pub data: Bytes,
58}
59
60impl PacketEvent {
61    /// Attempts to decode this packet as the packet `P`.
62    ///
63    /// If the packet ID is mismatched or an error occurs, `None` is returned.
64    /// Otherwise, `Some` is returned containing the decoded packet.
65    #[inline]
66    pub fn decode<'a, P>(&'a self) -> Option<P>
67    where
68        P: Packet + Decode<'a>,
69    {
70        if self.id == P::ID {
71            let mut r = &self.data[..];
72
73            match P::decode(&mut r) {
74                Ok(pkt) => {
75                    if r.is_empty() {
76                        return Some(pkt);
77                    }
78
79                    warn!(
80                        "missed {} bytes while decoding packet {} (ID = {})",
81                        r.len(),
82                        P::NAME,
83                        P::ID
84                    );
85                    debug!("complete packet after partial decode: {pkt:?}");
86                }
87                Err(e) => {
88                    warn!("failed to decode packet with ID of {}: {e:#}", P::ID);
89                }
90            }
91        }
92
93        None
94    }
95}
96
97fn run_event_loop_schedules(world: &mut World) {
98    world.run_schedule(EventLoopPreUpdate);
99    world.run_schedule(EventLoopUpdate);
100    world.run_schedule(EventLoopPostUpdate);
101}
102
103/// An exclusive system for running the event loop schedule.
104#[allow(clippy::type_complexity)]
105fn run_event_loop(
106    world: &mut World,
107    state: &mut SystemState<(
108        Query<(Entity, &mut Client)>,
109        EventWriter<PacketEvent>,
110        Commands,
111    )>,
112    mut check_again: Local<Vec<(Entity, usize)>>,
113) {
114    debug_assert!(check_again.is_empty());
115
116    let (mut clients, mut event_writer, mut commands) = state.get_mut(world);
117
118    for (entity, mut client) in &mut clients {
119        match client.connection_mut().try_recv() {
120            Ok(Some(pkt)) => {
121                event_writer.send(PacketEvent {
122                    client: entity,
123                    timestamp: pkt.timestamp,
124                    id: pkt.id,
125                    data: pkt.body,
126                });
127
128                let remaining = client.connection().len();
129
130                if remaining > 0 {
131                    check_again.push((entity, remaining));
132                }
133            }
134            Ok(None) => {}
135            Err(e) => {
136                // Client is disconnected.
137                debug!("disconnecting client: {e:#}");
138                commands.entity(entity).remove::<Client>();
139            }
140        }
141    }
142
143    state.apply(world);
144    run_event_loop_schedules(world);
145
146    while !check_again.is_empty() {
147        let (mut clients, mut event_writer, mut commands) = state.get_mut(world);
148
149        check_again.retain_mut(|(entity, remaining)| {
150            debug_assert!(*remaining > 0);
151
152            if let Ok((_, mut client)) = clients.get_mut(*entity) {
153                match client.connection_mut().try_recv() {
154                    Ok(Some(pkt)) => {
155                        event_writer.send(PacketEvent {
156                            client: *entity,
157                            timestamp: pkt.timestamp,
158                            id: pkt.id,
159                            data: pkt.body,
160                        });
161                        *remaining -= 1;
162                        // Keep looping as long as there are packets to process this tick.
163                        *remaining > 0
164                    }
165                    Ok(None) => false,
166                    Err(e) => {
167                        // Client is disconnected.
168                        debug!("disconnecting client: {e:#}");
169                        commands.entity(*entity).remove::<Client>();
170                        false
171                    }
172                }
173            } else {
174                // Client must have been deleted in the last run of the schedule.
175                false
176            }
177        });
178
179        state.apply(world);
180        run_event_loop_schedules(world);
181    }
182}