1
//! Simple implementation for the internal map state of a ChanMgr.
2

            
3
use std::time::Duration;
4

            
5
use super::AbstractChannelFactory;
6
use super::{AbstractChannel, Pending, Sending, select};
7
use crate::{ChannelConfig, Dormancy, Error, Result};
8

            
9
use futures::FutureExt;
10
use std::result::Result as StdResult;
11
use std::sync::Arc;
12
use std::sync::atomic::{AtomicU64, Ordering};
13
use tor_async_utils::oneshot;
14
use tor_basic_utils::RngExt as _;
15
use tor_cell::chancell::msg::PaddingNegotiate;
16
use tor_config::PaddingLevel;
17
use tor_error::{error_report, internal, into_internal};
18
use tor_linkspec::{HasRelayIds, ListByRelayIds, RelayIds};
19
use tor_netdir::{params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND, params::NetParameters};
20
use tor_proto::ChannelPaddingInstructions;
21
use tor_proto::channel::ChannelPaddingInstructionsUpdates;
22
use tor_proto::channel::kist::{KistMode, KistParams};
23
use tor_proto::channel::padding::Parameters as PaddingParameters;
24
use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
25
use tor_units::{BoundedInt32, IntegerMilliseconds};
26
use tracing::{info, instrument};
27
use void::{ResultVoidExt as _, Void};
28

            
29
#[cfg(test)]
30
mod padding_test;
31

            
32
/// All mutable state held by an `AbstractChannelMgr`.
33
///
34
/// One reason that this is an isolated type is that we want to
35
/// to limit the amount of code that can see and
36
/// lock the Mutex here.  (We're using a blocking mutex close to async
37
/// code, so we need to be careful.)
38
pub(crate) struct MgrState<C: AbstractChannelFactory> {
39
    /// The data, within a lock
40
    ///
41
    /// (Danger: this uses a blocking mutex close to async code.  This mutex
42
    /// must never be held while an await is happening.)
43
    inner: std::sync::Mutex<Inner<C>>,
44
}
45

            
46
/// Parameters for channels that we create, and that all existing channels are using
47
struct ChannelParams {
48
    /// Channel padding instructions
49
    padding: ChannelPaddingInstructions,
50

            
51
    /// KIST parameters
52
    kist: KistParams,
53
}
54

            
55
/// A map from channel id to channel state, plus necessary auxiliary state - inside lock
56
struct Inner<C: AbstractChannelFactory> {
57
    /// The channel factory type that we store.
58
    ///
59
    /// In this module we never use this _as_ an AbstractChannelFactory: we just
60
    /// hand out clones of it when asked.
61
    builder: C,
62

            
63
    /// A map from identity to channels, or to pending channel statuses.
64
    channels: ListByRelayIds<ChannelState<C::Channel>>,
65

            
66
    /// Parameters for channels that we create, and that all existing channels are using
67
    ///
68
    /// Will be updated by a background task, which also notifies all existing
69
    /// `Open` channels via `channels`.
70
    ///
71
    /// (Must be protected by the same lock as `channels`, or a channel might be
72
    /// created using being-replaced parameters, but not get an update.)
73
    channels_params: ChannelParams,
74

            
75
    /// The configuration (from the config file or API caller)
76
    config: ChannelConfig,
77

            
78
    /// Dormancy
79
    ///
80
    /// The last dormancy information we have been told about and passed on to our channels.
81
    /// Updated via `MgrState::set_dormancy` and hence `MgrState::reconfigure_general`,
82
    /// which then uses it to calculate how to reconfigure the channels.
83
    dormancy: Dormancy,
84
}
85

            
86
/// The state of a channel (or channel build attempt) within a map.
87
///
88
/// A ChannelState can be Open (representing a fully negotiated channel) or
89
/// Building (representing a pending attempt to build a channel). Both states
90
/// have a set of RelayIds, but these RelayIds represent slightly different
91
/// things:
92
///  * On a Building channel, the set of RelayIds is all the identities that we
93
///    require the peer to have. (The peer may turn out to have _more_
94
///    identities than this.)
95
///  * On an Open channel, the set of RelayIds is all the identities that
96
///    we were able to successfully authenticate for the peer.
97
pub(crate) enum ChannelState<C> {
98
    /// An open channel.
99
    ///
100
    /// This channel might not be usable: it might be closing or
101
    /// broken.  We need to check its is_usable() method before
102
    /// yielding it to the user.
103
    Open(OpenEntry<C>),
104
    /// A channel that's getting built.
105
    Building(PendingEntry),
106
}
107

            
108
/// An open channel entry.
109
#[derive(Clone)]
110
pub(crate) struct OpenEntry<C> {
111
    /// The underlying open channel.
112
    pub(crate) channel: Arc<C>,
113
    /// The maximum unused duration allowed for this channel.
114
    pub(crate) max_unused_duration: Duration,
115
}
116

            
117
/// A unique ID for a pending ([`PendingEntry`]) channel.
118
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
119
pub(crate) struct UniqPendingChanId(u64);
120

            
121
impl UniqPendingChanId {
122
    /// Construct a new `UniqPendingChanId`.
123
98
    pub(crate) fn new() -> Self {
124
        /// The next unique ID.
125
        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
126
        // Relaxed ordering is fine; we don't care about how this
127
        // is instantiated with respect to other channels.
128
98
        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
129
98
        assert!(id != u64::MAX, "Exhausted the pending channel ID namespace");
130
98
        Self(id)
131
98
    }
132
}
133

            
134
impl std::fmt::Display for UniqPendingChanId {
135
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136
        write!(f, "PendingChan {}", self.0)
137
    }
138
}
139

            
140
/// An entry for a not-yet-build channel
141
#[derive(Clone)]
142
pub(crate) struct PendingEntry {
143
    /// The keys of the relay to which we're trying to open a channel.
144
    pub(crate) ids: RelayIds,
145

            
146
    /// A future we can clone and listen on to learn when this channel attempt
147
    /// is successful or failed.
148
    ///
149
    /// This entry will be removed from the map (and possibly replaced with an
150
    /// `OpenEntry`) _before_ this future becomes ready.
151
    pub(crate) pending: Pending,
152

            
153
    /// A unique ID that allows us to find this exact pending entry later.
154
    pub(crate) unique_id: UniqPendingChanId,
155
}
156

            
157
impl<C> HasRelayIds for ChannelState<C>
158
where
159
    C: HasRelayIds,
160
{
161
920
    fn identity(
162
920
        &self,
163
920
        key_type: tor_linkspec::RelayIdType,
164
920
    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
165
920
        match self {
166
436
            ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
167
484
            ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
168
        }
169
920
    }
170
}
171

            
172
impl<C: Clone> ChannelState<C> {
173
    /// For testing: either give the Open channel inside this state,
174
    /// or panic if there is none.
175
    #[cfg(test)]
176
4
    fn unwrap_open(&self) -> &C {
177
4
        match self {
178
4
            ChannelState::Open(ent) => &ent.channel,
179
            _ => panic!("Not an open channel"),
180
        }
181
4
    }
182
}
183

            
184
/// Type of the `nf_ito_*` netdir parameters, convenience alias
185
type NfIto = IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>>;
186

            
187
/// Extract from a `NetParameters` which we need, conveniently organized for our processing
188
///
189
/// This type serves two functions at once:
190
///
191
///  1. Being a subset of the parameters, we can copy it out of
192
///     the netdir, before we do more complex processing - and, in particular,
193
///     before we obtain the lock on `inner` (which we need to actually handle the update,
194
///     because we need to combine information from the config with that from the netdir).
195
///
196
///  2. Rather than four separate named fields for the padding options,
197
///     it has arrays, so that it is easy to
198
///     select the values without error-prone recapitulation of field names.
199
#[derive(Debug, Clone)]
200
struct NetParamsExtract {
201
    /// `nf_ito_*`, the padding timeout parameters from the netdir consensus
202
    ///
203
    /// `nf_ito[ 0=normal, 1=reduced ][ 0=low, 1=high ]`
204
    /// are `nf_ito_{low,high}{,_reduced` from `NetParameters`.
205
    // TODO we could use some enum or IndexVec or something to make this less `0` and `1`
206
    nf_ito: [[NfIto; 2]; 2],
207

            
208
    /// The KIST parameters.
209
    kist: KistParams,
210
}
211

            
212
impl From<&NetParameters> for NetParamsExtract {
213
1908
    fn from(p: &NetParameters) -> Self {
214
1908
        let kist_enabled = kist_mode_from_net_parameter(p.kist_enabled);
215
        // NOTE: in theory, this cast shouldn't be needed
216
        // (kist_tcp_notsent_lowat is supposed to be a u32, not an i32).
217
        // In practice, however, the type conversion is needed
218
        // because consensus params are i32s.
219
        //
220
        // See the `NetParameters::kist_tcp_notsent_lowat` docs for more details.
221
1908
        let tcp_notsent_lowat = u32::from(p.kist_tcp_notsent_lowat);
222
1908
        let kist = KistParams::new(kist_enabled, tcp_notsent_lowat);
223

            
224
1908
        NetParamsExtract {
225
1908
            nf_ito: [
226
1908
                [p.nf_ito_low, p.nf_ito_high],
227
1908
                [p.nf_ito_low_reduced, p.nf_ito_high_reduced],
228
1908
            ],
229
1908
            kist,
230
1908
        }
231
1908
    }
232
}
233

            
234
/// Build a `KistMode` from [`NetParameters`].
235
///
236
/// Used for converting [`kist_enabled`](NetParameters::kist_enabled)
237
/// to a corresponding `KistMode`.
238
1908
fn kist_mode_from_net_parameter(val: BoundedInt32<0, 1>) -> KistMode {
239
    caret::caret_int! {
240
        /// KIST flavor, defined by a numerical value read from the consensus.
241
        struct KistType(i32) {
242
            /// KIST disabled
243
            DISABLED = 0,
244
            /// KIST using TCP_NOTSENT_LOWAT.
245
            TCP_NOTSENT_LOWAT = 1,
246
        }
247
    }
248

            
249
1908
    match val.get().into() {
250
1908
        KistType::DISABLED => KistMode::Disabled,
251
        KistType::TCP_NOTSENT_LOWAT => KistMode::TcpNotSentLowat,
252
        _ => unreachable!("BoundedInt32 was not bounded?!"),
253
    }
254
1908
}
255

            
256
impl NetParamsExtract {
257
    /// Return the padding timer parameter low end, for reduced-ness `reduced`, as a `u32`
258
3810
    fn pad_low(&self, reduced: bool) -> IntegerMilliseconds<u32> {
259
3810
        self.pad_get(reduced, 0)
260
3810
    }
261
    /// Return the padding timer parameter high end, for reduced-ness `reduced`, as a `u32`
262
3810
    fn pad_high(&self, reduced: bool) -> IntegerMilliseconds<u32> {
263
3810
        self.pad_get(reduced, 1)
264
3810
    }
265

            
266
    /// Return and converts one padding parameter timer
267
    ///
268
    /// Internal function.
269
7620
    fn pad_get(&self, reduced: bool, low_or_high: usize) -> IntegerMilliseconds<u32> {
270
7620
        self.nf_ito[usize::from(reduced)][low_or_high]
271
8070
            .try_map(|v| Ok::<_, Void>(v.into()))
272
7620
            .void_unwrap()
273
7620
    }
274
}
275

            
276
impl<C: AbstractChannel> ChannelState<C> {
277
    /// Return true if a channel is ready to expire.
278
    /// Update `expire_after` if a smaller duration than
279
    /// the given value is required to expire this channel.
280
10
    fn ready_to_expire(&self, expire_after: &mut Duration) -> bool {
281
10
        let ChannelState::Open(ent) = self else {
282
            return false;
283
        };
284
10
        let Some(unused_duration) = ent.channel.duration_unused() else {
285
            // still in use
286
2
            return false;
287
        };
288
8
        let max_unused_duration = ent.max_unused_duration;
289
8
        let Some(remaining) = max_unused_duration.checked_sub(unused_duration) else {
290
            // no time remaining; drop now.
291
4
            return true;
292
        };
293
4
        if remaining.is_zero() {
294
            // Ignoring this edge case would result in a fairly benign race
295
            // condition outside of Shadow, but deadlock in Shadow.
296
            return true;
297
4
        }
298
4
        *expire_after = std::cmp::min(*expire_after, remaining);
299
4
        false
300
10
    }
301
}
302

            
303
impl<C: AbstractChannelFactory> MgrState<C> {
304
    /// Create a new empty `MgrState`.
305
124
    pub(crate) fn new(
306
124
        builder: C,
307
124
        config: ChannelConfig,
308
124
        dormancy: Dormancy,
309
124
        netparams: &NetParameters,
310
124
    ) -> Self {
311
124
        let mut padding_params = ChannelPaddingInstructions::default();
312
124
        let netparams = NetParamsExtract::from(netparams);
313
124
        let kist_params = netparams.kist;
314
124
        let update = parameterize(&mut padding_params, &config, dormancy, &netparams)
315
124
            .unwrap_or_else(|e: tor_error::Bug| panic!("bug detected on startup: {:?}", e));
316
124
        let _: Option<_> = update; // there are no channels yet, that would need to be told
317

            
318
124
        let channels_params = ChannelParams {
319
124
            padding: padding_params,
320
124
            kist: kist_params,
321
124
        };
322

            
323
124
        MgrState {
324
124
            inner: std::sync::Mutex::new(Inner {
325
124
                builder,
326
124
                channels: ListByRelayIds::new(),
327
124
                config,
328
124
                channels_params,
329
124
                dormancy,
330
124
            }),
331
124
        }
332
124
    }
333

            
334
    /// Run a function on the [`ListByRelayIds`] that implements the map in this `MgrState`.
335
    ///
336
    /// This function grabs a mutex: do not provide a slow function.
337
    ///
338
    /// We provide this function rather than exposing the channels set directly,
339
    /// to make sure that the calling code doesn't await while holding the lock.
340
    ///
341
    /// This is only `cfg(test)` since it can deadlock.
342
    ///
343
    /// # Deadlock
344
    ///
345
    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
346
    #[cfg(test)]
347
26
    pub(crate) fn with_channels<F, T>(&self, func: F) -> Result<T>
348
26
    where
349
26
        F: FnOnce(&mut ListByRelayIds<ChannelState<C::Channel>>) -> T,
350
    {
351
26
        let mut inner = self.inner.lock()?;
352
26
        Ok(func(&mut inner.channels))
353
26
    }
354

            
355
    /// Return a copy of the builder stored in this state.
356
74
    pub(crate) fn builder(&self) -> C
357
74
    where
358
74
        C: Clone,
359
    {
360
74
        let inner = self.inner.lock().expect("lock poisoned");
361
74
        inner.builder.clone()
362
74
    }
363

            
364
    /// Run a function to modify the builder stored in this state.
365
    ///
366
    /// # Deadlock
367
    ///
368
    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
369
    #[allow(unused)]
370
22
    pub(crate) fn with_mut_builder<F>(&self, func: F)
371
22
    where
372
22
        F: FnOnce(&mut C),
373
    {
374
22
        let mut inner = self.inner.lock().expect("lock poisoned");
375
22
        func(&mut inner.builder);
376
22
    }
377

            
378
    /// Add an open channel into our list.
379
    #[cfg(feature = "relay")]
380
    pub(crate) fn add_open(&self, channel: Arc<C::Channel>) -> Result<()> {
381
        let mut inner = self.inner.lock()?;
382
        inner.channels.insert(ChannelState::Open(OpenEntry {
383
            channel,
384
            // TODO(relay): Relay need a different unused duration (if any). We can't use the
385
            // client timeout value. Need to be figured out before production.
386
            max_unused_duration: Self::random_max_unused_duration(),
387
        }));
388
        Ok(())
389
    }
390

            
391
    /// Remove every unusable state from the map in this state.
392
    #[cfg(test)]
393
4
    pub(crate) fn remove_unusable(&self) -> Result<()> {
394
4
        let mut inner = self.inner.lock()?;
395
20
        inner.channels.retain(|state| match state {
396
20
            ChannelState::Open(ent) => ent.channel.is_usable(),
397
            ChannelState::Building(_) => true,
398
20
        });
399
4
        Ok(())
400
4
    }
401

            
402
    /// Request an open or pending channel to `target`. If `add_new_entry_if_not_found` is true and
403
    /// an open or pending channel isn't found, a new pending entry will be added and
404
    /// [`ChannelForTarget::NewEntry`] will be returned. This is all done as part of the same method
405
    /// so that all operations are performed under the same lock acquisition.
406
94
    pub(crate) fn request_channel(
407
94
        &self,
408
94
        target: &C::BuildSpec,
409
94
        add_new_entry_if_not_found: bool,
410
94
    ) -> Result<Option<ChannelForTarget<C>>> {
411
        use ChannelState::*;
412

            
413
94
        let mut inner = self.inner.lock()?;
414

            
415
        // The idea here is to choose the channel in two steps:
416
        //
417
        // - Eligibility: Get channels from the channel map and filter them down to only channels
418
        //   which are eligible to be returned.
419
        // - Ranking: From the eligible channels, choose the best channel.
420
        //
421
        // Another way to choose the channel could be something like: first try all canonical open
422
        // channels, then all non-canonical open channels, then all pending channels with all
423
        // matching relay ids, then remaining pending channels, etc. But this ends up being hard to
424
        // follow and inflexible (what if you want to prioritize pending channels over non-canonical
425
        // open channels?).
426

            
427
        // Open channels which are allowed for requests to `target`.
428
94
        let open_channels = inner
429
94
            .channels
430
            // channels with all target relay identifiers
431
94
            .by_all_ids(target)
432
94
            .filter(|entry| match entry {
433
12
                Open(x) => select::open_channel_is_allowed(x, target),
434
10
                Building(_) => false,
435
22
            });
436

            
437
        // Pending channels which will *probably* be allowed for requests to `target` once they
438
        // complete.
439
94
        let pending_channels = inner
440
94
            .channels
441
            // channels that have a subset of the relay ids of `target`
442
94
            .all_subset(target)
443
94
            .into_iter()
444
94
            .filter(|entry| match entry {
445
12
                Open(_) => false,
446
10
                Building(x) => select::pending_channel_maybe_allowed(x, target),
447
22
            });
448

            
449
94
        match select::choose_best_channel(open_channels.chain(pending_channels), target) {
450
10
            Some(Open(OpenEntry { channel, .. })) => {
451
                // This entry is a perfect match for the target keys: we'll return the open
452
                // entry.
453
10
                return Ok(Some(ChannelForTarget::Open(Arc::clone(channel))));
454
            }
455
10
            Some(Building(PendingEntry { pending, .. })) => {
456
                // This entry is potentially a match for the target identities: we'll return the
457
                // pending entry. (We don't know for sure if it will match once it completes,
458
                // since we might discover additional keys beyond those listed for this pending
459
                // entry.)
460
10
                return Ok(Some(ChannelForTarget::Pending(pending.clone())));
461
            }
462
74
            None => {}
463
        }
464

            
465
        // It's possible we know ahead of time that building a channel would be unsuccessful.
466
74
        if inner
467
74
            .channels
468
            // channels with at least one id in common with `target`
469
74
            .all_overlapping(target)
470
74
            .into_iter()
471
            // but not channels which completely satisfy the id requirements of `target`
472
74
            .filter(|entry| !entry.has_all_relay_ids_from(target))
473
74
            .any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable()))
474
        {
475
            // At least one *open, usable* channel has been negotiated that overlaps only
476
            // partially with our target: it has proven itself to have _one_ of our target
477
            // identities, but not all.
478
            //
479
            // Because this channel exists, we know that our target cannot succeed, since relays
480
            // are not allowed to share _any_ identities.
481
            //return Ok(Some(Action::Return(Err(Error::IdentityConflict))));
482
            return Err(Error::IdentityConflict);
483
74
        }
484

            
485
74
        if !add_new_entry_if_not_found {
486
            return Ok(None);
487
74
        }
488

            
489
        // Great, nothing interfered at all.
490
74
        let any_relay_id = target
491
74
            .identities()
492
74
            .next()
493
74
            .ok_or(internal!("relay target had no id"))?
494
74
            .to_owned();
495
74
        let (new_state, send, unique_id) = setup_launch(RelayIds::from_relay_ids(target));
496
74
        inner
497
74
            .channels
498
74
            .try_insert(ChannelState::Building(new_state))?;
499
74
        let handle = PendingChannelHandle::new(any_relay_id, unique_id);
500
74
        Ok(Some(ChannelForTarget::NewEntry((handle, send))))
501
94
    }
502

            
503
    /// Remove the pending channel identified by its `handle`.
504
8
    pub(crate) fn remove_pending_channel(&self, handle: PendingChannelHandle) -> Result<()> {
505
8
        let mut inner = self.inner.lock()?;
506
8
        remove_pending(&mut inner.channels, handle);
507
8
        Ok(())
508
8
    }
509

            
510
    /// Upgrade the pending channel identified by its `handle` by replacing it with a new open
511
    /// `channel`.
512
    #[instrument(skip_all, level = "trace")]
513
66
    pub(crate) fn upgrade_pending_channel_to_open(
514
66
        &self,
515
66
        handle: PendingChannelHandle,
516
66
        channel: Arc<C::Channel>,
517
66
    ) -> Result<()> {
518
        // Do all operations under the same lock acquisition.
519
66
        let mut inner = self.inner.lock()?;
520

            
521
66
        remove_pending(&mut inner.channels, handle);
522

            
523
        // This isn't great.  We context switch to the newly-created
524
        // channel just to tell it how and whether to do padding.  Ideally
525
        // we would pass the params at some suitable point during
526
        // building.  However, that would involve the channel taking a
527
        // copy of the params, and that must happen in the same channel
528
        // manager lock acquisition span as the one where we insert the
529
        // channel into the table so it will receive updates.  I.e.,
530
        // here.
531
66
        let update = inner.channels_params.padding.initial_update();
532
66
        if let Some(update) = update {
533
66
            channel
534
66
                .reparameterize(update.into())
535
66
                .map_err(|_| internal!("failure on new channel"))?;
536
        }
537
66
        let new_entry = ChannelState::Open(OpenEntry {
538
66
            channel,
539
66
            max_unused_duration: Self::random_max_unused_duration(),
540
66
        });
541
66
        inner.channels.insert(new_entry);
542

            
543
66
        Ok(())
544
66
    }
545

            
546
    /// Reconfigure all channels as necessary
547
    ///
548
    /// (By reparameterizing channels as needed)
549
    /// This function will handle
550
    ///   - netdir update
551
    ///   - a reconfiguration
552
    ///   - dormancy
553
    ///
554
    /// For `new_config` and `new_dormancy`, `None` means "no change to previous info".
555
98
    pub(super) fn reconfigure_general(
556
98
        &self,
557
98
        new_config: Option<&ChannelConfig>,
558
98
        new_dormancy: Option<Dormancy>,
559
98
        netparams: Arc<dyn AsRef<NetParameters>>,
560
98
    ) -> StdResult<(), tor_error::Bug> {
561
        use ChannelState as CS;
562

            
563
        // TODO when we support operation as a relay, inter-relay channels ought
564
        // not to get padding.
565
98
        let netdir = {
566
98
            let extract = NetParamsExtract::from((*netparams).as_ref());
567
98
            drop(netparams);
568
98
            extract
569
        };
570

            
571
98
        let mut inner = self
572
98
            .inner
573
98
            .lock()
574
98
            .map_err(|_| internal!("poisoned channel manager"))?;
575
98
        let inner = &mut *inner;
576

            
577
98
        if let Some(new_config) = new_config {
578
28
            inner.config = new_config.clone();
579
70
        }
580
98
        if let Some(new_dormancy) = new_dormancy {
581
42
            inner.dormancy = new_dormancy;
582
70
        }
583

            
584
98
        let update = parameterize(
585
98
            &mut inner.channels_params.padding,
586
98
            &inner.config,
587
98
            inner.dormancy,
588
98
            &netdir,
589
        )?;
590

            
591
98
        let update = update.map(Arc::new);
592

            
593
98
        let new_kist_params = netdir.kist;
594
98
        let kist_params = if new_kist_params != inner.channels_params.kist {
595
            // The KIST params have changed: remember their value,
596
            // and reparameterize_kist()
597
            inner.channels_params.kist = new_kist_params;
598
            Some(new_kist_params)
599
        } else {
600
            // If the new KIST params are identical to the previous ones,
601
            // we don't need to call reparameterize_kist()
602
98
            None
603
        };
604

            
605
98
        if update.is_none() && kist_params.is_none() {
606
            // Return early, nothing to reconfigure
607
36
            return Ok(());
608
62
        }
609

            
610
62
        for channel in inner.channels.values() {
611
62
            let channel = match channel {
612
62
                CS::Open(OpenEntry { channel, .. }) => channel,
613
                CS::Building(_) => continue,
614
            };
615

            
616
62
            if let Some(ref update) = update {
617
62
                // Ignore error (which simply means the channel is closed or gone)
618
62
                let _ = channel.reparameterize(Arc::clone(update));
619
62
            }
620

            
621
62
            if let Some(kist) = kist_params {
622
                // Ignore error (which simply means the channel is closed or gone)
623
                let _ = channel.reparameterize_kist(kist);
624
62
            }
625
        }
626
62
        Ok(())
627
98
    }
628

            
629
    /// Expire all channels that have been unused for too long.
630
    ///
631
    /// Return a Duration until the next time at which
632
    /// a channel _could_ expire.
633
40
    pub(crate) fn expire_channels(&self) -> Duration {
634
40
        let mut ret = Duration::from_secs(180);
635
40
        self.inner
636
40
            .lock()
637
40
            .expect("Poisoned lock")
638
40
            .channels
639
46
            .retain(|chan| !chan.ready_to_expire(&mut ret));
640
40
        ret
641
40
    }
642

            
643
    /// Helper: Return the default max unused duration for a channel.
644
66
    fn random_max_unused_duration() -> Duration {
645
66
        Duration::from_secs(
646
66
            rand::rng()
647
66
                .gen_range_checked(180..270)
648
66
                .expect("not 180 < 270 !"),
649
        )
650
66
    }
651
}
652

            
653
/// A channel for a given target relay.
654
pub(crate) enum ChannelForTarget<CF: AbstractChannelFactory> {
655
    /// A channel that is open.
656
    Open(Arc<CF::Channel>),
657
    /// A channel that is building.
658
    Pending(Pending),
659
    /// Information about a new pending channel entry.
660
    NewEntry((PendingChannelHandle, Sending)),
661
}
662

            
663
/// A handle for a pending channel.
664
///
665
/// WARNING: This handle should never be dropped, and should always be passed back into
666
/// [`MgrState::remove_pending_channel`] or [`MgrState::upgrade_pending_channel_to_open`], otherwise
667
/// the pending channel may be left in the channel map forever.
668
///
669
/// This handle must only be used with the `MgrState` from which it was given.
670
pub(crate) struct PendingChannelHandle {
671
    /// Any relay ID for this pending channel.
672
    relay_id: tor_linkspec::RelayId,
673
    /// The unique ID for this pending channel.
674
    unique_id: UniqPendingChanId,
675
    /// The pending channel has been removed from the channel map.
676
    chan_has_been_removed: bool,
677
}
678

            
679
impl PendingChannelHandle {
680
    /// Create a new [`PendingChannelHandle`].
681
74
    fn new(relay_id: tor_linkspec::RelayId, unique_id: UniqPendingChanId) -> Self {
682
74
        Self {
683
74
            relay_id,
684
74
            unique_id,
685
74
            chan_has_been_removed: false,
686
74
        }
687
74
    }
688

            
689
    /// This should be called when the pending channel has been removed from the pending channel
690
    /// map. Not calling this will result in an error log message (and panic in debug builds) when
691
    /// this handle is dropped.
692
74
    fn chan_has_been_removed(mut self) {
693
74
        self.chan_has_been_removed = true;
694
74
    }
695
}
696

            
697
impl std::ops::Drop for PendingChannelHandle {
698
74
    fn drop(&mut self) {
699
74
        if !self.chan_has_been_removed {
700
            #[allow(clippy::missing_docs_in_private_items)]
701
            const MSG: &str = "Dropped the 'PendingChannelHandle' without removing the channel";
702
            error_report!(
703
                internal!("{MSG}"),
704
                "'PendingChannelHandle' dropped unexpectedly",
705
            );
706
74
        }
707
74
    }
708
}
709

            
710
/// Helper: return the objects used to inform pending tasks about a newly open or failed channel.
711
74
fn setup_launch(ids: RelayIds) -> (PendingEntry, Sending, UniqPendingChanId) {
712
74
    let (snd, rcv) = oneshot::channel();
713
74
    let pending = rcv.shared();
714
74
    let unique_id = UniqPendingChanId::new();
715
74
    let entry = PendingEntry {
716
74
        ids,
717
74
        pending,
718
74
        unique_id,
719
74
    };
720

            
721
74
    (entry, snd, unique_id)
722
74
}
723

            
724
/// Helper: remove the pending channel identified by `handle` from `channel_map`.
725
74
fn remove_pending<C: AbstractChannel>(
726
74
    channel_map: &mut tor_linkspec::ListByRelayIds<ChannelState<C>>,
727
74
    handle: PendingChannelHandle,
728
74
) {
729
    // we need only one relay id to locate it, even if it has multiple relay ids
730
76
    let removed = channel_map.remove_by_id(&handle.relay_id, |c| {
731
76
        let ChannelState::Building(c) = c else {
732
2
            return false;
733
        };
734
74
        c.unique_id == handle.unique_id
735
76
    });
736
74
    debug_assert_eq!(removed.len(), 1, "expected to remove exactly one channel");
737

            
738
74
    handle.chan_has_been_removed();
739
74
}
740

            
741
/// Converts config, dormancy, and netdir, into parameter updates
742
///
743
/// Calculates new parameters, updating `channels_params` as appropriate.
744
/// If anything changed, the corresponding update instruction is returned.
745
///
746
/// `channels_params` is updated with the new parameters,
747
/// and the update message, if one is needed, is returned.
748
///
749
/// This is called in two places:
750
///
751
///  1. During chanmgr creation, it is called once to analyze the initial state
752
///     and construct a corresponding ChannelPaddingInstructions.
753
///
754
///  2. During reconfiguration.
755
1902
fn parameterize(
756
1902
    channels_params: &mut ChannelPaddingInstructions,
757
1902
    config: &ChannelConfig,
758
1902
    dormancy: Dormancy,
759
1902
    netdir: &NetParamsExtract,
760
1902
) -> StdResult<Option<ChannelPaddingInstructionsUpdates>, tor_error::Bug> {
761
    // Everything in this calculation applies to *all* channels, disregarding
762
    // channel usage.  Usage is handled downstream, in the channel frontend.
763
    // See the module doc in `crates/tor-proto/src/channel/padding.rs`.
764

            
765
3915
    let padding_of_level = |level| padding_parameters(level, netdir);
766
1902
    let send_padding = padding_of_level(config.padding)?;
767
1902
    let padding_default = padding_of_level(PaddingLevel::default())?;
768

            
769
1902
    let send_padding = match dormancy {
770
1426
        Dormancy::Active => send_padding,
771
476
        Dormancy::Dormant => None,
772
    };
773

            
774
1902
    let recv_padding = match config.padding {
775
48
        PaddingLevel::Reduced => None,
776
1854
        PaddingLevel::Normal => send_padding,
777
        PaddingLevel::None => None,
778
    };
779

            
780
    // Whether the inbound padding approach we are to use, is the same as the default
781
    // derived from the netdir (disregarding our config and dormancy).
782
    //
783
    // Ie, whether the parameters we want are precisely those that a peer would
784
    // use by default (assuming they have the same view of the netdir as us).
785
1902
    let recv_equals_default = recv_padding == padding_default;
786

            
787
1902
    let padding_negotiate = if recv_equals_default {
788
        // Our padding approach is the same as peers' defaults.  So the PADDING_NEGOTIATE
789
        // message we need to send is the START(0,0).  (The channel frontend elides an
790
        // initial message of this form, - see crates/tor-proto/src/channel.rs::note_usage.)
791
        //
792
        // If the netdir default is no padding, and we previously negotiated
793
        // padding being enabled, and now want to disable it, we would send
794
        // START(0,0) rather than STOP.  That is OK (even, arguably, right).
795
1390
        PaddingNegotiate::start_default()
796
    } else {
797
512
        match recv_padding {
798
512
            None => PaddingNegotiate::stop(),
799
            Some(params) => params.padding_negotiate_cell()?,
800
        }
801
    };
802

            
803
1902
    let mut update = channels_params
804
1902
        .start_update()
805
1902
        .padding_enable(send_padding.is_some())
806
1902
        .padding_negotiate(padding_negotiate);
807
1902
    if let Some(params) = send_padding {
808
1414
        update = update.padding_parameters(params);
809
1414
    }
810
1902
    let update = update.finish();
811

            
812
1902
    Ok(update)
813
1902
}
814

            
815
/// Given a `NetDirExtract` and whether we're reducing padding, return a `PaddingParameters`
816
///
817
/// With `PaddingLevel::None`, or the consensus specifies no padding, will return `None`;
818
/// but does not account for other reasons why padding might be enabled/disabled.
819
3810
fn padding_parameters(
820
3810
    config: PaddingLevel,
821
3810
    netdir: &NetParamsExtract,
822
3810
) -> StdResult<Option<PaddingParameters>, tor_error::Bug> {
823
3810
    let reduced = match config {
824
50
        PaddingLevel::Reduced => true,
825
3760
        PaddingLevel::Normal => false,
826
        PaddingLevel::None => return Ok(None),
827
    };
828

            
829
3810
    padding_parameters_builder(reduced, netdir)
830
3811
        .unwrap_or_else(|e: &str| {
831
2
            info!(
832
                "consensus channel padding parameters wrong, using defaults: {}",
833
                &e,
834
            );
835
2
            Some(PaddingParametersBuilder::default())
836
2
        })
837
4023
        .map(|p| {
838
3786
            p.build()
839
3786
                .map_err(into_internal!("failed to build padding parameters"))
840
3786
        })
841
3810
        .transpose()
842
3810
}
843

            
844
/// Given a `NetDirExtract` and whether we're reducing padding,
845
/// return a `PaddingParametersBuilder`
846
///
847
/// If the consensus specifies no padding, will return `None`;
848
/// but does not account for other reasons why padding might be enabled/disabled.
849
///
850
/// If `Err`, the string is a description of what is wrong with the parameters;
851
/// the caller should use `PaddingParameters::Default`.
852
3810
fn padding_parameters_builder(
853
3810
    reduced: bool,
854
3810
    netdir: &NetParamsExtract,
855
3810
) -> StdResult<Option<PaddingParametersBuilder>, &'static str> {
856
3810
    let mut p = PaddingParametersBuilder::default();
857

            
858
3810
    let low = netdir.pad_low(reduced);
859
3810
    let high = netdir.pad_high(reduced);
860
3810
    if low > high {
861
2
        return Err("low > high");
862
3808
    }
863
3808
    if low.as_millis() == 0 && high.as_millis() == 0 {
864
        // Zeroes for both channel padding consensus parameters means "don't send padding".
865
        // padding-spec.txt s2.6, see description of `nf_ito_high`.
866
24
        return Ok(None);
867
3784
    }
868
3784
    p.low(low);
869
3784
    p.high(high);
870
3784
    Ok::<_, &'static str>(Some(p))
871
3810
}
872

            
873
#[cfg(test)]
874
mod test {
875
    // @@ begin test lint list maintained by maint/add_warning @@
876
    #![allow(clippy::bool_assert_comparison)]
877
    #![allow(clippy::clone_on_copy)]
878
    #![allow(clippy::dbg_macro)]
879
    #![allow(clippy::mixed_attributes_style)]
880
    #![allow(clippy::print_stderr)]
881
    #![allow(clippy::print_stdout)]
882
    #![allow(clippy::single_char_pattern)]
883
    #![allow(clippy::unwrap_used)]
884
    #![allow(clippy::unchecked_time_subtraction)]
885
    #![allow(clippy::useless_vec)]
886
    #![allow(clippy::needless_pass_by_value)]
887
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
888

            
889
    use super::*;
890
    use crate::factory::BootstrapReporter;
891
    use async_trait::async_trait;
892
    #[cfg(feature = "relay")]
893
    use safelog::Sensitive;
894
    use std::sync::{Arc, Mutex};
895
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
896
    use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
897
    use tor_proto::memquota::ChannelAccount;
898

            
899
    fn new_test_state() -> MgrState<FakeChannelFactory> {
900
        MgrState::new(
901
            FakeChannelFactory::default(),
902
            ChannelConfig::default(),
903
            Default::default(),
904
            &Default::default(),
905
        )
906
    }
907

            
908
    #[derive(Clone, Debug, Default)]
909
    struct FakeChannelFactory {}
910

            
911
    #[allow(clippy::diverging_sub_expression)] // for unimplemented!() + async_trait
912
    #[async_trait]
913
    impl AbstractChannelFactory for FakeChannelFactory {
914
        type Channel = FakeChannel;
915

            
916
        type BuildSpec = tor_linkspec::OwnedChanTarget;
917

            
918
        type Stream = ();
919

            
920
        async fn build_channel(
921
            &self,
922
            _target: &Self::BuildSpec,
923
            _reporter: BootstrapReporter,
924
            _memquota: ChannelAccount,
925
        ) -> Result<Arc<FakeChannel>> {
926
            unimplemented!()
927
        }
928

            
929
        #[cfg(feature = "relay")]
930
        async fn build_channel_using_incoming(
931
            &self,
932
            _peer: Sensitive<std::net::SocketAddr>,
933
            _stream: Self::Stream,
934
            _memquota: ChannelAccount,
935
        ) -> Result<Arc<Self::Channel>> {
936
            unimplemented!()
937
        }
938
    }
939

            
940
    #[derive(Clone, Debug)]
941
    struct FakeChannel {
942
        ed_ident: Ed25519Identity,
943
        usable: bool,
944
        unused_duration: Option<u64>,
945
        params_update: Arc<Mutex<Option<Arc<ChannelPaddingInstructionsUpdates>>>>,
946
    }
947
    impl AbstractChannel for FakeChannel {
948
        fn is_canonical(&self) -> bool {
949
            unimplemented!()
950
        }
951
        fn is_canonical_to_peer(&self) -> bool {
952
            unimplemented!()
953
        }
954
        fn is_usable(&self) -> bool {
955
            self.usable
956
        }
957
        fn duration_unused(&self) -> Option<Duration> {
958
            self.unused_duration.map(Duration::from_secs)
959
        }
960
        fn reparameterize(
961
            &self,
962
            update: Arc<ChannelPaddingInstructionsUpdates>,
963
        ) -> tor_proto::Result<()> {
964
            *self.params_update.lock().unwrap() = Some(update);
965
            Ok(())
966
        }
967
        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
968
            Ok(())
969
        }
970
        fn engage_padding_activities(&self) {}
971
    }
972
    impl tor_linkspec::HasRelayIds for FakeChannel {
973
        fn identity(
974
            &self,
975
            key_type: tor_linkspec::RelayIdType,
976
        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
977
            match key_type {
978
                tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
979
                _ => None,
980
            }
981
        }
982
    }
983
    /// Get a fake ed25519 identity from the first byte of a string.
984
    fn str_to_ed(s: &str) -> Ed25519Identity {
985
        let byte = s.as_bytes()[0];
986
        [byte; 32].into()
987
    }
988
    fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
989
        let channel = FakeChannel {
990
            ed_ident: str_to_ed(ident),
991
            usable: true,
992
            unused_duration: None,
993
            params_update: Arc::new(Mutex::new(None)),
994
        };
995
        ChannelState::Open(OpenEntry {
996
            channel: Arc::new(channel),
997
            max_unused_duration: Duration::from_secs(180),
998
        })
999
    }
    fn ch_with_details(
        ident: &'static str,
        max_unused_duration: Duration,
        unused_duration: Option<u64>,
    ) -> ChannelState<FakeChannel> {
        let channel = FakeChannel {
            ed_ident: str_to_ed(ident),
            usable: true,
            unused_duration,
            params_update: Arc::new(Mutex::new(None)),
        };
        ChannelState::Open(OpenEntry {
            channel: Arc::new(channel),
            max_unused_duration,
        })
    }
    fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
        let channel = FakeChannel {
            ed_ident: str_to_ed(ident),
            usable: false,
            unused_duration: None,
            params_update: Arc::new(Mutex::new(None)),
        };
        ChannelState::Open(OpenEntry {
            channel: Arc::new(channel),
            max_unused_duration: Duration::from_secs(180),
        })
    }
    #[test]
    fn rmv_unusable() -> Result<()> {
        let map = new_test_state();
        map.with_channels(|map| {
            map.insert(closed("machen"));
            map.insert(closed("wir"));
            map.insert(ch("wir"));
            map.insert(ch("feinen"));
            map.insert(ch("Fug"));
            map.insert(ch("Fug"));
        })?;
        map.remove_unusable().unwrap();
        map.with_channels(|map| {
            assert_eq!(map.by_id(&str_to_ed("m")).len(), 0);
            assert_eq!(map.by_id(&str_to_ed("w")).len(), 1);
            assert_eq!(map.by_id(&str_to_ed("f")).len(), 1);
            assert_eq!(map.by_id(&str_to_ed("F")).len(), 2);
        })?;
        Ok(())
    }
    #[test]
    fn reparameterize_via_netdir() -> Result<()> {
        let map = new_test_state();
        // Set some non-default parameters so that we can tell when an update happens
        let _ = map
            .inner
            .lock()
            .unwrap()
            .channels_params
            .padding
            .start_update()
            .padding_parameters(
                PaddingParametersBuilder::default()
                    .low(1234.into())
                    .build()
                    .unwrap(),
            )
            .finish();
        map.with_channels(|map| {
            map.insert(ch("track"));
        })?;
        let netdir = tor_netdir::testnet::construct_netdir()
            .unwrap_if_sufficient()
            .unwrap();
        let netdir = Arc::new(netdir);
        let with_ch = |f: &dyn Fn(&FakeChannel)| {
            let inner = map.inner.lock().unwrap();
            let mut ch = inner.channels.by_ed25519(&str_to_ed("t"));
            let ch = ch.next().unwrap().unwrap_open();
            f(ch);
        };
        eprintln!("-- process a default netdir, which should send an update --");
        map.reconfigure_general(None, None, netdir.clone()).unwrap();
        with_ch(&|ch| {
            assert_eq!(
                format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()),
                // evade field visibility by (ab)using Debug impl
                "ChannelPaddingInstructionsUpdates { padding_enable: None, \
                    padding_parameters: Some(Parameters { \
                        low: IntegerMilliseconds { value: 1500 }, \
                        high: IntegerMilliseconds { value: 9500 } }), \
                    padding_negotiate: None }"
            );
        });
        eprintln!();
        eprintln!("-- process a default netdir again, which should *not* send an update --");
        map.reconfigure_general(None, None, netdir).unwrap();
        with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none()));
        Ok(())
    }
    #[test]
    fn expire_channels() -> Result<()> {
        let map = new_test_state();
        // Channel that has been unused beyond max duration allowed is expired
        map.with_channels(|map| {
            map.insert(ch_with_details(
                "wello",
                Duration::from_secs(180),
                Some(181),
            ));
        })?;
        // Minimum value of max unused duration is 180 seconds
        assert_eq!(180, map.expire_channels().as_secs());
        map.with_channels(|map| {
            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 0);
        })?;
        let map = new_test_state();
        // Channel that has been unused for shorter than max unused duration
        map.with_channels(|map| {
            map.insert(ch_with_details(
                "wello",
                Duration::from_secs(180),
                Some(120),
            ));
            map.insert(ch_with_details(
                "yello",
                Duration::from_secs(180),
                Some(170),
            ));
            // Channel that has been unused beyond max duration allowed is expired
            map.insert(ch_with_details(
                "gello",
                Duration::from_secs(180),
                Some(181),
            ));
            // Closed channel should be retained
            map.insert(closed("hello"));
        })?;
        // Return duration until next channel expires
        assert_eq!(10, map.expire_channels().as_secs());
        map.with_channels(|map| {
            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("y")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("h")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("g")).len(), 0);
        })?;
        Ok(())
    }
}