diff options
author | Ilion Beyst <ilion.beyst@gmail.com> | 2022-11-27 11:26:38 +0100 |
---|---|---|
committer | Ilion Beyst <ilion.beyst@gmail.com> | 2022-11-27 11:26:38 +0100 |
commit | c7d973406790056451af2af24630f03409a9647e (patch) | |
tree | c74d458524dd4219c5b0233f4c1af01477fb8fbb | |
parent | 2b71ca625ed07697eb8f53d26ec57f92802b46e5 (diff) | |
download | planetwars.dev-c7d973406790056451af2af24630f03409a9647e.tar.xz planetwars.dev-c7d973406790056451af2af24630f03409a9647e.zip |
client: don't freeze when bot does not produce output
-rw-r--r-- | planetwars-client/src/main.rs | 104 | ||||
-rw-r--r-- | planetwars-matchrunner/src/bot_runner.rs | 6 |
2 files changed, 89 insertions, 21 deletions
diff --git a/planetwars-client/src/main.rs b/planetwars-client/src/main.rs index 6ae2a31..5c9ee44 100644 --- a/planetwars-client/src/main.rs +++ b/planetwars-client/src/main.rs @@ -7,12 +7,21 @@ pub mod pb { use clap::Parser; use pb::client_api_service_client::ClientApiServiceClient; -use planetwars_matchrunner::bot_runner::Bot; +use planetwars_matchrunner::bot_runner::{Bot, BotProcess}; use serde::Deserialize; -use std::{path::PathBuf, time::Duration}; -use tokio::sync::mpsc; +use std::{ + collections::VecDeque, + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::{ + io::{AsyncWriteExt, BufReader, Lines}, + process::{ChildStdin, ChildStdout}, + sync::mpsc::{self, UnboundedSender}, +}; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::{metadata::MetadataValue, transport::Channel, Request, Status}; +use tonic::{metadata::MetadataValue, transport::Channel, Request, Status, Streaming}; #[derive(clap::Parser)] struct PlayMatch { @@ -124,7 +133,11 @@ async fn run_player( Ok(req) }); - let mut bot_process = Bot { + let BotProcess { + child: _child, + stdin, + stdout, + } = Bot { working_dir: PathBuf::from( bot_config .working_directory @@ -134,27 +147,57 @@ async fn run_player( } .spawn_process(); + let state = Arc::new(Mutex::new(BotRunnerState { + request_queue: VecDeque::new(), + })); + let (tx, rx) = mpsc::unbounded_channel(); - let mut stream = client + let stream = client .connect_player(UnboundedReceiverStream::new(rx)) .await .unwrap() .into_inner(); + + let output_handle = tokio::spawn(handle_bot_output(state.clone(), tx, stdout)); + let input_handle = tokio::spawn(handle_server_messages(state.clone(), stream, stdin)); + + output_handle + .await + .unwrap() + .map_err(RunPlayerError::RunBotError)?; + input_handle + .await + .unwrap() + .map_err(RunPlayerError::RunBotError)?; + + Ok(()) +} + +struct BotRunnerState { + request_queue: VecDeque<i32>, +} + +async fn handle_server_messages( + runner_state: Arc<Mutex<BotRunnerState>>, + mut stream: Streaming<pb::PlayerApiServerMessage>, + mut stdin: ChildStdin, +) -> io::Result<()> { while let Some(message) = stream.message().await.unwrap() { match message.server_message { Some(pb::PlayerApiServerMessageType::ActionRequest(req)) => { - let moves = bot_process - .communicate(&req.content) - .await - .map_err(RunPlayerError::RunBotError)?; - let action = pb::PlayerAction { - action_request_id: req.action_request_id, - content: moves.as_bytes().to_vec(), - }; - let msg = pb::PlayerApiClientMessage { - client_message: Some(pb::PlayerApiClientMessageType::Action(action)), - }; - tx.send(msg).unwrap(); + { + let mut state = runner_state.lock().unwrap(); + if !state.request_queue.is_empty() { + eprintln!("[WARN] new turn started before bot output was received"); + eprintln!( + "[WARN] this could be due to your bot taking too long, \ + or failing to flush output buffers." + ); + } + state.request_queue.push_back(req.action_request_id); + } + stdin.write_all(&req.content).await?; + stdin.write_u8(b'\n').await?; } _ => {} // pass } @@ -162,3 +205,28 @@ async fn run_player( Ok(()) } + +use std::io; + +async fn handle_bot_output( + runner_state: Arc<Mutex<BotRunnerState>>, + tx: UnboundedSender<pb::PlayerApiClientMessage>, + mut bot_stdout: Lines<BufReader<ChildStdout>>, +) -> io::Result<()> { + while let Some(line) = bot_stdout.next_line().await? { + if let Some(request_id) = runner_state.lock().unwrap().request_queue.pop_front() { + let action = pb::PlayerAction { + action_request_id: request_id, + content: line.as_bytes().to_vec(), + }; + let msg = pb::PlayerApiClientMessage { + client_message: Some(pb::PlayerApiClientMessageType::Action(action)), + }; + tx.send(msg).unwrap(); + } else { + eprintln!("[WARN] bot issued commands before a gamestate was received"); + } + } + + Ok(()) +} diff --git a/planetwars-matchrunner/src/bot_runner.rs b/planetwars-matchrunner/src/bot_runner.rs index 8597e26..f7c0742 100644 --- a/planetwars-matchrunner/src/bot_runner.rs +++ b/planetwars-matchrunner/src/bot_runner.rs @@ -108,9 +108,9 @@ impl Bot { pub struct BotProcess { #[allow(dead_code)] - child: process::Child, - stdin: process::ChildStdin, - stdout: Lines<BufReader<process::ChildStdout>>, + pub child: process::Child, + pub stdin: process::ChildStdin, + pub stdout: Lines<BufReader<process::ChildStdout>>, } impl BotProcess { |