From 4a077c7c65eced447c45389acf05007dd571bf26 Mon Sep 17 00:00:00 2001 From: Ilion Beyst Date: Sat, 1 Jan 2022 12:10:02 +0100 Subject: extract matchrunner crate from planetwars-cli --- Cargo.toml | 1 + planetwars-cli/Cargo.toml | 2 +- planetwars-cli/src/commands/run_match.rs | 8 +- planetwars-cli/src/lib.rs | 1 - planetwars-cli/src/match_runner/bot_runner.rs | 121 ----------------- planetwars-cli/src/match_runner/match_context.rs | 161 ----------------------- planetwars-cli/src/match_runner/mod.rs | 91 ------------- planetwars-cli/src/match_runner/pw_match.rs | 136 ------------------- planetwars-cli/src/web/mod.rs | 3 +- planetwars-matchrunner/Cargo.toml | 16 +++ planetwars-matchrunner/src/bot_runner.rs | 121 +++++++++++++++++ planetwars-matchrunner/src/lib.rs | 90 +++++++++++++ planetwars-matchrunner/src/match_context.rs | 161 +++++++++++++++++++++++ planetwars-matchrunner/src/pw_match.rs | 136 +++++++++++++++++++ 14 files changed, 532 insertions(+), 516 deletions(-) delete mode 100644 planetwars-cli/src/match_runner/bot_runner.rs delete mode 100644 planetwars-cli/src/match_runner/match_context.rs delete mode 100644 planetwars-cli/src/match_runner/mod.rs delete mode 100644 planetwars-cli/src/match_runner/pw_match.rs create mode 100644 planetwars-matchrunner/Cargo.toml create mode 100644 planetwars-matchrunner/src/bot_runner.rs create mode 100644 planetwars-matchrunner/src/lib.rs create mode 100644 planetwars-matchrunner/src/match_context.rs create mode 100644 planetwars-matchrunner/src/pw_match.rs diff --git a/Cargo.toml b/Cargo.toml index 35061a8..cebf247 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "planetwars-rules", + "planetwars-matchrunner", "planetwars-cli", "planetwars-server", ] diff --git a/planetwars-cli/Cargo.toml b/planetwars-cli/Cargo.toml index e1f0a8e..972a02b 100644 --- a/planetwars-cli/Cargo.toml +++ b/planetwars-cli/Cargo.toml @@ -15,10 +15,10 @@ rand = "0.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" toml = "0.5" -planetwars-rules = { path = "../planetwars-rules" } clap = { version = "3.0.0-rc.8", features = ["derive"] } chrono = { version = "0.4", features = ["serde"] } shlex = "1.1" +planetwars-matchrunner = { path = "../planetwars-matchrunner" } rust-embed = "6.3.0" axum = { version = "0.4", features = ["ws"] } diff --git a/planetwars-cli/src/commands/run_match.rs b/planetwars-cli/src/commands/run_match.rs index 868e87c..03868ae 100644 --- a/planetwars-cli/src/commands/run_match.rs +++ b/planetwars-cli/src/commands/run_match.rs @@ -1,9 +1,8 @@ use std::io; use clap::Parser; +use planetwars_matchrunner::{run_match, MatchConfig, MatchPlayer}; -use crate::match_runner::MatchConfig; -use crate::match_runner::{self, MatchPlayer}; use crate::workspace::Workspace; #[derive(Parser)] pub struct RunMatchCommand { @@ -26,7 +25,8 @@ impl RunMatchCommand { let bot = workspace.get_bot(&bot_name)?; players.push(MatchPlayer { name: bot_name.clone(), - bot, + path: bot.path.clone(), + argv: bot.config.get_run_argv(), }); } @@ -37,7 +37,7 @@ impl RunMatchCommand { players, }; - match_runner::run_match(match_config).await; + run_match(match_config).await; println!("match completed successfully"); // TODO: maybe print the match result as well? diff --git a/planetwars-cli/src/lib.rs b/planetwars-cli/src/lib.rs index e5566b0..f67b67f 100644 --- a/planetwars-cli/src/lib.rs +++ b/planetwars-cli/src/lib.rs @@ -1,5 +1,4 @@ mod commands; -mod match_runner; mod web; mod workspace; diff --git a/planetwars-cli/src/match_runner/bot_runner.rs b/planetwars-cli/src/match_runner/bot_runner.rs deleted file mode 100644 index 70fc060..0000000 --- a/planetwars-cli/src/match_runner/bot_runner.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::io; -use std::path::PathBuf; -use std::process::Stdio; -use std::sync::Arc; -use std::sync::Mutex; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}; -use tokio::process; -use tokio::sync::mpsc; -use tokio::time::timeout; - -use super::match_context::EventBus; -use super::match_context::PlayerHandle; -use super::match_context::RequestError; -use super::match_context::RequestMessage; -pub struct LocalBotHandle { - tx: mpsc::UnboundedSender, -} - -impl PlayerHandle for LocalBotHandle { - fn send_request(&mut self, r: RequestMessage) { - self.tx - .send(r) - .expect("failed to send message to local bot"); - } - - fn send_info(&mut self, _msg: String) { - // TODO: log this somewhere - // drop info message - } -} - -pub fn run_local_bot(player_id: u32, event_bus: Arc>, bot: Bot) -> LocalBotHandle { - let (tx, rx) = mpsc::unbounded_channel(); - - let runner = LocalBotRunner { - event_bus, - rx, - player_id, - bot, - }; - tokio::spawn(runner.run()); - - return LocalBotHandle { tx }; -} - -pub struct LocalBotRunner { - event_bus: Arc>, - rx: mpsc::UnboundedReceiver, - player_id: u32, - bot: Bot, -} - -impl LocalBotRunner { - pub async fn run(mut self) { - let mut process = self.bot.spawn_process(); - - while let Some(request) = self.rx.recv().await { - let resp_fut = process.communicate(&request.content); - let result = timeout(request.timeout, resp_fut) - .await - // TODO: how can this failure be handled cleanly? - .expect("process read failed"); - let result = match result { - Ok(line) => Ok(line.into_bytes()), - Err(_elapsed) => Err(RequestError::Timeout), - }; - let request_id = (self.player_id, request.request_id); - - self.event_bus - .lock() - .unwrap() - .resolve_request(request_id, result); - } - } -} - -#[derive(Debug, Clone)] -pub struct Bot { - pub working_dir: PathBuf, - pub argv: Vec, -} - -impl Bot { - pub fn spawn_process(&self) -> BotProcess { - let mut child = process::Command::new(&self.argv[0]) - .args(&self.argv[1..]) - .current_dir(self.working_dir.clone()) - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .spawn() - .expect("spawning failed"); - - let stdout = child.stdout.take().unwrap(); - let reader = BufReader::new(stdout).lines(); - - return BotProcess { - stdin: child.stdin.take().unwrap(), - stdout: reader, - child, - }; - } -} - -pub struct BotProcess { - #[allow(dead_code)] - child: process::Child, - stdin: process::ChildStdin, - stdout: Lines>, -} - -impl BotProcess { - // TODO: gracefully handle errors - pub async fn communicate(&mut self, input: &[u8]) -> io::Result { - self.stdin.write_all(input).await?; - self.stdin.write_u8(b'\n').await?; - let line = self.stdout.next_line().await?; - line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received")) - } -} diff --git a/planetwars-cli/src/match_runner/match_context.rs b/planetwars-cli/src/match_runner/match_context.rs deleted file mode 100644 index 466da13..0000000 --- a/planetwars-cli/src/match_runner/match_context.rs +++ /dev/null @@ -1,161 +0,0 @@ -use futures::task::{Context, Poll}; -use futures::{future::Future, task::AtomicWaker}; -use serde::{Deserialize, Serialize}; -use std::fs::File; -use std::io::Write; -use std::pin::Pin; -use std::time::Duration; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - -#[derive(Serialize, Deserialize, Debug)] -pub struct RequestMessage { - pub request_id: u32, - pub timeout: Duration, - pub content: Vec, -} - -pub struct MatchCtx { - event_bus: Arc>, - players: HashMap, - // output: MsgStreamHandle, - log_sink: File, -} - -impl MatchCtx { - pub fn new( - event_bus: Arc>, - players: HashMap>, - log_file: File, - // log: MsgStreamHandle, - ) -> Self { - MatchCtx { - event_bus, - players: players - .into_iter() - .map(|(id, handle)| { - let player_handle = PlayerData { - request_ctr: 0, - handle, - }; - (id, player_handle) - }) - .collect(), - log_sink: log_file, - } - } - - // TODO: implement a clean way to handle the player not existing - pub fn request(&mut self, player_id: u32, content: Vec, timeout: Duration) -> Request { - let player = self.players.get_mut(&player_id).unwrap(); - let request_id = player.request_ctr; - player.request_ctr += 1; - - player.handle.send_request(RequestMessage { - request_id, - content, - timeout, - }); - - return Request { - player_id, - request_id, - event_bus: self.event_bus.clone(), - }; - } - - pub fn send_info(&mut self, player_id: u32, msg: String) { - let player = self.players.get_mut(&player_id).unwrap(); - player.handle.send_info(msg); - } - - pub fn players(&self) -> Vec { - self.players.keys().cloned().collect() - } - - // this method should be used to emit log states etc. - pub fn log_string(&mut self, message: String) { - write!(self.log_sink, "{}\n", message).expect("failed to write to log file"); - } -} - -pub trait PlayerHandle: Send { - fn send_request(&mut self, r: RequestMessage); - fn send_info(&mut self, msg: String); -} - -struct PlayerData { - request_ctr: u32, - handle: Box, -} - -type RequestId = (u32, u32); -pub struct EventBus { - request_responses: HashMap>>, - wakers: HashMap, -} - -impl EventBus { - pub fn new() -> Self { - EventBus { - request_responses: HashMap::new(), - wakers: HashMap::new(), - } - } -} - -impl EventBus { - pub fn resolve_request(&mut self, id: RequestId, result: RequestResult>) { - if self.request_responses.contains_key(&id) { - // request already resolved - // TODO: maybe report this? - return; - } - self.request_responses.insert(id, result); - if let Some(waker) = self.wakers.remove(&id) { - waker.wake(); - } - } -} - -pub struct Request { - player_id: u32, - request_id: u32, - event_bus: Arc>, -} - -impl Request { - #[allow(dead_code)] - pub fn player_id(&self) -> u32 { - self.player_id - } -} - -impl Future for Request { - type Output = RequestResult>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut event_bus = self.event_bus.lock().unwrap(); - let request_id = (self.player_id, self.request_id); - - if let Some(result) = event_bus.request_responses.get(&request_id) { - return Poll::Ready(result.clone()); - } - - event_bus - .wakers - .entry(request_id) - .or_insert_with(|| AtomicWaker::new()) - .register(cx.waker()); - return Poll::Pending; - } -} - -#[derive(Debug, Clone)] -pub enum RequestError { - Timeout, -} - -pub type RequestResult = Result; diff --git a/planetwars-cli/src/match_runner/mod.rs b/planetwars-cli/src/match_runner/mod.rs deleted file mode 100644 index fdd02d5..0000000 --- a/planetwars-cli/src/match_runner/mod.rs +++ /dev/null @@ -1,91 +0,0 @@ -mod bot_runner; -mod match_context; -mod pw_match; - -use std::{ - io::Write, - path::PathBuf, - sync::{Arc, Mutex}, -}; - -use match_context::MatchCtx; -use planetwars_rules::PwConfig; -use serde::{Deserialize, Serialize}; - -use crate::workspace::bot::WorkspaceBot; - -use self::match_context::{EventBus, PlayerHandle}; - -pub struct MatchConfig { - pub map_name: String, - pub map_path: PathBuf, - pub log_path: PathBuf, - pub players: Vec, -} - -#[derive(Serialize, Deserialize)] -pub struct MatchMeta { - pub map_name: String, - pub timestamp: chrono::DateTime, - pub players: Vec, -} - -#[derive(Serialize, Deserialize)] -pub struct PlayerInfo { - pub name: String, -} - -pub struct MatchPlayer { - pub name: String, - pub bot: WorkspaceBot, -} - -pub async fn run_match(config: MatchConfig) { - let pw_config = PwConfig { - map_file: config.map_path, - max_turns: 100, - }; - - let event_bus = Arc::new(Mutex::new(EventBus::new())); - - // start bots - let players = config - .players - .iter() - .enumerate() - .map(|(player_id, player)| { - let player_id = (player_id + 1) as u32; - let bot = bot_runner::Bot { - working_dir: player.bot.path.clone(), - argv: player.bot.config.get_run_argv(), - }; - let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot); - (player_id, Box::new(handle) as Box) - }) - .collect(); - let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file"); - - // assemble the math meta struct - let match_meta = MatchMeta { - map_name: config.map_name.clone(), - timestamp: chrono::Local::now(), - players: config - .players - .iter() - .map(|bot| PlayerInfo { - name: bot.name.clone(), - }) - .collect(), - }; - write!( - log_file, - "{}\n", - serde_json::to_string(&match_meta).unwrap() - ) - .unwrap(); - - let match_ctx = MatchCtx::new(event_bus, players, log_file); - - let match_state = pw_match::PwMatch::create(match_ctx, pw_config); - match_state.run().await; -} diff --git a/planetwars-cli/src/match_runner/pw_match.rs b/planetwars-cli/src/match_runner/pw_match.rs deleted file mode 100644 index 42bc9d2..0000000 --- a/planetwars-cli/src/match_runner/pw_match.rs +++ /dev/null @@ -1,136 +0,0 @@ -use super::match_context::{MatchCtx, RequestResult}; -use futures::stream::futures_unordered::FuturesUnordered; -use futures::{FutureExt, StreamExt}; -use serde::{Deserialize, Serialize}; -use tokio::time::Duration; - -use serde_json; - -use std::convert::TryInto; - -pub use planetwars_rules::config::{Config, Map}; - -use planetwars_rules::protocol::{self as proto, PlayerAction}; -use planetwars_rules::serializer as pw_serializer; -use planetwars_rules::{PlanetWars, PwConfig}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct MatchConfig { - pub map_name: String, - pub max_turns: usize, -} - -pub struct PwMatch { - match_ctx: MatchCtx, - match_state: PlanetWars, -} - -impl PwMatch { - pub fn create(match_ctx: MatchCtx, config: PwConfig) -> Self { - // TODO: this is kind of hacked together at the moment - let match_state = PlanetWars::create(config, match_ctx.players().len()); - - PwMatch { - match_state, - match_ctx, - } - } - - pub async fn run(mut self) { - while !self.match_state.is_finished() { - let player_messages = self.prompt_players().await; - - for (player_id, turn) in player_messages { - let res = self.execute_action(player_id, turn); - if let Some(err) = action_errors(res) { - let info_str = serde_json::to_string(&err).unwrap(); - self.match_ctx.send_info(player_id as u32, info_str); - } - } - self.match_state.step(); - - // Log state - let state = self.match_state.serialize_state(); - self.match_ctx - .log_string(serde_json::to_string(&state).unwrap()); - } - } - - async fn prompt_players(&mut self) -> Vec<(usize, RequestResult>)> { - // borrow these outside closure to make the borrow checker happy - let state = self.match_state.state(); - let match_ctx = &mut self.match_ctx; - - // TODO: this numbering is really messy. - // Get rid of the distinction between player_num - // and player_id. - - self.match_state - .state() - .players - .iter() - .filter(|p| p.alive) - .map(move |player| { - let state_for_player = pw_serializer::serialize_rotated(&state, player.id - 1); - match_ctx - .request( - player.id.try_into().unwrap(), - serde_json::to_vec(&state_for_player).unwrap(), - Duration::from_millis(1000), - ) - .map(move |resp| (player.id, resp)) - }) - .collect::>() - .collect::>() - .await - } - - fn execute_action( - &mut self, - player_num: usize, - turn: RequestResult>, - ) -> proto::PlayerAction { - let turn = match turn { - Err(_timeout) => return proto::PlayerAction::Timeout, - Ok(data) => data, - }; - - let action: proto::Action = match serde_json::from_slice(&turn) { - Err(err) => return proto::PlayerAction::ParseError(err.to_string()), - Ok(action) => action, - }; - - let commands = action - .commands - .into_iter() - .map(|command| { - let res = self.match_state.execute_command(player_num, &command); - proto::PlayerCommand { - command, - error: res.err(), - } - }) - .collect(); - - return proto::PlayerAction::Commands(commands); - } -} - -fn action_errors(action: PlayerAction) -> Option { - match action { - PlayerAction::Commands(commands) => { - let failed = commands - .into_iter() - .filter(|cmd| cmd.error.is_some()) - .collect::>(); - - if failed.is_empty() { - None - } else { - Some(PlayerAction::Commands(failed)) - } - } - e => Some(e), - } -} diff --git a/planetwars-cli/src/web/mod.rs b/planetwars-cli/src/web/mod.rs index a0e452e..f66b0c6 100644 --- a/planetwars-cli/src/web/mod.rs +++ b/planetwars-cli/src/web/mod.rs @@ -8,6 +8,7 @@ use axum::{ AddExtensionLayer, Json, }; use mime_guess; +use planetwars_matchrunner::MatchMeta; use rust_embed::RustEmbed; use serde::{Deserialize, Serialize}; use std::{ @@ -18,7 +19,7 @@ use std::{ sync::Arc, }; -use crate::{match_runner::MatchMeta, workspace::Workspace}; +use crate::workspace::Workspace; struct State { workspace: Workspace, diff --git a/planetwars-matchrunner/Cargo.toml b/planetwars-matchrunner/Cargo.toml new file mode 100644 index 0000000..da05f13 --- /dev/null +++ b/planetwars-matchrunner/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "planetwars-matchrunner" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures-core = "0.3" +futures = "0.3" +tokio = { version = "1", features = ["full"] } +rand = "0.6" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +planetwars-rules = { path = "../planetwars-rules" } +chrono = { version = "0.4", features = ["serde"] } \ No newline at end of file diff --git a/planetwars-matchrunner/src/bot_runner.rs b/planetwars-matchrunner/src/bot_runner.rs new file mode 100644 index 0000000..70fc060 --- /dev/null +++ b/planetwars-matchrunner/src/bot_runner.rs @@ -0,0 +1,121 @@ +use std::io; +use std::path::PathBuf; +use std::process::Stdio; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}; +use tokio::process; +use tokio::sync::mpsc; +use tokio::time::timeout; + +use super::match_context::EventBus; +use super::match_context::PlayerHandle; +use super::match_context::RequestError; +use super::match_context::RequestMessage; +pub struct LocalBotHandle { + tx: mpsc::UnboundedSender, +} + +impl PlayerHandle for LocalBotHandle { + fn send_request(&mut self, r: RequestMessage) { + self.tx + .send(r) + .expect("failed to send message to local bot"); + } + + fn send_info(&mut self, _msg: String) { + // TODO: log this somewhere + // drop info message + } +} + +pub fn run_local_bot(player_id: u32, event_bus: Arc>, bot: Bot) -> LocalBotHandle { + let (tx, rx) = mpsc::unbounded_channel(); + + let runner = LocalBotRunner { + event_bus, + rx, + player_id, + bot, + }; + tokio::spawn(runner.run()); + + return LocalBotHandle { tx }; +} + +pub struct LocalBotRunner { + event_bus: Arc>, + rx: mpsc::UnboundedReceiver, + player_id: u32, + bot: Bot, +} + +impl LocalBotRunner { + pub async fn run(mut self) { + let mut process = self.bot.spawn_process(); + + while let Some(request) = self.rx.recv().await { + let resp_fut = process.communicate(&request.content); + let result = timeout(request.timeout, resp_fut) + .await + // TODO: how can this failure be handled cleanly? + .expect("process read failed"); + let result = match result { + Ok(line) => Ok(line.into_bytes()), + Err(_elapsed) => Err(RequestError::Timeout), + }; + let request_id = (self.player_id, request.request_id); + + self.event_bus + .lock() + .unwrap() + .resolve_request(request_id, result); + } + } +} + +#[derive(Debug, Clone)] +pub struct Bot { + pub working_dir: PathBuf, + pub argv: Vec, +} + +impl Bot { + pub fn spawn_process(&self) -> BotProcess { + let mut child = process::Command::new(&self.argv[0]) + .args(&self.argv[1..]) + .current_dir(self.working_dir.clone()) + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .expect("spawning failed"); + + let stdout = child.stdout.take().unwrap(); + let reader = BufReader::new(stdout).lines(); + + return BotProcess { + stdin: child.stdin.take().unwrap(), + stdout: reader, + child, + }; + } +} + +pub struct BotProcess { + #[allow(dead_code)] + child: process::Child, + stdin: process::ChildStdin, + stdout: Lines>, +} + +impl BotProcess { + // TODO: gracefully handle errors + pub async fn communicate(&mut self, input: &[u8]) -> io::Result { + self.stdin.write_all(input).await?; + self.stdin.write_u8(b'\n').await?; + let line = self.stdout.next_line().await?; + line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received")) + } +} diff --git a/planetwars-matchrunner/src/lib.rs b/planetwars-matchrunner/src/lib.rs new file mode 100644 index 0000000..50c6518 --- /dev/null +++ b/planetwars-matchrunner/src/lib.rs @@ -0,0 +1,90 @@ +mod bot_runner; +mod match_context; +mod pw_match; + +use std::{ + io::Write, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use match_context::MatchCtx; +use planetwars_rules::PwConfig; +use serde::{Deserialize, Serialize}; + +use self::match_context::{EventBus, PlayerHandle}; + +pub struct MatchConfig { + pub map_name: String, + pub map_path: PathBuf, + pub log_path: PathBuf, + pub players: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct MatchMeta { + pub map_name: String, + pub timestamp: chrono::DateTime, + pub players: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct PlayerInfo { + pub name: String, +} + +pub struct MatchPlayer { + pub name: String, + pub path: PathBuf, + pub argv: Vec, +} + +pub async fn run_match(config: MatchConfig) { + let pw_config = PwConfig { + map_file: config.map_path, + max_turns: 100, + }; + + let event_bus = Arc::new(Mutex::new(EventBus::new())); + + // start bots + let players = config + .players + .iter() + .enumerate() + .map(|(player_id, player)| { + let player_id = (player_id + 1) as u32; + let bot = bot_runner::Bot { + working_dir: player.path.clone(), + argv: player.argv.clone(), + }; + let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot); + (player_id, Box::new(handle) as Box) + }) + .collect(); + let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file"); + + // assemble the math meta struct + let match_meta = MatchMeta { + map_name: config.map_name.clone(), + timestamp: chrono::Local::now(), + players: config + .players + .iter() + .map(|bot| PlayerInfo { + name: bot.name.clone(), + }) + .collect(), + }; + write!( + log_file, + "{}\n", + serde_json::to_string(&match_meta).unwrap() + ) + .unwrap(); + + let match_ctx = MatchCtx::new(event_bus, players, log_file); + + let match_state = pw_match::PwMatch::create(match_ctx, pw_config); + match_state.run().await; +} diff --git a/planetwars-matchrunner/src/match_context.rs b/planetwars-matchrunner/src/match_context.rs new file mode 100644 index 0000000..466da13 --- /dev/null +++ b/planetwars-matchrunner/src/match_context.rs @@ -0,0 +1,161 @@ +use futures::task::{Context, Poll}; +use futures::{future::Future, task::AtomicWaker}; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::io::Write; +use std::pin::Pin; +use std::time::Duration; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct RequestMessage { + pub request_id: u32, + pub timeout: Duration, + pub content: Vec, +} + +pub struct MatchCtx { + event_bus: Arc>, + players: HashMap, + // output: MsgStreamHandle, + log_sink: File, +} + +impl MatchCtx { + pub fn new( + event_bus: Arc>, + players: HashMap>, + log_file: File, + // log: MsgStreamHandle, + ) -> Self { + MatchCtx { + event_bus, + players: players + .into_iter() + .map(|(id, handle)| { + let player_handle = PlayerData { + request_ctr: 0, + handle, + }; + (id, player_handle) + }) + .collect(), + log_sink: log_file, + } + } + + // TODO: implement a clean way to handle the player not existing + pub fn request(&mut self, player_id: u32, content: Vec, timeout: Duration) -> Request { + let player = self.players.get_mut(&player_id).unwrap(); + let request_id = player.request_ctr; + player.request_ctr += 1; + + player.handle.send_request(RequestMessage { + request_id, + content, + timeout, + }); + + return Request { + player_id, + request_id, + event_bus: self.event_bus.clone(), + }; + } + + pub fn send_info(&mut self, player_id: u32, msg: String) { + let player = self.players.get_mut(&player_id).unwrap(); + player.handle.send_info(msg); + } + + pub fn players(&self) -> Vec { + self.players.keys().cloned().collect() + } + + // this method should be used to emit log states etc. + pub fn log_string(&mut self, message: String) { + write!(self.log_sink, "{}\n", message).expect("failed to write to log file"); + } +} + +pub trait PlayerHandle: Send { + fn send_request(&mut self, r: RequestMessage); + fn send_info(&mut self, msg: String); +} + +struct PlayerData { + request_ctr: u32, + handle: Box, +} + +type RequestId = (u32, u32); +pub struct EventBus { + request_responses: HashMap>>, + wakers: HashMap, +} + +impl EventBus { + pub fn new() -> Self { + EventBus { + request_responses: HashMap::new(), + wakers: HashMap::new(), + } + } +} + +impl EventBus { + pub fn resolve_request(&mut self, id: RequestId, result: RequestResult>) { + if self.request_responses.contains_key(&id) { + // request already resolved + // TODO: maybe report this? + return; + } + self.request_responses.insert(id, result); + if let Some(waker) = self.wakers.remove(&id) { + waker.wake(); + } + } +} + +pub struct Request { + player_id: u32, + request_id: u32, + event_bus: Arc>, +} + +impl Request { + #[allow(dead_code)] + pub fn player_id(&self) -> u32 { + self.player_id + } +} + +impl Future for Request { + type Output = RequestResult>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut event_bus = self.event_bus.lock().unwrap(); + let request_id = (self.player_id, self.request_id); + + if let Some(result) = event_bus.request_responses.get(&request_id) { + return Poll::Ready(result.clone()); + } + + event_bus + .wakers + .entry(request_id) + .or_insert_with(|| AtomicWaker::new()) + .register(cx.waker()); + return Poll::Pending; + } +} + +#[derive(Debug, Clone)] +pub enum RequestError { + Timeout, +} + +pub type RequestResult = Result; diff --git a/planetwars-matchrunner/src/pw_match.rs b/planetwars-matchrunner/src/pw_match.rs new file mode 100644 index 0000000..42bc9d2 --- /dev/null +++ b/planetwars-matchrunner/src/pw_match.rs @@ -0,0 +1,136 @@ +use super::match_context::{MatchCtx, RequestResult}; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::{FutureExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use tokio::time::Duration; + +use serde_json; + +use std::convert::TryInto; + +pub use planetwars_rules::config::{Config, Map}; + +use planetwars_rules::protocol::{self as proto, PlayerAction}; +use planetwars_rules::serializer as pw_serializer; +use planetwars_rules::{PlanetWars, PwConfig}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MatchConfig { + pub map_name: String, + pub max_turns: usize, +} + +pub struct PwMatch { + match_ctx: MatchCtx, + match_state: PlanetWars, +} + +impl PwMatch { + pub fn create(match_ctx: MatchCtx, config: PwConfig) -> Self { + // TODO: this is kind of hacked together at the moment + let match_state = PlanetWars::create(config, match_ctx.players().len()); + + PwMatch { + match_state, + match_ctx, + } + } + + pub async fn run(mut self) { + while !self.match_state.is_finished() { + let player_messages = self.prompt_players().await; + + for (player_id, turn) in player_messages { + let res = self.execute_action(player_id, turn); + if let Some(err) = action_errors(res) { + let info_str = serde_json::to_string(&err).unwrap(); + self.match_ctx.send_info(player_id as u32, info_str); + } + } + self.match_state.step(); + + // Log state + let state = self.match_state.serialize_state(); + self.match_ctx + .log_string(serde_json::to_string(&state).unwrap()); + } + } + + async fn prompt_players(&mut self) -> Vec<(usize, RequestResult>)> { + // borrow these outside closure to make the borrow checker happy + let state = self.match_state.state(); + let match_ctx = &mut self.match_ctx; + + // TODO: this numbering is really messy. + // Get rid of the distinction between player_num + // and player_id. + + self.match_state + .state() + .players + .iter() + .filter(|p| p.alive) + .map(move |player| { + let state_for_player = pw_serializer::serialize_rotated(&state, player.id - 1); + match_ctx + .request( + player.id.try_into().unwrap(), + serde_json::to_vec(&state_for_player).unwrap(), + Duration::from_millis(1000), + ) + .map(move |resp| (player.id, resp)) + }) + .collect::>() + .collect::>() + .await + } + + fn execute_action( + &mut self, + player_num: usize, + turn: RequestResult>, + ) -> proto::PlayerAction { + let turn = match turn { + Err(_timeout) => return proto::PlayerAction::Timeout, + Ok(data) => data, + }; + + let action: proto::Action = match serde_json::from_slice(&turn) { + Err(err) => return proto::PlayerAction::ParseError(err.to_string()), + Ok(action) => action, + }; + + let commands = action + .commands + .into_iter() + .map(|command| { + let res = self.match_state.execute_command(player_num, &command); + proto::PlayerCommand { + command, + error: res.err(), + } + }) + .collect(); + + return proto::PlayerAction::Commands(commands); + } +} + +fn action_errors(action: PlayerAction) -> Option { + match action { + PlayerAction::Commands(commands) => { + let failed = commands + .into_iter() + .filter(|cmd| cmd.error.is_some()) + .collect::>(); + + if failed.is_empty() { + None + } else { + Some(PlayerAction::Commands(failed)) + } + } + e => Some(e), + } +} -- cgit v1.2.3