aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlion Beyst <ilion.beyst@gmail.com>2022-11-27 11:26:38 +0100
committerIlion Beyst <ilion.beyst@gmail.com>2022-11-27 11:26:38 +0100
commitc7d973406790056451af2af24630f03409a9647e (patch)
treec74d458524dd4219c5b0233f4c1af01477fb8fbb
parent2b71ca625ed07697eb8f53d26ec57f92802b46e5 (diff)
downloadplanetwars.dev-c7d973406790056451af2af24630f03409a9647e.tar.xz
planetwars.dev-c7d973406790056451af2af24630f03409a9647e.zip
client: don't freeze when bot does not produce output
-rw-r--r--planetwars-client/src/main.rs104
-rw-r--r--planetwars-matchrunner/src/bot_runner.rs6
2 files changed, 89 insertions, 21 deletions
diff --git a/planetwars-client/src/main.rs b/planetwars-client/src/main.rs
index 6ae2a31..5c9ee44 100644
--- a/planetwars-client/src/main.rs
+++ b/planetwars-client/src/main.rs
@@ -7,12 +7,21 @@ pub mod pb {
use clap::Parser;
use pb::client_api_service_client::ClientApiServiceClient;
-use planetwars_matchrunner::bot_runner::Bot;
+use planetwars_matchrunner::bot_runner::{Bot, BotProcess};
use serde::Deserialize;
-use std::{path::PathBuf, time::Duration};
-use tokio::sync::mpsc;
+use std::{
+ collections::VecDeque,
+ path::PathBuf,
+ sync::{Arc, Mutex},
+ time::Duration,
+};
+use tokio::{
+ io::{AsyncWriteExt, BufReader, Lines},
+ process::{ChildStdin, ChildStdout},
+ sync::mpsc::{self, UnboundedSender},
+};
use tokio_stream::wrappers::UnboundedReceiverStream;
-use tonic::{metadata::MetadataValue, transport::Channel, Request, Status};
+use tonic::{metadata::MetadataValue, transport::Channel, Request, Status, Streaming};
#[derive(clap::Parser)]
struct PlayMatch {
@@ -124,7 +133,11 @@ async fn run_player(
Ok(req)
});
- let mut bot_process = Bot {
+ let BotProcess {
+ child: _child,
+ stdin,
+ stdout,
+ } = Bot {
working_dir: PathBuf::from(
bot_config
.working_directory
@@ -134,27 +147,57 @@ async fn run_player(
}
.spawn_process();
+ let state = Arc::new(Mutex::new(BotRunnerState {
+ request_queue: VecDeque::new(),
+ }));
+
let (tx, rx) = mpsc::unbounded_channel();
- let mut stream = client
+ let stream = client
.connect_player(UnboundedReceiverStream::new(rx))
.await
.unwrap()
.into_inner();
+
+ let output_handle = tokio::spawn(handle_bot_output(state.clone(), tx, stdout));
+ let input_handle = tokio::spawn(handle_server_messages(state.clone(), stream, stdin));
+
+ output_handle
+ .await
+ .unwrap()
+ .map_err(RunPlayerError::RunBotError)?;
+ input_handle
+ .await
+ .unwrap()
+ .map_err(RunPlayerError::RunBotError)?;
+
+ Ok(())
+}
+
+struct BotRunnerState {
+ request_queue: VecDeque<i32>,
+}
+
+async fn handle_server_messages(
+ runner_state: Arc<Mutex<BotRunnerState>>,
+ mut stream: Streaming<pb::PlayerApiServerMessage>,
+ mut stdin: ChildStdin,
+) -> io::Result<()> {
while let Some(message) = stream.message().await.unwrap() {
match message.server_message {
Some(pb::PlayerApiServerMessageType::ActionRequest(req)) => {
- let moves = bot_process
- .communicate(&req.content)
- .await
- .map_err(RunPlayerError::RunBotError)?;
- let action = pb::PlayerAction {
- action_request_id: req.action_request_id,
- content: moves.as_bytes().to_vec(),
- };
- let msg = pb::PlayerApiClientMessage {
- client_message: Some(pb::PlayerApiClientMessageType::Action(action)),
- };
- tx.send(msg).unwrap();
+ {
+ let mut state = runner_state.lock().unwrap();
+ if !state.request_queue.is_empty() {
+ eprintln!("[WARN] new turn started before bot output was received");
+ eprintln!(
+ "[WARN] this could be due to your bot taking too long, \
+ or failing to flush output buffers."
+ );
+ }
+ state.request_queue.push_back(req.action_request_id);
+ }
+ stdin.write_all(&req.content).await?;
+ stdin.write_u8(b'\n').await?;
}
_ => {} // pass
}
@@ -162,3 +205,28 @@ async fn run_player(
Ok(())
}
+
+use std::io;
+
+async fn handle_bot_output(
+ runner_state: Arc<Mutex<BotRunnerState>>,
+ tx: UnboundedSender<pb::PlayerApiClientMessage>,
+ mut bot_stdout: Lines<BufReader<ChildStdout>>,
+) -> io::Result<()> {
+ while let Some(line) = bot_stdout.next_line().await? {
+ if let Some(request_id) = runner_state.lock().unwrap().request_queue.pop_front() {
+ let action = pb::PlayerAction {
+ action_request_id: request_id,
+ content: line.as_bytes().to_vec(),
+ };
+ let msg = pb::PlayerApiClientMessage {
+ client_message: Some(pb::PlayerApiClientMessageType::Action(action)),
+ };
+ tx.send(msg).unwrap();
+ } else {
+ eprintln!("[WARN] bot issued commands before a gamestate was received");
+ }
+ }
+
+ Ok(())
+}
diff --git a/planetwars-matchrunner/src/bot_runner.rs b/planetwars-matchrunner/src/bot_runner.rs
index 8597e26..f7c0742 100644
--- a/planetwars-matchrunner/src/bot_runner.rs
+++ b/planetwars-matchrunner/src/bot_runner.rs
@@ -108,9 +108,9 @@ impl Bot {
pub struct BotProcess {
#[allow(dead_code)]
- child: process::Child,
- stdin: process::ChildStdin,
- stdout: Lines<BufReader<process::ChildStdout>>,
+ pub child: process::Child,
+ pub stdin: process::ChildStdin,
+ pub stdout: Lines<BufReader<process::ChildStdout>>,
}
impl BotProcess {