aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlion Beyst <ilion.beyst@gmail.com>2022-02-23 21:08:56 +0100
committerIlion Beyst <ilion.beyst@gmail.com>2022-02-23 21:08:56 +0100
commit54b9694f0d0d7e853592317d60ad262ae8c13568 (patch)
treeb8842c884ee51cac97d6e39c68e3fd54f19f0bf8
parente15944622d3741137f443e7fa0b5d193b4ce28d9 (diff)
downloadplanetwars.dev-54b9694f0d0d7e853592317d60ad262ae8c13568.tar.xz
planetwars.dev-54b9694f0d0d7e853592317d60ad262ae8c13568.zip
implement matchlogger
-rw-r--r--planetwars-matchrunner/Cargo.toml4
-rw-r--r--planetwars-matchrunner/src/docker_runner.rs118
-rw-r--r--planetwars-matchrunner/src/lib.rs57
-rw-r--r--planetwars-matchrunner/src/match_context.rs17
-rw-r--r--planetwars-matchrunner/src/match_log.rs45
-rw-r--r--planetwars-matchrunner/src/pw_match.rs10
6 files changed, 162 insertions, 89 deletions
diff --git a/planetwars-matchrunner/Cargo.toml b/planetwars-matchrunner/Cargo.toml
index b041d61..f69e7ff 100644
--- a/planetwars-matchrunner/Cargo.toml
+++ b/planetwars-matchrunner/Cargo.toml
@@ -18,6 +18,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
planetwars-rules = { path = "../planetwars-rules" }
chrono = { version = "0.4", features = ["serde"] }
-bollard = { git = "https://github.com/antoinert/bollard" }
+bollard = { git = "https://github.com/fussybeaver/bollard", rev = "c5d87a4934c70a04f9c649fedb241dbd4943c927" }
bytes = "1.1"
-async-trait = "0.1" \ No newline at end of file
+async-trait = "0.1"
diff --git a/planetwars-matchrunner/src/docker_runner.rs b/planetwars-matchrunner/src/docker_runner.rs
index 5900d92..d563d60 100644
--- a/planetwars-matchrunner/src/docker_runner.rs
+++ b/planetwars-matchrunner/src/docker_runner.rs
@@ -13,6 +13,7 @@ use tokio::sync::mpsc;
use tokio::time::timeout;
use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage};
+use crate::match_log::{MatchLogMessage, MatchLogger, StdErrMessage};
use crate::BotSpec;
#[derive(Clone, Debug)]
@@ -28,10 +29,11 @@ impl BotSpec for DockerBotSpec {
&self,
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
+ match_logger: MatchLogger,
) -> Box<dyn PlayerHandle> {
- let (handle, runner) = create_docker_bot(player_id, event_bus);
let process = spawn_docker_process(self).await.unwrap();
- tokio::spawn(runner.run(process));
+ let (handle, runner) = create_docker_bot(process, player_id, event_bus, match_logger);
+ tokio::spawn(runner.run());
return Box::new(handle);
}
}
@@ -75,7 +77,8 @@ async fn spawn_docker_process(
stderr: Some(true),
stdin: Some(true),
stream: Some(true),
- logs: Some(true),
+ // setting this to true causes duplicate error output. Why?
+ logs: Some(false),
..Default::default()
}),
)
@@ -87,56 +90,24 @@ async fn spawn_docker_process(
})
}
-pub struct ContainerProcess {
+struct ContainerProcess {
stdin: Pin<Box<dyn AsyncWrite + Send>>,
output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>,
}
-impl ContainerProcess {
- pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
- self.write_line(input).await?;
- self.read_line().await
- }
-
- async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
- self.stdin.write_all(bytes).await?;
- self.stdin.write_u8(b'\n').await?;
- self.stdin.flush().await?;
- Ok(())
- }
-
- async fn read_line(&mut self) -> io::Result<Bytes> {
- while let Some(item) = self.output.next().await {
- let log_output = item.expect("failed to get log output");
- match log_output {
- LogOutput::StdOut { message } => {
- // TODO: this is not correct (buffering and such)
- return Ok(message);
- }
- LogOutput::StdErr { message } => {
- // TODO
- println!("{}", String::from_utf8_lossy(&message));
- }
- _ => (),
- }
- }
-
- Err(io::Error::new(
- io::ErrorKind::UnexpectedEof,
- "no response received",
- ))
- }
-}
-
fn create_docker_bot(
+ process: ContainerProcess,
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
+ match_logger: MatchLogger,
) -> (DockerBotHandle, DockerBotRunner) {
let (tx, rx) = mpsc::unbounded_channel();
let bot_handle = DockerBotHandle { tx };
let bot_runner = DockerBotRunner {
+ process,
player_id,
event_bus,
+ match_logger,
rx,
};
(bot_handle, bot_runner)
@@ -155,21 +126,22 @@ impl PlayerHandle for DockerBotHandle {
}
pub struct DockerBotRunner {
+ process: ContainerProcess,
event_bus: Arc<Mutex<EventBus>>,
rx: mpsc::UnboundedReceiver<RequestMessage>,
+ match_logger: MatchLogger,
player_id: u32,
}
impl DockerBotRunner {
- pub async fn run(mut self, mut process: ContainerProcess) {
+ pub async fn run(mut self) {
while let Some(request) = self.rx.recv().await {
- let resp_fut = process.communicate(&request.content);
- let result = timeout(request.timeout, resp_fut)
- .await
- // TODO: how can this failure be handled cleanly?
- .expect("process read failed");
- let result = match result {
- Ok(line) => Ok(line.to_vec()),
+ let resp_fut = self.communicate(&request.content);
+ let result = timeout(request.timeout, resp_fut).await;
+ let request_response = match result {
+ Ok(Ok(response)) => Ok(response.to_vec()),
+ // this one happens when a bot output stream ends, map this to Timeout for now
+ Ok(Err(_read_error)) => Err(RequestError::Timeout),
Err(_elapsed) => Err(RequestError::Timeout),
};
let request_id = (self.player_id, request.request_id);
@@ -177,7 +149,55 @@ impl DockerBotRunner {
self.event_bus
.lock()
.unwrap()
- .resolve_request(request_id, result);
+ .resolve_request(request_id, request_response);
}
}
+
+ pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
+ self.write_line(input).await?;
+ self.read_line().await
+ }
+
+ async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
+ self.process.stdin.write_all(bytes).await?;
+ self.process.stdin.write_u8(b'\n').await?;
+ self.process.stdin.flush().await?;
+ Ok(())
+ }
+
+ async fn read_line(&mut self) -> io::Result<Bytes> {
+ while let Some(item) = self.process.output.next().await {
+ let log_output = item.expect("failed to get log output");
+ match log_output {
+ LogOutput::StdOut { message } => {
+ // TODO: this is not correct (buffering and such)
+ return Ok(message);
+ }
+ LogOutput::StdErr { mut message } => {
+ // TODO
+ if message.ends_with(b"\n") {
+ message.truncate(message.len() - 1);
+ }
+ for line in message.split(|c| *c == b'\n') {
+ let message = StdErrMessage {
+ player_id: self.player_id,
+ message: String::from_utf8_lossy(line).to_string(),
+ };
+ self.match_logger
+ .send(MatchLogMessage::StdErr(message))
+ .unwrap();
+ }
+ }
+ _ => (),
+ }
+ }
+
+ // at this point the stream has ended
+ // does this mean the container has exited?
+
+ Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "no response received",
+ ))
+ }
}
diff --git a/planetwars-matchrunner/src/lib.rs b/planetwars-matchrunner/src/lib.rs
index 170ac1e..0be0b3d 100644
--- a/planetwars-matchrunner/src/lib.rs
+++ b/planetwars-matchrunner/src/lib.rs
@@ -1,17 +1,18 @@
pub mod bot_runner;
pub mod docker_runner;
pub mod match_context;
+pub mod match_log;
pub mod pw_match;
use std::{
- io::Write,
path::PathBuf,
sync::{Arc, Mutex},
};
use async_trait::async_trait;
-use futures::{stream::FuturesOrdered, FutureExt, StreamExt};
+use futures::{stream::FuturesOrdered, StreamExt};
use match_context::MatchCtx;
+use match_log::{create_log_sink, MatchLogger};
use planetwars_rules::PwConfig;
use serde::{Deserialize, Serialize};
@@ -47,6 +48,7 @@ pub trait BotSpec: Send + Sync {
&self,
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
+ match_logger: MatchLogger,
) -> Box<dyn PlayerHandle>;
}
@@ -57,6 +59,7 @@ pub async fn run_match(config: MatchConfig) {
};
let event_bus = Arc::new(Mutex::new(EventBus::new()));
+ let match_logger = create_log_sink(&config.log_path).await;
// start bots
// TODO: what happens when a bot fails?
@@ -66,34 +69,39 @@ pub async fn run_match(config: MatchConfig) {
.enumerate()
.map(|(player_id, player)| {
let player_id = (player_id + 1) as u32;
- start_bot(player_id, event_bus.clone(), &player.bot_spec)
+ start_bot(
+ player_id,
+ event_bus.clone(),
+ &player.bot_spec,
+ match_logger.clone(),
+ )
})
.collect::<FuturesOrdered<_>>()
// await all results
.collect()
.await;
- let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file");
- // assemble the math meta struct
- let match_meta = MatchMeta {
- map_name: config.map_name.clone(),
- timestamp: chrono::Local::now(),
- players: config
- .players
- .iter()
- .map(|bot| PlayerInfo {
- name: bot.name.clone(),
- })
- .collect(),
- };
- write!(
- log_file,
- "{}\n",
- serde_json::to_string(&match_meta).unwrap()
- )
- .unwrap();
+ let match_ctx = MatchCtx::new(event_bus, players, match_logger);
- let match_ctx = MatchCtx::new(event_bus, players, log_file);
+ // TODO: is this still needed?
+ // assemble the math meta struct
+ // let match_meta = MatchMeta {
+ // map_name: config.map_name.clone(),
+ // timestamp: chrono::Local::now(),
+ // players: config
+ // .players
+ // .iter()
+ // .map(|bot| PlayerInfo {
+ // name: bot.name.clone(),
+ // })
+ // .collect(),
+ // };
+ // write!(
+ // log_file,
+ // "{}\n",
+ // serde_json::to_string(&match_meta).unwrap()
+ // )
+ // .unwrap();
let match_state = pw_match::PwMatch::create(match_ctx, pw_config);
match_state.run().await;
@@ -104,7 +112,8 @@ async fn start_bot(
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
bot_spec: &Box<dyn BotSpec>,
+ match_logger: MatchLogger,
) -> (u32, Box<dyn PlayerHandle>) {
- let player_handle = bot_spec.run_bot(player_id, event_bus).await;
+ let player_handle = bot_spec.run_bot(player_id, event_bus, match_logger).await;
(player_id, player_handle)
}
diff --git a/planetwars-matchrunner/src/match_context.rs b/planetwars-matchrunner/src/match_context.rs
index 8161ed9..6ea60c3 100644
--- a/planetwars-matchrunner/src/match_context.rs
+++ b/planetwars-matchrunner/src/match_context.rs
@@ -1,8 +1,6 @@
use futures::task::{Context, Poll};
use futures::{future::Future, task::AtomicWaker};
use serde::{Deserialize, Serialize};
-use std::fs::File;
-use std::io::Write;
use std::pin::Pin;
use std::time::Duration;
use std::{
@@ -10,6 +8,8 @@ use std::{
sync::{Arc, Mutex},
};
+use crate::match_log::{MatchLogMessage, MatchLogger};
+
#[derive(Serialize, Deserialize, Debug)]
pub struct RequestMessage {
pub request_id: u32,
@@ -20,16 +20,14 @@ pub struct RequestMessage {
pub struct MatchCtx {
event_bus: Arc<Mutex<EventBus>>,
players: HashMap<u32, PlayerData>,
- // output: MsgStreamHandle<String>,
- log_sink: File,
+ match_logger: MatchLogger,
}
impl MatchCtx {
pub fn new(
event_bus: Arc<Mutex<EventBus>>,
players: HashMap<u32, Box<dyn PlayerHandle>>,
- log_file: File,
- // log: MsgStreamHandle<String>,
+ match_logger: MatchLogger,
) -> Self {
MatchCtx {
event_bus,
@@ -43,7 +41,7 @@ impl MatchCtx {
(id, player_handle)
})
.collect(),
- log_sink: log_file,
+ match_logger,
}
}
@@ -70,9 +68,8 @@ impl MatchCtx {
self.players.keys().cloned().collect()
}
- // this method should be used to emit log states etc.
- pub fn log_string(&mut self, message: String) {
- write!(self.log_sink, "{}\n", message).expect("failed to write to log file");
+ pub fn log(&mut self, message: MatchLogMessage) {
+ self.match_logger.send(message).expect("write failed");
}
}
diff --git a/planetwars-matchrunner/src/match_log.rs b/planetwars-matchrunner/src/match_log.rs
new file mode 100644
index 0000000..9991f99
--- /dev/null
+++ b/planetwars-matchrunner/src/match_log.rs
@@ -0,0 +1,45 @@
+use std::path::Path;
+
+use serde::{Deserialize, Serialize};
+use tokio::{fs::File, io::AsyncWriteExt};
+
+use planetwars_rules::protocol::State;
+use tokio::sync::mpsc;
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(tag = "type")]
+pub enum MatchLogMessage {
+ #[serde(rename = "gamestate")]
+ GameState(State),
+ #[serde(rename = "stderr")]
+ StdErr(StdErrMessage),
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct StdErrMessage {
+ pub player_id: u32,
+ pub message: String,
+}
+
+pub type MatchLogger = mpsc::UnboundedSender<MatchLogMessage>;
+
+pub async fn create_log_sink(log_file_path: &Path) -> MatchLogger {
+ let (tx, rx) = mpsc::unbounded_channel();
+ let log_file = File::create(log_file_path)
+ .await
+ .expect("Could not create log file");
+ tokio::spawn(run_log_sink(rx, log_file));
+ return tx;
+}
+
+async fn run_log_sink(mut rx: mpsc::UnboundedReceiver<MatchLogMessage>, mut file: File) {
+ while let Some(message) = rx.recv().await {
+ let json = serde_json::to_string(&message).expect("failed to serialize message");
+ file.write_all(json.as_bytes())
+ .await
+ .expect("failed to write log message to file");
+ file.write_all(b"\n")
+ .await
+ .expect("failed to write newline log message to file");
+ }
+}
diff --git a/planetwars-matchrunner/src/pw_match.rs b/planetwars-matchrunner/src/pw_match.rs
index 25f849e..c114d78 100644
--- a/planetwars-matchrunner/src/pw_match.rs
+++ b/planetwars-matchrunner/src/pw_match.rs
@@ -1,3 +1,5 @@
+use crate::match_log::MatchLogMessage;
+
use super::match_context::{MatchCtx, RequestResult};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::{FutureExt, StreamExt};
@@ -44,16 +46,16 @@ impl PwMatch {
for (player_id, turn) in player_messages {
let res = self.execute_action(player_id, turn);
if let Some(err) = action_errors(res) {
- let info_str = serde_json::to_string(&err).unwrap();
- println!("player {}: {}", player_id, info_str);
+ let _info_str = serde_json::to_string(&err).unwrap();
+ // TODO
+ // println!("player {}: {}", player_id, info_str);
}
}
self.match_state.step();
// Log state
let state = self.match_state.serialize_state();
- self.match_ctx
- .log_string(serde_json::to_string(&state).unwrap());
+ self.match_ctx.log(MatchLogMessage::GameState(state));
}
}