diff options
Diffstat (limited to 'planetwars-server/src/modules/client_api.rs')
-rw-r--r-- | planetwars-server/src/modules/client_api.rs | 304 |
1 files changed, 304 insertions, 0 deletions
diff --git a/planetwars-server/src/modules/client_api.rs b/planetwars-server/src/modules/client_api.rs new file mode 100644 index 0000000..7026671 --- /dev/null +++ b/planetwars-server/src/modules/client_api.rs @@ -0,0 +1,304 @@ +pub mod pb { + tonic::include_proto!("grpc.planetwars.client_api"); + + pub use player_api_client_message::ClientMessage as PlayerApiClientMessageType; + pub use player_api_server_message::ServerMessage as PlayerApiServerMessageType; +} + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; +use runner::match_log::MatchLogger; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; + +use planetwars_matchrunner as runner; + +use crate::db; +use crate::util::gen_alphanumeric; +use crate::ConnectionPool; +use crate::GlobalConfig; + +use super::matches::{MatchPlayer, RunMatch}; + +pub struct ClientApiServer { + conn_pool: ConnectionPool, + runner_config: Arc<GlobalConfig>, + router: PlayerRouter, +} + +/// Routes players to their handler +#[derive(Clone)] +struct PlayerRouter { + routing_table: Arc<Mutex<HashMap<String, SyncThingData>>>, +} + +impl PlayerRouter { + pub fn new() -> Self { + PlayerRouter { + routing_table: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl Default for PlayerRouter { + fn default() -> Self { + Self::new() + } +} + +// TODO: implement a way to expire entries +impl PlayerRouter { + fn put(&self, player_key: String, entry: SyncThingData) { + let mut routing_table = self.routing_table.lock().unwrap(); + routing_table.insert(player_key, entry); + } + + fn take(&self, player_key: &str) -> Option<SyncThingData> { + // TODO: this design does not allow for reconnects. Is this desired? + let mut routing_table = self.routing_table.lock().unwrap(); + routing_table.remove(player_key) + } +} + +#[tonic::async_trait] +impl pb::client_api_service_server::ClientApiService for ClientApiServer { + type ConnectPlayerStream = UnboundedReceiverStream<Result<pb::PlayerApiServerMessage, Status>>; + + async fn connect_player( + &self, + req: Request<Streaming<pb::PlayerApiClientMessage>>, + ) -> Result<Response<Self::ConnectPlayerStream>, Status> { + // TODO: clean up errors + let player_key = req + .metadata() + .get("player_key") + .ok_or_else(|| Status::unauthenticated("no player_key provided"))?; + + let player_key_str = player_key + .to_str() + .map_err(|_| Status::invalid_argument("unreadable string"))?; + + let sync_data = self + .router + .take(player_key_str) + .ok_or_else(|| Status::not_found("player_key not found"))?; + + let stream = req.into_inner(); + + sync_data.tx.send(stream).unwrap(); + Ok(Response::new(UnboundedReceiverStream::new( + sync_data.server_messages, + ))) + } + + async fn create_match( + &self, + req: Request<pb::CreateMatchRequest>, + ) -> Result<Response<pb::CreateMatchResponse>, Status> { + // TODO: unify with matchrunner module + let conn = self.conn_pool.get().await.unwrap(); + + let match_request = req.get_ref(); + + let (opponent_bot, opponent_bot_version) = + db::bots::find_bot_with_version_by_name(&match_request.opponent_name, &conn) + .map_err(|_| Status::not_found("opponent not found"))?; + + let player_key = gen_alphanumeric(32); + + let remote_bot_spec = Box::new(RemoteBotSpec { + player_key: player_key.clone(), + router: self.router.clone(), + }); + let run_match = RunMatch::from_players( + self.runner_config.clone(), + vec![ + MatchPlayer::BotSpec { + spec: remote_bot_spec, + }, + MatchPlayer::BotVersion { + bot: Some(opponent_bot), + version: opponent_bot_version, + }, + ], + ); + let (created_match, _) = run_match + .run(self.conn_pool.clone()) + .await + .expect("failed to create match"); + + Ok(Response::new(pb::CreateMatchResponse { + match_id: created_match.base.id, + player_key, + // TODO: can we avoid hardcoding this? + match_url: format!( + "{}/matches/{}", + self.runner_config.root_url, created_match.base.id + ), + })) + } +} + +// TODO: please rename me +struct SyncThingData { + tx: oneshot::Sender<Streaming<pb::PlayerApiClientMessage>>, + server_messages: mpsc::UnboundedReceiver<Result<pb::PlayerApiServerMessage, Status>>, +} + +struct RemoteBotSpec { + player_key: String, + router: PlayerRouter, +} + +#[tonic::async_trait] +impl runner::BotSpec for RemoteBotSpec { + async fn run_bot( + &self, + player_id: u32, + event_bus: Arc<Mutex<EventBus>>, + _match_logger: MatchLogger, + ) -> Box<dyn PlayerHandle> { + let (tx, rx) = oneshot::channel(); + let (server_msg_snd, server_msg_recv) = mpsc::unbounded_channel(); + self.router.put( + self.player_key.clone(), + SyncThingData { + tx, + server_messages: server_msg_recv, + }, + ); + + let fut = tokio::time::timeout(Duration::from_secs(10), rx); + match fut.await { + Ok(Ok(client_messages)) => { + // let client_messages = rx.await.unwrap(); + tokio::spawn(handle_bot_messages( + player_id, + event_bus.clone(), + client_messages, + )); + } + _ => { + // ensure router cleanup + self.router.take(&self.player_key); + } + }; + + // If the player did not connect, the receiving half of `sender` + // will be dropped here, resulting in a time-out for every turn. + // This is fine for now, but + // TODO: provide a formal mechanism for player startup failure + Box::new(RemoteBotHandle { + sender: server_msg_snd, + player_id, + event_bus, + }) + } +} + +async fn handle_bot_messages( + player_id: u32, + event_bus: Arc<Mutex<EventBus>>, + mut messages: Streaming<pb::PlayerApiClientMessage>, +) { + // TODO: can this be writte nmore nicely? + while let Some(message) = messages.message().await.unwrap() { + match message.client_message { + Some(pb::PlayerApiClientMessageType::Action(resp)) => { + let request_id = (player_id, resp.action_request_id as u32); + event_bus + .lock() + .unwrap() + .resolve_request(request_id, Ok(resp.content)); + } + _ => (), + } + } +} + +struct RemoteBotHandle { + sender: mpsc::UnboundedSender<Result<pb::PlayerApiServerMessage, Status>>, + player_id: u32, + event_bus: Arc<Mutex<EventBus>>, +} + +impl PlayerHandle for RemoteBotHandle { + fn send_request(&mut self, r: RequestMessage) { + let req = pb::PlayerActionRequest { + action_request_id: r.request_id as i32, + content: r.content, + }; + + let server_message = pb::PlayerApiServerMessage { + server_message: Some(pb::PlayerApiServerMessageType::ActionRequest(req)), + }; + + let res = self.sender.send(Ok(server_message)); + match res { + Ok(()) => { + // schedule a timeout. See comments at method implementation + tokio::spawn(schedule_timeout( + (self.player_id, r.request_id), + r.timeout, + self.event_bus.clone(), + )); + } + Err(_send_error) => { + // cannot contact the remote bot anymore; + // directly mark all requests as timed out. + // TODO: create a dedicated error type for this. + // should it be logged? + println!("send error: {:?}", _send_error); + self.event_bus + .lock() + .unwrap() + .resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout)); + } + } + } +} + +// TODO: this will spawn a task for every request, which might not be ideal. +// Some alternatives: +// - create a single task that manages all time-outs. +// - intersperse timeouts with incoming client messages +// - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle). +// This was initially not done to allow timer start to be delayed until the message actually arrived +// with the player. Is this still needed, or is there a different way to do this? +// +async fn schedule_timeout( + request_id: (u32, u32), + duration: Duration, + event_bus: Arc<Mutex<EventBus>>, +) { + tokio::time::sleep(duration).await; + event_bus + .lock() + .unwrap() + .resolve_request(request_id, Err(RequestError::Timeout)); +} + +pub async fn run_client_api(runner_config: Arc<GlobalConfig>, pool: ConnectionPool) { + let router = PlayerRouter::new(); + let server = ClientApiServer { + router, + conn_pool: pool, + runner_config, + }; + + let addr = SocketAddr::from(([127, 0, 0, 1], 50051)); + Server::builder() + .add_service(pb::client_api_service_server::ClientApiServiceServer::new( + server, + )) + .serve(addr) + .await + .unwrap() +} |