diff options
author | Ilion Beyst <ilion.beyst@gmail.com> | 2022-07-25 22:16:50 +0200 |
---|---|---|
committer | Ilion Beyst <ilion.beyst@gmail.com> | 2022-07-25 22:16:50 +0200 |
commit | 67276bd0bbac15fe087edafd59d164c686509b35 (patch) | |
tree | b844126c3d0987c4beb12cb60a968a9f53fc6ce9 /planetwars-server/src/modules/bot_api.rs | |
parent | 93c4306b1015594bb6d7e08d03138c12229ac598 (diff) | |
download | planetwars.dev-67276bd0bbac15fe087edafd59d164c686509b35.tar.xz planetwars.dev-67276bd0bbac15fe087edafd59d164c686509b35.zip |
rename bot_api to client_api
Diffstat (limited to 'planetwars-server/src/modules/bot_api.rs')
-rw-r--r-- | planetwars-server/src/modules/bot_api.rs | 299 |
1 files changed, 0 insertions, 299 deletions
diff --git a/planetwars-server/src/modules/bot_api.rs b/planetwars-server/src/modules/bot_api.rs deleted file mode 100644 index 3efb1c2..0000000 --- a/planetwars-server/src/modules/bot_api.rs +++ /dev/null @@ -1,299 +0,0 @@ -pub mod pb { - tonic::include_proto!("grpc.planetwars.bot_api"); -} - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; -use runner::match_log::MatchLogger; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic; -use tonic::transport::Server; -use tonic::{Request, Response, Status, Streaming}; - -use planetwars_matchrunner as runner; - -use crate::db; -use crate::util::gen_alphanumeric; -use crate::ConnectionPool; -use crate::GlobalConfig; - -use super::matches::{MatchPlayer, RunMatch}; - -pub struct BotApiServer { - conn_pool: ConnectionPool, - runner_config: Arc<GlobalConfig>, - router: PlayerRouter, -} - -/// Routes players to their handler -#[derive(Clone)] -struct PlayerRouter { - routing_table: Arc<Mutex<HashMap<String, SyncThingData>>>, -} - -impl PlayerRouter { - pub fn new() -> Self { - PlayerRouter { - routing_table: Arc::new(Mutex::new(HashMap::new())), - } - } -} - -impl Default for PlayerRouter { - fn default() -> Self { - Self::new() - } -} - -// TODO: implement a way to expire entries -impl PlayerRouter { - fn put(&self, player_key: String, entry: SyncThingData) { - let mut routing_table = self.routing_table.lock().unwrap(); - routing_table.insert(player_key, entry); - } - - fn take(&self, player_key: &str) -> Option<SyncThingData> { - // TODO: this design does not allow for reconnects. Is this desired? - let mut routing_table = self.routing_table.lock().unwrap(); - routing_table.remove(player_key) - } -} - -#[tonic::async_trait] -impl pb::bot_api_service_server::BotApiService for BotApiServer { - type ConnectPlayerStream = UnboundedReceiverStream<Result<pb::ServerMessage, Status>>; - - async fn connect_player( - &self, - req: Request<Streaming<pb::ClientMessage>>, - ) -> Result<Response<Self::ConnectPlayerStream>, Status> { - // TODO: clean up errors - let player_key = req - .metadata() - .get("player_key") - .ok_or_else(|| Status::unauthenticated("no player_key provided"))?; - - let player_key_str = player_key - .to_str() - .map_err(|_| Status::invalid_argument("unreadable string"))?; - - let sync_data = self - .router - .take(player_key_str) - .ok_or_else(|| Status::not_found("player_key not found"))?; - - let stream = req.into_inner(); - - sync_data.tx.send(stream).unwrap(); - Ok(Response::new(UnboundedReceiverStream::new( - sync_data.server_messages, - ))) - } - - async fn create_match( - &self, - req: Request<pb::MatchRequest>, - ) -> Result<Response<pb::CreatedMatch>, Status> { - // TODO: unify with matchrunner module - let conn = self.conn_pool.get().await.unwrap(); - - let match_request = req.get_ref(); - - let (opponent_bot, opponent_bot_version) = - db::bots::find_bot_with_version_by_name(&match_request.opponent_name, &conn) - .map_err(|_| Status::not_found("opponent not found"))?; - - let player_key = gen_alphanumeric(32); - - let remote_bot_spec = Box::new(RemoteBotSpec { - player_key: player_key.clone(), - router: self.router.clone(), - }); - let run_match = RunMatch::from_players( - self.runner_config.clone(), - vec![ - MatchPlayer::BotSpec { - spec: remote_bot_spec, - }, - MatchPlayer::BotVersion { - bot: Some(opponent_bot), - version: opponent_bot_version, - }, - ], - ); - let (created_match, _) = run_match - .run(self.conn_pool.clone()) - .await - .expect("failed to create match"); - - Ok(Response::new(pb::CreatedMatch { - match_id: created_match.base.id, - player_key, - // TODO: can we avoid hardcoding this? - match_url: format!( - "{}/matches/{}", - self.runner_config.root_url, created_match.base.id - ), - })) - } -} - -// TODO: please rename me -struct SyncThingData { - tx: oneshot::Sender<Streaming<pb::ClientMessage>>, - server_messages: mpsc::UnboundedReceiver<Result<pb::ServerMessage, Status>>, -} - -struct RemoteBotSpec { - player_key: String, - router: PlayerRouter, -} - -#[tonic::async_trait] -impl runner::BotSpec for RemoteBotSpec { - async fn run_bot( - &self, - player_id: u32, - event_bus: Arc<Mutex<EventBus>>, - _match_logger: MatchLogger, - ) -> Box<dyn PlayerHandle> { - let (tx, rx) = oneshot::channel(); - let (server_msg_snd, server_msg_recv) = mpsc::unbounded_channel(); - self.router.put( - self.player_key.clone(), - SyncThingData { - tx, - server_messages: server_msg_recv, - }, - ); - - let fut = tokio::time::timeout(Duration::from_secs(10), rx); - match fut.await { - Ok(Ok(client_messages)) => { - // let client_messages = rx.await.unwrap(); - tokio::spawn(handle_bot_messages( - player_id, - event_bus.clone(), - client_messages, - )); - } - _ => { - // ensure router cleanup - self.router.take(&self.player_key); - } - }; - - // If the player did not connect, the receiving half of `sender` - // will be dropped here, resulting in a time-out for every turn. - // This is fine for now, but - // TODO: provide a formal mechanism for player startup failure - Box::new(RemoteBotHandle { - sender: server_msg_snd, - player_id, - event_bus, - }) - } -} - -async fn handle_bot_messages( - player_id: u32, - event_bus: Arc<Mutex<EventBus>>, - mut messages: Streaming<pb::ClientMessage>, -) { - // TODO: can this be writte nmore nicely? - while let Some(message) = messages.message().await.unwrap() { - match message.client_message { - Some(pb::client_message::ClientMessage::RequestResponse(resp)) => { - let request_id = (player_id, resp.request_id as u32); - event_bus - .lock() - .unwrap() - .resolve_request(request_id, Ok(resp.content)); - } - _ => (), - } - } -} - -struct RemoteBotHandle { - sender: mpsc::UnboundedSender<Result<pb::ServerMessage, Status>>, - player_id: u32, - event_bus: Arc<Mutex<EventBus>>, -} - -impl PlayerHandle for RemoteBotHandle { - fn send_request(&mut self, r: RequestMessage) { - let req = pb::PlayerRequest { - request_id: r.request_id as i32, - content: r.content, - }; - - let server_message = pb::ServerMessage { - server_message: Some(pb::server_message::ServerMessage::PlayerRequest(req)), - }; - - let res = self.sender.send(Ok(server_message)); - match res { - Ok(()) => { - // schedule a timeout. See comments at method implementation - tokio::spawn(schedule_timeout( - (self.player_id, r.request_id), - r.timeout, - self.event_bus.clone(), - )); - } - Err(_send_error) => { - // cannot contact the remote bot anymore; - // directly mark all requests as timed out. - // TODO: create a dedicated error type for this. - // should it be logged? - println!("send error: {:?}", _send_error); - self.event_bus - .lock() - .unwrap() - .resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout)); - } - } - } -} - -// TODO: this will spawn a task for every request, which might not be ideal. -// Some alternatives: -// - create a single task that manages all time-outs. -// - intersperse timeouts with incoming client messages -// - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle). -// This was initially not done to allow timer start to be delayed until the message actually arrived -// with the player. Is this still needed, or is there a different way to do this? -// -async fn schedule_timeout( - request_id: (u32, u32), - duration: Duration, - event_bus: Arc<Mutex<EventBus>>, -) { - tokio::time::sleep(duration).await; - event_bus - .lock() - .unwrap() - .resolve_request(request_id, Err(RequestError::Timeout)); -} - -pub async fn run_bot_api(runner_config: Arc<GlobalConfig>, pool: ConnectionPool) { - let router = PlayerRouter::new(); - let server = BotApiServer { - router, - conn_pool: pool, - runner_config, - }; - - let addr = SocketAddr::from(([127, 0, 0, 1], 50051)); - Server::builder() - .add_service(pb::bot_api_service_server::BotApiServiceServer::new(server)) - .serve(addr) - .await - .unwrap() -} |