diff options
Diffstat (limited to 'planetwars-matchrunner/src/docker_runner.rs')
-rw-r--r-- | planetwars-matchrunner/src/docker_runner.rs | 118 |
1 files changed, 69 insertions, 49 deletions
diff --git a/planetwars-matchrunner/src/docker_runner.rs b/planetwars-matchrunner/src/docker_runner.rs index 5900d92..d563d60 100644 --- a/planetwars-matchrunner/src/docker_runner.rs +++ b/planetwars-matchrunner/src/docker_runner.rs @@ -13,6 +13,7 @@ use tokio::sync::mpsc; use tokio::time::timeout; use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; +use crate::match_log::{MatchLogMessage, MatchLogger, StdErrMessage}; use crate::BotSpec; #[derive(Clone, Debug)] @@ -28,10 +29,11 @@ impl BotSpec for DockerBotSpec { &self, player_id: u32, event_bus: Arc<Mutex<EventBus>>, + match_logger: MatchLogger, ) -> Box<dyn PlayerHandle> { - let (handle, runner) = create_docker_bot(player_id, event_bus); let process = spawn_docker_process(self).await.unwrap(); - tokio::spawn(runner.run(process)); + let (handle, runner) = create_docker_bot(process, player_id, event_bus, match_logger); + tokio::spawn(runner.run()); return Box::new(handle); } } @@ -75,7 +77,8 @@ async fn spawn_docker_process( stderr: Some(true), stdin: Some(true), stream: Some(true), - logs: Some(true), + // setting this to true causes duplicate error output. Why? + logs: Some(false), ..Default::default() }), ) @@ -87,56 +90,24 @@ async fn spawn_docker_process( }) } -pub struct ContainerProcess { +struct ContainerProcess { stdin: Pin<Box<dyn AsyncWrite + Send>>, output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>, } -impl ContainerProcess { - pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> { - self.write_line(input).await?; - self.read_line().await - } - - async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> { - self.stdin.write_all(bytes).await?; - self.stdin.write_u8(b'\n').await?; - self.stdin.flush().await?; - Ok(()) - } - - async fn read_line(&mut self) -> io::Result<Bytes> { - while let Some(item) = self.output.next().await { - let log_output = item.expect("failed to get log output"); - match log_output { - LogOutput::StdOut { message } => { - // TODO: this is not correct (buffering and such) - return Ok(message); - } - LogOutput::StdErr { message } => { - // TODO - println!("{}", String::from_utf8_lossy(&message)); - } - _ => (), - } - } - - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "no response received", - )) - } -} - fn create_docker_bot( + process: ContainerProcess, player_id: u32, event_bus: Arc<Mutex<EventBus>>, + match_logger: MatchLogger, ) -> (DockerBotHandle, DockerBotRunner) { let (tx, rx) = mpsc::unbounded_channel(); let bot_handle = DockerBotHandle { tx }; let bot_runner = DockerBotRunner { + process, player_id, event_bus, + match_logger, rx, }; (bot_handle, bot_runner) @@ -155,21 +126,22 @@ impl PlayerHandle for DockerBotHandle { } pub struct DockerBotRunner { + process: ContainerProcess, event_bus: Arc<Mutex<EventBus>>, rx: mpsc::UnboundedReceiver<RequestMessage>, + match_logger: MatchLogger, player_id: u32, } impl DockerBotRunner { - pub async fn run(mut self, mut process: ContainerProcess) { + pub async fn run(mut self) { while let Some(request) = self.rx.recv().await { - let resp_fut = process.communicate(&request.content); - let result = timeout(request.timeout, resp_fut) - .await - // TODO: how can this failure be handled cleanly? - .expect("process read failed"); - let result = match result { - Ok(line) => Ok(line.to_vec()), + let resp_fut = self.communicate(&request.content); + let result = timeout(request.timeout, resp_fut).await; + let request_response = match result { + Ok(Ok(response)) => Ok(response.to_vec()), + // this one happens when a bot output stream ends, map this to Timeout for now + Ok(Err(_read_error)) => Err(RequestError::Timeout), Err(_elapsed) => Err(RequestError::Timeout), }; let request_id = (self.player_id, request.request_id); @@ -177,7 +149,55 @@ impl DockerBotRunner { self.event_bus .lock() .unwrap() - .resolve_request(request_id, result); + .resolve_request(request_id, request_response); } } + + pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> { + self.write_line(input).await?; + self.read_line().await + } + + async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> { + self.process.stdin.write_all(bytes).await?; + self.process.stdin.write_u8(b'\n').await?; + self.process.stdin.flush().await?; + Ok(()) + } + + async fn read_line(&mut self) -> io::Result<Bytes> { + while let Some(item) = self.process.output.next().await { + let log_output = item.expect("failed to get log output"); + match log_output { + LogOutput::StdOut { message } => { + // TODO: this is not correct (buffering and such) + return Ok(message); + } + LogOutput::StdErr { mut message } => { + // TODO + if message.ends_with(b"\n") { + message.truncate(message.len() - 1); + } + for line in message.split(|c| *c == b'\n') { + let message = StdErrMessage { + player_id: self.player_id, + message: String::from_utf8_lossy(line).to_string(), + }; + self.match_logger + .send(MatchLogMessage::StdErr(message)) + .unwrap(); + } + } + _ => (), + } + } + + // at this point the stream has ended + // does this mean the container has exited? + + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "no response received", + )) + } } |