diff options
Diffstat (limited to 'planetwars-server')
-rw-r--r-- | planetwars-server/Cargo.toml | 6 | ||||
-rw-r--r-- | planetwars-server/build.rs | 9 | ||||
-rw-r--r-- | planetwars-server/migrations/2022-06-10-180418_nullable_match_player_code_bundle/down.sql | 1 | ||||
-rw-r--r-- | planetwars-server/migrations/2022-06-10-180418_nullable_match_player_code_bundle/up.sql | 1 | ||||
-rw-r--r-- | planetwars-server/src/db/matches.rs | 18 | ||||
-rw-r--r-- | planetwars-server/src/modules/bot_api.rs | 272 | ||||
-rw-r--r-- | planetwars-server/src/modules/matches.rs | 50 | ||||
-rw-r--r-- | planetwars-server/src/modules/mod.rs | 1 | ||||
-rw-r--r-- | planetwars-server/src/modules/ranking.rs | 9 | ||||
-rw-r--r-- | planetwars-server/src/routes/demo.rs | 11 | ||||
-rw-r--r-- | planetwars-server/src/routes/matches.rs | 6 | ||||
-rw-r--r-- | planetwars-server/src/schema.rs | 2 |
12 files changed, 355 insertions, 31 deletions
diff --git a/planetwars-server/Cargo.toml b/planetwars-server/Cargo.toml index 99161bb..f2444c1 100644 --- a/planetwars-server/Cargo.toml +++ b/planetwars-server/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] futures = "0.3" tokio = { version = "1.15", features = ["full"] } +tokio-stream = "0.1.9" hyper = "0.14" axum = { version = "0.5", features = ["json", "headers", "multipart"] } diesel = { version = "1.4.4", features = ["postgres", "chrono"] } @@ -29,9 +30,14 @@ config = { version = "0.12", features = ["toml"] } thiserror = "1.0.31" sha2 = "0.10" tokio-util = { version="0.7.3", features=["io"] } +prost = "0.10" +tonic = "0.7.2" # TODO: remove me shlex = "1.1" +[build-dependencies] +tonic-build = "0.7.2" + [dev-dependencies] parking_lot = "0.11" diff --git a/planetwars-server/build.rs b/planetwars-server/build.rs new file mode 100644 index 0000000..97bf355 --- /dev/null +++ b/planetwars-server/build.rs @@ -0,0 +1,9 @@ +extern crate tonic_build; + +fn main() -> Result<(), Box<dyn std::error::Error>> { + tonic_build::configure() + .build_server(true) + .build_client(false) + .compile(&["../proto/bot_api.proto"], &["../proto"])?; + Ok(()) +} diff --git a/planetwars-server/migrations/2022-06-10-180418_nullable_match_player_code_bundle/down.sql b/planetwars-server/migrations/2022-06-10-180418_nullable_match_player_code_bundle/down.sql new file mode 100644 index 0000000..bb0b613 --- /dev/null +++ b/planetwars-server/migrations/2022-06-10-180418_nullable_match_player_code_bundle/down.sql @@ -0,0 +1 @@ +ALTER TABLE match_players ALTER COLUMN code_bundle_id SET NOT NULL; diff --git a/planetwars-server/migrations/2022-06-10-180418_nullable_match_player_code_bundle/up.sql b/planetwars-server/migrations/2022-06-10-180418_nullable_match_player_code_bundle/up.sql new file mode 100644 index 0000000..86ab65d --- /dev/null +++ b/planetwars-server/migrations/2022-06-10-180418_nullable_match_player_code_bundle/up.sql @@ -0,0 +1 @@ +ALTER TABLE match_players ALTER COLUMN code_bundle_id DROP NOT NULL; diff --git a/planetwars-server/src/db/matches.rs b/planetwars-server/src/db/matches.rs index dfff3cf..54fd113 100644 --- a/planetwars-server/src/db/matches.rs +++ b/planetwars-server/src/db/matches.rs @@ -25,7 +25,7 @@ pub struct NewMatchPlayer { /// player id within the match pub player_id: i32, /// id of the bot behind this player - pub code_bundle_id: i32, + pub code_bundle_id: Option<i32>, } #[derive(Queryable, Identifiable)] @@ -44,11 +44,11 @@ pub struct MatchBase { pub struct MatchPlayer { pub match_id: i32, pub player_id: i32, - pub code_bundle_id: i32, + pub code_bundle_id: Option<i32>, } pub struct MatchPlayerData { - pub code_bundle_id: i32, + pub code_bundle_id: Option<i32>, } pub fn create_match( @@ -92,7 +92,10 @@ pub fn list_matches(conn: &PgConnection) -> QueryResult<Vec<FullMatchData>> { let matches = matches::table.get_results::<MatchBase>(conn)?; let match_players = MatchPlayer::belonging_to(&matches) - .inner_join(code_bundles::table) + .left_join( + code_bundles::table + .on(match_players::code_bundle_id.eq(code_bundles::id.nullable())), + ) .left_join(bots::table.on(code_bundles::bot_id.eq(bots::id.nullable()))) .load::<FullMatchPlayerData>(conn)? .grouped_by(&matches); @@ -120,7 +123,7 @@ pub struct FullMatchData { // #[primary_key(base.match_id, base::player_id)] pub struct FullMatchPlayerData { pub base: MatchPlayer, - pub code_bundle: CodeBundle, + pub code_bundle: Option<CodeBundle>, pub bot: Option<Bot>, } @@ -142,7 +145,10 @@ pub fn find_match(id: i32, conn: &PgConnection) -> QueryResult<FullMatchData> { let match_base = matches::table.find(id).get_result::<MatchBase>(conn)?; let match_players = MatchPlayer::belonging_to(&match_base) - .inner_join(code_bundles::table) + .left_join( + code_bundles::table + .on(match_players::code_bundle_id.eq(code_bundles::id.nullable())), + ) .left_join(bots::table.on(code_bundles::bot_id.eq(bots::id.nullable()))) .load::<FullMatchPlayerData>(conn)?; diff --git a/planetwars-server/src/modules/bot_api.rs b/planetwars-server/src/modules/bot_api.rs new file mode 100644 index 0000000..0ecbf71 --- /dev/null +++ b/planetwars-server/src/modules/bot_api.rs @@ -0,0 +1,272 @@ +pub mod pb { + tonic::include_proto!("grpc.planetwars.bot_api"); +} + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; +use runner::match_log::MatchLogger; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; + +use planetwars_matchrunner as runner; + +use crate::db; +use crate::util::gen_alphanumeric; +use crate::ConnectionPool; + +use super::matches::{MatchPlayer, RunMatch}; + +pub struct BotApiServer { + conn_pool: ConnectionPool, + router: PlayerRouter, +} + +/// Routes players to their handler +#[derive(Clone)] +struct PlayerRouter { + routing_table: Arc<Mutex<HashMap<String, SyncThingData>>>, +} + +impl PlayerRouter { + pub fn new() -> Self { + PlayerRouter { + routing_table: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl Default for PlayerRouter { + fn default() -> Self { + Self::new() + } +} + +// TODO: implement a way to expire entries +impl PlayerRouter { + fn put(&self, player_key: String, entry: SyncThingData) { + let mut routing_table = self.routing_table.lock().unwrap(); + routing_table.insert(player_key, entry); + } + + fn take(&self, player_key: &str) -> Option<SyncThingData> { + // TODO: this design does not allow for reconnects. Is this desired? + let mut routing_table = self.routing_table.lock().unwrap(); + routing_table.remove(player_key) + } +} + +#[tonic::async_trait] +impl pb::bot_api_service_server::BotApiService for BotApiServer { + type ConnectBotStream = UnboundedReceiverStream<Result<pb::PlayerRequest, Status>>; + + async fn connect_bot( + &self, + req: Request<Streaming<pb::PlayerRequestResponse>>, + ) -> Result<Response<Self::ConnectBotStream>, Status> { + // TODO: clean up errors + let player_key = req + .metadata() + .get("player_key") + .ok_or_else(|| Status::unauthenticated("no player_key provided"))?; + + let player_key_str = player_key + .to_str() + .map_err(|_| Status::invalid_argument("unreadable string"))?; + + let sync_data = self + .router + .take(player_key_str) + .ok_or_else(|| Status::not_found("player_key not found"))?; + + let stream = req.into_inner(); + + sync_data.tx.send(stream).unwrap(); + Ok(Response::new(UnboundedReceiverStream::new( + sync_data.server_messages, + ))) + } + + async fn create_match( + &self, + req: Request<pb::MatchRequest>, + ) -> Result<Response<pb::CreatedMatch>, Status> { + // TODO: unify with matchrunner module + let conn = self.conn_pool.get().await.unwrap(); + + let match_request = req.get_ref(); + + let opponent = db::bots::find_bot_by_name(&match_request.opponent_name, &conn) + .map_err(|_| Status::not_found("opponent not found"))?; + let opponent_code_bundle = db::bots::active_code_bundle(opponent.id, &conn) + .map_err(|_| Status::not_found("opponent has no code"))?; + + let player_key = gen_alphanumeric(32); + + let remote_bot_spec = Box::new(RemoteBotSpec { + player_key: player_key.clone(), + router: self.router.clone(), + }); + let mut run_match = RunMatch::from_players(vec![ + MatchPlayer::from_bot_spec(remote_bot_spec), + MatchPlayer::from_code_bundle(&opponent_code_bundle), + ]); + let created_match = run_match + .store_in_database(&conn) + .expect("failed to save match"); + run_match.spawn(self.conn_pool.clone()); + + Ok(Response::new(pb::CreatedMatch { + match_id: created_match.base.id, + player_key, + })) + } +} + +// TODO: please rename me +struct SyncThingData { + tx: oneshot::Sender<Streaming<pb::PlayerRequestResponse>>, + server_messages: mpsc::UnboundedReceiver<Result<pb::PlayerRequest, Status>>, +} + +struct RemoteBotSpec { + player_key: String, + router: PlayerRouter, +} + +#[tonic::async_trait] +impl runner::BotSpec for RemoteBotSpec { + async fn run_bot( + &self, + player_id: u32, + event_bus: Arc<Mutex<EventBus>>, + _match_logger: MatchLogger, + ) -> Box<dyn PlayerHandle> { + let (tx, rx) = oneshot::channel(); + let (server_msg_snd, server_msg_recv) = mpsc::unbounded_channel(); + self.router.put( + self.player_key.clone(), + SyncThingData { + tx, + server_messages: server_msg_recv, + }, + ); + + let fut = tokio::time::timeout(Duration::from_secs(10), rx); + match fut.await { + Ok(Ok(client_messages)) => { + // let client_messages = rx.await.unwrap(); + tokio::spawn(handle_bot_messages( + player_id, + event_bus.clone(), + client_messages, + )); + } + _ => { + // ensure router cleanup + self.router.take(&self.player_key); + } + }; + + // If the player did not connect, the receiving half of `sender` + // will be dropped here, resulting in a time-out for every turn. + // This is fine for now, but + // TODO: provide a formal mechanism for player startup failure + Box::new(RemoteBotHandle { + sender: server_msg_snd, + player_id, + event_bus, + }) + } +} + +async fn handle_bot_messages( + player_id: u32, + event_bus: Arc<Mutex<EventBus>>, + mut messages: Streaming<pb::PlayerRequestResponse>, +) { + while let Some(message) = messages.message().await.unwrap() { + let request_id = (player_id, message.request_id as u32); + event_bus + .lock() + .unwrap() + .resolve_request(request_id, Ok(message.content)); + } +} + +struct RemoteBotHandle { + sender: mpsc::UnboundedSender<Result<pb::PlayerRequest, Status>>, + player_id: u32, + event_bus: Arc<Mutex<EventBus>>, +} + +impl PlayerHandle for RemoteBotHandle { + fn send_request(&mut self, r: RequestMessage) { + let res = self.sender.send(Ok(pb::PlayerRequest { + request_id: r.request_id as i32, + content: r.content, + })); + match res { + Ok(()) => { + // schedule a timeout. See comments at method implementation + tokio::spawn(schedule_timeout( + (self.player_id, r.request_id), + r.timeout, + self.event_bus.clone(), + )); + } + Err(_send_error) => { + // cannot contact the remote bot anymore; + // directly mark all requests as timed out. + // TODO: create a dedicated error type for this. + // should it be logged? + println!("send error: {:?}", _send_error); + self.event_bus + .lock() + .unwrap() + .resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout)); + } + } + } +} + +// TODO: this will spawn a task for every request, which might not be ideal. +// Some alternatives: +// - create a single task that manages all time-outs. +// - intersperse timeouts with incoming client messages +// - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle). +// This was initially not done to allow timer start to be delayed until the message actually arrived +// with the player. Is this still needed, or is there a different way to do this? +// +async fn schedule_timeout( + request_id: (u32, u32), + duration: Duration, + event_bus: Arc<Mutex<EventBus>>, +) { + tokio::time::sleep(duration).await; + event_bus + .lock() + .unwrap() + .resolve_request(request_id, Err(RequestError::Timeout)); +} + +pub async fn run_bot_api(pool: ConnectionPool) { + let router = PlayerRouter::new(); + let server = BotApiServer { + router, + conn_pool: pool.clone(), + }; + + let addr = SocketAddr::from(([127, 0, 0, 1], 50051)); + Server::builder() + .add_service(pb::bot_api_service_server::BotApiServiceServer::new(server)) + .serve(addr) + .await + .unwrap() +} diff --git a/planetwars-server/src/modules/matches.rs b/planetwars-server/src/modules/matches.rs index a254bac..6d9261d 100644 --- a/planetwars-server/src/modules/matches.rs +++ b/planetwars-server/src/modules/matches.rs @@ -16,32 +16,54 @@ use crate::{ const PYTHON_IMAGE: &str = "python:3.10-slim-buster"; -pub struct RunMatch<'a> { +pub struct RunMatch { log_file_name: String, - player_code_bundles: Vec<&'a db::bots::CodeBundle>, + players: Vec<MatchPlayer>, match_id: Option<i32>, } -impl<'a> RunMatch<'a> { - pub fn from_players(player_code_bundles: Vec<&'a db::bots::CodeBundle>) -> Self { +pub struct MatchPlayer { + bot_spec: Box<dyn BotSpec>, + // meta that will be passed on to database + code_bundle_id: Option<i32>, +} + +impl MatchPlayer { + pub fn from_code_bundle(code_bundle: &db::bots::CodeBundle) -> Self { + MatchPlayer { + bot_spec: code_bundle_to_botspec(code_bundle), + code_bundle_id: Some(code_bundle.id), + } + } + + pub fn from_bot_spec(bot_spec: Box<dyn BotSpec>) -> Self { + MatchPlayer { + bot_spec, + code_bundle_id: None, + } + } +} + +impl RunMatch { + pub fn from_players(players: Vec<MatchPlayer>) -> Self { let log_file_name = format!("{}.log", gen_alphanumeric(16)); RunMatch { log_file_name, - player_code_bundles, + players, match_id: None, } } - pub fn runner_config(&self) -> runner::MatchConfig { + pub fn into_runner_config(self) -> runner::MatchConfig { runner::MatchConfig { map_path: PathBuf::from(MAPS_DIR).join("hex.json"), map_name: "hex".to_string(), log_path: PathBuf::from(MATCHES_DIR).join(&self.log_file_name), players: self - .player_code_bundles - .iter() - .map(|b| runner::MatchPlayer { - bot_spec: code_bundle_to_botspec(b), + .players + .into_iter() + .map(|player| runner::MatchPlayer { + bot_spec: player.bot_spec, }) .collect(), } @@ -56,10 +78,10 @@ impl<'a> RunMatch<'a> { log_path: &self.log_file_name, }; let new_match_players = self - .player_code_bundles + .players .iter() - .map(|b| db::matches::MatchPlayerData { - code_bundle_id: b.id, + .map(|p| db::matches::MatchPlayerData { + code_bundle_id: p.code_bundle_id, }) .collect::<Vec<_>>(); @@ -70,7 +92,7 @@ impl<'a> RunMatch<'a> { pub fn spawn(self, pool: ConnectionPool) -> JoinHandle<MatchOutcome> { let match_id = self.match_id.expect("match must be saved before running"); - let runner_config = self.runner_config(); + let runner_config = self.into_runner_config(); tokio::spawn(run_match_task(pool, runner_config, match_id)) } } diff --git a/planetwars-server/src/modules/mod.rs b/planetwars-server/src/modules/mod.rs index d66f568..1200f9d 100644 --- a/planetwars-server/src/modules/mod.rs +++ b/planetwars-server/src/modules/mod.rs @@ -1,5 +1,6 @@ // This module implements general domain logic, not directly // tied to the database or API layers. +pub mod bot_api; pub mod bots; pub mod matches; pub mod ranking; diff --git a/planetwars-server/src/modules/ranking.rs b/planetwars-server/src/modules/ranking.rs index 5d496d7..72156ee 100644 --- a/planetwars-server/src/modules/ranking.rs +++ b/planetwars-server/src/modules/ranking.rs @@ -1,8 +1,8 @@ use crate::{db::bots::Bot, DbPool}; use crate::db; -use crate::modules::matches::RunMatch; use diesel::{PgConnection, QueryResult}; +use crate::modules::matches::{MatchPlayer, RunMatch}; use rand::seq::SliceRandom; use std::collections::HashMap; use std::mem; @@ -44,9 +44,12 @@ async fn play_ranking_match(selected_bots: Vec<Bot>, db_pool: DbPool) { code_bundles.push(code_bundle); } - let code_bundle_refs = code_bundles.iter().collect::<Vec<_>>(); + let players = code_bundles + .iter() + .map(MatchPlayer::from_code_bundle) + .collect::<Vec<_>>(); - let mut run_match = RunMatch::from_players(code_bundle_refs); + let mut run_match = RunMatch::from_players(players); run_match .store_in_database(&db_conn) .expect("could not store match in db"); diff --git a/planetwars-server/src/routes/demo.rs b/planetwars-server/src/routes/demo.rs index 7f7ba71..33dc02d 100644 --- a/planetwars-server/src/routes/demo.rs +++ b/planetwars-server/src/routes/demo.rs @@ -1,7 +1,7 @@ use crate::db; use crate::db::matches::{FullMatchData, FullMatchPlayerData}; use crate::modules::bots::save_code_bundle; -use crate::modules::matches::RunMatch; +use crate::modules::matches::{MatchPlayer, RunMatch}; use crate::ConnectionPool; use axum::extract::Extension; use axum::Json; @@ -46,7 +46,10 @@ pub async fn submit_bot( // TODO: can we recover from this? .expect("could not save bot code"); - let mut run_match = RunMatch::from_players(vec![&player_code_bundle, &opponent_code_bundle]); + let mut run_match = RunMatch::from_players(vec![ + MatchPlayer::from_code_bundle(&player_code_bundle), + MatchPlayer::from_code_bundle(&opponent_code_bundle), + ]); let match_data = run_match .store_in_database(&conn) .expect("failed to save match"); @@ -58,12 +61,12 @@ pub async fn submit_bot( match_players: vec![ FullMatchPlayerData { base: match_data.match_players[0].clone(), - code_bundle: player_code_bundle, + code_bundle: Some(player_code_bundle), bot: None, }, FullMatchPlayerData { base: match_data.match_players[1].clone(), - code_bundle: opponent_code_bundle, + code_bundle: Some(opponent_code_bundle), bot: Some(opponent), }, ], diff --git a/planetwars-server/src/routes/matches.rs b/planetwars-server/src/routes/matches.rs index b61008d..874c775 100644 --- a/planetwars-server/src/routes/matches.rs +++ b/planetwars-server/src/routes/matches.rs @@ -61,7 +61,7 @@ pub async fn play_match( }); bot_ids.push(matches::MatchPlayerData { - code_bundle_id: code_bundle.id, + code_bundle_id: Some(code_bundle.id), }); } @@ -107,7 +107,7 @@ pub struct ApiMatch { #[derive(Serialize, Deserialize)] pub struct ApiMatchPlayer { - code_bundle_id: i32, + code_bundle_id: Option<i32>, bot_id: Option<i32>, bot_name: Option<String>, } @@ -127,7 +127,7 @@ pub fn match_data_to_api(data: matches::FullMatchData) -> ApiMatch { .match_players .iter() .map(|_p| ApiMatchPlayer { - code_bundle_id: _p.code_bundle.id, + code_bundle_id: _p.code_bundle.as_ref().map(|cb| cb.id), bot_id: _p.bot.as_ref().map(|b| b.id), bot_name: _p.bot.as_ref().map(|b| b.name.clone()), }) diff --git a/planetwars-server/src/schema.rs b/planetwars-server/src/schema.rs index be3e858..92acc8e 100644 --- a/planetwars-server/src/schema.rs +++ b/planetwars-server/src/schema.rs @@ -31,7 +31,7 @@ table! { match_players (match_id, player_id) { match_id -> Int4, player_id -> Int4, - code_bundle_id -> Int4, + code_bundle_id -> Nullable<Int4>, } } |