From 67276bd0bbac15fe087edafd59d164c686509b35 Mon Sep 17 00:00:00 2001 From: Ilion Beyst Date: Mon, 25 Jul 2022 22:16:50 +0200 Subject: rename bot_api to client_api --- planetwars-client/build.rs | 2 +- planetwars-client/src/main.rs | 31 +-- planetwars-server/build.rs | 2 +- planetwars-server/src/lib.rs | 4 +- planetwars-server/src/modules/bot_api.rs | 299 --------------------------- planetwars-server/src/modules/client_api.rs | 304 ++++++++++++++++++++++++++++ planetwars-server/src/modules/mod.rs | 2 +- proto/bot_api.proto | 40 ---- proto/client_api.proto | 45 ++++ 9 files changed, 371 insertions(+), 358 deletions(-) delete mode 100644 planetwars-server/src/modules/bot_api.rs create mode 100644 planetwars-server/src/modules/client_api.rs delete mode 100644 proto/bot_api.proto create mode 100644 proto/client_api.proto diff --git a/planetwars-client/build.rs b/planetwars-client/build.rs index acabd08..b58ef08 100644 --- a/planetwars-client/build.rs +++ b/planetwars-client/build.rs @@ -4,6 +4,6 @@ fn main() -> Result<(), Box> { tonic_build::configure() .build_server(false) .build_client(true) - .compile(&["../proto/bot_api.proto"], &["../proto"])?; + .compile(&["../proto/client_api.proto"], &["../proto"])?; Ok(()) } diff --git a/planetwars-client/src/main.rs b/planetwars-client/src/main.rs index 6edcc2e..5053c51 100644 --- a/planetwars-client/src/main.rs +++ b/planetwars-client/src/main.rs @@ -1,9 +1,12 @@ pub mod pb { - tonic::include_proto!("grpc.planetwars.bot_api"); + tonic::include_proto!("grpc.planetwars.client_api"); + + pub use player_api_client_message::ClientMessage as PlayerApiClientMessageType; + pub use player_api_server_message::ServerMessage as PlayerApiServerMessageType; } use clap::Parser; -use pb::bot_api_service_client::BotApiServiceClient; +use pb::client_api_service_client::ClientApiServiceClient; use planetwars_matchrunner::bot_runner::Bot; use serde::Deserialize; use std::{path::PathBuf, time::Duration}; @@ -77,16 +80,19 @@ async fn main() { tokio::time::sleep(Duration::from_secs(1)).await; } -async fn create_match(channel: Channel, opponent_name: String) -> Result { - let mut client = BotApiServiceClient::new(channel); +async fn create_match( + channel: Channel, + opponent_name: String, +) -> Result { + let mut client = ClientApiServiceClient::new(channel); let res = client - .create_match(Request::new(pb::MatchRequest { opponent_name })) + .create_match(Request::new(pb::CreateMatchRequest { opponent_name })) .await; res.map(|response| response.into_inner()) } async fn run_player(bot_config: BotConfig, player_key: String, channel: Channel) { - let mut client = BotApiServiceClient::with_interceptor(channel, |mut req: Request<()>| { + let mut client = ClientApiServiceClient::with_interceptor(channel, |mut req: Request<()>| { let player_key: MetadataValue<_> = player_key.parse().unwrap(); req.metadata_mut().insert("player_key", player_key); Ok(req) @@ -109,18 +115,15 @@ async fn run_player(bot_config: BotConfig, player_key: String, channel: Channel) .unwrap() .into_inner(); while let Some(message) = stream.message().await.unwrap() { - use pb::client_message::ClientMessage; - use pb::server_message::ServerMessage; - match message.server_message { - Some(ServerMessage::PlayerRequest(req)) => { + Some(pb::PlayerApiServerMessageType::ActionRequest(req)) => { let moves = bot_process.communicate(&req.content).await.unwrap(); - let resp = pb::PlayerRequestResponse { - request_id: req.request_id, + let action = pb::PlayerAction { + action_request_id: req.action_request_id, content: moves.as_bytes().to_vec(), }; - let msg = pb::ClientMessage { - client_message: Some(ClientMessage::RequestResponse(resp)), + let msg = pb::PlayerApiClientMessage { + client_message: Some(pb::PlayerApiClientMessageType::Action(action)), }; tx.send(msg).unwrap(); } diff --git a/planetwars-server/build.rs b/planetwars-server/build.rs index 97bf355..87b9671 100644 --- a/planetwars-server/build.rs +++ b/planetwars-server/build.rs @@ -4,6 +4,6 @@ fn main() -> Result<(), Box> { tonic_build::configure() .build_server(true) .build_client(false) - .compile(&["../proto/bot_api.proto"], &["../proto"])?; + .compile(&["../proto/client_api.proto"], &["../proto"])?; Ok(()) } diff --git a/planetwars-server/src/lib.rs b/planetwars-server/src/lib.rs index 3ad0c88..e16b232 100644 --- a/planetwars-server/src/lib.rs +++ b/planetwars-server/src/lib.rs @@ -17,7 +17,7 @@ use bb8::{Pool, PooledConnection}; use bb8_diesel::{self, DieselConnectionManager}; use config::ConfigError; use diesel::{Connection, PgConnection}; -use modules::bot_api::run_bot_api; +use modules::client_api::run_client_api; use modules::ranking::run_ranker; use modules::registry::registry_service; use serde::{Deserialize, Serialize}; @@ -172,7 +172,7 @@ pub async fn run_app() { tokio::spawn(run_ranker(global_config.clone(), db_pool.clone())); } tokio::spawn(run_registry(global_config.clone(), db_pool.clone())); - tokio::spawn(run_bot_api(global_config.clone(), db_pool.clone())); + tokio::spawn(run_client_api(global_config.clone(), db_pool.clone())); let api_service = Router::new() .nest("/api", api()) diff --git a/planetwars-server/src/modules/bot_api.rs b/planetwars-server/src/modules/bot_api.rs deleted file mode 100644 index 3efb1c2..0000000 --- a/planetwars-server/src/modules/bot_api.rs +++ /dev/null @@ -1,299 +0,0 @@ -pub mod pb { - tonic::include_proto!("grpc.planetwars.bot_api"); -} - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; -use runner::match_log::MatchLogger; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic; -use tonic::transport::Server; -use tonic::{Request, Response, Status, Streaming}; - -use planetwars_matchrunner as runner; - -use crate::db; -use crate::util::gen_alphanumeric; -use crate::ConnectionPool; -use crate::GlobalConfig; - -use super::matches::{MatchPlayer, RunMatch}; - -pub struct BotApiServer { - conn_pool: ConnectionPool, - runner_config: Arc, - router: PlayerRouter, -} - -/// Routes players to their handler -#[derive(Clone)] -struct PlayerRouter { - routing_table: Arc>>, -} - -impl PlayerRouter { - pub fn new() -> Self { - PlayerRouter { - routing_table: Arc::new(Mutex::new(HashMap::new())), - } - } -} - -impl Default for PlayerRouter { - fn default() -> Self { - Self::new() - } -} - -// TODO: implement a way to expire entries -impl PlayerRouter { - fn put(&self, player_key: String, entry: SyncThingData) { - let mut routing_table = self.routing_table.lock().unwrap(); - routing_table.insert(player_key, entry); - } - - fn take(&self, player_key: &str) -> Option { - // TODO: this design does not allow for reconnects. Is this desired? - let mut routing_table = self.routing_table.lock().unwrap(); - routing_table.remove(player_key) - } -} - -#[tonic::async_trait] -impl pb::bot_api_service_server::BotApiService for BotApiServer { - type ConnectPlayerStream = UnboundedReceiverStream>; - - async fn connect_player( - &self, - req: Request>, - ) -> Result, Status> { - // TODO: clean up errors - let player_key = req - .metadata() - .get("player_key") - .ok_or_else(|| Status::unauthenticated("no player_key provided"))?; - - let player_key_str = player_key - .to_str() - .map_err(|_| Status::invalid_argument("unreadable string"))?; - - let sync_data = self - .router - .take(player_key_str) - .ok_or_else(|| Status::not_found("player_key not found"))?; - - let stream = req.into_inner(); - - sync_data.tx.send(stream).unwrap(); - Ok(Response::new(UnboundedReceiverStream::new( - sync_data.server_messages, - ))) - } - - async fn create_match( - &self, - req: Request, - ) -> Result, Status> { - // TODO: unify with matchrunner module - let conn = self.conn_pool.get().await.unwrap(); - - let match_request = req.get_ref(); - - let (opponent_bot, opponent_bot_version) = - db::bots::find_bot_with_version_by_name(&match_request.opponent_name, &conn) - .map_err(|_| Status::not_found("opponent not found"))?; - - let player_key = gen_alphanumeric(32); - - let remote_bot_spec = Box::new(RemoteBotSpec { - player_key: player_key.clone(), - router: self.router.clone(), - }); - let run_match = RunMatch::from_players( - self.runner_config.clone(), - vec![ - MatchPlayer::BotSpec { - spec: remote_bot_spec, - }, - MatchPlayer::BotVersion { - bot: Some(opponent_bot), - version: opponent_bot_version, - }, - ], - ); - let (created_match, _) = run_match - .run(self.conn_pool.clone()) - .await - .expect("failed to create match"); - - Ok(Response::new(pb::CreatedMatch { - match_id: created_match.base.id, - player_key, - // TODO: can we avoid hardcoding this? - match_url: format!( - "{}/matches/{}", - self.runner_config.root_url, created_match.base.id - ), - })) - } -} - -// TODO: please rename me -struct SyncThingData { - tx: oneshot::Sender>, - server_messages: mpsc::UnboundedReceiver>, -} - -struct RemoteBotSpec { - player_key: String, - router: PlayerRouter, -} - -#[tonic::async_trait] -impl runner::BotSpec for RemoteBotSpec { - async fn run_bot( - &self, - player_id: u32, - event_bus: Arc>, - _match_logger: MatchLogger, - ) -> Box { - let (tx, rx) = oneshot::channel(); - let (server_msg_snd, server_msg_recv) = mpsc::unbounded_channel(); - self.router.put( - self.player_key.clone(), - SyncThingData { - tx, - server_messages: server_msg_recv, - }, - ); - - let fut = tokio::time::timeout(Duration::from_secs(10), rx); - match fut.await { - Ok(Ok(client_messages)) => { - // let client_messages = rx.await.unwrap(); - tokio::spawn(handle_bot_messages( - player_id, - event_bus.clone(), - client_messages, - )); - } - _ => { - // ensure router cleanup - self.router.take(&self.player_key); - } - }; - - // If the player did not connect, the receiving half of `sender` - // will be dropped here, resulting in a time-out for every turn. - // This is fine for now, but - // TODO: provide a formal mechanism for player startup failure - Box::new(RemoteBotHandle { - sender: server_msg_snd, - player_id, - event_bus, - }) - } -} - -async fn handle_bot_messages( - player_id: u32, - event_bus: Arc>, - mut messages: Streaming, -) { - // TODO: can this be writte nmore nicely? - while let Some(message) = messages.message().await.unwrap() { - match message.client_message { - Some(pb::client_message::ClientMessage::RequestResponse(resp)) => { - let request_id = (player_id, resp.request_id as u32); - event_bus - .lock() - .unwrap() - .resolve_request(request_id, Ok(resp.content)); - } - _ => (), - } - } -} - -struct RemoteBotHandle { - sender: mpsc::UnboundedSender>, - player_id: u32, - event_bus: Arc>, -} - -impl PlayerHandle for RemoteBotHandle { - fn send_request(&mut self, r: RequestMessage) { - let req = pb::PlayerRequest { - request_id: r.request_id as i32, - content: r.content, - }; - - let server_message = pb::ServerMessage { - server_message: Some(pb::server_message::ServerMessage::PlayerRequest(req)), - }; - - let res = self.sender.send(Ok(server_message)); - match res { - Ok(()) => { - // schedule a timeout. See comments at method implementation - tokio::spawn(schedule_timeout( - (self.player_id, r.request_id), - r.timeout, - self.event_bus.clone(), - )); - } - Err(_send_error) => { - // cannot contact the remote bot anymore; - // directly mark all requests as timed out. - // TODO: create a dedicated error type for this. - // should it be logged? - println!("send error: {:?}", _send_error); - self.event_bus - .lock() - .unwrap() - .resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout)); - } - } - } -} - -// TODO: this will spawn a task for every request, which might not be ideal. -// Some alternatives: -// - create a single task that manages all time-outs. -// - intersperse timeouts with incoming client messages -// - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle). -// This was initially not done to allow timer start to be delayed until the message actually arrived -// with the player. Is this still needed, or is there a different way to do this? -// -async fn schedule_timeout( - request_id: (u32, u32), - duration: Duration, - event_bus: Arc>, -) { - tokio::time::sleep(duration).await; - event_bus - .lock() - .unwrap() - .resolve_request(request_id, Err(RequestError::Timeout)); -} - -pub async fn run_bot_api(runner_config: Arc, pool: ConnectionPool) { - let router = PlayerRouter::new(); - let server = BotApiServer { - router, - conn_pool: pool, - runner_config, - }; - - let addr = SocketAddr::from(([127, 0, 0, 1], 50051)); - Server::builder() - .add_service(pb::bot_api_service_server::BotApiServiceServer::new(server)) - .serve(addr) - .await - .unwrap() -} diff --git a/planetwars-server/src/modules/client_api.rs b/planetwars-server/src/modules/client_api.rs new file mode 100644 index 0000000..7026671 --- /dev/null +++ b/planetwars-server/src/modules/client_api.rs @@ -0,0 +1,304 @@ +pub mod pb { + tonic::include_proto!("grpc.planetwars.client_api"); + + pub use player_api_client_message::ClientMessage as PlayerApiClientMessageType; + pub use player_api_server_message::ServerMessage as PlayerApiServerMessageType; +} + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; +use runner::match_log::MatchLogger; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tonic; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; + +use planetwars_matchrunner as runner; + +use crate::db; +use crate::util::gen_alphanumeric; +use crate::ConnectionPool; +use crate::GlobalConfig; + +use super::matches::{MatchPlayer, RunMatch}; + +pub struct ClientApiServer { + conn_pool: ConnectionPool, + runner_config: Arc, + router: PlayerRouter, +} + +/// Routes players to their handler +#[derive(Clone)] +struct PlayerRouter { + routing_table: Arc>>, +} + +impl PlayerRouter { + pub fn new() -> Self { + PlayerRouter { + routing_table: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl Default for PlayerRouter { + fn default() -> Self { + Self::new() + } +} + +// TODO: implement a way to expire entries +impl PlayerRouter { + fn put(&self, player_key: String, entry: SyncThingData) { + let mut routing_table = self.routing_table.lock().unwrap(); + routing_table.insert(player_key, entry); + } + + fn take(&self, player_key: &str) -> Option { + // TODO: this design does not allow for reconnects. Is this desired? + let mut routing_table = self.routing_table.lock().unwrap(); + routing_table.remove(player_key) + } +} + +#[tonic::async_trait] +impl pb::client_api_service_server::ClientApiService for ClientApiServer { + type ConnectPlayerStream = UnboundedReceiverStream>; + + async fn connect_player( + &self, + req: Request>, + ) -> Result, Status> { + // TODO: clean up errors + let player_key = req + .metadata() + .get("player_key") + .ok_or_else(|| Status::unauthenticated("no player_key provided"))?; + + let player_key_str = player_key + .to_str() + .map_err(|_| Status::invalid_argument("unreadable string"))?; + + let sync_data = self + .router + .take(player_key_str) + .ok_or_else(|| Status::not_found("player_key not found"))?; + + let stream = req.into_inner(); + + sync_data.tx.send(stream).unwrap(); + Ok(Response::new(UnboundedReceiverStream::new( + sync_data.server_messages, + ))) + } + + async fn create_match( + &self, + req: Request, + ) -> Result, Status> { + // TODO: unify with matchrunner module + let conn = self.conn_pool.get().await.unwrap(); + + let match_request = req.get_ref(); + + let (opponent_bot, opponent_bot_version) = + db::bots::find_bot_with_version_by_name(&match_request.opponent_name, &conn) + .map_err(|_| Status::not_found("opponent not found"))?; + + let player_key = gen_alphanumeric(32); + + let remote_bot_spec = Box::new(RemoteBotSpec { + player_key: player_key.clone(), + router: self.router.clone(), + }); + let run_match = RunMatch::from_players( + self.runner_config.clone(), + vec![ + MatchPlayer::BotSpec { + spec: remote_bot_spec, + }, + MatchPlayer::BotVersion { + bot: Some(opponent_bot), + version: opponent_bot_version, + }, + ], + ); + let (created_match, _) = run_match + .run(self.conn_pool.clone()) + .await + .expect("failed to create match"); + + Ok(Response::new(pb::CreateMatchResponse { + match_id: created_match.base.id, + player_key, + // TODO: can we avoid hardcoding this? + match_url: format!( + "{}/matches/{}", + self.runner_config.root_url, created_match.base.id + ), + })) + } +} + +// TODO: please rename me +struct SyncThingData { + tx: oneshot::Sender>, + server_messages: mpsc::UnboundedReceiver>, +} + +struct RemoteBotSpec { + player_key: String, + router: PlayerRouter, +} + +#[tonic::async_trait] +impl runner::BotSpec for RemoteBotSpec { + async fn run_bot( + &self, + player_id: u32, + event_bus: Arc>, + _match_logger: MatchLogger, + ) -> Box { + let (tx, rx) = oneshot::channel(); + let (server_msg_snd, server_msg_recv) = mpsc::unbounded_channel(); + self.router.put( + self.player_key.clone(), + SyncThingData { + tx, + server_messages: server_msg_recv, + }, + ); + + let fut = tokio::time::timeout(Duration::from_secs(10), rx); + match fut.await { + Ok(Ok(client_messages)) => { + // let client_messages = rx.await.unwrap(); + tokio::spawn(handle_bot_messages( + player_id, + event_bus.clone(), + client_messages, + )); + } + _ => { + // ensure router cleanup + self.router.take(&self.player_key); + } + }; + + // If the player did not connect, the receiving half of `sender` + // will be dropped here, resulting in a time-out for every turn. + // This is fine for now, but + // TODO: provide a formal mechanism for player startup failure + Box::new(RemoteBotHandle { + sender: server_msg_snd, + player_id, + event_bus, + }) + } +} + +async fn handle_bot_messages( + player_id: u32, + event_bus: Arc>, + mut messages: Streaming, +) { + // TODO: can this be writte nmore nicely? + while let Some(message) = messages.message().await.unwrap() { + match message.client_message { + Some(pb::PlayerApiClientMessageType::Action(resp)) => { + let request_id = (player_id, resp.action_request_id as u32); + event_bus + .lock() + .unwrap() + .resolve_request(request_id, Ok(resp.content)); + } + _ => (), + } + } +} + +struct RemoteBotHandle { + sender: mpsc::UnboundedSender>, + player_id: u32, + event_bus: Arc>, +} + +impl PlayerHandle for RemoteBotHandle { + fn send_request(&mut self, r: RequestMessage) { + let req = pb::PlayerActionRequest { + action_request_id: r.request_id as i32, + content: r.content, + }; + + let server_message = pb::PlayerApiServerMessage { + server_message: Some(pb::PlayerApiServerMessageType::ActionRequest(req)), + }; + + let res = self.sender.send(Ok(server_message)); + match res { + Ok(()) => { + // schedule a timeout. See comments at method implementation + tokio::spawn(schedule_timeout( + (self.player_id, r.request_id), + r.timeout, + self.event_bus.clone(), + )); + } + Err(_send_error) => { + // cannot contact the remote bot anymore; + // directly mark all requests as timed out. + // TODO: create a dedicated error type for this. + // should it be logged? + println!("send error: {:?}", _send_error); + self.event_bus + .lock() + .unwrap() + .resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout)); + } + } + } +} + +// TODO: this will spawn a task for every request, which might not be ideal. +// Some alternatives: +// - create a single task that manages all time-outs. +// - intersperse timeouts with incoming client messages +// - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle). +// This was initially not done to allow timer start to be delayed until the message actually arrived +// with the player. Is this still needed, or is there a different way to do this? +// +async fn schedule_timeout( + request_id: (u32, u32), + duration: Duration, + event_bus: Arc>, +) { + tokio::time::sleep(duration).await; + event_bus + .lock() + .unwrap() + .resolve_request(request_id, Err(RequestError::Timeout)); +} + +pub async fn run_client_api(runner_config: Arc, pool: ConnectionPool) { + let router = PlayerRouter::new(); + let server = ClientApiServer { + router, + conn_pool: pool, + runner_config, + }; + + let addr = SocketAddr::from(([127, 0, 0, 1], 50051)); + Server::builder() + .add_service(pb::client_api_service_server::ClientApiServiceServer::new( + server, + )) + .serve(addr) + .await + .unwrap() +} diff --git a/planetwars-server/src/modules/mod.rs b/planetwars-server/src/modules/mod.rs index 1200f9d..1d3ddbf 100644 --- a/planetwars-server/src/modules/mod.rs +++ b/planetwars-server/src/modules/mod.rs @@ -1,7 +1,7 @@ // This module implements general domain logic, not directly // tied to the database or API layers. -pub mod bot_api; pub mod bots; +pub mod client_api; pub mod matches; pub mod ranking; pub mod registry; diff --git a/proto/bot_api.proto b/proto/bot_api.proto deleted file mode 100644 index 69a319a..0000000 --- a/proto/bot_api.proto +++ /dev/null @@ -1,40 +0,0 @@ -syntax = "proto3"; - -package grpc.planetwars.bot_api; - -message ServerMessage { - oneof server_message { - PlayerRequest player_request = 1; - } -} -message PlayerRequest { - int32 request_id = 1; - bytes content = 2; -} - -message ClientMessage { - oneof client_message { - PlayerRequestResponse request_response = 1; - } -} - -message PlayerRequestResponse { - int32 request_id = 1; - bytes content = 2; -} - -message MatchRequest { - string opponent_name = 1; -} - -message CreatedMatch { - int32 match_id = 1; - string player_key = 2; - string match_url = 3; -} - -service BotApiService { - rpc CreateMatch(MatchRequest) returns (CreatedMatch); - // server sends requests to the player, player responds - rpc ConnectPlayer(stream ClientMessage) returns (stream ServerMessage); -} diff --git a/proto/client_api.proto b/proto/client_api.proto new file mode 100644 index 0000000..3f1b956 --- /dev/null +++ b/proto/client_api.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package grpc.planetwars.client_api; + +// Provides the planetwars client API, allowing for remote play. +service ClientApiService { + rpc CreateMatch(CreateMatchRequest) returns (CreateMatchResponse); + rpc ConnectPlayer(stream PlayerApiClientMessage) returns (stream PlayerApiServerMessage); +} + +message CreateMatchRequest { + string opponent_name = 1; +} + +message CreateMatchResponse { + int32 match_id = 1; + string player_key = 2; + string match_url = 3; +} + + +// Server messages +message PlayerApiServerMessage { + oneof server_message { + PlayerActionRequest action_request = 1; + } +} + +message PlayerActionRequest { + int32 action_request_id = 1; + bytes content = 2; +} + + +// Player messages +message PlayerApiClientMessage { + oneof client_message { + PlayerAction action = 1; + } +} + +message PlayerAction { + int32 action_request_id = 1; + bytes content = 2; +} -- cgit v1.2.3