aboutsummaryrefslogtreecommitdiff
path: root/planetwars-matchrunner/src/docker_runner.rs
diff options
context:
space:
mode:
Diffstat (limited to 'planetwars-matchrunner/src/docker_runner.rs')
-rw-r--r--planetwars-matchrunner/src/docker_runner.rs118
1 files changed, 69 insertions, 49 deletions
diff --git a/planetwars-matchrunner/src/docker_runner.rs b/planetwars-matchrunner/src/docker_runner.rs
index 5900d92..d563d60 100644
--- a/planetwars-matchrunner/src/docker_runner.rs
+++ b/planetwars-matchrunner/src/docker_runner.rs
@@ -13,6 +13,7 @@ use tokio::sync::mpsc;
use tokio::time::timeout;
use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage};
+use crate::match_log::{MatchLogMessage, MatchLogger, StdErrMessage};
use crate::BotSpec;
#[derive(Clone, Debug)]
@@ -28,10 +29,11 @@ impl BotSpec for DockerBotSpec {
&self,
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
+ match_logger: MatchLogger,
) -> Box<dyn PlayerHandle> {
- let (handle, runner) = create_docker_bot(player_id, event_bus);
let process = spawn_docker_process(self).await.unwrap();
- tokio::spawn(runner.run(process));
+ let (handle, runner) = create_docker_bot(process, player_id, event_bus, match_logger);
+ tokio::spawn(runner.run());
return Box::new(handle);
}
}
@@ -75,7 +77,8 @@ async fn spawn_docker_process(
stderr: Some(true),
stdin: Some(true),
stream: Some(true),
- logs: Some(true),
+ // setting this to true causes duplicate error output. Why?
+ logs: Some(false),
..Default::default()
}),
)
@@ -87,56 +90,24 @@ async fn spawn_docker_process(
})
}
-pub struct ContainerProcess {
+struct ContainerProcess {
stdin: Pin<Box<dyn AsyncWrite + Send>>,
output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>,
}
-impl ContainerProcess {
- pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
- self.write_line(input).await?;
- self.read_line().await
- }
-
- async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
- self.stdin.write_all(bytes).await?;
- self.stdin.write_u8(b'\n').await?;
- self.stdin.flush().await?;
- Ok(())
- }
-
- async fn read_line(&mut self) -> io::Result<Bytes> {
- while let Some(item) = self.output.next().await {
- let log_output = item.expect("failed to get log output");
- match log_output {
- LogOutput::StdOut { message } => {
- // TODO: this is not correct (buffering and such)
- return Ok(message);
- }
- LogOutput::StdErr { message } => {
- // TODO
- println!("{}", String::from_utf8_lossy(&message));
- }
- _ => (),
- }
- }
-
- Err(io::Error::new(
- io::ErrorKind::UnexpectedEof,
- "no response received",
- ))
- }
-}
-
fn create_docker_bot(
+ process: ContainerProcess,
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
+ match_logger: MatchLogger,
) -> (DockerBotHandle, DockerBotRunner) {
let (tx, rx) = mpsc::unbounded_channel();
let bot_handle = DockerBotHandle { tx };
let bot_runner = DockerBotRunner {
+ process,
player_id,
event_bus,
+ match_logger,
rx,
};
(bot_handle, bot_runner)
@@ -155,21 +126,22 @@ impl PlayerHandle for DockerBotHandle {
}
pub struct DockerBotRunner {
+ process: ContainerProcess,
event_bus: Arc<Mutex<EventBus>>,
rx: mpsc::UnboundedReceiver<RequestMessage>,
+ match_logger: MatchLogger,
player_id: u32,
}
impl DockerBotRunner {
- pub async fn run(mut self, mut process: ContainerProcess) {
+ pub async fn run(mut self) {
while let Some(request) = self.rx.recv().await {
- let resp_fut = process.communicate(&request.content);
- let result = timeout(request.timeout, resp_fut)
- .await
- // TODO: how can this failure be handled cleanly?
- .expect("process read failed");
- let result = match result {
- Ok(line) => Ok(line.to_vec()),
+ let resp_fut = self.communicate(&request.content);
+ let result = timeout(request.timeout, resp_fut).await;
+ let request_response = match result {
+ Ok(Ok(response)) => Ok(response.to_vec()),
+ // this one happens when a bot output stream ends, map this to Timeout for now
+ Ok(Err(_read_error)) => Err(RequestError::Timeout),
Err(_elapsed) => Err(RequestError::Timeout),
};
let request_id = (self.player_id, request.request_id);
@@ -177,7 +149,55 @@ impl DockerBotRunner {
self.event_bus
.lock()
.unwrap()
- .resolve_request(request_id, result);
+ .resolve_request(request_id, request_response);
}
}
+
+ pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
+ self.write_line(input).await?;
+ self.read_line().await
+ }
+
+ async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
+ self.process.stdin.write_all(bytes).await?;
+ self.process.stdin.write_u8(b'\n').await?;
+ self.process.stdin.flush().await?;
+ Ok(())
+ }
+
+ async fn read_line(&mut self) -> io::Result<Bytes> {
+ while let Some(item) = self.process.output.next().await {
+ let log_output = item.expect("failed to get log output");
+ match log_output {
+ LogOutput::StdOut { message } => {
+ // TODO: this is not correct (buffering and such)
+ return Ok(message);
+ }
+ LogOutput::StdErr { mut message } => {
+ // TODO
+ if message.ends_with(b"\n") {
+ message.truncate(message.len() - 1);
+ }
+ for line in message.split(|c| *c == b'\n') {
+ let message = StdErrMessage {
+ player_id: self.player_id,
+ message: String::from_utf8_lossy(line).to_string(),
+ };
+ self.match_logger
+ .send(MatchLogMessage::StdErr(message))
+ .unwrap();
+ }
+ }
+ _ => (),
+ }
+ }
+
+ // at this point the stream has ended
+ // does this mean the container has exited?
+
+ Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "no response received",
+ ))
+ }
}