aboutsummaryrefslogtreecommitdiff
path: root/planetwars-matchrunner/src/bot_runner.rs
blob: 8597e26662616729831aa1a1df9dd69e3d27b646 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::io;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
use tokio::process;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::timeout;

use super::match_context::EventBus;
use super::match_context::PlayerHandle;
use super::match_context::RequestError;
use super::match_context::RequestMessage;
// TODO: this is exactly the same as the docker bot handle.
// should this abstraction be removed?
pub struct LocalBotHandle {
    tx: mpsc::UnboundedSender<RequestMessage>,
    join_handle: JoinHandle<()>,
}

impl PlayerHandle for LocalBotHandle {
    fn send_request(&mut self, r: RequestMessage) {
        self.tx
            .send(r)
            .expect("failed to send message to local bot");
    }

    fn into_join_handle(self: Box<Self>) -> JoinHandle<()> {
        self.join_handle
    }
}

pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle {
    let (tx, rx) = mpsc::unbounded_channel();

    let runner = LocalBotRunner {
        event_bus,
        rx,
        player_id,
        bot,
    };
    let join_handle = tokio::spawn(runner.run());

    LocalBotHandle { tx, join_handle }
}

pub struct LocalBotRunner {
    event_bus: Arc<Mutex<EventBus>>,
    rx: mpsc::UnboundedReceiver<RequestMessage>,
    player_id: u32,
    bot: Bot,
}

impl LocalBotRunner {
    pub async fn run(mut self) {
        let mut process = self.bot.spawn_process();

        while let Some(request) = self.rx.recv().await {
            let resp_fut = process.communicate(&request.content);
            let result = timeout(request.timeout, resp_fut)
                .await
                // TODO: how can this failure be handled cleanly?
                .expect("process read failed");
            let result = match result {
                Ok(line) => Ok(line.into_bytes()),
                Err(_elapsed) => Err(RequestError::Timeout),
            };
            let request_id = (self.player_id, request.request_id);

            self.event_bus
                .lock()
                .unwrap()
                .resolve_request(request_id, result);
        }
    }
}

#[derive(Debug, Clone)]
pub struct Bot {
    pub working_dir: PathBuf,
    pub argv: Vec<String>,
}

impl Bot {
    pub fn spawn_process(&self) -> BotProcess {
        let mut child = process::Command::new(&self.argv[0])
            .args(&self.argv[1..])
            .current_dir(self.working_dir.clone())
            .kill_on_drop(true)
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .expect("spawning failed");

        let stdout = child.stdout.take().unwrap();
        let reader = BufReader::new(stdout).lines();

        BotProcess {
            stdin: child.stdin.take().unwrap(),
            stdout: reader,
            child,
        }
    }
}

pub struct BotProcess {
    #[allow(dead_code)]
    child: process::Child,
    stdin: process::ChildStdin,
    stdout: Lines<BufReader<process::ChildStdout>>,
}

impl BotProcess {
    // TODO: gracefully handle errors
    pub async fn communicate(&mut self, input: &[u8]) -> io::Result<String> {
        self.stdin.write_all(input).await?;
        self.stdin.write_u8(b'\n').await?;
        let line = self.stdout.next_line().await?;
        line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received"))
    }
}