1
//! Launching pluggable transport binaries and communicating with them.
2
//!
3
//! This module contains utilities to launch pluggable transports supporting pt-spec.txt
4
//! version 1, and communicate with them in order to specify configuration parameters and
5
//! receive updates as to the current state of the PT.
6

            
7
use crate::PtClientMethod;
8
use crate::err;
9
use crate::err::PtError;
10
use futures::StreamExt;
11
use futures::channel::mpsc::Receiver;
12
use itertools::Itertools;
13
use std::borrow::Cow;
14
use std::collections::HashMap;
15
use std::ffi::OsString;
16
use std::io::{BufRead, BufReader};
17
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
18
use std::path::PathBuf;
19
use std::process::{Child, Command, Stdio};
20
use std::str::FromStr;
21
use std::sync::Arc;
22
use std::{io, thread};
23
use tor_basic_utils::PathExt as _;
24
use tor_error::{internal, warn_report};
25
use tor_linkspec::PtTransportName;
26
use tor_rtcompat::{Runtime, SleepProviderExt};
27
use tor_socksproto::SocksVersion;
28
use tracing::{debug, error, info, trace, warn};
29
use web_time_compat::{Duration, Instant, InstantExt};
30

            
31
/// Amount of time we give a pluggable transport child process to exit gracefully.
32
const GRACEFUL_EXIT_TIME: Duration = Duration::from_secs(5);
33
/// Default timeout for PT binary startup.
34
const PT_START_TIMEOUT: Duration = Duration::from_secs(30);
35
/// Size for the buffer storing pluggable transport stdout lines.
36
const PT_STDIO_BUFFER: usize = 64;
37

            
38
/// An arbitrary key/value status update from a pluggable transport.
39
#[derive(PartialEq, Eq, Debug, Clone)]
40
pub struct PtStatus {
41
    /// Arbitrary key-value data about the state of this transport, from the binary running
42
    /// said transport.
43
    // NOTE(eta): This is assumed to not have duplicate keys.
44
    data: HashMap<String, String>,
45
}
46

            
47
/// A message sent from a pluggable transport child process.
48
///
49
/// For more in-depth information about these messages, consult pt-spec.txt.
50
#[derive(PartialEq, Eq, Debug, Clone)]
51
#[non_exhaustive]
52
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
53
pub enum PtMessage {
54
    /// `VERSION-ERROR`: No compatible pluggable transport specification version was provided.
55
    VersionError(String),
56
    /// `VERSION`: Specifies the version the binary is using for the IPC protocol.
57
    Version(String),
58
    /// `ENV-ERROR`: Reports an error with the provided environment variables.
59
    EnvError(String),
60
    /// `PROXY DONE`: The configured proxy was correctly initialised.
61
    ProxyDone,
62
    /// `PROXY-ERROR`: An error was encountered setting up the configured proxy.
63
    ProxyError(String),
64
    /// `CMETHOD`: A client transport has been launched.
65
    ClientTransportLaunched {
66
        /// The name of the launched transport.
67
        transport: PtTransportName,
68
        /// The protocol used ('socks4' or 'socks5').
69
        protocol: String,
70
        /// An address to connect via this transport.
71
        /// (This should be localhost.)
72
        endpoint: SocketAddr,
73
    },
74
    /// `CMETHOD-ERROR`: An error was encountered setting up a client transport.
75
    ClientTransportFailed {
76
        /// The name of the transport.
77
        transport: PtTransportName,
78
        /// The error message.
79
        message: String,
80
    },
81
    /// `CMETHODS DONE`: All client transports that are supported have been launched.
82
    ClientTransportsDone,
83
    /// `SMETHOD`: A server transport has been launched.
84
    ServerTransportLaunched {
85
        /// The name of the launched transport.
86
        transport: PtTransportName,
87
        /// The endpoint clients should use the reach the transport.
88
        endpoint: SocketAddr,
89
        /// Additional per-transport information.
90
        // NOTE(eta): This assumes it actually is k/v and repeated keys aren't allowed...
91
        options: HashMap<String, String>,
92
    },
93
    /// `SMETHOD-ERROR`: An error was encountered setting up a server transport.
94
    ServerTransportFailed {
95
        /// The name of the transport.
96
        transport: PtTransportName,
97
        /// The error message.
98
        message: String,
99
    },
100
    /// `SMETHODS DONE`: All server transports that are supported have been launched.
101
    ServerTransportsDone,
102
    /// `LOG`: A log message.
103
    Log {
104
        /// The severity (one of 'error', 'warning', 'notice', 'info', 'debug').
105
        severity: String,
106
        /// The log message.
107
        message: String,
108
    },
109
    /// `STATUS`: Arbitrary key/value status messages.
110
    Status(PtStatus),
111
    /// A line containing an unknown command.
112
    Unknown(String),
113
}
114

            
115
/// Parse a value (something on the RHS of an =), which could be a CString as defined by
116
/// control-spec.txt §2. Returns (value, unparsed rest of string).
117
#[allow(clippy::string_slice)] // TODO
118
44
fn parse_one_value(from: &str) -> Result<(String, &str), &'static str> {
119
44
    let first_char = from.chars().next();
120
44
    Ok(if first_char.is_none() {
121
2
        (String::new(), "")
122
42
    } else if let Some('"') = first_char {
123
        // This is a CString, so we're going to need to parse it char-by-char.
124
        // FIXME(eta): This currently doesn't parse octal escape codes, even though the spec says
125
        //             we should. That's finicky, though, and probably not used.
126
26
        let mut ret = String::new();
127
26
        let mut chars = from.chars();
128
26
        assert_eq!(chars.next(), Some('"')); // discard "
129
        loop {
130
112
            let ch = chars.next().ok_or("ran out of input parsing CString")?;
131
112
            match ch {
132
26
                '\\' => match chars
133
26
                    .next()
134
26
                    .ok_or("encountered trailing backslash in CString")?
135
                {
136
2
                    'n' => ret.push('\n'),
137
2
                    'r' => ret.push('\r'),
138
2
                    't' => ret.push('\t'),
139
20
                    '0'..='8' => return Err("attempted unsupported octal escape code"),
140
2
                    ch2 => ret.push(ch2),
141
                },
142
8
                '"' => break,
143
78
                _ => ret.push(ch),
144
            }
145
        }
146
8
        (ret, chars.as_str())
147
    } else {
148
        // Simple: just find the space
149
16
        let space = from.find(' ').unwrap_or(from.len());
150
16
        (from[0..space].into(), &from[space..])
151
    })
152
44
}
153

            
154
/// Chomp one key/value pair off a list of smethod args.
155
/// Returns (k, v, unparsed rest of string).
156
/// Will also chomp the comma at the end, if there is one.
157
12
fn parse_one_smethod_arg(args: &str) -> Result<(String, String, &str), &'static str> {
158
    // NOTE(eta): Apologies for this looking a bit gnarly. Ideally, this is what you'd use
159
    //            something like `nom` for, but I didn't want to bring in a dep just for this.
160

            
161
12
    let mut key = String::new();
162
12
    let mut val = String::new();
163
    // If true, we're reading the value, not the key.
164
12
    let mut reading_val = false;
165
12
    let mut chars = args.chars();
166
248
    while let Some(c) = chars.next() {
167
242
        let target = if reading_val { &mut val } else { &mut key };
168
242
        match c {
169
            '\\' => {
170
2
                let c = chars
171
2
                    .next()
172
2
                    .ok_or("smethod arg terminates with backslash")?;
173
                target.push(c);
174
            }
175
            '=' => {
176
12
                if reading_val {
177
2
                    return Err("encountered = while parsing value");
178
10
                }
179
10
                reading_val = true;
180
            }
181
2
            ',' => break,
182
226
            c => target.push(c),
183
        }
184
    }
185
8
    if !reading_val {
186
2
        return Err("ran out of chars parsing smethod arg");
187
6
    }
188
6
    Ok((key, val, chars.as_str()))
189
12
}
190

            
191
impl FromStr for PtMessage {
192
    type Err = Cow<'static, str>;
193

            
194
    // NOTE(eta): This, of course, implies that the PT IPC communications are valid UTF-8.
195
    //            This assumption might turn out to be false.
196
    #[allow(clippy::cognitive_complexity)]
197
    #[allow(clippy::string_slice)] // TODO
198
62
    fn from_str(s: &str) -> Result<Self, Self::Err> {
199
        // TODO(eta): Maybe tolerate additional whitespace (using `split_whitespace`)?.
200
        //            This requires modified words.join() logic, though.
201
62
        let mut words = s.split(' ');
202
62
        let first_word = words.next().ok_or_else(|| Cow::from("empty line"))?;
203
62
        Ok(match first_word {
204
62
            "VERSION-ERROR" => {
205
2
                let rest = words.join(" ");
206
2
                Self::VersionError(rest)
207
            }
208
60
            "VERSION" => {
209
2
                let vers = words.next().ok_or_else(|| Cow::from("no version"))?;
210
2
                Self::Version(vers.into())
211
            }
212
58
            "ENV-ERROR" => {
213
2
                let rest = words.join(" ");
214
2
                Self::EnvError(rest)
215
            }
216
56
            "PROXY" => match words.next() {
217
2
                Some("DONE") => Self::ProxyDone,
218
                _ => Self::Unknown(s.into()),
219
            },
220
54
            "PROXY-ERROR" => {
221
2
                let rest = words.join(" ");
222
2
                Self::ProxyError(rest)
223
            }
224
52
            "CMETHOD" => {
225
2
                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
226
2
                let protocol = words.next().ok_or_else(|| Cow::from("no protocol"))?;
227
2
                let endpoint = words
228
2
                    .next()
229
2
                    .ok_or_else(|| Cow::from("no endpoint"))?
230
2
                    .parse::<SocketAddr>()
231
2
                    .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
232
2
                if !endpoint.ip().is_loopback() {
233
                    return Err(Cow::from(format!(
234
                        "CMETHOD endpoint {endpoint} was not localhost"
235
                    )));
236
2
                }
237
                Self::ClientTransportLaunched {
238
2
                    transport: transport
239
2
                        .parse()
240
2
                        .map_err(|_| Cow::from("bad transport ID"))?,
241
2
                    protocol: protocol.to_string(),
242
2
                    endpoint,
243
                }
244
            }
245
50
            "CMETHOD-ERROR" => {
246
2
                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
247
2
                let rest = words.join(" ");
248
                Self::ClientTransportFailed {
249
2
                    transport: transport
250
2
                        .parse()
251
2
                        .map_err(|_| Cow::from("bad transport ID"))?,
252
2
                    message: rest,
253
                }
254
            }
255
48
            "CMETHODS" => match words.next() {
256
2
                Some("DONE") => Self::ClientTransportsDone,
257
                _ => Self::Unknown(s.into()),
258
            },
259
46
            "SMETHOD" => {
260
12
                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
261
12
                let endpoint = words
262
12
                    .next()
263
12
                    .ok_or_else(|| Cow::from("no endpoint"))?
264
12
                    .parse::<SocketAddr>()
265
12
                    .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
266
                // The SMETHOD endpoint is the place where _clients_ connect, and it shouldn't be localhost.
267
12
                let mut parsed_args = HashMap::new();
268

            
269
                // NOTE(eta): pt-spec.txt seems to imply these options can't contain spaces, so
270
                //            we work under that assumption.
271
                //            It also doesn't actually parse them out -- but seeing as the API to
272
                //            feed these back in will want them as separated k/v pairs, I think
273
                //            it makes sense to here.
274
12
                for option in words {
275
10
                    if let Some(mut args) = option.strip_prefix("ARGS:") {
276
16
                        while !args.is_empty() {
277
15
                            let (k, v, rest) = parse_one_smethod_arg(args).map_err(|e| {
278
6
                                Cow::from(format!("failed to parse SMETHOD ARGS: {}", e))
279
9
                            })?;
280
6
                            if parsed_args.contains_key(&k) {
281
                                // At least check our assumption that this is actually k/v
282
                                // and not Vec<(String, String)>.
283
                                warn!("PT SMETHOD arguments contain repeated key {}!", k);
284
6
                            }
285
6
                            parsed_args.insert(k, v);
286
6
                            args = rest;
287
                        }
288
                    }
289
                }
290
                Self::ServerTransportLaunched {
291
6
                    transport: transport
292
6
                        .parse()
293
6
                        .map_err(|_| Cow::from("bad transport ID"))?,
294
6
                    endpoint,
295
6
                    options: parsed_args,
296
                }
297
            }
298
34
            "SMETHOD-ERROR" => {
299
2
                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
300
2
                let rest = words.join(" ");
301
                Self::ServerTransportFailed {
302
2
                    transport: transport
303
2
                        .parse()
304
2
                        .map_err(|_| Cow::from("bad transport ID"))?,
305
2
                    message: rest,
306
                }
307
            }
308
32
            "SMETHODS" => match words.next() {
309
                Some("DONE") => Self::ServerTransportsDone,
310
                _ => Self::Unknown(s.into()),
311
            },
312
32
            "LOG" => {
313
26
                let severity = words
314
26
                    .next()
315
26
                    .ok_or_else(|| Cow::from("no severity"))?
316
26
                    .strip_prefix("SEVERITY=")
317
26
                    .ok_or_else(|| Cow::from("badly formatted severity"))?;
318
26
                let message = words.join(" ");
319
26
                let message = parse_one_value(
320
26
                    message
321
26
                        .strip_prefix("MESSAGE=")
322
26
                        .ok_or_else(|| Cow::from("no or badly formatted message"))?,
323
                )
324
26
                .map_err(Cow::from)?
325
                .0;
326
8
                Self::Log {
327
8
                    severity: severity.into(),
328
8
                    message,
329
8
                }
330
            }
331
6
            "STATUS" => {
332
6
                let mut ret = HashMap::new();
333
6
                let message = words.join(" ");
334
6
                let mut message = &message as &str;
335
24
                while !message.is_empty() {
336
18
                    let equals = message
337
18
                        .find('=')
338
18
                        .ok_or_else(|| Cow::from(format!("failed to find = in '{}'", message)))?;
339
18
                    let k = &message[..equals];
340
18
                    if equals + 1 == message.len() {
341
                        return Err(Cow::from("key with no value"));
342
18
                    }
343
18
                    let (v, rest) = parse_one_value(&message[(equals + 1)..]).map_err(Cow::from)?;
344
18
                    if ret.contains_key(k) {
345
                        // At least check our assumption that this is actually k/v
346
                        // and not Vec<(String, String)>.
347
                        warn!("STATUS contains repeated key {}!", k);
348
18
                    }
349
18
                    ret.insert(k.to_owned(), v);
350
18
                    message = rest;
351
18
                    if message.starts_with(' ') {
352
12
                        message = &message[1..];
353
12
                    }
354
                }
355
6
                Self::Status(PtStatus { data: ret })
356
            }
357
            _ => Self::Unknown(s.into()),
358
        })
359
62
    }
360
}
361

            
362
use sealed::*;
363
/// Sealed trait to protect private types and default trait implementations
364
pub(crate) mod sealed {
365
    use super::*;
366

            
367
    /// A handle to receive lines from a pluggable transport process' stdout asynchronously.
368
    //
369
    // FIXME(eta): This currently spawns an OS thread, since there's no other way to do this without
370
    //             being async-runtime dependent (or adding process spawning to tor-rtcompat).
371
    #[derive(Debug)]
372
    pub struct AsyncPtChild {
373
        /// Channel to receive lines from the child process stdout.
374
        stdout: Receiver<io::Result<String>>,
375
        /// Identifier to put in logging messages.
376
        pub identifier: String,
377
    }
378

            
379
    impl AsyncPtChild {
380
        /// Wrap an OS child process by spawning a worker thread to forward output from the child
381
        /// to the asynchronous runtime via use of a channel.
382
        pub fn new(mut child: Child, identifier: String) -> Result<Self, PtError> {
383
            let (stdin, stdout) = (
384
                child.stdin.take().ok_or_else(|| {
385
                    PtError::Internal(internal!("Created child process without stdin pipe"))
386
                })?,
387
                child.stdout.take().ok_or_else(|| {
388
                    PtError::Internal(internal!("Created child process without stdout pipe"))
389
                })?,
390
            );
391
            // TODO RELAY #1649 We don't use a tor_memquota::mq_queue here yet
392
            let (mut tx, rx) = tor_async_utils::mpsc_channel_no_memquota(PT_STDIO_BUFFER);
393
            let ident = identifier.clone();
394
            #[allow(clippy::cognitive_complexity)]
395
            thread::spawn(move || {
396
                let reader = BufReader::new(stdout);
397
                let _stdin = stdin;
398
                let mut noted_full = false;
399
                // Forward lines from the blocking reader to the async channel.
400
                for line in reader.lines() {
401
                    let err = line.is_err();
402
                    match &line {
403
                        Ok(l) => trace!("<-- PT {}: {:?}", ident, l),
404
                        Err(e) => trace!("<-- PT {}: Error: {:?}", ident, e),
405
                    }
406
                    if let Err(e) = tx.try_send(line) {
407
                        if e.is_disconnected() {
408
                            debug!("PT {} is disconnected; shutting it down.", ident);
409
                            // Channel dropped, so shut down the pluggable transport process.
410
                            break;
411
                        }
412
                        // The other kind of error is "full", which we can't do anything about.
413
                        // Just throw the line away.
414
                        if !noted_full {
415
                            noted_full = true; // warn only once per PT.
416
                            warn!(
417
                                "Bug: Message queue for PT {} became full; dropping message",
418
                                ident
419
                            );
420
                        }
421
                    }
422
                    if err {
423
                        // Encountered an error reading, so ensure the process is shut down (it's
424
                        // probably "broken pipe" anyway, so this is slightly redundant, but the
425
                        // rest of the code assumes errors are nonrecoverable).
426
                        break;
427
                    }
428
                }
429
                // Has it already quit? If so, just exit now.
430
                if let Ok(Some(_)) = child.try_wait() {
431
                    // FIXME(eta): We currently throw away the exit code, which might be useful
432
                    //             for debugging purposes!
433
                    debug!("PT {} has exited.", ident);
434
                    return;
435
                }
436
                // Otherwise, tell it to exit.
437
                // Dropping stdin should tell the PT to exit, since we set the correct environment
438
                // variable for that to happen.
439
                trace!("Asking PT {} to exit, nicely.", ident);
440
                drop(_stdin);
441
                // Give it some time to exit.
442
                thread::sleep(GRACEFUL_EXIT_TIME);
443
                match child.try_wait() {
444
                    Ok(None) => {
445
                        // Kill it.
446
                        debug!("Sending kill signal to PT {}", ident);
447
                        if let Err(e) = child.kill() {
448
                            warn_report!(e, "Failed to kill() spawned PT {}", ident);
449
                        }
450
                    }
451
                    Ok(Some(_)) => {
452
                        debug!("PT {} shut down successfully.", ident);
453
                    } // It exited.
454
                    Err(e) => {
455
                        warn_report!(e, "Failed to call try_wait() on spawned PT {}", ident);
456
                    }
457
                }
458
            });
459
            Ok(AsyncPtChild {
460
                stdout: rx,
461
                identifier,
462
            })
463
        }
464

            
465
        /// Receive a message from the pluggable transport binary asynchronously.
466
        ///
467
        /// Note: This will convert `PtMessage::Log` into a tracing log call automatically.
468
        #[allow(clippy::cognitive_complexity)] // due to tracing
469
        pub async fn recv(&mut self) -> err::Result<PtMessage> {
470
            loop {
471
                match self.stdout.next().await {
472
                    None => return Err(PtError::ChildGone),
473
                    Some(Ok(line)) => {
474
                        let line =
475
                            line.parse::<PtMessage>()
476
                                .map_err(|e| PtError::IpcParseFailed {
477
                                    line,
478
                                    error: e.into(),
479
                                })?;
480
                        if let PtMessage::Log { severity, message } = line {
481
                            // FIXME(eta): I wanted to make this integrate with `tracing` more nicely,
482
                            //             but gave up after 15 minutes of clicking through spaghetti.
483
                            match &severity as &str {
484
                                "error" => error!("[pt {}] {}", self.identifier, message),
485
                                "warning" => warn!("[pt {}] {}", self.identifier, message),
486
                                "notice" => info!("[pt {}] {}", self.identifier, message),
487
                                "info" => debug!("[pt {}] {}", self.identifier, message),
488
                                "debug" => trace!("[pt {}] {}", self.identifier, message),
489
                                x => warn!("[pt] {} {} {}", self.identifier, x, message),
490
                            }
491
                        } else {
492
                            return Ok(line);
493
                        }
494
                    }
495
                    Some(Err(e)) => {
496
                        return Err(PtError::ChildReadFailed(Arc::new(e)));
497
                    }
498
                }
499
            }
500
        }
501
    }
502

            
503
    /// Defines some helper methods that are required later on
504
    #[async_trait::async_trait]
505
    pub trait PluggableTransportPrivate {
506
        /// Return the [`AsyncPtChild`] if it exists
507
        fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError>;
508

            
509
        /// Set the [`AsyncPtChild`]
510
        fn set_inner(&mut self, newval: Option<AsyncPtChild>);
511

            
512
        /// Return a loggable identifier for this transport.
513
        fn identifier(&self) -> &str;
514

            
515
        /// Checks whether a transport is specified in our specific parameters
516
        fn specific_params_contains(&self, transport: &PtTransportName) -> bool;
517

            
518
        /// Common handler for `ClientTransportLaunched` and `ServerTransportLaunched`
519
        fn common_transport_launched_handler(
520
            &self,
521
            protocol: Option<String>,
522
            transport: PtTransportName,
523
            endpoint: SocketAddr,
524
            methods: &mut HashMap<PtTransportName, PtClientMethod>,
525
        ) -> Result<(), PtError> {
526
            if !self.specific_params_contains(&transport) {
527
                return Err(PtError::ProtocolViolation(format!(
528
                    "binary launched unwanted transport '{}'",
529
                    transport
530
                )));
531
            }
532
            let protocol = match protocol {
533
                Some(protocol_str) => match &protocol_str as &str {
534
                    "socks4" => SocksVersion::V4,
535
                    "socks5" => SocksVersion::V5,
536
                    x => {
537
                        return Err(PtError::ProtocolViolation(format!(
538
                            "unknown CMETHOD protocol '{}'",
539
                            x
540
                        )));
541
                    }
542
                },
543
                None => SocksVersion::V5,
544
            };
545
            let method = PtClientMethod {
546
                kind: protocol,
547
                endpoint,
548
            };
549
            info!("Transport '{}' uses method {:?}", transport, method);
550
            methods.insert(transport, method);
551
            Ok(())
552
        }
553

            
554
        /// Attempt to launch the PT and return the corresponding `[AsyncPtChild]`
555
        fn get_child_from_pt_launch(
556
            inner: &Option<AsyncPtChild>,
557
            transports: &Vec<PtTransportName>,
558
            binary_path: &PathBuf,
559
            arguments: &[String],
560
            all_env_vars: HashMap<OsString, OsString>,
561
        ) -> Result<AsyncPtChild, PtError> {
562
            if inner.is_some() {
563
                let warning_msg =
564
                    format!("Attempted to launch PT binary for {:?} twice.", transports);
565
                warn!("{warning_msg}");
566
                // WARN: this may not be the correct error to throw here
567
                return Err(PtError::ChildProtocolViolation(warning_msg));
568
            }
569
            info!(
570
                "Launching pluggable transport at {} for {:?}",
571
                binary_path.display_lossy(),
572
                transports
573
            );
574
            let child = Command::new(binary_path)
575
                .args(arguments.iter())
576
                .envs(all_env_vars)
577
                .stdout(Stdio::piped())
578
                .stdin(Stdio::piped())
579
                .spawn()
580
                .map_err(|e| PtError::ChildSpawnFailed {
581
                    path: binary_path.clone(),
582
                    error: Arc::new(e),
583
                })?;
584

            
585
            let identifier = crate::managed::pt_identifier(binary_path)?;
586
            AsyncPtChild::new(child, identifier)
587
        }
588

            
589
        /// Consolidates some of the [`PtMessage`] potential matches to
590
        /// deduplicate code
591
        ///
592
        /// Note that getting a [`PtMessage`] from this method implies that
593
        /// the method was unable to match it and thus you should continue handling
594
        /// the message. Getting [`None`] after error handling means that a match
595
        /// was found and the appropriate action was successfully taken, and you don't
596
        /// need to worry about it.
597
        async fn try_match_common_messages<R: Runtime>(
598
            &self,
599
            rt: &R,
600
            deadline: Instant,
601
            async_child: &mut AsyncPtChild,
602
        ) -> Result<Option<PtMessage>, PtError> {
603
            match rt
604
                .timeout(
605
                    // FIXME(eta): It'd be nice if SleepProviderExt took an `Instant` natively.
606
                    deadline.saturating_duration_since(Instant::get()),
607
                    async_child.recv(),
608
                )
609
                .await
610
                .map_err(|_| PtError::Timeout)??
611
            {
612
                PtMessage::ClientTransportFailed { transport, message }
613
                | PtMessage::ServerTransportFailed { transport, message } => {
614
                    warn!(
615
                        "PT {} unable to launch {}. It said: {:?}",
616
                        async_child.identifier, transport, message
617
                    );
618
                    return Err(PtError::TransportGaveError {
619
                        transport: transport.to_string(),
620
                        message,
621
                    });
622
                }
623
                PtMessage::VersionError(e) => {
624
                    if e != "no-version" {
625
                        warn!("weird VERSION-ERROR: {}", e);
626
                    }
627
                    return Err(PtError::UnsupportedVersion);
628
                }
629
                PtMessage::Version(vers) => {
630
                    if vers != "1" {
631
                        return Err(PtError::ProtocolViolation(format!(
632
                            "stated version is {}, asked for 1",
633
                            vers
634
                        )));
635
                    }
636
                    Ok(None)
637
                }
638
                PtMessage::EnvError(e) => return Err(PtError::ChildProtocolViolation(e)),
639
                PtMessage::ProxyError(e) => return Err(PtError::ProxyError(e)),
640
                // TODO(eta): We don't do anything with these right now!
641
                PtMessage::Status(_) => Ok(None),
642
                PtMessage::Unknown(x) => {
643
                    warn!("unknown PT line: {}", x);
644
                    Ok(None)
645
                }
646
                // Return the PtMessage as it is for further processing
647
                // TODO: handle [`PtError::ProtocolViolation`] here somehow
648
                x => {
649
                    return Ok(Some(x));
650
                }
651
            }
652
        }
653
    }
654
}
655

            
656
/// Common parameters passed to a pluggable transport.
657
#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
658
pub struct PtCommonParameters {
659
    /// A path where the launched PT can store state.
660
    state_location: PathBuf,
661
    /// An IPv4 address to bind outgoing connections to (if specified).
662
    ///
663
    /// Leaving this out will mean the PT uses a sane default.
664
    #[builder(default)]
665
    outbound_bind_v4: Option<Ipv4Addr>,
666
    /// An IPv6 address to bind outgoing connections to (if specified).
667
    ///
668
    /// Leaving this out will mean the PT uses a sane default.
669
    #[builder(default)]
670
    outbound_bind_v6: Option<Ipv6Addr>,
671
    /// The maximum time we should wait for a pluggable transport binary to report successful
672
    /// initialization. If `None`, a default value is used.
673
    #[builder(default)]
674
    timeout: Option<Duration>,
675
}
676

            
677
impl PtCommonParameters {
678
    /// Return a new `PtCommonParametersBuilder` for constructing a set of parameters.
679
    pub fn builder() -> PtCommonParametersBuilder {
680
        PtCommonParametersBuilder::default()
681
    }
682

            
683
    /// Convert these parameters into a set of environment variables to be passed to the PT binary
684
    /// in accordance with the specification.
685
    fn common_environment_variables(&self) -> HashMap<OsString, OsString> {
686
        let mut ret = HashMap::new();
687
        ret.insert("TOR_PT_MANAGED_TRANSPORT_VER".into(), "1".into());
688
        ret.insert(
689
            "TOR_PT_STATE_LOCATION".into(),
690
            self.state_location.clone().into_os_string(),
691
        );
692
        ret.insert("TOR_PT_EXIT_ON_STDIN_CLOSE".into(), "1".into());
693
        if let Some(v4) = self.outbound_bind_v4 {
694
            ret.insert(
695
                "TOR_PT_OUTBOUND_BIND_ADDRESS_V4".into(),
696
                v4.to_string().into(),
697
            );
698
        }
699
        if let Some(v6) = self.outbound_bind_v6 {
700
            // pt-spec.txt: "IPv6 addresses MUST always be wrapped in square brackets."
701
            ret.insert(
702
                "TOR_PT_OUTBOUND_BIND_ADDRESS_V6".into(),
703
                format!("[{}]", v6).into(),
704
            );
705
        }
706
        ret
707
    }
708
}
709

            
710
/// Parameters passed only to a pluggable transport client.
711
#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
712
pub struct PtClientParameters {
713
    /// A SOCKS URI specifying a proxy to use.
714
    #[builder(default)]
715
    proxy_uri: Option<String>,
716
    /// A list of transports to initialise.
717
    ///
718
    /// The PT launch will fail if all transports are not successfully initialised.
719
    transports: Vec<PtTransportName>,
720
}
721

            
722
impl PtClientParameters {
723
    /// Return a new `PtClientParametersBuilder` for constructing a set of parameters.
724
    pub fn builder() -> PtClientParametersBuilder {
725
        PtClientParametersBuilder::default()
726
    }
727

            
728
    /// Convert these parameters into a set of environment variables to be passed to the PT binary
729
    /// in accordance with the specification.
730
    fn environment_variables(
731
        &self,
732
        common_params: &PtCommonParameters,
733
    ) -> HashMap<OsString, OsString> {
734
        let mut ret = common_params.common_environment_variables();
735
        if let Some(ref proxy_uri) = self.proxy_uri {
736
            ret.insert("TOR_PT_PROXY".into(), proxy_uri.clone().into());
737
        }
738
        ret.insert(
739
            "TOR_PT_CLIENT_TRANSPORTS".into(),
740
            self.transports.iter().join(",").into(),
741
        );
742
        ret
743
    }
744
}
745

            
746
/// Parameters passed only to a pluggable transport server.
747
#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
748
pub struct PtServerParameters {
749
    /// A list of transports to initialise.
750
    ///
751
    /// The PT launch will fail if all transports are not successfully initialised.
752
    transports: Vec<PtTransportName>,
753
    /// Transport options for each server transport
754
    #[builder(default)]
755
    server_transport_options: String,
756
    /// Set host:port on which the server transport should listen for connections
757
    #[builder(default)]
758
    server_bindaddr: String,
759
    /// Set host:port on which the server transport should forward requests
760
    #[builder(default)]
761
    server_orport: Option<String>,
762
    /// Set host:port on which the server transport should forward requests (extended ORPORT)
763
    #[builder(default)]
764
    server_extended_orport: Option<String>,
765
}
766

            
767
impl PtServerParameters {
768
    /// Return a new `PtServerParametersBuilder` for constructing a set of parameters.
769
    pub fn builder() -> PtServerParametersBuilder {
770
        PtServerParametersBuilder::default()
771
    }
772

            
773
    /// Convert these parameters into a set of environment variables to be passed to the PT binary
774
    /// in accordance with the specification.
775
    fn environment_variables(
776
        &self,
777
        common_params: &PtCommonParameters,
778
    ) -> HashMap<OsString, OsString> {
779
        let mut ret = common_params.common_environment_variables();
780
        ret.insert(
781
            "TOR_PT_SERVER_TRANSPORTS".into(),
782
            self.transports.iter().join(",").into(),
783
        );
784
        ret.insert(
785
            "TOR_PT_SERVER_TRANSPORT_OPTIONS".into(),
786
            self.server_transport_options.clone().into(),
787
        );
788
        ret.insert(
789
            "TOR_PT_SERVER_BINDADDR".into(),
790
            self.server_bindaddr.clone().into(),
791
        );
792
        if let Some(ref server_orport) = self.server_orport {
793
            ret.insert("TOR_PT_ORPORT".into(), server_orport.into());
794
        }
795
        if let Some(ref server_extended_orport) = self.server_extended_orport {
796
            ret.insert(
797
                "TOR_PT_EXTENDED_SERVER_PORT".into(),
798
                server_extended_orport.into(),
799
            );
800
        }
801
        ret
802
    }
803
}
804

            
805
/// Common functionality implemented to allow code reuse
806
#[async_trait::async_trait]
807
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
808
pub trait PluggableTransport: PluggableTransportPrivate {
809
    /// Get all client methods returned by the binary, if it has been launched.
810
    ///
811
    /// If it hasn't been launched, the returned map will be empty.
812
    // TODO(eta): Actually figure out a way to expose this more stably.
813
    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod>;
814

            
815
    /// Get the next [`PtMessage`] from the running transport. It is recommended to call this
816
    /// in a loop once a PT has been launched, in order to forward log messages and find out about
817
    /// status updates.
818
    //
819
    // FIXME(eta): This API will probably go away and get replaced with something better.
820
    //             In particular, we'd want to cache `Status` messages from before this method
821
    //             was called.
822
    async fn next_message(&mut self) -> err::Result<PtMessage> {
823
        let inner = self.inner()?;
824
        let ret = inner.recv().await;
825
        if let Err(PtError::ChildGone) | Err(PtError::ChildReadFailed { .. }) = &ret {
826
            // FIXME(eta): Currently this lets the caller still think the methods work by calling
827
            //             transport_methods.
828
            debug!(
829
                "PT {}: Received {:?}; shutting down.",
830
                self.identifier(),
831
                ret
832
            );
833
            self.set_inner(None);
834
        }
835
        ret
836
    }
837
}
838
/// A pluggable transport binary in a child process.
839
///
840
/// These start out inert, and must be launched with [`PluggableClientTransport::launch`] in order
841
/// to be useful.
842
#[derive(Debug)]
843
pub struct PluggableClientTransport {
844
    /// The currently running child, if there is one.
845
    inner: Option<AsyncPtChild>,
846
    /// The path to the binary to run.
847
    pub(crate) binary_path: PathBuf,
848
    /// Arguments to pass to the binary.
849
    arguments: Vec<String>,
850
    /// Configured parameters.
851
    common_params: PtCommonParameters,
852
    /// Configured client-only parameters.
853
    client_params: PtClientParameters,
854
    /// Information about client methods obtained from the PT.
855
    cmethods: HashMap<PtTransportName, PtClientMethod>,
856
}
857

            
858
impl PluggableTransport for PluggableClientTransport {
859
    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
860
        &self.cmethods
861
    }
862
}
863

            
864
impl PluggableTransportPrivate for PluggableClientTransport {
865
    fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
866
        self.inner.as_mut().ok_or(PtError::ChildGone)
867
    }
868
    fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
869
        self.inner = newval;
870
    }
871
    fn identifier(&self) -> &str {
872
        match &self.inner {
873
            Some(child) => &child.identifier,
874
            None => "<not yet launched>",
875
        }
876
    }
877
    fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
878
        self.client_params.transports.contains(transport)
879
    }
880
}
881

            
882
impl PluggableClientTransport {
883
    /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing
884
    /// the `params` to it.
885
    ///
886
    /// You must call [`PluggableClientTransport::launch`] to actually run the PT.
887
    pub fn new(
888
        binary_path: PathBuf,
889
        arguments: Vec<String>,
890
        common_params: PtCommonParameters,
891
        client_params: PtClientParameters,
892
    ) -> Self {
893
        Self {
894
            common_params,
895
            client_params,
896
            arguments,
897
            binary_path,
898
            inner: None,
899
            cmethods: Default::default(),
900
        }
901
    }
902

            
903
    /// Launch the pluggable transport, executing the binary.
904
    ///
905
    /// Will return an error if the launch fails, one of the transports fail, not all transports
906
    /// were launched, or the launch times out.
907
    pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
908
        let all_env_vars = self
909
            .client_params
910
            .environment_variables(&self.common_params);
911

            
912
        let mut async_child =
913
            <PluggableClientTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
914
                &self.inner,
915
                &self.client_params.transports,
916
                &self.binary_path,
917
                &self.arguments,
918
                all_env_vars,
919
            )?;
920

            
921
        let deadline = Instant::get() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
922
        let mut cmethods = HashMap::new();
923
        let mut proxy_done = self.client_params.proxy_uri.is_none();
924

            
925
        loop {
926
            match self
927
                .try_match_common_messages(&rt, deadline, &mut async_child)
928
                .await
929
            {
930
                Ok(maybe_message) => {
931
                    if let Some(message) = maybe_message {
932
                        match message {
933
                            PtMessage::ClientTransportLaunched {
934
                                transport,
935
                                protocol,
936
                                endpoint,
937
                            } => {
938
                                self.common_transport_launched_handler(
939
                                    Some(protocol),
940
                                    transport,
941
                                    endpoint,
942
                                    &mut cmethods,
943
                                )?;
944
                            }
945
                            PtMessage::ProxyDone => {
946
                                if proxy_done {
947
                                    return Err(PtError::ProtocolViolation(
948
                                        "binary initiated proxy when not asked (or twice)".into(),
949
                                    ));
950
                                }
951
                                info!("PT binary now proxying connections via supplied URI");
952
                                proxy_done = true;
953
                            }
954
                            // TODO: unify most of the handling of ClientTransportsDone with ServerTransportsDone
955
                            PtMessage::ClientTransportsDone => {
956
                                let unsupported = self
957
                                    .client_params
958
                                    .transports
959
                                    .iter()
960
                                    .filter(|&x| !cmethods.contains_key(x))
961
                                    .map(|x| x.to_string())
962
                                    .collect::<Vec<_>>();
963
                                if !unsupported.is_empty() {
964
                                    warn!(
965
                                        "PT binary failed to initialise transports: {:?}",
966
                                        unsupported
967
                                    );
968
                                    return Err(PtError::ClientTransportsUnsupported(unsupported));
969
                                }
970
                                info!("PT binary initialisation done");
971
                                break;
972
                            }
973
                            x => {
974
                                return Err(PtError::ProtocolViolation(format!(
975
                                    "received unexpected {:?}",
976
                                    x
977
                                )));
978
                            }
979
                        }
980
                    }
981
                }
982
                Err(e) => return Err(e),
983
            }
984
        }
985
        self.cmethods = cmethods;
986
        self.inner = Some(async_child);
987
        // TODO(eta): We need to expose the log and status messages after this function exits!
988
        Ok(())
989
    }
990
}
991

            
992
/// A pluggable transport server binary in a child process.
993
///
994
/// These start out inert, and must be launched with [`PluggableServerTransport::launch`] in order
995
/// to be useful.
996
#[derive(Debug)]
997
pub struct PluggableServerTransport {
998
    /// The currently running child, if there is one.
999
    inner: Option<AsyncPtChild>,
    /// The path to the binary to run.
    pub(crate) binary_path: PathBuf,
    /// Arguments to pass to the binary.
    arguments: Vec<String>,
    /// Configured parameters.
    common_params: PtCommonParameters,
    /// Configured server-only parameters.
    server_params: PtServerParameters,
    /// Information about server methods obtained from the PT.
    smethods: HashMap<PtTransportName, PtClientMethod>,
}
impl PluggableTransportPrivate for PluggableServerTransport {
    fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
        self.inner.as_mut().ok_or(PtError::ChildGone)
    }
    fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
        self.inner = newval;
    }
    fn identifier(&self) -> &str {
        match &self.inner {
            Some(child) => &child.identifier,
            None => "<not yet launched>",
        }
    }
    fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
        self.server_params.transports.contains(transport)
    }
}
impl PluggableTransport for PluggableServerTransport {
    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
        &self.smethods
    }
}
impl PluggableServerTransport {
    /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing
    /// the `params` to it.
    ///
    /// You must call [`PluggableServerTransport::launch`] to actually run the PT.
    pub fn new(
        binary_path: PathBuf,
        arguments: Vec<String>,
        common_params: PtCommonParameters,
        server_params: PtServerParameters,
    ) -> Self {
        Self {
            common_params,
            server_params,
            arguments,
            binary_path,
            inner: None,
            smethods: Default::default(),
        }
    }
    /// Launch the pluggable transport, executing the binary.
    ///
    /// Will return an error if the launch fails, one of the transports fail, not all transports
    /// were launched, or the launch times out.
    pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
        let all_env_vars = self
            .server_params
            .environment_variables(&self.common_params);
        let mut async_child =
            <PluggableServerTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
                &self.inner,
                &self.server_params.transports,
                &self.binary_path,
                &self.arguments,
                all_env_vars,
            )?;
        let deadline = Instant::get() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
        let mut smethods = HashMap::new();
        loop {
            match self
                .try_match_common_messages(&rt, deadline, &mut async_child)
                .await
            {
                Ok(maybe_message) => {
                    if let Some(message) = maybe_message {
                        match message {
                            PtMessage::ServerTransportLaunched {
                                transport,
                                endpoint,
                                options: _,
                            } => {
                                self.common_transport_launched_handler(
                                    None,
                                    transport,
                                    endpoint,
                                    &mut smethods,
                                )?;
                            }
                            PtMessage::ServerTransportsDone => {
                                let unsupported = self
                                    .server_params
                                    .transports
                                    .iter()
                                    .filter(|&x| !smethods.contains_key(x))
                                    .map(|x| x.to_string())
                                    .collect::<Vec<_>>();
                                if !unsupported.is_empty() {
                                    warn!(
                                        "PT binary failed to initialise transports: {:?}",
                                        unsupported
                                    );
                                    return Err(PtError::ClientTransportsUnsupported(unsupported));
                                }
                                info!("PT binary initialisation done");
                                break;
                            }
                            x => {
                                return Err(PtError::ProtocolViolation(format!(
                                    "received unexpected {:?}",
                                    x
                                )));
                            }
                        }
                    }
                }
                Err(e) => return Err(e),
            }
        }
        self.smethods = smethods;
        self.inner = Some(async_child);
        // TODO(eta): We need to expose the log and status messages after this function exits!
        Ok(())
    }
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_time_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    #![allow(clippy::string_slice)] // See arti#2571
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use crate::ipc::{PtMessage, PtStatus};
    use std::borrow::Cow;
    use std::collections::HashMap;
    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
    #[test]
    fn it_parses_spec_examples() {
        assert_eq!(
            "VERSION-ERROR no-version".parse(),
            Ok(PtMessage::VersionError("no-version".into()))
        );
        assert_eq!("VERSION 1".parse(), Ok(PtMessage::Version("1".into())));
        assert_eq!(
            "ENV-ERROR No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".parse(),
            Ok(PtMessage::EnvError(
                "No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".into()
            ))
        );
        assert_eq!("PROXY DONE".parse(), Ok(PtMessage::ProxyDone));
        assert_eq!(
            "PROXY-ERROR SOCKS 4 upstream proxies unsupported".parse(),
            Ok(PtMessage::ProxyError(
                "SOCKS 4 upstream proxies unsupported".into()
            ))
        );
        assert_eq!(
            "CMETHOD trebuchet socks5 127.0.0.1:19999".parse(),
            Ok(PtMessage::ClientTransportLaunched {
                transport: "trebuchet".parse().unwrap(),
                protocol: "socks5".to_string(),
                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 19999)
            })
        );
        assert_eq!(
            "CMETHOD-ERROR trebuchet no rocks available".parse(),
            Ok(PtMessage::ClientTransportFailed {
                transport: "trebuchet".parse().unwrap(),
                message: "no rocks available".to_string()
            })
        );
        assert_eq!("CMETHODS DONE".parse(), Ok(PtMessage::ClientTransportsDone));
        assert_eq!(
            "SMETHOD trebuchet 198.51.100.1:19999".parse(),
            Ok(PtMessage::ServerTransportLaunched {
                transport: "trebuchet".parse().unwrap(),
                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 19999),
                options: Default::default()
            })
        );
        let mut map = HashMap::new();
        map.insert("N".to_string(), "13".to_string());
        assert_eq!(
            "SMETHOD rot_by_N 198.51.100.1:2323 ARGS:N=13".parse(),
            Ok(PtMessage::ServerTransportLaunched {
                transport: "rot_by_N".parse().unwrap(),
                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 2323),
                options: map
            })
        );
        let mut map = HashMap::new();
        map.insert(
            "cert".to_string(),
            "HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw".to_string(),
        );
        map.insert("iat-mode".to_string(), "0".to_string());
        assert_eq!(
            "SMETHOD obfs4 198.51.100.1:43734 ARGS:cert=HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw,iat-mode=0".parse(),
            Ok(PtMessage::ServerTransportLaunched {
                transport: "obfs4".parse().unwrap(),
                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 43734),
                options: map
            })
        );
        assert_eq!(
            "SMETHOD-ERROR trebuchet no cows available".parse(),
            Ok(PtMessage::ServerTransportFailed {
                transport: "trebuchet".parse().unwrap(),
                message: "no cows available".to_string()
            })
        );
        assert_eq!(
            "LOG SEVERITY=debug MESSAGE=\"Connected to bridge A\"".parse(),
            Ok(PtMessage::Log {
                severity: "debug".to_string(),
                message: "Connected to bridge A".to_string()
            })
        );
        assert_eq!(
            "LOG SEVERITY=debug MESSAGE=\"\\r\\n\\t\"".parse(),
            Ok(PtMessage::Log {
                severity: "debug".to_string(),
                message: "\r\n\t".to_string()
            })
        );
        assert_eq!(
            "LOG SEVERITY=debug MESSAGE=".parse(),
            Ok(PtMessage::Log {
                severity: "debug".to_string(),
                message: "".to_string()
            })
        );
        assert_eq!(
            "LOG SEVERITY=debug MESSAGE=\"\\a\"".parse::<PtMessage>(),
            Ok(PtMessage::Log {
                severity: "debug".to_string(),
                message: "a".to_string()
            })
        );
        for i in 0..9 {
            let msg = format!("LOG SEVERITY=debug MESSAGE=\"\\{i}\"");
            assert_eq!(
                msg.parse::<PtMessage>(),
                Err(Cow::from("attempted unsupported octal escape code"))
            );
        }
        assert_eq!(
            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=0\\".parse::<PtMessage>(),
            Err(Cow::from(
                "failed to parse SMETHOD ARGS: smethod arg terminates with backslash"
            ))
        );
        assert_eq!(
            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=fo=o".parse::<PtMessage>(),
            Err(Cow::from(
                "failed to parse SMETHOD ARGS: encountered = while parsing value"
            ))
        );
        assert_eq!(
            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode".parse::<PtMessage>(),
            Err(Cow::from(
                "failed to parse SMETHOD ARGS: ran out of chars parsing smethod arg"
            ))
        );
        let mut map = HashMap::new();
        map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
        map.insert("CONNECT".to_string(), "Success".to_string());
        assert_eq!(
            "STATUS ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
            Ok(PtMessage::Status(PtStatus { data: map }))
        );
        let mut map = HashMap::new();
        map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
        map.insert("CONNECT".to_string(), "Success".to_string());
        map.insert("TRANSPORT".to_string(), "obfs4".to_string());
        assert_eq!(
            "STATUS TRANSPORT=obfs4 ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
            Ok(PtMessage::Status(PtStatus { data: map }))
        );
        let mut map = HashMap::new();
        map.insert("ADDRESS".to_string(), "198.51.100.222:2222".to_string());
        map.insert("CONNECT".to_string(), "Failed".to_string());
        map.insert("FINGERPRINT".to_string(), "<Fingerprint>".to_string());
        map.insert("ERRSTR".to_string(), "Connection refused".to_string());
        assert_eq!(
            "STATUS ADDRESS=198.51.100.222:2222 CONNECT=Failed FINGERPRINT=<Fingerprint> ERRSTR=\"Connection refused\"".parse(),
            Ok(PtMessage::Status(PtStatus {
                data: map
            }))
        );
    }
}