aboutsummaryrefslogtreecommitdiff
path: root/planetwars-matchrunner/src/bin
diff options
context:
space:
mode:
Diffstat (limited to 'planetwars-matchrunner/src/bin')
-rw-r--r--planetwars-matchrunner/src/bin/testmatch.rs69
1 files changed, 51 insertions, 18 deletions
diff --git a/planetwars-matchrunner/src/bin/testmatch.rs b/planetwars-matchrunner/src/bin/testmatch.rs
index 97c00ed..ebd0199 100644
--- a/planetwars-matchrunner/src/bin/testmatch.rs
+++ b/planetwars-matchrunner/src/bin/testmatch.rs
@@ -10,15 +10,17 @@ use std::sync::{Arc, Mutex};
use bollard::container::{self, LogOutput};
use bollard::exec::StartExecResults;
use bollard::Docker;
+use bytes::Bytes;
use futures::{Stream, StreamExt};
use planetwars_matchrunner::{
- match_context::{EventBus, MatchCtx, PlayerHandle},
+ match_context::{EventBus, MatchCtx, PlayerHandle, RequestMessage},
pw_match, MatchConfig, MatchMeta, PlayerInfo,
};
use planetwars_rules::protocol as proto;
use planetwars_rules::PwConfig;
use std::env;
use tokio::io::{AsyncWrite, AsyncWriteExt};
+use tokio::sync::mpsc;
const IMAGE: &'static str = "simplebot:latest";
@@ -64,9 +66,12 @@ async fn create_player_process(docker: &Docker) -> Result<(), bollard::errors::E
.id;
let start_exec_results = docker.start_exec(&exec_id, None).await?;
- let (mut input, mut output) = match start_exec_results {
+ let mut process = match start_exec_results {
StartExecResults::Detached => panic!("failed to get io channels"),
- StartExecResults::Attached { input, output } => (input, output),
+ StartExecResults::Attached { input, output } => ContainerProcess {
+ stdin: input,
+ output,
+ },
};
let state = proto::State {
@@ -90,22 +95,50 @@ async fn create_player_process(docker: &Docker) -> Result<(), bollard::errors::E
};
let serialized = serde_json::to_vec(&state).unwrap();
- input.write_all(&serialized).await?;
- input.write(b"\n").await?;
- input.flush().await?;
-
- while let Some(item) = output.next().await {
- let log_output = item.expect("failed to get log output");
- match log_output {
- LogOutput::StdOut { message } => {
- println!("stdout: {}", String::from_utf8_lossy(&message));
- }
- LogOutput::StdErr { message } => {
- println!("stderr: {}", String::from_utf8_lossy(&message));
+ let out = process.communicate(&serialized).await?;
+
+ print!("{}", String::from_utf8(out.to_vec()).unwrap());
+
+ Ok(())
+}
+
+pub struct ContainerProcess {
+ stdin: Pin<Box<dyn AsyncWrite + Send>>,
+ output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>>>>,
+}
+
+impl ContainerProcess {
+ pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
+ self.write_line(input).await?;
+ self.read_line().await
+ }
+
+ async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
+ self.stdin.write_all(bytes).await?;
+ self.stdin.write_u8(b'\n').await?;
+ self.stdin.flush().await?;
+ Ok(())
+ }
+
+ async fn read_line(&mut self) -> io::Result<Bytes> {
+ while let Some(item) = self.output.next().await {
+ let log_output = item.expect("failed to get log output");
+ match log_output {
+ LogOutput::StdOut { message } => {
+ // TODO: this is not correct (buffering and such)
+ return Ok(message);
+ }
+ LogOutput::StdErr { message } => {
+ // TODO
+ println!("stderr: {}", String::from_utf8_lossy(&message));
+ }
+ _ => (),
}
- _ => (),
}
- }
- Ok(())
+ Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "no response received",
+ ))
+ }
}