pub mod pb { tonic::include_proto!("grpc.planetwars.bot_api"); } use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::time::Duration; use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; use runner::match_log::MatchLogger; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; use planetwars_matchrunner as runner; use crate::db; use crate::{ConnectionPool, MAPS_DIR, MATCHES_DIR}; use super::matches::code_bundle_to_botspec; pub struct BotApiServer { router: PlayerRouter, } /// Routes players to their handler #[derive(Clone)] struct PlayerRouter { routing_table: Arc>>, } impl PlayerRouter { pub fn new() -> Self { PlayerRouter { routing_table: Arc::new(Mutex::new(HashMap::new())), } } } // TODO: implement a way to expire entries impl PlayerRouter { fn put(&self, player_id: String, entry: SyncThingData) { let mut routing_table = self.routing_table.lock().unwrap(); routing_table.insert(player_id, entry); } fn take(&self, player_id: &str) -> Option { // TODO: this design does not allow for reconnects. Is this desired? let mut routing_table = self.routing_table.lock().unwrap(); routing_table.remove(player_id) } } #[tonic::async_trait] impl pb::bot_api_service_server::BotApiService for BotApiServer { type ConnectBotStream = UnboundedReceiverStream>; async fn connect_bot( &self, req: Request>, ) -> Result, Status> { // TODO: clean up errors let player_id = req .metadata() .get("player_id") .ok_or_else(|| Status::unauthenticated("no player_id provided"))?; let player_id_str = player_id .to_str() .map_err(|_| Status::invalid_argument("unreadable string"))?; let sync_data = self .router .take(player_id_str) .ok_or_else(|| Status::not_found("player_id not found"))?; let stream = req.into_inner(); sync_data.tx.send(stream).unwrap(); Ok(Response::new(UnboundedReceiverStream::new( sync_data.server_messages, ))) } } struct SyncThingData { tx: oneshot::Sender>, server_messages: mpsc::UnboundedReceiver>, } struct RemoteBotSpec { router: PlayerRouter, } #[tonic::async_trait] impl runner::BotSpec for RemoteBotSpec { async fn run_bot( &self, player_id: u32, event_bus: Arc>, _match_logger: MatchLogger, ) -> Box { let (tx, rx) = oneshot::channel(); let (server_msg_snd, server_msg_recv) = mpsc::unbounded_channel(); let player_key = "test_player".to_string(); self.router.put( player_key.clone(), SyncThingData { tx, server_messages: server_msg_recv, }, ); let fut = tokio::time::timeout(Duration::from_secs(10), rx); match fut.await { Ok(Ok(client_messages)) => { // let client_messages = rx.await.unwrap(); tokio::spawn(handle_bot_messages( player_id, event_bus.clone(), client_messages, )); } _ => { // ensure router cleanup self.router.take(&player_key); } }; // If the player did not connect, the receiving half of `sender` // will be dropped here, resulting in a time-out for every turn. // This is fine for now, but // TODO: provide a formal mechanism for player startup failure Box::new(RemoteBotHandle { sender: server_msg_snd, player_id, event_bus, }) } } async fn handle_bot_messages( player_id: u32, event_bus: Arc>, mut messages: Streaming, ) { while let Some(message) = messages.message().await.unwrap() { let request_id = (player_id, message.request_id as u32); event_bus .lock() .unwrap() .resolve_request(request_id, Ok(message.content)); } } struct RemoteBotHandle { sender: mpsc::UnboundedSender>, player_id: u32, event_bus: Arc>, } impl PlayerHandle for RemoteBotHandle { fn send_request(&mut self, r: RequestMessage) { let res = self.sender.send(Ok(pb::PlayerRequest { request_id: r.request_id as i32, content: r.content, })); match res { Ok(()) => { // schedule a timeout. See comments at method implementation tokio::spawn(schedule_timeout( (self.player_id, r.request_id), r.timeout, self.event_bus.clone(), )); } Err(_send_error) => { // cannot contact the remote bot anymore; // directly mark all requests as timed out. // TODO: create a dedicated error type for this. // should it be logged? self.event_bus .lock() .unwrap() .resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout)); } } } } // TODO: this will spawn a task for every request, which might not be ideal. // Some alternatives: // - create a single task that manages all time-outs. // - intersperse timeouts with incoming client messages // - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle). // This was initially not done to allow timer start to be delayed until the message actually arrived // with the player. Is this still needed, or is there a different way to do this? // async fn schedule_timeout( request_id: (u32, u32), duration: Duration, event_bus: Arc>, ) { tokio::time::sleep(duration).await; event_bus .lock() .unwrap() .resolve_request(request_id, Err(RequestError::Timeout)); } async fn run_match(router: PlayerRouter, pool: ConnectionPool) { let conn = pool.get().await.unwrap(); let opponent = db::bots::find_bot_by_name("simplebot", &conn).unwrap(); let opponent_code_bundle = db::bots::active_code_bundle(opponent.id, &conn).unwrap(); let log_file_name = "remote_match.log"; let remote_bot_spec = RemoteBotSpec { router }; let match_config = runner::MatchConfig { map_path: PathBuf::from(MAPS_DIR).join("hex.json"), map_name: "hex".to_string(), log_path: PathBuf::from(MATCHES_DIR).join(&log_file_name), players: vec![ runner::MatchPlayer { bot_spec: Box::new(remote_bot_spec), }, runner::MatchPlayer { bot_spec: code_bundle_to_botspec(&opponent_code_bundle), }, ], }; runner::run_match(match_config).await; } pub async fn run_bot_api(pool: ConnectionPool) { let router = PlayerRouter::new(); tokio::spawn(run_match(router.clone(), pool)); let server = BotApiServer { router }; let addr = SocketAddr::from(([127, 0, 0, 1], 50051)); Server::builder() .add_service(pb::bot_api_service_server::BotApiServiceServer::new(server)) .serve(addr) .await .unwrap() }