diff options
Diffstat (limited to 'planetwars-cli/src/match_runner')
-rw-r--r-- | planetwars-cli/src/match_runner/bot_runner.rs | 121 | ||||
-rw-r--r-- | planetwars-cli/src/match_runner/match_context.rs | 161 | ||||
-rw-r--r-- | planetwars-cli/src/match_runner/mod.rs | 91 | ||||
-rw-r--r-- | planetwars-cli/src/match_runner/pw_match.rs | 136 |
4 files changed, 0 insertions, 509 deletions
diff --git a/planetwars-cli/src/match_runner/bot_runner.rs b/planetwars-cli/src/match_runner/bot_runner.rs deleted file mode 100644 index 70fc060..0000000 --- a/planetwars-cli/src/match_runner/bot_runner.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::io; -use std::path::PathBuf; -use std::process::Stdio; -use std::sync::Arc; -use std::sync::Mutex; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}; -use tokio::process; -use tokio::sync::mpsc; -use tokio::time::timeout; - -use super::match_context::EventBus; -use super::match_context::PlayerHandle; -use super::match_context::RequestError; -use super::match_context::RequestMessage; -pub struct LocalBotHandle { - tx: mpsc::UnboundedSender<RequestMessage>, -} - -impl PlayerHandle for LocalBotHandle { - fn send_request(&mut self, r: RequestMessage) { - self.tx - .send(r) - .expect("failed to send message to local bot"); - } - - fn send_info(&mut self, _msg: String) { - // TODO: log this somewhere - // drop info message - } -} - -pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle { - let (tx, rx) = mpsc::unbounded_channel(); - - let runner = LocalBotRunner { - event_bus, - rx, - player_id, - bot, - }; - tokio::spawn(runner.run()); - - return LocalBotHandle { tx }; -} - -pub struct LocalBotRunner { - event_bus: Arc<Mutex<EventBus>>, - rx: mpsc::UnboundedReceiver<RequestMessage>, - player_id: u32, - bot: Bot, -} - -impl LocalBotRunner { - pub async fn run(mut self) { - let mut process = self.bot.spawn_process(); - - while let Some(request) = self.rx.recv().await { - let resp_fut = process.communicate(&request.content); - let result = timeout(request.timeout, resp_fut) - .await - // TODO: how can this failure be handled cleanly? - .expect("process read failed"); - let result = match result { - Ok(line) => Ok(line.into_bytes()), - Err(_elapsed) => Err(RequestError::Timeout), - }; - let request_id = (self.player_id, request.request_id); - - self.event_bus - .lock() - .unwrap() - .resolve_request(request_id, result); - } - } -} - -#[derive(Debug, Clone)] -pub struct Bot { - pub working_dir: PathBuf, - pub argv: Vec<String>, -} - -impl Bot { - pub fn spawn_process(&self) -> BotProcess { - let mut child = process::Command::new(&self.argv[0]) - .args(&self.argv[1..]) - .current_dir(self.working_dir.clone()) - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .spawn() - .expect("spawning failed"); - - let stdout = child.stdout.take().unwrap(); - let reader = BufReader::new(stdout).lines(); - - return BotProcess { - stdin: child.stdin.take().unwrap(), - stdout: reader, - child, - }; - } -} - -pub struct BotProcess { - #[allow(dead_code)] - child: process::Child, - stdin: process::ChildStdin, - stdout: Lines<BufReader<process::ChildStdout>>, -} - -impl BotProcess { - // TODO: gracefully handle errors - pub async fn communicate(&mut self, input: &[u8]) -> io::Result<String> { - self.stdin.write_all(input).await?; - self.stdin.write_u8(b'\n').await?; - let line = self.stdout.next_line().await?; - line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received")) - } -} diff --git a/planetwars-cli/src/match_runner/match_context.rs b/planetwars-cli/src/match_runner/match_context.rs deleted file mode 100644 index 466da13..0000000 --- a/planetwars-cli/src/match_runner/match_context.rs +++ /dev/null @@ -1,161 +0,0 @@ -use futures::task::{Context, Poll}; -use futures::{future::Future, task::AtomicWaker}; -use serde::{Deserialize, Serialize}; -use std::fs::File; -use std::io::Write; -use std::pin::Pin; -use std::time::Duration; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - -#[derive(Serialize, Deserialize, Debug)] -pub struct RequestMessage { - pub request_id: u32, - pub timeout: Duration, - pub content: Vec<u8>, -} - -pub struct MatchCtx { - event_bus: Arc<Mutex<EventBus>>, - players: HashMap<u32, PlayerData>, - // output: MsgStreamHandle<String>, - log_sink: File, -} - -impl MatchCtx { - pub fn new( - event_bus: Arc<Mutex<EventBus>>, - players: HashMap<u32, Box<dyn PlayerHandle>>, - log_file: File, - // log: MsgStreamHandle<String>, - ) -> Self { - MatchCtx { - event_bus, - players: players - .into_iter() - .map(|(id, handle)| { - let player_handle = PlayerData { - request_ctr: 0, - handle, - }; - (id, player_handle) - }) - .collect(), - log_sink: log_file, - } - } - - // TODO: implement a clean way to handle the player not existing - pub fn request(&mut self, player_id: u32, content: Vec<u8>, timeout: Duration) -> Request { - let player = self.players.get_mut(&player_id).unwrap(); - let request_id = player.request_ctr; - player.request_ctr += 1; - - player.handle.send_request(RequestMessage { - request_id, - content, - timeout, - }); - - return Request { - player_id, - request_id, - event_bus: self.event_bus.clone(), - }; - } - - pub fn send_info(&mut self, player_id: u32, msg: String) { - let player = self.players.get_mut(&player_id).unwrap(); - player.handle.send_info(msg); - } - - pub fn players(&self) -> Vec<u32> { - self.players.keys().cloned().collect() - } - - // this method should be used to emit log states etc. - pub fn log_string(&mut self, message: String) { - write!(self.log_sink, "{}\n", message).expect("failed to write to log file"); - } -} - -pub trait PlayerHandle: Send { - fn send_request(&mut self, r: RequestMessage); - fn send_info(&mut self, msg: String); -} - -struct PlayerData { - request_ctr: u32, - handle: Box<dyn PlayerHandle>, -} - -type RequestId = (u32, u32); -pub struct EventBus { - request_responses: HashMap<RequestId, RequestResult<Vec<u8>>>, - wakers: HashMap<RequestId, AtomicWaker>, -} - -impl EventBus { - pub fn new() -> Self { - EventBus { - request_responses: HashMap::new(), - wakers: HashMap::new(), - } - } -} - -impl EventBus { - pub fn resolve_request(&mut self, id: RequestId, result: RequestResult<Vec<u8>>) { - if self.request_responses.contains_key(&id) { - // request already resolved - // TODO: maybe report this? - return; - } - self.request_responses.insert(id, result); - if let Some(waker) = self.wakers.remove(&id) { - waker.wake(); - } - } -} - -pub struct Request { - player_id: u32, - request_id: u32, - event_bus: Arc<Mutex<EventBus>>, -} - -impl Request { - #[allow(dead_code)] - pub fn player_id(&self) -> u32 { - self.player_id - } -} - -impl Future for Request { - type Output = RequestResult<Vec<u8>>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut event_bus = self.event_bus.lock().unwrap(); - let request_id = (self.player_id, self.request_id); - - if let Some(result) = event_bus.request_responses.get(&request_id) { - return Poll::Ready(result.clone()); - } - - event_bus - .wakers - .entry(request_id) - .or_insert_with(|| AtomicWaker::new()) - .register(cx.waker()); - return Poll::Pending; - } -} - -#[derive(Debug, Clone)] -pub enum RequestError { - Timeout, -} - -pub type RequestResult<T> = Result<T, RequestError>; diff --git a/planetwars-cli/src/match_runner/mod.rs b/planetwars-cli/src/match_runner/mod.rs deleted file mode 100644 index fdd02d5..0000000 --- a/planetwars-cli/src/match_runner/mod.rs +++ /dev/null @@ -1,91 +0,0 @@ -mod bot_runner; -mod match_context; -mod pw_match; - -use std::{ - io::Write, - path::PathBuf, - sync::{Arc, Mutex}, -}; - -use match_context::MatchCtx; -use planetwars_rules::PwConfig; -use serde::{Deserialize, Serialize}; - -use crate::workspace::bot::WorkspaceBot; - -use self::match_context::{EventBus, PlayerHandle}; - -pub struct MatchConfig { - pub map_name: String, - pub map_path: PathBuf, - pub log_path: PathBuf, - pub players: Vec<MatchPlayer>, -} - -#[derive(Serialize, Deserialize)] -pub struct MatchMeta { - pub map_name: String, - pub timestamp: chrono::DateTime<chrono::Local>, - pub players: Vec<PlayerInfo>, -} - -#[derive(Serialize, Deserialize)] -pub struct PlayerInfo { - pub name: String, -} - -pub struct MatchPlayer { - pub name: String, - pub bot: WorkspaceBot, -} - -pub async fn run_match(config: MatchConfig) { - let pw_config = PwConfig { - map_file: config.map_path, - max_turns: 100, - }; - - let event_bus = Arc::new(Mutex::new(EventBus::new())); - - // start bots - let players = config - .players - .iter() - .enumerate() - .map(|(player_id, player)| { - let player_id = (player_id + 1) as u32; - let bot = bot_runner::Bot { - working_dir: player.bot.path.clone(), - argv: player.bot.config.get_run_argv(), - }; - let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot); - (player_id, Box::new(handle) as Box<dyn PlayerHandle>) - }) - .collect(); - let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file"); - - // assemble the math meta struct - let match_meta = MatchMeta { - map_name: config.map_name.clone(), - timestamp: chrono::Local::now(), - players: config - .players - .iter() - .map(|bot| PlayerInfo { - name: bot.name.clone(), - }) - .collect(), - }; - write!( - log_file, - "{}\n", - serde_json::to_string(&match_meta).unwrap() - ) - .unwrap(); - - let match_ctx = MatchCtx::new(event_bus, players, log_file); - - let match_state = pw_match::PwMatch::create(match_ctx, pw_config); - match_state.run().await; -} diff --git a/planetwars-cli/src/match_runner/pw_match.rs b/planetwars-cli/src/match_runner/pw_match.rs deleted file mode 100644 index 42bc9d2..0000000 --- a/planetwars-cli/src/match_runner/pw_match.rs +++ /dev/null @@ -1,136 +0,0 @@ -use super::match_context::{MatchCtx, RequestResult}; -use futures::stream::futures_unordered::FuturesUnordered; -use futures::{FutureExt, StreamExt}; -use serde::{Deserialize, Serialize}; -use tokio::time::Duration; - -use serde_json; - -use std::convert::TryInto; - -pub use planetwars_rules::config::{Config, Map}; - -use planetwars_rules::protocol::{self as proto, PlayerAction}; -use planetwars_rules::serializer as pw_serializer; -use planetwars_rules::{PlanetWars, PwConfig}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct MatchConfig { - pub map_name: String, - pub max_turns: usize, -} - -pub struct PwMatch { - match_ctx: MatchCtx, - match_state: PlanetWars, -} - -impl PwMatch { - pub fn create(match_ctx: MatchCtx, config: PwConfig) -> Self { - // TODO: this is kind of hacked together at the moment - let match_state = PlanetWars::create(config, match_ctx.players().len()); - - PwMatch { - match_state, - match_ctx, - } - } - - pub async fn run(mut self) { - while !self.match_state.is_finished() { - let player_messages = self.prompt_players().await; - - for (player_id, turn) in player_messages { - let res = self.execute_action(player_id, turn); - if let Some(err) = action_errors(res) { - let info_str = serde_json::to_string(&err).unwrap(); - self.match_ctx.send_info(player_id as u32, info_str); - } - } - self.match_state.step(); - - // Log state - let state = self.match_state.serialize_state(); - self.match_ctx - .log_string(serde_json::to_string(&state).unwrap()); - } - } - - async fn prompt_players(&mut self) -> Vec<(usize, RequestResult<Vec<u8>>)> { - // borrow these outside closure to make the borrow checker happy - let state = self.match_state.state(); - let match_ctx = &mut self.match_ctx; - - // TODO: this numbering is really messy. - // Get rid of the distinction between player_num - // and player_id. - - self.match_state - .state() - .players - .iter() - .filter(|p| p.alive) - .map(move |player| { - let state_for_player = pw_serializer::serialize_rotated(&state, player.id - 1); - match_ctx - .request( - player.id.try_into().unwrap(), - serde_json::to_vec(&state_for_player).unwrap(), - Duration::from_millis(1000), - ) - .map(move |resp| (player.id, resp)) - }) - .collect::<FuturesUnordered<_>>() - .collect::<Vec<_>>() - .await - } - - fn execute_action( - &mut self, - player_num: usize, - turn: RequestResult<Vec<u8>>, - ) -> proto::PlayerAction { - let turn = match turn { - Err(_timeout) => return proto::PlayerAction::Timeout, - Ok(data) => data, - }; - - let action: proto::Action = match serde_json::from_slice(&turn) { - Err(err) => return proto::PlayerAction::ParseError(err.to_string()), - Ok(action) => action, - }; - - let commands = action - .commands - .into_iter() - .map(|command| { - let res = self.match_state.execute_command(player_num, &command); - proto::PlayerCommand { - command, - error: res.err(), - } - }) - .collect(); - - return proto::PlayerAction::Commands(commands); - } -} - -fn action_errors(action: PlayerAction) -> Option<PlayerAction> { - match action { - PlayerAction::Commands(commands) => { - let failed = commands - .into_iter() - .filter(|cmd| cmd.error.is_some()) - .collect::<Vec<_>>(); - - if failed.is_empty() { - None - } else { - Some(PlayerAction::Commands(failed)) - } - } - e => Some(e), - } -} |