1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
use std::io;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
use tokio::process;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use super::match_context::EventBus;
use super::match_context::PlayerHandle;
use super::match_context::RequestError;
use super::match_context::RequestMessage;
// TODO: this is exactly the same as the docker bot handle.
// should this abstraction be removed?
pub struct LocalBotHandle {
tx: mpsc::UnboundedSender<RequestMessage>,
join_handle: JoinHandle<()>,
}
impl PlayerHandle for LocalBotHandle {
fn send_request(&mut self, r: RequestMessage) {
self.tx
.send(r)
.expect("failed to send message to local bot");
}
fn into_join_handle(self: Box<Self>) -> JoinHandle<()> {
self.join_handle
}
}
pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle {
let (tx, rx) = mpsc::unbounded_channel();
let runner = LocalBotRunner {
event_bus,
rx,
player_id,
bot,
};
let join_handle = tokio::spawn(runner.run());
LocalBotHandle { tx, join_handle }
}
pub struct LocalBotRunner {
event_bus: Arc<Mutex<EventBus>>,
rx: mpsc::UnboundedReceiver<RequestMessage>,
player_id: u32,
bot: Bot,
}
impl LocalBotRunner {
pub async fn run(mut self) {
let mut process = self.bot.spawn_process();
while let Some(request) = self.rx.recv().await {
let resp_fut = process.communicate(&request.content);
let result = timeout(request.timeout, resp_fut)
.await
// TODO: how can this failure be handled cleanly?
.expect("process read failed");
let result = match result {
Ok(line) => Ok(line.into_bytes()),
Err(_elapsed) => Err(RequestError::Timeout),
};
let request_id = (self.player_id, request.request_id);
self.event_bus
.lock()
.unwrap()
.resolve_request(request_id, result);
}
}
}
#[derive(Debug, Clone)]
pub struct Bot {
pub working_dir: PathBuf,
pub argv: Vec<String>,
}
impl Bot {
pub fn spawn_process(&self) -> BotProcess {
let mut child = process::Command::new(&self.argv[0])
.args(&self.argv[1..])
.current_dir(self.working_dir.clone())
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.expect("spawning failed");
let stdout = child.stdout.take().unwrap();
let reader = BufReader::new(stdout).lines();
BotProcess {
stdin: child.stdin.take().unwrap(),
stdout: reader,
child,
}
}
}
pub struct BotProcess {
#[allow(dead_code)]
pub child: process::Child,
pub stdin: process::ChildStdin,
pub stdout: Lines<BufReader<process::ChildStdout>>,
}
impl BotProcess {
// TODO: gracefully handle errors
pub async fn communicate(&mut self, input: &[u8]) -> io::Result<String> {
self.stdin.write_all(input).await?;
self.stdin.write_u8(b'\n').await?;
let line = self.stdout.next_line().await?;
line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received"))
}
}
|