diff options
Diffstat (limited to 'planetwars-cli/src/match_runner')
-rw-r--r-- | planetwars-cli/src/match_runner/bot_runner.rs | 121 | ||||
-rw-r--r-- | planetwars-cli/src/match_runner/match_context.rs | 161 | ||||
-rw-r--r-- | planetwars-cli/src/match_runner/mod.rs | 91 | ||||
-rw-r--r-- | planetwars-cli/src/match_runner/pw_match.rs | 136 |
4 files changed, 509 insertions, 0 deletions
diff --git a/planetwars-cli/src/match_runner/bot_runner.rs b/planetwars-cli/src/match_runner/bot_runner.rs new file mode 100644 index 0000000..70fc060 --- /dev/null +++ b/planetwars-cli/src/match_runner/bot_runner.rs @@ -0,0 +1,121 @@ +use std::io; +use std::path::PathBuf; +use std::process::Stdio; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}; +use tokio::process; +use tokio::sync::mpsc; +use tokio::time::timeout; + +use super::match_context::EventBus; +use super::match_context::PlayerHandle; +use super::match_context::RequestError; +use super::match_context::RequestMessage; +pub struct LocalBotHandle { + tx: mpsc::UnboundedSender<RequestMessage>, +} + +impl PlayerHandle for LocalBotHandle { + fn send_request(&mut self, r: RequestMessage) { + self.tx + .send(r) + .expect("failed to send message to local bot"); + } + + fn send_info(&mut self, _msg: String) { + // TODO: log this somewhere + // drop info message + } +} + +pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle { + let (tx, rx) = mpsc::unbounded_channel(); + + let runner = LocalBotRunner { + event_bus, + rx, + player_id, + bot, + }; + tokio::spawn(runner.run()); + + return LocalBotHandle { tx }; +} + +pub struct LocalBotRunner { + event_bus: Arc<Mutex<EventBus>>, + rx: mpsc::UnboundedReceiver<RequestMessage>, + player_id: u32, + bot: Bot, +} + +impl LocalBotRunner { + pub async fn run(mut self) { + let mut process = self.bot.spawn_process(); + + while let Some(request) = self.rx.recv().await { + let resp_fut = process.communicate(&request.content); + let result = timeout(request.timeout, resp_fut) + .await + // TODO: how can this failure be handled cleanly? + .expect("process read failed"); + let result = match result { + Ok(line) => Ok(line.into_bytes()), + Err(_elapsed) => Err(RequestError::Timeout), + }; + let request_id = (self.player_id, request.request_id); + + self.event_bus + .lock() + .unwrap() + .resolve_request(request_id, result); + } + } +} + +#[derive(Debug, Clone)] +pub struct Bot { + pub working_dir: PathBuf, + pub argv: Vec<String>, +} + +impl Bot { + pub fn spawn_process(&self) -> BotProcess { + let mut child = process::Command::new(&self.argv[0]) + .args(&self.argv[1..]) + .current_dir(self.working_dir.clone()) + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .expect("spawning failed"); + + let stdout = child.stdout.take().unwrap(); + let reader = BufReader::new(stdout).lines(); + + return BotProcess { + stdin: child.stdin.take().unwrap(), + stdout: reader, + child, + }; + } +} + +pub struct BotProcess { + #[allow(dead_code)] + child: process::Child, + stdin: process::ChildStdin, + stdout: Lines<BufReader<process::ChildStdout>>, +} + +impl BotProcess { + // TODO: gracefully handle errors + pub async fn communicate(&mut self, input: &[u8]) -> io::Result<String> { + self.stdin.write_all(input).await?; + self.stdin.write_u8(b'\n').await?; + let line = self.stdout.next_line().await?; + line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received")) + } +} diff --git a/planetwars-cli/src/match_runner/match_context.rs b/planetwars-cli/src/match_runner/match_context.rs new file mode 100644 index 0000000..466da13 --- /dev/null +++ b/planetwars-cli/src/match_runner/match_context.rs @@ -0,0 +1,161 @@ +use futures::task::{Context, Poll}; +use futures::{future::Future, task::AtomicWaker}; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::io::Write; +use std::pin::Pin; +use std::time::Duration; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct RequestMessage { + pub request_id: u32, + pub timeout: Duration, + pub content: Vec<u8>, +} + +pub struct MatchCtx { + event_bus: Arc<Mutex<EventBus>>, + players: HashMap<u32, PlayerData>, + // output: MsgStreamHandle<String>, + log_sink: File, +} + +impl MatchCtx { + pub fn new( + event_bus: Arc<Mutex<EventBus>>, + players: HashMap<u32, Box<dyn PlayerHandle>>, + log_file: File, + // log: MsgStreamHandle<String>, + ) -> Self { + MatchCtx { + event_bus, + players: players + .into_iter() + .map(|(id, handle)| { + let player_handle = PlayerData { + request_ctr: 0, + handle, + }; + (id, player_handle) + }) + .collect(), + log_sink: log_file, + } + } + + // TODO: implement a clean way to handle the player not existing + pub fn request(&mut self, player_id: u32, content: Vec<u8>, timeout: Duration) -> Request { + let player = self.players.get_mut(&player_id).unwrap(); + let request_id = player.request_ctr; + player.request_ctr += 1; + + player.handle.send_request(RequestMessage { + request_id, + content, + timeout, + }); + + return Request { + player_id, + request_id, + event_bus: self.event_bus.clone(), + }; + } + + pub fn send_info(&mut self, player_id: u32, msg: String) { + let player = self.players.get_mut(&player_id).unwrap(); + player.handle.send_info(msg); + } + + pub fn players(&self) -> Vec<u32> { + self.players.keys().cloned().collect() + } + + // this method should be used to emit log states etc. + pub fn log_string(&mut self, message: String) { + write!(self.log_sink, "{}\n", message).expect("failed to write to log file"); + } +} + +pub trait PlayerHandle: Send { + fn send_request(&mut self, r: RequestMessage); + fn send_info(&mut self, msg: String); +} + +struct PlayerData { + request_ctr: u32, + handle: Box<dyn PlayerHandle>, +} + +type RequestId = (u32, u32); +pub struct EventBus { + request_responses: HashMap<RequestId, RequestResult<Vec<u8>>>, + wakers: HashMap<RequestId, AtomicWaker>, +} + +impl EventBus { + pub fn new() -> Self { + EventBus { + request_responses: HashMap::new(), + wakers: HashMap::new(), + } + } +} + +impl EventBus { + pub fn resolve_request(&mut self, id: RequestId, result: RequestResult<Vec<u8>>) { + if self.request_responses.contains_key(&id) { + // request already resolved + // TODO: maybe report this? + return; + } + self.request_responses.insert(id, result); + if let Some(waker) = self.wakers.remove(&id) { + waker.wake(); + } + } +} + +pub struct Request { + player_id: u32, + request_id: u32, + event_bus: Arc<Mutex<EventBus>>, +} + +impl Request { + #[allow(dead_code)] + pub fn player_id(&self) -> u32 { + self.player_id + } +} + +impl Future for Request { + type Output = RequestResult<Vec<u8>>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut event_bus = self.event_bus.lock().unwrap(); + let request_id = (self.player_id, self.request_id); + + if let Some(result) = event_bus.request_responses.get(&request_id) { + return Poll::Ready(result.clone()); + } + + event_bus + .wakers + .entry(request_id) + .or_insert_with(|| AtomicWaker::new()) + .register(cx.waker()); + return Poll::Pending; + } +} + +#[derive(Debug, Clone)] +pub enum RequestError { + Timeout, +} + +pub type RequestResult<T> = Result<T, RequestError>; diff --git a/planetwars-cli/src/match_runner/mod.rs b/planetwars-cli/src/match_runner/mod.rs new file mode 100644 index 0000000..fdd02d5 --- /dev/null +++ b/planetwars-cli/src/match_runner/mod.rs @@ -0,0 +1,91 @@ +mod bot_runner; +mod match_context; +mod pw_match; + +use std::{ + io::Write, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use match_context::MatchCtx; +use planetwars_rules::PwConfig; +use serde::{Deserialize, Serialize}; + +use crate::workspace::bot::WorkspaceBot; + +use self::match_context::{EventBus, PlayerHandle}; + +pub struct MatchConfig { + pub map_name: String, + pub map_path: PathBuf, + pub log_path: PathBuf, + pub players: Vec<MatchPlayer>, +} + +#[derive(Serialize, Deserialize)] +pub struct MatchMeta { + pub map_name: String, + pub timestamp: chrono::DateTime<chrono::Local>, + pub players: Vec<PlayerInfo>, +} + +#[derive(Serialize, Deserialize)] +pub struct PlayerInfo { + pub name: String, +} + +pub struct MatchPlayer { + pub name: String, + pub bot: WorkspaceBot, +} + +pub async fn run_match(config: MatchConfig) { + let pw_config = PwConfig { + map_file: config.map_path, + max_turns: 100, + }; + + let event_bus = Arc::new(Mutex::new(EventBus::new())); + + // start bots + let players = config + .players + .iter() + .enumerate() + .map(|(player_id, player)| { + let player_id = (player_id + 1) as u32; + let bot = bot_runner::Bot { + working_dir: player.bot.path.clone(), + argv: player.bot.config.get_run_argv(), + }; + let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot); + (player_id, Box::new(handle) as Box<dyn PlayerHandle>) + }) + .collect(); + let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file"); + + // assemble the math meta struct + let match_meta = MatchMeta { + map_name: config.map_name.clone(), + timestamp: chrono::Local::now(), + players: config + .players + .iter() + .map(|bot| PlayerInfo { + name: bot.name.clone(), + }) + .collect(), + }; + write!( + log_file, + "{}\n", + serde_json::to_string(&match_meta).unwrap() + ) + .unwrap(); + + let match_ctx = MatchCtx::new(event_bus, players, log_file); + + let match_state = pw_match::PwMatch::create(match_ctx, pw_config); + match_state.run().await; +} diff --git a/planetwars-cli/src/match_runner/pw_match.rs b/planetwars-cli/src/match_runner/pw_match.rs new file mode 100644 index 0000000..42bc9d2 --- /dev/null +++ b/planetwars-cli/src/match_runner/pw_match.rs @@ -0,0 +1,136 @@ +use super::match_context::{MatchCtx, RequestResult}; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::{FutureExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use tokio::time::Duration; + +use serde_json; + +use std::convert::TryInto; + +pub use planetwars_rules::config::{Config, Map}; + +use planetwars_rules::protocol::{self as proto, PlayerAction}; +use planetwars_rules::serializer as pw_serializer; +use planetwars_rules::{PlanetWars, PwConfig}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MatchConfig { + pub map_name: String, + pub max_turns: usize, +} + +pub struct PwMatch { + match_ctx: MatchCtx, + match_state: PlanetWars, +} + +impl PwMatch { + pub fn create(match_ctx: MatchCtx, config: PwConfig) -> Self { + // TODO: this is kind of hacked together at the moment + let match_state = PlanetWars::create(config, match_ctx.players().len()); + + PwMatch { + match_state, + match_ctx, + } + } + + pub async fn run(mut self) { + while !self.match_state.is_finished() { + let player_messages = self.prompt_players().await; + + for (player_id, turn) in player_messages { + let res = self.execute_action(player_id, turn); + if let Some(err) = action_errors(res) { + let info_str = serde_json::to_string(&err).unwrap(); + self.match_ctx.send_info(player_id as u32, info_str); + } + } + self.match_state.step(); + + // Log state + let state = self.match_state.serialize_state(); + self.match_ctx + .log_string(serde_json::to_string(&state).unwrap()); + } + } + + async fn prompt_players(&mut self) -> Vec<(usize, RequestResult<Vec<u8>>)> { + // borrow these outside closure to make the borrow checker happy + let state = self.match_state.state(); + let match_ctx = &mut self.match_ctx; + + // TODO: this numbering is really messy. + // Get rid of the distinction between player_num + // and player_id. + + self.match_state + .state() + .players + .iter() + .filter(|p| p.alive) + .map(move |player| { + let state_for_player = pw_serializer::serialize_rotated(&state, player.id - 1); + match_ctx + .request( + player.id.try_into().unwrap(), + serde_json::to_vec(&state_for_player).unwrap(), + Duration::from_millis(1000), + ) + .map(move |resp| (player.id, resp)) + }) + .collect::<FuturesUnordered<_>>() + .collect::<Vec<_>>() + .await + } + + fn execute_action( + &mut self, + player_num: usize, + turn: RequestResult<Vec<u8>>, + ) -> proto::PlayerAction { + let turn = match turn { + Err(_timeout) => return proto::PlayerAction::Timeout, + Ok(data) => data, + }; + + let action: proto::Action = match serde_json::from_slice(&turn) { + Err(err) => return proto::PlayerAction::ParseError(err.to_string()), + Ok(action) => action, + }; + + let commands = action + .commands + .into_iter() + .map(|command| { + let res = self.match_state.execute_command(player_num, &command); + proto::PlayerCommand { + command, + error: res.err(), + } + }) + .collect(); + + return proto::PlayerAction::Commands(commands); + } +} + +fn action_errors(action: PlayerAction) -> Option<PlayerAction> { + match action { + PlayerAction::Commands(commands) => { + let failed = commands + .into_iter() + .filter(|cmd| cmd.error.is_some()) + .collect::<Vec<_>>(); + + if failed.is_empty() { + None + } else { + Some(PlayerAction::Commands(failed)) + } + } + e => Some(e), + } +} |