From d0a0fcfdeda2c315a13ddd96b4fca958da0d9858 Mon Sep 17 00:00:00 2001 From: Ilion Beyst Date: Sat, 25 Dec 2021 14:45:05 +0100 Subject: cli for running matches --- planetwars-localdev/src/match_runner/bot_runner.rs | 120 +++++++++++++++ .../src/match_runner/match_context.rs | 161 +++++++++++++++++++++ planetwars-localdev/src/match_runner/mod.rs | 56 +++++++ planetwars-localdev/src/match_runner/pw_match.rs | 136 +++++++++++++++++ 4 files changed, 473 insertions(+) create mode 100644 planetwars-localdev/src/match_runner/bot_runner.rs create mode 100644 planetwars-localdev/src/match_runner/match_context.rs create mode 100644 planetwars-localdev/src/match_runner/mod.rs create mode 100644 planetwars-localdev/src/match_runner/pw_match.rs (limited to 'planetwars-localdev/src/match_runner') diff --git a/planetwars-localdev/src/match_runner/bot_runner.rs b/planetwars-localdev/src/match_runner/bot_runner.rs new file mode 100644 index 0000000..290df07 --- /dev/null +++ b/planetwars-localdev/src/match_runner/bot_runner.rs @@ -0,0 +1,120 @@ +use std::io; +use std::process::Stdio; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}; +use tokio::process; +use tokio::sync::mpsc; +use tokio::time::timeout; + +use super::match_context::EventBus; +use super::match_context::PlayerHandle; +use super::match_context::RequestError; +use super::match_context::RequestMessage; +pub struct LocalBotHandle { + tx: mpsc::UnboundedSender, +} + +impl PlayerHandle for LocalBotHandle { + fn send_request(&mut self, r: RequestMessage) { + self.tx + .send(r) + .expect("failed to send message to local bot"); + } + + fn send_info(&mut self, _msg: String) { + // TODO: log this somewhere + // drop info message + } +} + +pub fn run_local_bot(player_id: u32, event_bus: Arc>, bot: Bot) -> LocalBotHandle { + let (tx, rx) = mpsc::unbounded_channel(); + + let runner = LocalBotRunner { + event_bus, + rx, + player_id, + bot, + }; + tokio::spawn(runner.run()); + + return LocalBotHandle { tx }; +} + +pub struct LocalBotRunner { + event_bus: Arc>, + rx: mpsc::UnboundedReceiver, + player_id: u32, + bot: Bot, +} + +impl LocalBotRunner { + pub async fn run(mut self) { + let mut process = self.bot.spawn_process(); + + while let Some(request) = self.rx.recv().await { + let resp_fut = process.communicate(&request.content); + let result = timeout(request.timeout, resp_fut) + .await + // TODO: how can this failure be handled cleanly? + .expect("process read failed"); + let result = match result { + Ok(line) => Ok(line.into_bytes()), + Err(_elapsed) => Err(RequestError::Timeout), + }; + let request_id = (self.player_id, request.request_id); + + self.event_bus + .lock() + .unwrap() + .resolve_request(request_id, result); + } + } +} + +#[derive(Debug, Clone)] +pub struct Bot { + pub working_dir: String, + pub argv: Vec, +} + +impl Bot { + pub fn spawn_process(&self) -> BotProcess { + let mut child = process::Command::new(&self.argv[0]) + .args(&self.argv[1..]) + .current_dir(self.working_dir.clone()) + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .expect("spawning failed"); + + let stdout = child.stdout.take().unwrap(); + let reader = BufReader::new(stdout).lines(); + + return BotProcess { + stdin: child.stdin.take().unwrap(), + stdout: reader, + child, + }; + } +} + +pub struct BotProcess { + #[allow(dead_code)] + child: process::Child, + stdin: process::ChildStdin, + stdout: Lines>, +} + +impl BotProcess { + // TODO: gracefully handle errors + pub async fn communicate(&mut self, input: &[u8]) -> io::Result { + self.stdin.write_all(input).await?; + self.stdin.write_u8(b'\n').await?; + let line = self.stdout.next_line().await?; + line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received")) + } +} diff --git a/planetwars-localdev/src/match_runner/match_context.rs b/planetwars-localdev/src/match_runner/match_context.rs new file mode 100644 index 0000000..466da13 --- /dev/null +++ b/planetwars-localdev/src/match_runner/match_context.rs @@ -0,0 +1,161 @@ +use futures::task::{Context, Poll}; +use futures::{future::Future, task::AtomicWaker}; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::io::Write; +use std::pin::Pin; +use std::time::Duration; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct RequestMessage { + pub request_id: u32, + pub timeout: Duration, + pub content: Vec, +} + +pub struct MatchCtx { + event_bus: Arc>, + players: HashMap, + // output: MsgStreamHandle, + log_sink: File, +} + +impl MatchCtx { + pub fn new( + event_bus: Arc>, + players: HashMap>, + log_file: File, + // log: MsgStreamHandle, + ) -> Self { + MatchCtx { + event_bus, + players: players + .into_iter() + .map(|(id, handle)| { + let player_handle = PlayerData { + request_ctr: 0, + handle, + }; + (id, player_handle) + }) + .collect(), + log_sink: log_file, + } + } + + // TODO: implement a clean way to handle the player not existing + pub fn request(&mut self, player_id: u32, content: Vec, timeout: Duration) -> Request { + let player = self.players.get_mut(&player_id).unwrap(); + let request_id = player.request_ctr; + player.request_ctr += 1; + + player.handle.send_request(RequestMessage { + request_id, + content, + timeout, + }); + + return Request { + player_id, + request_id, + event_bus: self.event_bus.clone(), + }; + } + + pub fn send_info(&mut self, player_id: u32, msg: String) { + let player = self.players.get_mut(&player_id).unwrap(); + player.handle.send_info(msg); + } + + pub fn players(&self) -> Vec { + self.players.keys().cloned().collect() + } + + // this method should be used to emit log states etc. + pub fn log_string(&mut self, message: String) { + write!(self.log_sink, "{}\n", message).expect("failed to write to log file"); + } +} + +pub trait PlayerHandle: Send { + fn send_request(&mut self, r: RequestMessage); + fn send_info(&mut self, msg: String); +} + +struct PlayerData { + request_ctr: u32, + handle: Box, +} + +type RequestId = (u32, u32); +pub struct EventBus { + request_responses: HashMap>>, + wakers: HashMap, +} + +impl EventBus { + pub fn new() -> Self { + EventBus { + request_responses: HashMap::new(), + wakers: HashMap::new(), + } + } +} + +impl EventBus { + pub fn resolve_request(&mut self, id: RequestId, result: RequestResult>) { + if self.request_responses.contains_key(&id) { + // request already resolved + // TODO: maybe report this? + return; + } + self.request_responses.insert(id, result); + if let Some(waker) = self.wakers.remove(&id) { + waker.wake(); + } + } +} + +pub struct Request { + player_id: u32, + request_id: u32, + event_bus: Arc>, +} + +impl Request { + #[allow(dead_code)] + pub fn player_id(&self) -> u32 { + self.player_id + } +} + +impl Future for Request { + type Output = RequestResult>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut event_bus = self.event_bus.lock().unwrap(); + let request_id = (self.player_id, self.request_id); + + if let Some(result) = event_bus.request_responses.get(&request_id) { + return Poll::Ready(result.clone()); + } + + event_bus + .wakers + .entry(request_id) + .or_insert_with(|| AtomicWaker::new()) + .register(cx.waker()); + return Poll::Pending; + } +} + +#[derive(Debug, Clone)] +pub enum RequestError { + Timeout, +} + +pub type RequestResult = Result; diff --git a/planetwars-localdev/src/match_runner/mod.rs b/planetwars-localdev/src/match_runner/mod.rs new file mode 100644 index 0000000..2715b96 --- /dev/null +++ b/planetwars-localdev/src/match_runner/mod.rs @@ -0,0 +1,56 @@ +mod bot_runner; +mod match_context; +mod pw_match; + +use std::{ + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use match_context::MatchCtx; +use planetwars_rules::PwConfig; + +use crate::BotConfig; + +use self::match_context::{EventBus, PlayerHandle}; + +pub struct MatchConfig { + pub map_path: PathBuf, + pub log_path: PathBuf, + pub players: Vec, +} + +pub struct MatchBot { + pub name: String, + pub bot_config: BotConfig, +} + +pub async fn run_match(config: MatchConfig) { + let pw_config = PwConfig { + map_file: config.map_path, + max_turns: 100, + }; + + let event_bus = Arc::new(Mutex::new(EventBus::new())); + + // start bots + let players = config + .players + .iter() + .enumerate() + .map(|(player_id, bot)| { + let player_id = (player_id + 1) as u32; + let bot = bot_runner::Bot { + working_dir: bot.bot_config.path.clone(), + argv: bot.bot_config.argv.clone(), + }; + let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot); + (player_id, Box::new(handle) as Box) + }) + .collect(); + let log_file = std::fs::File::create(config.log_path).expect("could not create log file"); + let match_ctx = MatchCtx::new(event_bus, players, log_file); + + let match_state = pw_match::PwMatch::create(match_ctx, pw_config); + match_state.run().await; +} diff --git a/planetwars-localdev/src/match_runner/pw_match.rs b/planetwars-localdev/src/match_runner/pw_match.rs new file mode 100644 index 0000000..42bc9d2 --- /dev/null +++ b/planetwars-localdev/src/match_runner/pw_match.rs @@ -0,0 +1,136 @@ +use super::match_context::{MatchCtx, RequestResult}; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::{FutureExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use tokio::time::Duration; + +use serde_json; + +use std::convert::TryInto; + +pub use planetwars_rules::config::{Config, Map}; + +use planetwars_rules::protocol::{self as proto, PlayerAction}; +use planetwars_rules::serializer as pw_serializer; +use planetwars_rules::{PlanetWars, PwConfig}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MatchConfig { + pub map_name: String, + pub max_turns: usize, +} + +pub struct PwMatch { + match_ctx: MatchCtx, + match_state: PlanetWars, +} + +impl PwMatch { + pub fn create(match_ctx: MatchCtx, config: PwConfig) -> Self { + // TODO: this is kind of hacked together at the moment + let match_state = PlanetWars::create(config, match_ctx.players().len()); + + PwMatch { + match_state, + match_ctx, + } + } + + pub async fn run(mut self) { + while !self.match_state.is_finished() { + let player_messages = self.prompt_players().await; + + for (player_id, turn) in player_messages { + let res = self.execute_action(player_id, turn); + if let Some(err) = action_errors(res) { + let info_str = serde_json::to_string(&err).unwrap(); + self.match_ctx.send_info(player_id as u32, info_str); + } + } + self.match_state.step(); + + // Log state + let state = self.match_state.serialize_state(); + self.match_ctx + .log_string(serde_json::to_string(&state).unwrap()); + } + } + + async fn prompt_players(&mut self) -> Vec<(usize, RequestResult>)> { + // borrow these outside closure to make the borrow checker happy + let state = self.match_state.state(); + let match_ctx = &mut self.match_ctx; + + // TODO: this numbering is really messy. + // Get rid of the distinction between player_num + // and player_id. + + self.match_state + .state() + .players + .iter() + .filter(|p| p.alive) + .map(move |player| { + let state_for_player = pw_serializer::serialize_rotated(&state, player.id - 1); + match_ctx + .request( + player.id.try_into().unwrap(), + serde_json::to_vec(&state_for_player).unwrap(), + Duration::from_millis(1000), + ) + .map(move |resp| (player.id, resp)) + }) + .collect::>() + .collect::>() + .await + } + + fn execute_action( + &mut self, + player_num: usize, + turn: RequestResult>, + ) -> proto::PlayerAction { + let turn = match turn { + Err(_timeout) => return proto::PlayerAction::Timeout, + Ok(data) => data, + }; + + let action: proto::Action = match serde_json::from_slice(&turn) { + Err(err) => return proto::PlayerAction::ParseError(err.to_string()), + Ok(action) => action, + }; + + let commands = action + .commands + .into_iter() + .map(|command| { + let res = self.match_state.execute_command(player_num, &command); + proto::PlayerCommand { + command, + error: res.err(), + } + }) + .collect(); + + return proto::PlayerAction::Commands(commands); + } +} + +fn action_errors(action: PlayerAction) -> Option { + match action { + PlayerAction::Commands(commands) => { + let failed = commands + .into_iter() + .filter(|cmd| cmd.error.is_some()) + .collect::>(); + + if failed.is_empty() { + None + } else { + Some(PlayerAction::Commands(failed)) + } + } + e => Some(e), + } +} -- cgit v1.2.3