aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlion Beyst <ilion.beyst@gmail.com>2022-01-22 14:32:43 +0100
committerIlion Beyst <ilion.beyst@gmail.com>2022-01-22 14:32:43 +0100
commitf62196d983c04a94b892086a4ea6926bd7b6e4fb (patch)
tree7e0399883be05dcc78a931df3d7939bb81840e85
parent3dd940321cd7f6e4356afa4e505bac8016c83707 (diff)
downloadplanetwars.dev-f62196d983c04a94b892086a4ea6926bd7b6e4fb.tar.xz
planetwars.dev-f62196d983c04a94b892086a4ea6926bd7b6e4fb.zip
implement docker runner
-rw-r--r--planetwars-matchrunner/Cargo.toml3
-rw-r--r--planetwars-matchrunner/src/bin/testmatch.rs162
-rw-r--r--planetwars-matchrunner/src/docker_runner.rs188
-rw-r--r--planetwars-matchrunner/src/lib.rs31
4 files changed, 230 insertions, 154 deletions
diff --git a/planetwars-matchrunner/Cargo.toml b/planetwars-matchrunner/Cargo.toml
index c4cb00d..b041d61 100644
--- a/planetwars-matchrunner/Cargo.toml
+++ b/planetwars-matchrunner/Cargo.toml
@@ -19,4 +19,5 @@ serde_json = "1.0"
planetwars-rules = { path = "../planetwars-rules" }
chrono = { version = "0.4", features = ["serde"] }
bollard = { git = "https://github.com/antoinert/bollard" }
-bytes = "1.1" \ No newline at end of file
+bytes = "1.1"
+async-trait = "0.1" \ No newline at end of file
diff --git a/planetwars-matchrunner/src/bin/testmatch.rs b/planetwars-matchrunner/src/bin/testmatch.rs
index a072b9f..db160cf 100644
--- a/planetwars-matchrunner/src/bin/testmatch.rs
+++ b/planetwars-matchrunner/src/bin/testmatch.rs
@@ -1,28 +1,6 @@
-extern crate planetwars_matchrunner;
-extern crate tokio;
+use std::{env, path::PathBuf};
-use std::collections::HashMap;
-use std::io::{self, Write};
-use std::path::{Path, PathBuf};
-use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-
-use bollard::container::{self, AttachContainerOptions, AttachContainerResults, LogOutput};
-use bollard::exec::StartExecResults;
-use bollard::Docker;
-use bytes::Bytes;
-use futures::{Stream, StreamExt};
-use planetwars_matchrunner::{
- match_context::{EventBus, MatchCtx, PlayerHandle, RequestMessage},
- pw_match, MatchConfig, MatchMeta, PlayerInfo,
-};
-use planetwars_rules::protocol as proto;
-use std::env;
-use tokio::io::{AsyncWrite, AsyncWriteExt};
-use tokio::sync::mpsc;
-
-const IMAGE: &'static str = "python:3.10.1-slim-buster";
-// const IMAGE: &'static str = "simplebot:latest";
+use planetwars_matchrunner::{docker_runner::DockerBotSpec, run_match, MatchConfig, MatchPlayer};
#[tokio::main]
async fn main() {
@@ -32,134 +10,30 @@ async fn main() {
_run_match(map_path).await;
}
+const IMAGE: &'static str = "python:3.10-slim-buster";
+
async fn _run_match(map_path: String) {
- let docker = Docker::connect_with_socket_defaults().unwrap();
let code_dir_path = PathBuf::from("../simplebot");
- let params = BotParams {
- image: IMAGE,
- code_path: &code_dir_path,
- argv: vec!["python", "simplebot.py"],
+ let bot_spec = DockerBotSpec {
+ image: IMAGE.to_string(),
+ code_path: code_dir_path,
+ argv: vec!["python".to_string(), "simplebot.py".to_string()],
};
- let mut process = spawn_docker_process(&docker, params).await.unwrap();
- let state = proto::State {
- planets: vec![
- proto::Planet {
+ run_match(MatchConfig {
+ map_path: PathBuf::from(map_path),
+ map_name: "hex".to_string(),
+ log_path: PathBuf::from("match.log"),
+ players: vec![
+ MatchPlayer {
name: "a".to_string(),
- owner: Some(1),
- ship_count: 100,
- x: -1.0,
- y: 0.0,
+ bot_spec: Box::new(bot_spec.clone()),
},
- proto::Planet {
+ MatchPlayer {
name: "b".to_string(),
- owner: Some(2),
- ship_count: 100,
- x: 1.0,
- y: 0.0,
+ bot_spec: Box::new(bot_spec.clone()),
},
],
- expeditions: vec![],
- };
-
- let serialized = serde_json::to_vec(&state).unwrap();
- let out = process.communicate(&serialized).await.unwrap();
-
- print!("got output: {}", String::from_utf8(out.to_vec()).unwrap());
-}
-
-pub struct BotParams<'a> {
- pub image: &'a str,
- pub code_path: &'a Path,
- pub argv: Vec<&'a str>,
-}
-
-async fn spawn_docker_process(
- docker: &Docker,
- params: BotParams<'_>,
-) -> Result<ContainerProcess, bollard::errors::Error> {
- let bot_code_dir = std::fs::canonicalize(params.code_path).unwrap();
- let code_dir_str = bot_code_dir.as_os_str().to_str().unwrap();
-
- let config = container::Config {
- image: Some(params.image),
- host_config: Some(bollard::models::HostConfig {
- binds: Some(vec![format!("{}:{}", code_dir_str, "/workdir")]),
- ..Default::default()
- }),
- working_dir: Some("/workdir"),
- cmd: Some(params.argv),
- attach_stdin: Some(true),
- attach_stdout: Some(true),
- attach_stderr: Some(true),
- open_stdin: Some(true),
- ..Default::default()
- };
-
- let response = docker.create_container::<&str, &str>(None, config).await?;
- let container_id = response.id;
-
- docker
- .start_container::<String>(&container_id, None)
- .await?;
-
- let AttachContainerResults { output, input } = docker
- .attach_container(
- &container_id,
- Some(AttachContainerOptions::<String> {
- stdout: Some(true),
- stderr: Some(true),
- stdin: Some(true),
- stream: Some(true),
- logs: Some(true),
- ..Default::default()
- }),
- )
- .await?;
-
- Ok(ContainerProcess {
- stdin: input,
- output,
})
-}
-
-pub struct ContainerProcess {
- stdin: Pin<Box<dyn AsyncWrite + Send>>,
- output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>>>>,
-}
-
-impl ContainerProcess {
- pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
- self.write_line(input).await?;
- self.read_line().await
- }
-
- async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
- self.stdin.write_all(bytes).await?;
- self.stdin.write_u8(b'\n').await?;
- self.stdin.flush().await?;
- Ok(())
- }
-
- async fn read_line(&mut self) -> io::Result<Bytes> {
- while let Some(item) = self.output.next().await {
- let log_output = item.expect("failed to get log output");
- match log_output {
- LogOutput::StdOut { message } => {
- // TODO: this is not correct (buffering and such)
- return Ok(message);
- }
- LogOutput::StdErr { message } => {
- // TODO
- println!("{}", String::from_utf8_lossy(&message));
- }
- _ => (),
- }
- }
-
- Err(io::Error::new(
- io::ErrorKind::UnexpectedEof,
- "no response received",
- ))
- }
+ .await;
}
diff --git a/planetwars-matchrunner/src/docker_runner.rs b/planetwars-matchrunner/src/docker_runner.rs
new file mode 100644
index 0000000..84101ef
--- /dev/null
+++ b/planetwars-matchrunner/src/docker_runner.rs
@@ -0,0 +1,188 @@
+use std::io;
+use std::path::PathBuf;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+
+use async_trait::async_trait;
+use bollard::container::{self, AttachContainerOptions, AttachContainerResults, LogOutput};
+use bollard::Docker;
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use tokio::io::{AsyncWrite, AsyncWriteExt};
+use tokio::sync::mpsc;
+use tokio::time::timeout;
+
+use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage};
+use crate::BotSpec;
+
+#[derive(Clone, Debug)]
+pub struct DockerBotSpec {
+ pub image: String,
+ pub code_path: PathBuf,
+ pub argv: Vec<String>,
+}
+
+#[async_trait]
+impl BotSpec for DockerBotSpec {
+ async fn run_bot(
+ &self,
+ player_id: u32,
+ event_bus: Arc<Mutex<EventBus>>,
+ ) -> Box<dyn PlayerHandle> {
+ let (handle, runner) = create_docker_bot(player_id, event_bus);
+ let process = spawn_docker_process(self).await.unwrap();
+ tokio::spawn(runner.run(process));
+ return Box::new(handle);
+ }
+}
+
+async fn spawn_docker_process(
+ params: &DockerBotSpec,
+) -> Result<ContainerProcess, bollard::errors::Error> {
+ let docker = Docker::connect_with_socket_defaults()?;
+ let bot_code_dir = std::fs::canonicalize(&params.code_path).unwrap();
+ let code_dir_str = bot_code_dir.as_os_str().to_str().unwrap();
+
+ let config = container::Config {
+ image: Some(params.image.clone()),
+ host_config: Some(bollard::models::HostConfig {
+ binds: Some(vec![format!("{}:{}", code_dir_str, "/workdir")]),
+ ..Default::default()
+ }),
+ working_dir: Some("/workdir".to_string()),
+ cmd: Some(params.argv.clone()),
+ attach_stdin: Some(true),
+ attach_stdout: Some(true),
+ attach_stderr: Some(true),
+ open_stdin: Some(true),
+ ..Default::default()
+ };
+
+ let response = docker
+ .create_container::<&str, String>(None, config)
+ .await?;
+ let container_id = response.id;
+
+ docker
+ .start_container::<String>(&container_id, None)
+ .await?;
+
+ let AttachContainerResults { output, input } = docker
+ .attach_container(
+ &container_id,
+ Some(AttachContainerOptions::<String> {
+ stdout: Some(true),
+ stderr: Some(true),
+ stdin: Some(true),
+ stream: Some(true),
+ logs: Some(true),
+ ..Default::default()
+ }),
+ )
+ .await?;
+
+ Ok(ContainerProcess {
+ stdin: input,
+ output,
+ })
+}
+
+pub struct ContainerProcess {
+ stdin: Pin<Box<dyn AsyncWrite + Send>>,
+ output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>,
+}
+
+impl ContainerProcess {
+ pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
+ self.write_line(input).await?;
+ self.read_line().await
+ }
+
+ async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
+ self.stdin.write_all(bytes).await?;
+ self.stdin.write_u8(b'\n').await?;
+ self.stdin.flush().await?;
+ Ok(())
+ }
+
+ async fn read_line(&mut self) -> io::Result<Bytes> {
+ while let Some(item) = self.output.next().await {
+ let log_output = item.expect("failed to get log output");
+ match log_output {
+ LogOutput::StdOut { message } => {
+ // TODO: this is not correct (buffering and such)
+ return Ok(message);
+ }
+ LogOutput::StdErr { message } => {
+ // TODO
+ println!("{}", String::from_utf8_lossy(&message));
+ }
+ _ => (),
+ }
+ }
+
+ Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "no response received",
+ ))
+ }
+}
+
+fn create_docker_bot(
+ player_id: u32,
+ event_bus: Arc<Mutex<EventBus>>,
+) -> (DockerBotHandle, DockerBotRunner) {
+ let (tx, rx) = mpsc::unbounded_channel();
+ let bot_handle = DockerBotHandle { tx };
+ let bot_runner = DockerBotRunner {
+ player_id,
+ event_bus,
+ rx,
+ };
+ (bot_handle, bot_runner)
+}
+
+pub struct DockerBotHandle {
+ tx: mpsc::UnboundedSender<RequestMessage>,
+}
+
+impl PlayerHandle for DockerBotHandle {
+ fn send_request(&mut self, r: RequestMessage) {
+ self.tx
+ .send(r)
+ .expect("failed to send message to local bot");
+ }
+
+ fn send_info(&mut self, _msg: String) {
+ // TODO: log this somewhere
+ // drop info message
+ }
+}
+
+pub struct DockerBotRunner {
+ event_bus: Arc<Mutex<EventBus>>,
+ rx: mpsc::UnboundedReceiver<RequestMessage>,
+ player_id: u32,
+}
+
+impl DockerBotRunner {
+ pub async fn run(mut self, mut process: ContainerProcess) {
+ while let Some(request) = self.rx.recv().await {
+ let resp_fut = process.communicate(&request.content);
+ let result = timeout(request.timeout, resp_fut)
+ .await
+ // TODO: how can this failure be handled cleanly?
+ .expect("process read failed");
+ let result = match result {
+ Ok(line) => Ok(line.to_vec()),
+ Err(_elapsed) => Err(RequestError::Timeout),
+ };
+ let request_id = (self.player_id, request.request_id);
+
+ self.event_bus
+ .lock()
+ .unwrap()
+ .resolve_request(request_id, result);
+ }
+ }
+}
diff --git a/planetwars-matchrunner/src/lib.rs b/planetwars-matchrunner/src/lib.rs
index 1f23551..2e4200c 100644
--- a/planetwars-matchrunner/src/lib.rs
+++ b/planetwars-matchrunner/src/lib.rs
@@ -1,4 +1,5 @@
pub mod bot_runner;
+pub mod docker_runner;
pub mod match_context;
pub mod pw_match;
@@ -8,6 +9,8 @@ use std::{
sync::{Arc, Mutex},
};
+use async_trait::async_trait;
+use futures::{stream::FuturesOrdered, FutureExt, StreamExt};
use match_context::MatchCtx;
use planetwars_rules::PwConfig;
use serde::{Deserialize, Serialize};
@@ -35,8 +38,16 @@ pub struct PlayerInfo {
pub struct MatchPlayer {
pub name: String,
- pub path: PathBuf,
- pub argv: Vec<String>,
+ pub bot_spec: Box<dyn BotSpec>,
+}
+
+#[async_trait]
+pub trait BotSpec {
+ async fn run_bot(
+ &self,
+ player_id: u32,
+ event_bus: Arc<Mutex<EventBus>>,
+ ) -> Box<dyn PlayerHandle>;
}
pub async fn run_match(config: MatchConfig) {
@@ -48,20 +59,22 @@ pub async fn run_match(config: MatchConfig) {
let event_bus = Arc::new(Mutex::new(EventBus::new()));
// start bots
+ // TODO: what happens when a bot fails?
let players = config
.players
.iter()
.enumerate()
.map(|(player_id, player)| {
let player_id = (player_id + 1) as u32;
- let bot = bot_runner::Bot {
- working_dir: player.path.clone(),
- argv: player.argv.clone(),
- };
- let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot);
- (player_id, Box::new(handle) as Box<dyn PlayerHandle>)
+ player
+ .bot_spec
+ .run_bot(player_id, event_bus.clone())
+ .map(move |handle| (player_id, handle))
})
- .collect();
+ .collect::<FuturesOrdered<_>>()
+ // await all results
+ .collect()
+ .await;
let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file");
// assemble the math meta struct