aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--planetwars-server/src/modules/bot_api.rs63
1 files changed, 55 insertions, 8 deletions
diff --git a/planetwars-server/src/modules/bot_api.rs b/planetwars-server/src/modules/bot_api.rs
index f6e4d5c..f5aae20 100644
--- a/planetwars-server/src/modules/bot_api.rs
+++ b/planetwars-server/src/modules/bot_api.rs
@@ -6,8 +6,9 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
+use std::time::Duration;
-use runner::match_context::{EventBus, PlayerHandle, RequestMessage};
+use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage};
use runner::match_log::MatchLogger;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -114,10 +115,16 @@ impl runner::BotSpec for RemoteBotSpec {
);
let client_messages = rx.await.unwrap();
- tokio::spawn(handle_bot_messages(player_id, event_bus, client_messages));
+ tokio::spawn(handle_bot_messages(
+ player_id,
+ event_bus.clone(),
+ client_messages,
+ ));
Box::new(RemoteBotHandle {
sender: server_msg_snd,
+ player_id,
+ event_bus,
})
}
}
@@ -138,19 +145,59 @@ async fn handle_bot_messages(
struct RemoteBotHandle {
sender: mpsc::UnboundedSender<Result<pb::PlayerRequest, Status>>,
+ player_id: u32,
+ event_bus: Arc<Mutex<EventBus>>,
}
impl PlayerHandle for RemoteBotHandle {
fn send_request(&mut self, r: RequestMessage) {
- self.sender
- .send(Ok(pb::PlayerRequest {
- request_id: r.request_id as i32,
- content: r.content,
- }))
- .unwrap();
+ let res = self.sender.send(Ok(pb::PlayerRequest {
+ request_id: r.request_id as i32,
+ content: r.content,
+ }));
+ match res {
+ Ok(()) => {
+ // schedule a timeout. See comments at method implementation
+ tokio::spawn(schedule_timeout(
+ (self.player_id, r.request_id),
+ r.timeout,
+ self.event_bus.clone(),
+ ));
+ }
+ Err(_send_error) => {
+ // cannot contact the remote bot anymore;
+ // directly mark all requests as timed out.
+ // TODO: create a dedicated error type for this.
+ // should it be logged?
+ self.event_bus
+ .lock()
+ .unwrap()
+ .resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout));
+ }
+ }
}
}
+// TODO: this will spawn a task for every request, which might not be ideal.
+// Some alternatives:
+// - create a single task that manages all time-outs.
+// - intersperse timeouts with incoming client messages
+// - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle).
+// This was initially not done to allow timer start to be delayed until the message actually arrived
+// with the player. Is this still needed, or is there a different way to do this?
+//
+async fn schedule_timeout(
+ request_id: (u32, u32),
+ duration: Duration,
+ event_bus: Arc<Mutex<EventBus>>,
+) {
+ tokio::time::sleep(duration).await;
+ event_bus
+ .lock()
+ .unwrap()
+ .resolve_request(request_id, Err(RequestError::Timeout));
+}
+
async fn run_match(router: PlayerRouter, pool: ConnectionPool) {
let conn = pool.get().await.unwrap();