aboutsummaryrefslogtreecommitdiff
path: root/planetwars-localdev/src/match_runner/match_context.rs
diff options
context:
space:
mode:
authorIlion Beyst <ilion.beyst@gmail.com>2021-12-25 14:45:05 +0100
committerIlion Beyst <ilion.beyst@gmail.com>2021-12-25 14:45:05 +0100
commitd0a0fcfdeda2c315a13ddd96b4fca958da0d9858 (patch)
tree6fb53e3b6e2999ac0c9e35f390d6be26a438a634 /planetwars-localdev/src/match_runner/match_context.rs
parentd0af8d3bbff506d4f4caf03750c3b85c69c4f168 (diff)
downloadplanetwars.dev-d0a0fcfdeda2c315a13ddd96b4fca958da0d9858.tar.xz
planetwars.dev-d0a0fcfdeda2c315a13ddd96b4fca958da0d9858.zip
cli for running matches
Diffstat (limited to 'planetwars-localdev/src/match_runner/match_context.rs')
-rw-r--r--planetwars-localdev/src/match_runner/match_context.rs161
1 files changed, 161 insertions, 0 deletions
diff --git a/planetwars-localdev/src/match_runner/match_context.rs b/planetwars-localdev/src/match_runner/match_context.rs
new file mode 100644
index 0000000..466da13
--- /dev/null
+++ b/planetwars-localdev/src/match_runner/match_context.rs
@@ -0,0 +1,161 @@
+use futures::task::{Context, Poll};
+use futures::{future::Future, task::AtomicWaker};
+use serde::{Deserialize, Serialize};
+use std::fs::File;
+use std::io::Write;
+use std::pin::Pin;
+use std::time::Duration;
+use std::{
+ collections::HashMap,
+ sync::{Arc, Mutex},
+};
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct RequestMessage {
+ pub request_id: u32,
+ pub timeout: Duration,
+ pub content: Vec<u8>,
+}
+
+pub struct MatchCtx {
+ event_bus: Arc<Mutex<EventBus>>,
+ players: HashMap<u32, PlayerData>,
+ // output: MsgStreamHandle<String>,
+ log_sink: File,
+}
+
+impl MatchCtx {
+ pub fn new(
+ event_bus: Arc<Mutex<EventBus>>,
+ players: HashMap<u32, Box<dyn PlayerHandle>>,
+ log_file: File,
+ // log: MsgStreamHandle<String>,
+ ) -> Self {
+ MatchCtx {
+ event_bus,
+ players: players
+ .into_iter()
+ .map(|(id, handle)| {
+ let player_handle = PlayerData {
+ request_ctr: 0,
+ handle,
+ };
+ (id, player_handle)
+ })
+ .collect(),
+ log_sink: log_file,
+ }
+ }
+
+ // TODO: implement a clean way to handle the player not existing
+ pub fn request(&mut self, player_id: u32, content: Vec<u8>, timeout: Duration) -> Request {
+ let player = self.players.get_mut(&player_id).unwrap();
+ let request_id = player.request_ctr;
+ player.request_ctr += 1;
+
+ player.handle.send_request(RequestMessage {
+ request_id,
+ content,
+ timeout,
+ });
+
+ return Request {
+ player_id,
+ request_id,
+ event_bus: self.event_bus.clone(),
+ };
+ }
+
+ pub fn send_info(&mut self, player_id: u32, msg: String) {
+ let player = self.players.get_mut(&player_id).unwrap();
+ player.handle.send_info(msg);
+ }
+
+ pub fn players(&self) -> Vec<u32> {
+ self.players.keys().cloned().collect()
+ }
+
+ // this method should be used to emit log states etc.
+ pub fn log_string(&mut self, message: String) {
+ write!(self.log_sink, "{}\n", message).expect("failed to write to log file");
+ }
+}
+
+pub trait PlayerHandle: Send {
+ fn send_request(&mut self, r: RequestMessage);
+ fn send_info(&mut self, msg: String);
+}
+
+struct PlayerData {
+ request_ctr: u32,
+ handle: Box<dyn PlayerHandle>,
+}
+
+type RequestId = (u32, u32);
+pub struct EventBus {
+ request_responses: HashMap<RequestId, RequestResult<Vec<u8>>>,
+ wakers: HashMap<RequestId, AtomicWaker>,
+}
+
+impl EventBus {
+ pub fn new() -> Self {
+ EventBus {
+ request_responses: HashMap::new(),
+ wakers: HashMap::new(),
+ }
+ }
+}
+
+impl EventBus {
+ pub fn resolve_request(&mut self, id: RequestId, result: RequestResult<Vec<u8>>) {
+ if self.request_responses.contains_key(&id) {
+ // request already resolved
+ // TODO: maybe report this?
+ return;
+ }
+ self.request_responses.insert(id, result);
+ if let Some(waker) = self.wakers.remove(&id) {
+ waker.wake();
+ }
+ }
+}
+
+pub struct Request {
+ player_id: u32,
+ request_id: u32,
+ event_bus: Arc<Mutex<EventBus>>,
+}
+
+impl Request {
+ #[allow(dead_code)]
+ pub fn player_id(&self) -> u32 {
+ self.player_id
+ }
+}
+
+impl Future for Request {
+ type Output = RequestResult<Vec<u8>>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut event_bus = self.event_bus.lock().unwrap();
+ let request_id = (self.player_id, self.request_id);
+
+ if let Some(result) = event_bus.request_responses.get(&request_id) {
+ return Poll::Ready(result.clone());
+ }
+
+ event_bus
+ .wakers
+ .entry(request_id)
+ .or_insert_with(|| AtomicWaker::new())
+ .register(cx.waker());
+ return Poll::Pending;
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum RequestError {
+ Timeout,
+}
+
+pub type RequestResult<T> = Result<T, RequestError>;