chunkedge_server/
event_loop.rs1use std::time::Instant;
2
3use bevy_app::prelude::*;
4use bevy_app::MainScheduleOrder;
5use bevy_ecs::prelude::*;
6use bevy_ecs::schedule::ScheduleLabel;
7use bevy_ecs::system::SystemState;
8use bytes::Bytes;
9use chunkedge_binary::Decode;
10use chunkedge_protocol::Packet;
11use tracing::{debug, warn};
12
13use crate::client::Client;
14
15pub struct EventLoopPlugin;
16
17impl Plugin for EventLoopPlugin {
18 fn build(&self, app: &mut App) {
19 app.add_event::<PacketEvent>()
20 .add_schedule(Schedule::new(RunEventLoop))
21 .add_schedule(Schedule::new(EventLoopPreUpdate))
22 .add_schedule(Schedule::new(EventLoopUpdate))
23 .add_schedule(Schedule::new(EventLoopPostUpdate))
24 .add_systems(RunEventLoop, run_event_loop);
25
26 app.world_mut()
27 .resource_mut::<MainScheduleOrder>()
28 .insert_after(PreUpdate, RunEventLoop);
29 }
30}
31
32#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
37pub struct RunEventLoop;
38
39#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
40pub struct EventLoopPreUpdate;
41
42#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
43pub struct EventLoopUpdate;
44
45#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)]
46pub struct EventLoopPostUpdate;
47
48#[derive(Event, Clone, Debug)]
49pub struct PacketEvent {
50 pub client: Entity,
52 pub timestamp: Instant,
54 pub id: i32,
56 pub data: Bytes,
58}
59
60impl PacketEvent {
61 #[inline]
66 pub fn decode<'a, P>(&'a self) -> Option<P>
67 where
68 P: Packet + Decode<'a>,
69 {
70 if self.id == P::ID {
71 let mut r = &self.data[..];
72
73 match P::decode(&mut r) {
74 Ok(pkt) => {
75 if r.is_empty() {
76 return Some(pkt);
77 }
78
79 warn!(
80 "missed {} bytes while decoding packet {} (ID = {})",
81 r.len(),
82 P::NAME,
83 P::ID
84 );
85 debug!("complete packet after partial decode: {pkt:?}");
86 }
87 Err(e) => {
88 warn!("failed to decode packet with ID of {}: {e:#}", P::ID);
89 }
90 }
91 }
92
93 None
94 }
95}
96
97fn run_event_loop_schedules(world: &mut World) {
98 world.run_schedule(EventLoopPreUpdate);
99 world.run_schedule(EventLoopUpdate);
100 world.run_schedule(EventLoopPostUpdate);
101}
102
103#[allow(clippy::type_complexity)]
105fn run_event_loop(
106 world: &mut World,
107 state: &mut SystemState<(
108 Query<(Entity, &mut Client)>,
109 EventWriter<PacketEvent>,
110 Commands,
111 )>,
112 mut check_again: Local<Vec<(Entity, usize)>>,
113) {
114 debug_assert!(check_again.is_empty());
115
116 let (mut clients, mut event_writer, mut commands) = state.get_mut(world);
117
118 for (entity, mut client) in &mut clients {
119 match client.connection_mut().try_recv() {
120 Ok(Some(pkt)) => {
121 event_writer.send(PacketEvent {
122 client: entity,
123 timestamp: pkt.timestamp,
124 id: pkt.id,
125 data: pkt.body,
126 });
127
128 let remaining = client.connection().len();
129
130 if remaining > 0 {
131 check_again.push((entity, remaining));
132 }
133 }
134 Ok(None) => {}
135 Err(e) => {
136 debug!("disconnecting client: {e:#}");
138 commands.entity(entity).remove::<Client>();
139 }
140 }
141 }
142
143 state.apply(world);
144 run_event_loop_schedules(world);
145
146 while !check_again.is_empty() {
147 let (mut clients, mut event_writer, mut commands) = state.get_mut(world);
148
149 check_again.retain_mut(|(entity, remaining)| {
150 debug_assert!(*remaining > 0);
151
152 if let Ok((_, mut client)) = clients.get_mut(*entity) {
153 match client.connection_mut().try_recv() {
154 Ok(Some(pkt)) => {
155 event_writer.send(PacketEvent {
156 client: *entity,
157 timestamp: pkt.timestamp,
158 id: pkt.id,
159 data: pkt.body,
160 });
161 *remaining -= 1;
162 *remaining > 0
164 }
165 Ok(None) => false,
166 Err(e) => {
167 debug!("disconnecting client: {e:#}");
169 commands.entity(*entity).remove::<Client>();
170 false
171 }
172 }
173 } else {
174 false
176 }
177 });
178
179 state.apply(world);
180 run_event_loop_schedules(world);
181 }
182}