pub mod pb { tonic::include_proto!("grpc.planetwars.client_api"); pub use player_api_client_message::ClientMessage as PlayerApiClientMessageType; pub use player_api_server_message::ServerMessage as PlayerApiServerMessageType; } use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::time::Duration; use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; use runner::match_log::MatchLogger; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; use planetwars_matchrunner as runner; use crate::db; use crate::util::gen_alphanumeric; use crate::ConnectionPool; use crate::GlobalConfig; use super::matches::{MatchPlayer, RunMatch}; pub struct ClientApiServer { conn_pool: ConnectionPool, runner_config: Arc, router: PlayerRouter, } type ClientMessages = Streaming; type ServerMessages = mpsc::UnboundedReceiver>; enum PlayerConnectionState { Reserved, ClientConnected { tx: oneshot::Sender, client_messages: ClientMessages, }, ServerConnected { tx: oneshot::Sender, server_messages: ServerMessages, }, // In connected state, the connection is removed from the PlayerRouter } /// Routes players to their handler #[derive(Clone)] struct PlayerRouter { routing_table: Arc>>, } impl PlayerRouter { pub fn new() -> Self { PlayerRouter { routing_table: Arc::new(Mutex::new(HashMap::new())), } } } impl Default for PlayerRouter { fn default() -> Self { Self::new() } } // TODO: implement a way to expire entries impl PlayerRouter { fn put(&self, player_key: String, entry: PlayerConnectionState) { let mut routing_table = self.routing_table.lock().unwrap(); routing_table.insert(player_key, entry); } fn take(&self, player_key: &str) -> Option { // TODO: this design does not allow for reconnects. Is this desired? let mut routing_table = self.routing_table.lock().unwrap(); routing_table.remove(player_key) } } #[tonic::async_trait] impl pb::client_api_service_server::ClientApiService for ClientApiServer { type ConnectPlayerStream = UnboundedReceiverStream>; async fn connect_player( &self, req: Request>, ) -> Result, Status> { // TODO: clean up errors let player_key = req .metadata() .get("player_key") .ok_or_else(|| Status::unauthenticated("no player_key provided"))?; let player_key_string = player_key .to_str() .map_err(|_| Status::invalid_argument("unreadable string"))? .to_string(); let client_messages = req.into_inner(); let server_messages_promise = { // during this block, a lack is held on the routing table let mut routing_table = self.router.routing_table.lock().unwrap(); let connection_state = routing_table .remove(&player_key_string) .ok_or_else(|| Status::not_found("player_key not found"))?; match connection_state { PlayerConnectionState::Reserved => { let (tx, rx) = oneshot::channel(); routing_table.insert( player_key_string, PlayerConnectionState::ClientConnected { tx, client_messages, }, ); Promise::Awaiting(rx) } PlayerConnectionState::ServerConnected { tx, server_messages, } => { tx.send(client_messages).unwrap(); Promise::Resolved(server_messages) } PlayerConnectionState::ClientConnected { .. } => panic!("player already connected"), } }; let server_messages = server_messages_promise .get_value() .await .map_err(|_| Status::internal("failed to connect player to game"))?; Ok(Response::new(UnboundedReceiverStream::new(server_messages))) } async fn create_match( &self, req: Request, ) -> Result, Status> { // TODO: unify with matchrunner module let conn = self.conn_pool.get().await.unwrap(); let match_request = req.get_ref(); let (opponent_bot, opponent_bot_version) = db::bots::find_bot_with_version_by_name(&match_request.opponent_name, &conn) .map_err(|_| Status::not_found("opponent not found"))?; let map_name = match match_request.map_name.as_str() { "" => "hex", name => name, }; let map = db::maps::find_map_by_name(map_name, &conn) .map_err(|_| Status::not_found("map not found"))?; let player_key = gen_alphanumeric(32); // ensure that the player key is registered in the router when we send a response self.router .put(player_key.clone(), PlayerConnectionState::Reserved); let remote_bot_spec = Box::new(RemoteBotSpec { player_key: player_key.clone(), router: self.router.clone(), }); let run_match = RunMatch::new( self.runner_config.clone(), false, map, vec![ MatchPlayer::BotSpec { spec: remote_bot_spec, }, MatchPlayer::BotVersion { bot: Some(opponent_bot), version: opponent_bot_version, }, ], ); let (created_match, _) = run_match .run(self.conn_pool.clone()) .await .expect("failed to create match"); Ok(Response::new(pb::CreateMatchResponse { match_id: created_match.base.id, player_key, // TODO: can we avoid hardcoding this? match_url: format!( "{}/matches/{}", self.runner_config.root_url, created_match.base.id ), })) } } struct RemoteBotSpec { player_key: String, router: PlayerRouter, } #[tonic::async_trait] impl runner::BotSpec for RemoteBotSpec { async fn run_bot( &self, player_id: u32, event_bus: Arc>, _match_logger: MatchLogger, ) -> Box { let (server_msg_snd, server_msg_recv) = mpsc::unbounded_channel(); let client_messages_promise = { // during this block, we hold a lock on the routing table. let mut routing_table = self.router.routing_table.lock().unwrap(); let connection_state = routing_table .remove(&self.player_key) .expect("player key not found in routing table"); match connection_state { PlayerConnectionState::Reserved => { let (tx, rx) = oneshot::channel(); routing_table.insert( self.player_key.clone(), PlayerConnectionState::ServerConnected { tx, server_messages: server_msg_recv, }, ); Promise::Awaiting(rx) } PlayerConnectionState::ClientConnected { tx, client_messages, } => { tx.send(server_msg_recv).unwrap(); Promise::Resolved(client_messages) } PlayerConnectionState::ServerConnected { .. } => panic!("server already connected"), } }; let client_messages_future = tokio::time::timeout(Duration::from_secs(10), client_messages_promise.get_value()); if let Ok(Ok(client_messages)) = client_messages_future.await { tokio::spawn(handle_bot_messages( player_id, event_bus.clone(), client_messages, )); } // ensure router cleanup self.router.take(&self.player_key); // If the player did not connect, the receiving half of `sender` // will be dropped here, resulting in a time-out for every turn. // This is fine for now, but // TODO: provide a formal mechanism for player startup failure Box::new(RemoteBotHandle { sender: server_msg_snd, player_id, event_bus, }) } } async fn handle_bot_messages( player_id: u32, event_bus: Arc>, mut messages: Streaming, ) { // TODO: can this be written more nicely? while let Some(message) = messages.message().await.unwrap() { match message.client_message { Some(pb::PlayerApiClientMessageType::Action(resp)) => { let request_id = (player_id, resp.action_request_id as u32); event_bus .lock() .unwrap() .resolve_request(request_id, Ok(resp.content)); } _ => (), } } } struct RemoteBotHandle { sender: mpsc::UnboundedSender>, player_id: u32, event_bus: Arc>, } impl PlayerHandle for RemoteBotHandle { fn send_request(&mut self, r: RequestMessage) { let req = pb::PlayerActionRequest { action_request_id: r.request_id as i32, content: r.content, }; let server_message = pb::PlayerApiServerMessage { server_message: Some(pb::PlayerApiServerMessageType::ActionRequest(req)), }; let res = self.sender.send(Ok(server_message)); match res { Ok(()) => { // schedule a timeout. See comments at method implementation tokio::spawn(schedule_timeout( (self.player_id, r.request_id), r.timeout, self.event_bus.clone(), )); } Err(_send_error) => { // cannot contact the remote bot anymore; // directly mark all requests as timed out. // TODO: create a dedicated error type for this. // should it be logged? println!("send error: {:?}", _send_error); self.event_bus .lock() .unwrap() .resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout)); } } } } // TODO: this will spawn a task for every request, which might not be ideal. // Some alternatives: // - create a single task that manages all time-outs. // - intersperse timeouts with incoming client messages // - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle). // This was initially not done to allow timer start to be delayed until the message actually arrived // with the player. Is this still needed, or is there a different way to do this? // async fn schedule_timeout( request_id: (u32, u32), duration: Duration, event_bus: Arc>, ) { tokio::time::sleep(duration).await; event_bus .lock() .unwrap() .resolve_request(request_id, Err(RequestError::Timeout)); } pub async fn run_client_api(runner_config: Arc, pool: ConnectionPool) { let router = PlayerRouter::new(); let server = ClientApiServer { router, conn_pool: pool, runner_config, }; let addr = SocketAddr::from(([127, 0, 0, 1], 50051)); Server::builder() .add_service(pb::client_api_service_server::ClientApiServiceServer::new( server, )) .serve(addr) .await .unwrap() } enum Promise { Resolved(T), Awaiting(oneshot::Receiver), } impl Promise { async fn get_value(self) -> Result { match self { Promise::Resolved(val) => Ok(val), Promise::Awaiting(rx) => rx.await, } } }