diff options
Diffstat (limited to 'planetwars-client/src')
-rw-r--r-- | planetwars-client/src/main.rs | 104 |
1 files changed, 86 insertions, 18 deletions
diff --git a/planetwars-client/src/main.rs b/planetwars-client/src/main.rs index 6ae2a31..5c9ee44 100644 --- a/planetwars-client/src/main.rs +++ b/planetwars-client/src/main.rs @@ -7,12 +7,21 @@ pub mod pb { use clap::Parser; use pb::client_api_service_client::ClientApiServiceClient; -use planetwars_matchrunner::bot_runner::Bot; +use planetwars_matchrunner::bot_runner::{Bot, BotProcess}; use serde::Deserialize; -use std::{path::PathBuf, time::Duration}; -use tokio::sync::mpsc; +use std::{ + collections::VecDeque, + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::{ + io::{AsyncWriteExt, BufReader, Lines}, + process::{ChildStdin, ChildStdout}, + sync::mpsc::{self, UnboundedSender}, +}; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::{metadata::MetadataValue, transport::Channel, Request, Status}; +use tonic::{metadata::MetadataValue, transport::Channel, Request, Status, Streaming}; #[derive(clap::Parser)] struct PlayMatch { @@ -124,7 +133,11 @@ async fn run_player( Ok(req) }); - let mut bot_process = Bot { + let BotProcess { + child: _child, + stdin, + stdout, + } = Bot { working_dir: PathBuf::from( bot_config .working_directory @@ -134,27 +147,57 @@ async fn run_player( } .spawn_process(); + let state = Arc::new(Mutex::new(BotRunnerState { + request_queue: VecDeque::new(), + })); + let (tx, rx) = mpsc::unbounded_channel(); - let mut stream = client + let stream = client .connect_player(UnboundedReceiverStream::new(rx)) .await .unwrap() .into_inner(); + + let output_handle = tokio::spawn(handle_bot_output(state.clone(), tx, stdout)); + let input_handle = tokio::spawn(handle_server_messages(state.clone(), stream, stdin)); + + output_handle + .await + .unwrap() + .map_err(RunPlayerError::RunBotError)?; + input_handle + .await + .unwrap() + .map_err(RunPlayerError::RunBotError)?; + + Ok(()) +} + +struct BotRunnerState { + request_queue: VecDeque<i32>, +} + +async fn handle_server_messages( + runner_state: Arc<Mutex<BotRunnerState>>, + mut stream: Streaming<pb::PlayerApiServerMessage>, + mut stdin: ChildStdin, +) -> io::Result<()> { while let Some(message) = stream.message().await.unwrap() { match message.server_message { Some(pb::PlayerApiServerMessageType::ActionRequest(req)) => { - let moves = bot_process - .communicate(&req.content) - .await - .map_err(RunPlayerError::RunBotError)?; - let action = pb::PlayerAction { - action_request_id: req.action_request_id, - content: moves.as_bytes().to_vec(), - }; - let msg = pb::PlayerApiClientMessage { - client_message: Some(pb::PlayerApiClientMessageType::Action(action)), - }; - tx.send(msg).unwrap(); + { + let mut state = runner_state.lock().unwrap(); + if !state.request_queue.is_empty() { + eprintln!("[WARN] new turn started before bot output was received"); + eprintln!( + "[WARN] this could be due to your bot taking too long, \ + or failing to flush output buffers." + ); + } + state.request_queue.push_back(req.action_request_id); + } + stdin.write_all(&req.content).await?; + stdin.write_u8(b'\n').await?; } _ => {} // pass } @@ -162,3 +205,28 @@ async fn run_player( Ok(()) } + +use std::io; + +async fn handle_bot_output( + runner_state: Arc<Mutex<BotRunnerState>>, + tx: UnboundedSender<pb::PlayerApiClientMessage>, + mut bot_stdout: Lines<BufReader<ChildStdout>>, +) -> io::Result<()> { + while let Some(line) = bot_stdout.next_line().await? { + if let Some(request_id) = runner_state.lock().unwrap().request_queue.pop_front() { + let action = pb::PlayerAction { + action_request_id: request_id, + content: line.as_bytes().to_vec(), + }; + let msg = pb::PlayerApiClientMessage { + client_message: Some(pb::PlayerApiClientMessageType::Action(action)), + }; + tx.send(msg).unwrap(); + } else { + eprintln!("[WARN] bot issued commands before a gamestate was received"); + } + } + + Ok(()) +} |