diff options
Diffstat (limited to 'planetwars-matchrunner/src/bin')
-rw-r--r-- | planetwars-matchrunner/src/bin/testmatch.rs | 69 |
1 files changed, 51 insertions, 18 deletions
diff --git a/planetwars-matchrunner/src/bin/testmatch.rs b/planetwars-matchrunner/src/bin/testmatch.rs index 97c00ed..ebd0199 100644 --- a/planetwars-matchrunner/src/bin/testmatch.rs +++ b/planetwars-matchrunner/src/bin/testmatch.rs @@ -10,15 +10,17 @@ use std::sync::{Arc, Mutex}; use bollard::container::{self, LogOutput}; use bollard::exec::StartExecResults; use bollard::Docker; +use bytes::Bytes; use futures::{Stream, StreamExt}; use planetwars_matchrunner::{ - match_context::{EventBus, MatchCtx, PlayerHandle}, + match_context::{EventBus, MatchCtx, PlayerHandle, RequestMessage}, pw_match, MatchConfig, MatchMeta, PlayerInfo, }; use planetwars_rules::protocol as proto; use planetwars_rules::PwConfig; use std::env; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc; const IMAGE: &'static str = "simplebot:latest"; @@ -64,9 +66,12 @@ async fn create_player_process(docker: &Docker) -> Result<(), bollard::errors::E .id; let start_exec_results = docker.start_exec(&exec_id, None).await?; - let (mut input, mut output) = match start_exec_results { + let mut process = match start_exec_results { StartExecResults::Detached => panic!("failed to get io channels"), - StartExecResults::Attached { input, output } => (input, output), + StartExecResults::Attached { input, output } => ContainerProcess { + stdin: input, + output, + }, }; let state = proto::State { @@ -90,22 +95,50 @@ async fn create_player_process(docker: &Docker) -> Result<(), bollard::errors::E }; let serialized = serde_json::to_vec(&state).unwrap(); - input.write_all(&serialized).await?; - input.write(b"\n").await?; - input.flush().await?; - - while let Some(item) = output.next().await { - let log_output = item.expect("failed to get log output"); - match log_output { - LogOutput::StdOut { message } => { - println!("stdout: {}", String::from_utf8_lossy(&message)); - } - LogOutput::StdErr { message } => { - println!("stderr: {}", String::from_utf8_lossy(&message)); + let out = process.communicate(&serialized).await?; + + print!("{}", String::from_utf8(out.to_vec()).unwrap()); + + Ok(()) +} + +pub struct ContainerProcess { + stdin: Pin<Box<dyn AsyncWrite + Send>>, + output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>>>>, +} + +impl ContainerProcess { + pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> { + self.write_line(input).await?; + self.read_line().await + } + + async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> { + self.stdin.write_all(bytes).await?; + self.stdin.write_u8(b'\n').await?; + self.stdin.flush().await?; + Ok(()) + } + + async fn read_line(&mut self) -> io::Result<Bytes> { + while let Some(item) = self.output.next().await { + let log_output = item.expect("failed to get log output"); + match log_output { + LogOutput::StdOut { message } => { + // TODO: this is not correct (buffering and such) + return Ok(message); + } + LogOutput::StdErr { message } => { + // TODO + println!("stderr: {}", String::from_utf8_lossy(&message)); + } + _ => (), } - _ => (), } - } - Ok(()) + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "no response received", + )) + } } |