aboutsummaryrefslogtreecommitdiff
path: root/planetwars-matchrunner/src
diff options
context:
space:
mode:
authorIlion Beyst <ilion.beyst@gmail.com>2022-01-01 12:10:02 +0100
committerIlion Beyst <ilion.beyst@gmail.com>2022-01-01 12:10:02 +0100
commit4a077c7c65eced447c45389acf05007dd571bf26 (patch)
treef291c61c6164a58fb1d3ab66952d07d0b7e4609c /planetwars-matchrunner/src
parente145947d052450618af3ba094e66a27c3c7f86e4 (diff)
downloadplanetwars.dev-4a077c7c65eced447c45389acf05007dd571bf26.tar.xz
planetwars.dev-4a077c7c65eced447c45389acf05007dd571bf26.zip
extract matchrunner crate from planetwars-cli
Diffstat (limited to 'planetwars-matchrunner/src')
-rw-r--r--planetwars-matchrunner/src/bot_runner.rs121
-rw-r--r--planetwars-matchrunner/src/lib.rs90
-rw-r--r--planetwars-matchrunner/src/match_context.rs161
-rw-r--r--planetwars-matchrunner/src/pw_match.rs136
4 files changed, 508 insertions, 0 deletions
diff --git a/planetwars-matchrunner/src/bot_runner.rs b/planetwars-matchrunner/src/bot_runner.rs
new file mode 100644
index 0000000..70fc060
--- /dev/null
+++ b/planetwars-matchrunner/src/bot_runner.rs
@@ -0,0 +1,121 @@
+use std::io;
+use std::path::PathBuf;
+use std::process::Stdio;
+use std::sync::Arc;
+use std::sync::Mutex;
+use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
+use tokio::process;
+use tokio::sync::mpsc;
+use tokio::time::timeout;
+
+use super::match_context::EventBus;
+use super::match_context::PlayerHandle;
+use super::match_context::RequestError;
+use super::match_context::RequestMessage;
+pub struct LocalBotHandle {
+ tx: mpsc::UnboundedSender<RequestMessage>,
+}
+
+impl PlayerHandle for LocalBotHandle {
+ fn send_request(&mut self, r: RequestMessage) {
+ self.tx
+ .send(r)
+ .expect("failed to send message to local bot");
+ }
+
+ fn send_info(&mut self, _msg: String) {
+ // TODO: log this somewhere
+ // drop info message
+ }
+}
+
+pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle {
+ let (tx, rx) = mpsc::unbounded_channel();
+
+ let runner = LocalBotRunner {
+ event_bus,
+ rx,
+ player_id,
+ bot,
+ };
+ tokio::spawn(runner.run());
+
+ return LocalBotHandle { tx };
+}
+
+pub struct LocalBotRunner {
+ event_bus: Arc<Mutex<EventBus>>,
+ rx: mpsc::UnboundedReceiver<RequestMessage>,
+ player_id: u32,
+ bot: Bot,
+}
+
+impl LocalBotRunner {
+ pub async fn run(mut self) {
+ let mut process = self.bot.spawn_process();
+
+ while let Some(request) = self.rx.recv().await {
+ let resp_fut = process.communicate(&request.content);
+ let result = timeout(request.timeout, resp_fut)
+ .await
+ // TODO: how can this failure be handled cleanly?
+ .expect("process read failed");
+ let result = match result {
+ Ok(line) => Ok(line.into_bytes()),
+ Err(_elapsed) => Err(RequestError::Timeout),
+ };
+ let request_id = (self.player_id, request.request_id);
+
+ self.event_bus
+ .lock()
+ .unwrap()
+ .resolve_request(request_id, result);
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Bot {
+ pub working_dir: PathBuf,
+ pub argv: Vec<String>,
+}
+
+impl Bot {
+ pub fn spawn_process(&self) -> BotProcess {
+ let mut child = process::Command::new(&self.argv[0])
+ .args(&self.argv[1..])
+ .current_dir(self.working_dir.clone())
+ .kill_on_drop(true)
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::inherit())
+ .spawn()
+ .expect("spawning failed");
+
+ let stdout = child.stdout.take().unwrap();
+ let reader = BufReader::new(stdout).lines();
+
+ return BotProcess {
+ stdin: child.stdin.take().unwrap(),
+ stdout: reader,
+ child,
+ };
+ }
+}
+
+pub struct BotProcess {
+ #[allow(dead_code)]
+ child: process::Child,
+ stdin: process::ChildStdin,
+ stdout: Lines<BufReader<process::ChildStdout>>,
+}
+
+impl BotProcess {
+ // TODO: gracefully handle errors
+ pub async fn communicate(&mut self, input: &[u8]) -> io::Result<String> {
+ self.stdin.write_all(input).await?;
+ self.stdin.write_u8(b'\n').await?;
+ let line = self.stdout.next_line().await?;
+ line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received"))
+ }
+}
diff --git a/planetwars-matchrunner/src/lib.rs b/planetwars-matchrunner/src/lib.rs
new file mode 100644
index 0000000..50c6518
--- /dev/null
+++ b/planetwars-matchrunner/src/lib.rs
@@ -0,0 +1,90 @@
+mod bot_runner;
+mod match_context;
+mod pw_match;
+
+use std::{
+ io::Write,
+ path::PathBuf,
+ sync::{Arc, Mutex},
+};
+
+use match_context::MatchCtx;
+use planetwars_rules::PwConfig;
+use serde::{Deserialize, Serialize};
+
+use self::match_context::{EventBus, PlayerHandle};
+
+pub struct MatchConfig {
+ pub map_name: String,
+ pub map_path: PathBuf,
+ pub log_path: PathBuf,
+ pub players: Vec<MatchPlayer>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct MatchMeta {
+ pub map_name: String,
+ pub timestamp: chrono::DateTime<chrono::Local>,
+ pub players: Vec<PlayerInfo>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct PlayerInfo {
+ pub name: String,
+}
+
+pub struct MatchPlayer {
+ pub name: String,
+ pub path: PathBuf,
+ pub argv: Vec<String>,
+}
+
+pub async fn run_match(config: MatchConfig) {
+ let pw_config = PwConfig {
+ map_file: config.map_path,
+ max_turns: 100,
+ };
+
+ let event_bus = Arc::new(Mutex::new(EventBus::new()));
+
+ // start bots
+ let players = config
+ .players
+ .iter()
+ .enumerate()
+ .map(|(player_id, player)| {
+ let player_id = (player_id + 1) as u32;
+ let bot = bot_runner::Bot {
+ working_dir: player.path.clone(),
+ argv: player.argv.clone(),
+ };
+ let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot);
+ (player_id, Box::new(handle) as Box<dyn PlayerHandle>)
+ })
+ .collect();
+ let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file");
+
+ // assemble the math meta struct
+ let match_meta = MatchMeta {
+ map_name: config.map_name.clone(),
+ timestamp: chrono::Local::now(),
+ players: config
+ .players
+ .iter()
+ .map(|bot| PlayerInfo {
+ name: bot.name.clone(),
+ })
+ .collect(),
+ };
+ write!(
+ log_file,
+ "{}\n",
+ serde_json::to_string(&match_meta).unwrap()
+ )
+ .unwrap();
+
+ let match_ctx = MatchCtx::new(event_bus, players, log_file);
+
+ let match_state = pw_match::PwMatch::create(match_ctx, pw_config);
+ match_state.run().await;
+}
diff --git a/planetwars-matchrunner/src/match_context.rs b/planetwars-matchrunner/src/match_context.rs
new file mode 100644
index 0000000..466da13
--- /dev/null
+++ b/planetwars-matchrunner/src/match_context.rs
@@ -0,0 +1,161 @@
+use futures::task::{Context, Poll};
+use futures::{future::Future, task::AtomicWaker};
+use serde::{Deserialize, Serialize};
+use std::fs::File;
+use std::io::Write;
+use std::pin::Pin;
+use std::time::Duration;
+use std::{
+ collections::HashMap,
+ sync::{Arc, Mutex},
+};
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct RequestMessage {
+ pub request_id: u32,
+ pub timeout: Duration,
+ pub content: Vec<u8>,
+}
+
+pub struct MatchCtx {
+ event_bus: Arc<Mutex<EventBus>>,
+ players: HashMap<u32, PlayerData>,
+ // output: MsgStreamHandle<String>,
+ log_sink: File,
+}
+
+impl MatchCtx {
+ pub fn new(
+ event_bus: Arc<Mutex<EventBus>>,
+ players: HashMap<u32, Box<dyn PlayerHandle>>,
+ log_file: File,
+ // log: MsgStreamHandle<String>,
+ ) -> Self {
+ MatchCtx {
+ event_bus,
+ players: players
+ .into_iter()
+ .map(|(id, handle)| {
+ let player_handle = PlayerData {
+ request_ctr: 0,
+ handle,
+ };
+ (id, player_handle)
+ })
+ .collect(),
+ log_sink: log_file,
+ }
+ }
+
+ // TODO: implement a clean way to handle the player not existing
+ pub fn request(&mut self, player_id: u32, content: Vec<u8>, timeout: Duration) -> Request {
+ let player = self.players.get_mut(&player_id).unwrap();
+ let request_id = player.request_ctr;
+ player.request_ctr += 1;
+
+ player.handle.send_request(RequestMessage {
+ request_id,
+ content,
+ timeout,
+ });
+
+ return Request {
+ player_id,
+ request_id,
+ event_bus: self.event_bus.clone(),
+ };
+ }
+
+ pub fn send_info(&mut self, player_id: u32, msg: String) {
+ let player = self.players.get_mut(&player_id).unwrap();
+ player.handle.send_info(msg);
+ }
+
+ pub fn players(&self) -> Vec<u32> {
+ self.players.keys().cloned().collect()
+ }
+
+ // this method should be used to emit log states etc.
+ pub fn log_string(&mut self, message: String) {
+ write!(self.log_sink, "{}\n", message).expect("failed to write to log file");
+ }
+}
+
+pub trait PlayerHandle: Send {
+ fn send_request(&mut self, r: RequestMessage);
+ fn send_info(&mut self, msg: String);
+}
+
+struct PlayerData {
+ request_ctr: u32,
+ handle: Box<dyn PlayerHandle>,
+}
+
+type RequestId = (u32, u32);
+pub struct EventBus {
+ request_responses: HashMap<RequestId, RequestResult<Vec<u8>>>,
+ wakers: HashMap<RequestId, AtomicWaker>,
+}
+
+impl EventBus {
+ pub fn new() -> Self {
+ EventBus {
+ request_responses: HashMap::new(),
+ wakers: HashMap::new(),
+ }
+ }
+}
+
+impl EventBus {
+ pub fn resolve_request(&mut self, id: RequestId, result: RequestResult<Vec<u8>>) {
+ if self.request_responses.contains_key(&id) {
+ // request already resolved
+ // TODO: maybe report this?
+ return;
+ }
+ self.request_responses.insert(id, result);
+ if let Some(waker) = self.wakers.remove(&id) {
+ waker.wake();
+ }
+ }
+}
+
+pub struct Request {
+ player_id: u32,
+ request_id: u32,
+ event_bus: Arc<Mutex<EventBus>>,
+}
+
+impl Request {
+ #[allow(dead_code)]
+ pub fn player_id(&self) -> u32 {
+ self.player_id
+ }
+}
+
+impl Future for Request {
+ type Output = RequestResult<Vec<u8>>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut event_bus = self.event_bus.lock().unwrap();
+ let request_id = (self.player_id, self.request_id);
+
+ if let Some(result) = event_bus.request_responses.get(&request_id) {
+ return Poll::Ready(result.clone());
+ }
+
+ event_bus
+ .wakers
+ .entry(request_id)
+ .or_insert_with(|| AtomicWaker::new())
+ .register(cx.waker());
+ return Poll::Pending;
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum RequestError {
+ Timeout,
+}
+
+pub type RequestResult<T> = Result<T, RequestError>;
diff --git a/planetwars-matchrunner/src/pw_match.rs b/planetwars-matchrunner/src/pw_match.rs
new file mode 100644
index 0000000..42bc9d2
--- /dev/null
+++ b/planetwars-matchrunner/src/pw_match.rs
@@ -0,0 +1,136 @@
+use super::match_context::{MatchCtx, RequestResult};
+use futures::stream::futures_unordered::FuturesUnordered;
+use futures::{FutureExt, StreamExt};
+use serde::{Deserialize, Serialize};
+use tokio::time::Duration;
+
+use serde_json;
+
+use std::convert::TryInto;
+
+pub use planetwars_rules::config::{Config, Map};
+
+use planetwars_rules::protocol::{self as proto, PlayerAction};
+use planetwars_rules::serializer as pw_serializer;
+use planetwars_rules::{PlanetWars, PwConfig};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct MatchConfig {
+ pub map_name: String,
+ pub max_turns: usize,
+}
+
+pub struct PwMatch {
+ match_ctx: MatchCtx,
+ match_state: PlanetWars,
+}
+
+impl PwMatch {
+ pub fn create(match_ctx: MatchCtx, config: PwConfig) -> Self {
+ // TODO: this is kind of hacked together at the moment
+ let match_state = PlanetWars::create(config, match_ctx.players().len());
+
+ PwMatch {
+ match_state,
+ match_ctx,
+ }
+ }
+
+ pub async fn run(mut self) {
+ while !self.match_state.is_finished() {
+ let player_messages = self.prompt_players().await;
+
+ for (player_id, turn) in player_messages {
+ let res = self.execute_action(player_id, turn);
+ if let Some(err) = action_errors(res) {
+ let info_str = serde_json::to_string(&err).unwrap();
+ self.match_ctx.send_info(player_id as u32, info_str);
+ }
+ }
+ self.match_state.step();
+
+ // Log state
+ let state = self.match_state.serialize_state();
+ self.match_ctx
+ .log_string(serde_json::to_string(&state).unwrap());
+ }
+ }
+
+ async fn prompt_players(&mut self) -> Vec<(usize, RequestResult<Vec<u8>>)> {
+ // borrow these outside closure to make the borrow checker happy
+ let state = self.match_state.state();
+ let match_ctx = &mut self.match_ctx;
+
+ // TODO: this numbering is really messy.
+ // Get rid of the distinction between player_num
+ // and player_id.
+
+ self.match_state
+ .state()
+ .players
+ .iter()
+ .filter(|p| p.alive)
+ .map(move |player| {
+ let state_for_player = pw_serializer::serialize_rotated(&state, player.id - 1);
+ match_ctx
+ .request(
+ player.id.try_into().unwrap(),
+ serde_json::to_vec(&state_for_player).unwrap(),
+ Duration::from_millis(1000),
+ )
+ .map(move |resp| (player.id, resp))
+ })
+ .collect::<FuturesUnordered<_>>()
+ .collect::<Vec<_>>()
+ .await
+ }
+
+ fn execute_action(
+ &mut self,
+ player_num: usize,
+ turn: RequestResult<Vec<u8>>,
+ ) -> proto::PlayerAction {
+ let turn = match turn {
+ Err(_timeout) => return proto::PlayerAction::Timeout,
+ Ok(data) => data,
+ };
+
+ let action: proto::Action = match serde_json::from_slice(&turn) {
+ Err(err) => return proto::PlayerAction::ParseError(err.to_string()),
+ Ok(action) => action,
+ };
+
+ let commands = action
+ .commands
+ .into_iter()
+ .map(|command| {
+ let res = self.match_state.execute_command(player_num, &command);
+ proto::PlayerCommand {
+ command,
+ error: res.err(),
+ }
+ })
+ .collect();
+
+ return proto::PlayerAction::Commands(commands);
+ }
+}
+
+fn action_errors(action: PlayerAction) -> Option<PlayerAction> {
+ match action {
+ PlayerAction::Commands(commands) => {
+ let failed = commands
+ .into_iter()
+ .filter(|cmd| cmd.error.is_some())
+ .collect::<Vec<_>>();
+
+ if failed.is_empty() {
+ None
+ } else {
+ Some(PlayerAction::Commands(failed))
+ }
+ }
+ e => Some(e),
+ }
+}