1
//! Simple implementation for the internal map state of a ChanMgr.
2

            
3
use std::time::Duration;
4

            
5
use super::AbstractChannelFactory;
6
use super::{AbstractChannel, Pending, Sending, select};
7
use crate::{ChannelConfig, Dormancy, Error, Result};
8

            
9
use futures::FutureExt;
10
use std::result::Result as StdResult;
11
use std::sync::Arc;
12
use std::sync::atomic::{AtomicU64, Ordering};
13
use tor_async_utils::oneshot;
14
use tor_basic_utils::RngExt as _;
15
use tor_cell::chancell::msg::PaddingNegotiate;
16
use tor_config::PaddingLevel;
17
use tor_error::{error_report, internal, into_internal};
18
use tor_linkspec::{HasRelayIds, ListByRelayIds, RelayIds};
19
use tor_netdir::{params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND, params::NetParameters};
20
use tor_proto::ChannelPaddingInstructions;
21
use tor_proto::channel::ChannelPaddingInstructionsUpdates;
22
use tor_proto::channel::kist::{KistMode, KistParams};
23
use tor_proto::channel::padding::Parameters as PaddingParameters;
24
use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
25
use tor_units::{BoundedInt32, IntegerMilliseconds};
26
use tracing::{info, instrument};
27
use void::{ResultVoidExt as _, Void};
28

            
29
#[cfg(test)]
30
mod padding_test;
31

            
32
/// All mutable state held by an `AbstractChannelMgr`.
33
///
34
/// One reason that this is an isolated type is that we want to
35
/// to limit the amount of code that can see and
36
/// lock the Mutex here.  (We're using a blocking mutex close to async
37
/// code, so we need to be careful.)
38
pub(crate) struct MgrState<C: AbstractChannelFactory> {
39
    /// The data, within a lock
40
    ///
41
    /// (Danger: this uses a blocking mutex close to async code.  This mutex
42
    /// must never be held while an await is happening.)
43
    inner: std::sync::Mutex<Inner<C>>,
44
}
45

            
46
/// Parameters for channels that we create, and that all existing channels are using
47
struct ChannelParams {
48
    /// Channel padding instructions
49
    padding: ChannelPaddingInstructions,
50

            
51
    /// KIST parameters
52
    kist: KistParams,
53
}
54

            
55
/// A map from channel id to channel state, plus necessary auxiliary state - inside lock
56
struct Inner<C: AbstractChannelFactory> {
57
    /// The channel factory type that we store.
58
    ///
59
    /// In this module we never use this _as_ an AbstractChannelFactory: we just
60
    /// hand out clones of it when asked.
61
    builder: C,
62

            
63
    /// A map from identity to channels, or to pending channel statuses.
64
    channels: ListByRelayIds<ChannelState<C::Channel>>,
65

            
66
    /// A list of unauthenticated channels meaning they are client/bridge -> relay. We populate
67
    /// this list as a relay responder accepting incoming connections.
68
    ///
69
    /// Notice here that [`ChannelState`] is NOT used because we don't need a pending state of
70
    /// unauthenticated channels (inbound client/bridge) because multiple channel from the same
71
    /// peer can coexist. Furthermore, these channels are never looked up for normal relay
72
    /// operations and so a pending state is not needed.
73
    #[cfg(feature = "relay")]
74
    unauth_channels: Vec<Arc<C::Channel>>,
75

            
76
    /// Parameters for channels that we create, and that all existing channels are using
77
    ///
78
    /// Will be updated by a background task, which also notifies all existing
79
    /// `Open` channels via `channels`.
80
    ///
81
    /// (Must be protected by the same lock as `channels`, or a channel might be
82
    /// created using being-replaced parameters, but not get an update.)
83
    channels_params: ChannelParams,
84

            
85
    /// The configuration (from the config file or API caller)
86
    config: ChannelConfig,
87

            
88
    /// Dormancy
89
    ///
90
    /// The last dormancy information we have been told about and passed on to our channels.
91
    /// Updated via `MgrState::set_dormancy` and hence `MgrState::reconfigure_general`,
92
    /// which then uses it to calculate how to reconfigure the channels.
93
    dormancy: Dormancy,
94
}
95

            
96
/// The state of a channel (or channel build attempt) within a map.
97
///
98
/// A ChannelState can be Open (representing a fully negotiated channel) or
99
/// Building (representing a pending attempt to build a channel). Both states
100
/// have a set of RelayIds, but these RelayIds represent slightly different
101
/// things:
102
///  * On a Building channel, the set of RelayIds is all the identities that we
103
///    require the peer to have. (The peer may turn out to have _more_
104
///    identities than this.)
105
///  * On an Open channel, the set of RelayIds is all the identities that
106
///    we were able to successfully authenticate for the peer.
107
pub(crate) enum ChannelState<C> {
108
    /// An open channel.
109
    ///
110
    /// This channel might not be usable: it might be closing or
111
    /// broken.  We need to check its is_usable() method before
112
    /// yielding it to the user.
113
    Open(OpenEntry<C>),
114
    /// A channel that's getting built.
115
    Building(PendingEntry),
116
}
117

            
118
/// An open channel entry.
119
#[derive(Clone)]
120
pub(crate) struct OpenEntry<C> {
121
    /// The underlying open channel.
122
    pub(crate) channel: Arc<C>,
123
    /// The maximum unused duration allowed for this channel.
124
    pub(crate) max_unused_duration: Duration,
125
}
126

            
127
/// A unique ID for a pending ([`PendingEntry`]) channel.
128
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
129
pub(crate) struct UniqPendingChanId(u64);
130

            
131
impl UniqPendingChanId {
132
    /// Construct a new `UniqPendingChanId`.
133
98
    pub(crate) fn new() -> Self {
134
        /// The next unique ID.
135
        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
136
        // Relaxed ordering is fine; we don't care about how this
137
        // is instantiated with respect to other channels.
138
98
        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
139
98
        assert!(id != u64::MAX, "Exhausted the pending channel ID namespace");
140
98
        Self(id)
141
98
    }
142
}
143

            
144
impl std::fmt::Display for UniqPendingChanId {
145
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146
        write!(f, "PendingChan {}", self.0)
147
    }
148
}
149

            
150
/// An entry for a not-yet-build channel
151
#[derive(Clone)]
152
pub(crate) struct PendingEntry {
153
    /// The keys of the relay to which we're trying to open a channel.
154
    pub(crate) ids: RelayIds,
155

            
156
    /// A future we can clone and listen on to learn when this channel attempt
157
    /// is successful or failed.
158
    ///
159
    /// This entry will be removed from the map (and possibly replaced with an
160
    /// `OpenEntry`) _before_ this future becomes ready.
161
    pub(crate) pending: Pending,
162

            
163
    /// A unique ID that allows us to find this exact pending entry later.
164
    pub(crate) unique_id: UniqPendingChanId,
165
}
166

            
167
impl<C> HasRelayIds for ChannelState<C>
168
where
169
    C: HasRelayIds,
170
{
171
920
    fn identity(
172
920
        &self,
173
920
        key_type: tor_linkspec::RelayIdType,
174
920
    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
175
920
        match self {
176
436
            ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
177
484
            ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
178
        }
179
920
    }
180
}
181

            
182
impl<C: Clone> ChannelState<C> {
183
    /// For testing: either give the Open channel inside this state,
184
    /// or panic if there is none.
185
    #[cfg(test)]
186
4
    fn unwrap_open(&self) -> &C {
187
4
        match self {
188
4
            ChannelState::Open(ent) => &ent.channel,
189
            _ => panic!("Not an open channel"),
190
        }
191
4
    }
192
}
193

            
194
/// Type of the `nf_ito_*` netdir parameters, convenience alias
195
type NfIto = IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>>;
196

            
197
/// Extract from a `NetParameters` which we need, conveniently organized for our processing
198
///
199
/// This type serves two functions at once:
200
///
201
///  1. Being a subset of the parameters, we can copy it out of
202
///     the netdir, before we do more complex processing - and, in particular,
203
///     before we obtain the lock on `inner` (which we need to actually handle the update,
204
///     because we need to combine information from the config with that from the netdir).
205
///
206
///  2. Rather than four separate named fields for the padding options,
207
///     it has arrays, so that it is easy to
208
///     select the values without error-prone recapitulation of field names.
209
#[derive(Debug, Clone)]
210
struct NetParamsExtract {
211
    /// `nf_ito_*`, the padding timeout parameters from the netdir consensus
212
    ///
213
    /// `nf_ito[ 0=normal, 1=reduced ][ 0=low, 1=high ]`
214
    /// are `nf_ito_{low,high}{,_reduced` from `NetParameters`.
215
    // TODO we could use some enum or IndexVec or something to make this less `0` and `1`
216
    nf_ito: [[NfIto; 2]; 2],
217

            
218
    /// The KIST parameters.
219
    kist: KistParams,
220
}
221

            
222
impl From<&NetParameters> for NetParamsExtract {
223
1988
    fn from(p: &NetParameters) -> Self {
224
1988
        let kist_enabled = kist_mode_from_net_parameter(p.kist_enabled);
225
        // NOTE: in theory, this cast shouldn't be needed
226
        // (kist_tcp_notsent_lowat is supposed to be a u32, not an i32).
227
        // In practice, however, the type conversion is needed
228
        // because consensus params are i32s.
229
        //
230
        // See the `NetParameters::kist_tcp_notsent_lowat` docs for more details.
231
1988
        let tcp_notsent_lowat = u32::from(p.kist_tcp_notsent_lowat);
232
1988
        let kist = KistParams::new(kist_enabled, tcp_notsent_lowat);
233

            
234
1988
        NetParamsExtract {
235
1988
            nf_ito: [
236
1988
                [p.nf_ito_low, p.nf_ito_high],
237
1988
                [p.nf_ito_low_reduced, p.nf_ito_high_reduced],
238
1988
            ],
239
1988
            kist,
240
1988
        }
241
1988
    }
242
}
243

            
244
/// Build a `KistMode` from [`NetParameters`].
245
///
246
/// Used for converting [`kist_enabled`](NetParameters::kist_enabled)
247
/// to a corresponding `KistMode`.
248
1988
fn kist_mode_from_net_parameter(val: BoundedInt32<0, 1>) -> KistMode {
249
    caret::caret_int! {
250
        /// KIST flavor, defined by a numerical value read from the consensus.
251
        struct KistType(i32) {
252
            /// KIST disabled
253
            DISABLED = 0,
254
            /// KIST using TCP_NOTSENT_LOWAT.
255
            TCP_NOTSENT_LOWAT = 1,
256
        }
257
    }
258

            
259
1988
    match val.get().into() {
260
1988
        KistType::DISABLED => KistMode::Disabled,
261
        KistType::TCP_NOTSENT_LOWAT => KistMode::TcpNotSentLowat,
262
        _ => unreachable!("BoundedInt32 was not bounded?!"),
263
    }
264
1988
}
265

            
266
impl NetParamsExtract {
267
    /// Return the padding timer parameter low end, for reduced-ness `reduced`, as a `u32`
268
3970
    fn pad_low(&self, reduced: bool) -> IntegerMilliseconds<u32> {
269
3970
        self.pad_get(reduced, 0)
270
3970
    }
271
    /// Return the padding timer parameter high end, for reduced-ness `reduced`, as a `u32`
272
3970
    fn pad_high(&self, reduced: bool) -> IntegerMilliseconds<u32> {
273
3970
        self.pad_get(reduced, 1)
274
3970
    }
275

            
276
    /// Return and converts one padding parameter timer
277
    ///
278
    /// Internal function.
279
7940
    fn pad_get(&self, reduced: bool, low_or_high: usize) -> IntegerMilliseconds<u32> {
280
7940
        self.nf_ito[usize::from(reduced)][low_or_high]
281
8390
            .try_map(|v| Ok::<_, Void>(v.into()))
282
7940
            .void_unwrap()
283
7940
    }
284
}
285

            
286
impl<C: AbstractChannel> ChannelState<C> {
287
    /// Return true if a channel is ready to expire.
288
    /// Update `expire_after` if a smaller duration than
289
    /// the given value is required to expire this channel.
290
10
    fn ready_to_expire(&self, expire_after: &mut Duration) -> bool {
291
10
        let ChannelState::Open(ent) = self else {
292
            return false;
293
        };
294
10
        let Some(unused_duration) = ent.channel.duration_unused() else {
295
            // still in use
296
2
            return false;
297
        };
298
8
        let max_unused_duration = ent.max_unused_duration;
299
8
        let Some(remaining) = max_unused_duration.checked_sub(unused_duration) else {
300
            // no time remaining; drop now.
301
4
            return true;
302
        };
303
4
        if remaining.is_zero() {
304
            // Ignoring this edge case would result in a fairly benign race
305
            // condition outside of Shadow, but deadlock in Shadow.
306
            return true;
307
4
        }
308
4
        *expire_after = std::cmp::min(*expire_after, remaining);
309
4
        false
310
10
    }
311
}
312

            
313
impl<C: AbstractChannelFactory> MgrState<C> {
314
    /// Create a new empty `MgrState`.
315
124
    pub(crate) fn new(
316
124
        builder: C,
317
124
        config: ChannelConfig,
318
124
        dormancy: Dormancy,
319
124
        netparams: &NetParameters,
320
124
    ) -> Self {
321
124
        let mut padding_params = ChannelPaddingInstructions::default();
322
124
        let netparams = NetParamsExtract::from(netparams);
323
124
        let kist_params = netparams.kist;
324
124
        let update = parameterize(&mut padding_params, &config, dormancy, &netparams)
325
124
            .unwrap_or_else(|e: tor_error::Bug| panic!("bug detected on startup: {:?}", e));
326
124
        let _: Option<_> = update; // there are no channels yet, that would need to be told
327

            
328
124
        let channels_params = ChannelParams {
329
124
            padding: padding_params,
330
124
            kist: kist_params,
331
124
        };
332

            
333
124
        MgrState {
334
124
            inner: std::sync::Mutex::new(Inner {
335
124
                builder,
336
124
                channels: ListByRelayIds::new(),
337
124
                #[cfg(feature = "relay")]
338
124
                unauth_channels: Vec::new(),
339
124
                config,
340
124
                channels_params,
341
124
                dormancy,
342
124
            }),
343
124
        }
344
124
    }
345

            
346
    /// Run a function on the [`ListByRelayIds`] that implements the map in this `MgrState`.
347
    ///
348
    /// This function grabs a mutex: do not provide a slow function.
349
    ///
350
    /// We provide this function rather than exposing the channels set directly,
351
    /// to make sure that the calling code doesn't await while holding the lock.
352
    ///
353
    /// This is only `cfg(test)` since it can deadlock.
354
    ///
355
    /// # Deadlock
356
    ///
357
    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
358
    #[cfg(test)]
359
26
    pub(crate) fn with_channels<F, T>(&self, func: F) -> Result<T>
360
26
    where
361
26
        F: FnOnce(&mut ListByRelayIds<ChannelState<C::Channel>>) -> T,
362
    {
363
26
        let mut inner = self.inner.lock()?;
364
26
        Ok(func(&mut inner.channels))
365
26
    }
366

            
367
    /// Return a copy of the builder stored in this state.
368
74
    pub(crate) fn builder(&self) -> C
369
74
    where
370
74
        C: Clone,
371
    {
372
74
        let inner = self.inner.lock().expect("lock poisoned");
373
74
        inner.builder.clone()
374
74
    }
375

            
376
    /// Run a function to modify the builder stored in this state.
377
    ///
378
    /// # Deadlock
379
    ///
380
    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
381
    #[allow(unused)]
382
22
    pub(crate) fn with_mut_builder<F>(&self, func: F)
383
22
    where
384
22
        F: FnOnce(&mut C),
385
    {
386
22
        let mut inner = self.inner.lock().expect("lock poisoned");
387
22
        func(&mut inner.builder);
388
22
    }
389

            
390
    /// Add an open channel into our list.
391
    #[cfg(feature = "relay")]
392
    pub(crate) fn add_open(&self, channel: Arc<C::Channel>) -> Result<()> {
393
        let mut inner = self.inner.lock()?;
394
        // Make sure this channel has verified relay identities. Else, put it in the
395
        // unauthenticated channel list (ex: client channel as a relay responder).
396
        if channel.has_any_identity() {
397
            inner.channels.insert(ChannelState::Open(OpenEntry {
398
                channel,
399
                // TODO(relay): Relay need a different unused duration (if any). We can't use the
400
                // client timeout value. Need to be figured out before production.
401
                max_unused_duration: Self::random_max_unused_duration(),
402
            }));
403
        } else {
404
            inner.unauth_channels.push(channel);
405
        }
406
        Ok(())
407
    }
408

            
409
    /// Remove every unusable state from the map in this state.
410
    #[cfg(test)]
411
4
    pub(crate) fn remove_unusable(&self) -> Result<()> {
412
4
        let mut inner = self.inner.lock()?;
413
20
        inner.channels.retain(|state| match state {
414
20
            ChannelState::Open(ent) => ent.channel.is_usable(),
415
            ChannelState::Building(_) => true,
416
20
        });
417
4
        Ok(())
418
4
    }
419

            
420
    /// Request an open or pending channel to `target`. If `add_new_entry_if_not_found` is true and
421
    /// an open or pending channel isn't found, a new pending entry will be added and
422
    /// [`ChannelForTarget::NewEntry`] will be returned. This is all done as part of the same method
423
    /// so that all operations are performed under the same lock acquisition.
424
94
    pub(crate) fn request_channel(
425
94
        &self,
426
94
        target: &C::BuildSpec,
427
94
        add_new_entry_if_not_found: bool,
428
94
    ) -> Result<Option<ChannelForTarget<C>>> {
429
        use ChannelState::*;
430

            
431
94
        let mut inner = self.inner.lock()?;
432

            
433
        // The idea here is to choose the channel in two steps:
434
        //
435
        // - Eligibility: Get channels from the channel map and filter them down to only channels
436
        //   which are eligible to be returned.
437
        // - Ranking: From the eligible channels, choose the best channel.
438
        //
439
        // Another way to choose the channel could be something like: first try all canonical open
440
        // channels, then all non-canonical open channels, then all pending channels with all
441
        // matching relay ids, then remaining pending channels, etc. But this ends up being hard to
442
        // follow and inflexible (what if you want to prioritize pending channels over non-canonical
443
        // open channels?).
444

            
445
        // Open channels which are allowed for requests to `target`.
446
94
        let open_channels = inner
447
94
            .channels
448
            // channels with all target relay identifiers
449
94
            .by_all_ids(target)
450
94
            .filter(|entry| match entry {
451
12
                Open(x) => select::open_channel_is_allowed(x, target),
452
10
                Building(_) => false,
453
22
            });
454

            
455
        // Pending channels which will *probably* be allowed for requests to `target` once they
456
        // complete.
457
94
        let pending_channels = inner
458
94
            .channels
459
            // channels that have a subset of the relay ids of `target`
460
94
            .all_subset(target)
461
94
            .into_iter()
462
94
            .filter(|entry| match entry {
463
12
                Open(_) => false,
464
10
                Building(x) => select::pending_channel_maybe_allowed(x, target),
465
22
            });
466

            
467
94
        match select::choose_best_channel(open_channels.chain(pending_channels), target) {
468
10
            Some(Open(OpenEntry { channel, .. })) => {
469
                // This entry is a perfect match for the target keys: we'll return the open
470
                // entry.
471
10
                return Ok(Some(ChannelForTarget::Open(Arc::clone(channel))));
472
            }
473
10
            Some(Building(PendingEntry { pending, .. })) => {
474
                // This entry is potentially a match for the target identities: we'll return the
475
                // pending entry. (We don't know for sure if it will match once it completes,
476
                // since we might discover additional keys beyond those listed for this pending
477
                // entry.)
478
10
                return Ok(Some(ChannelForTarget::Pending(pending.clone())));
479
            }
480
74
            None => {}
481
        }
482

            
483
        // It's possible we know ahead of time that building a channel would be unsuccessful.
484
74
        if inner
485
74
            .channels
486
            // channels with at least one id in common with `target`
487
74
            .all_overlapping(target)
488
74
            .into_iter()
489
            // but not channels which completely satisfy the id requirements of `target`
490
74
            .filter(|entry| !entry.has_all_relay_ids_from(target))
491
74
            .any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable()))
492
        {
493
            // At least one *open, usable* channel has been negotiated that overlaps only
494
            // partially with our target: it has proven itself to have _one_ of our target
495
            // identities, but not all.
496
            //
497
            // Because this channel exists, we know that our target cannot succeed, since relays
498
            // are not allowed to share _any_ identities.
499
            //return Ok(Some(Action::Return(Err(Error::IdentityConflict))));
500
            return Err(Error::IdentityConflict);
501
74
        }
502

            
503
74
        if !add_new_entry_if_not_found {
504
            return Ok(None);
505
74
        }
506

            
507
        // Great, nothing interfered at all.
508
74
        let any_relay_id = target
509
74
            .identities()
510
74
            .next()
511
74
            .ok_or(internal!("relay target had no id"))?
512
74
            .to_owned();
513
74
        let (new_state, send, unique_id) = setup_launch(RelayIds::from_relay_ids(target));
514
74
        inner
515
74
            .channels
516
74
            .try_insert(ChannelState::Building(new_state))?;
517
74
        let handle = PendingChannelHandle::new(any_relay_id, unique_id);
518
74
        Ok(Some(ChannelForTarget::NewEntry((handle, send))))
519
94
    }
520

            
521
    /// Remove the pending channel identified by its `handle`.
522
8
    pub(crate) fn remove_pending_channel(&self, handle: PendingChannelHandle) -> Result<()> {
523
8
        let mut inner = self.inner.lock()?;
524
8
        remove_pending(&mut inner.channels, handle);
525
8
        Ok(())
526
8
    }
527

            
528
    /// Upgrade the pending channel identified by its `handle` by replacing it with a new open
529
    /// `channel`.
530
    #[instrument(skip_all, level = "trace")]
531
66
    pub(crate) fn upgrade_pending_channel_to_open(
532
66
        &self,
533
66
        handle: PendingChannelHandle,
534
66
        channel: Arc<C::Channel>,
535
66
    ) -> Result<()> {
536
        // Do all operations under the same lock acquisition.
537
66
        let mut inner = self.inner.lock()?;
538

            
539
66
        remove_pending(&mut inner.channels, handle);
540

            
541
        // This isn't great.  We context switch to the newly-created
542
        // channel just to tell it how and whether to do padding.  Ideally
543
        // we would pass the params at some suitable point during
544
        // building.  However, that would involve the channel taking a
545
        // copy of the params, and that must happen in the same channel
546
        // manager lock acquisition span as the one where we insert the
547
        // channel into the table so it will receive updates.  I.e.,
548
        // here.
549
66
        let update = inner.channels_params.padding.initial_update();
550
66
        if let Some(update) = update {
551
66
            channel
552
66
                .reparameterize(update.into())
553
66
                .map_err(|_| internal!("failure on new channel"))?;
554
        }
555
66
        let new_entry = ChannelState::Open(OpenEntry {
556
66
            channel,
557
66
            max_unused_duration: Self::random_max_unused_duration(),
558
66
        });
559
66
        inner.channels.insert(new_entry);
560

            
561
66
        Ok(())
562
66
    }
563

            
564
    /// Reconfigure all channels as necessary
565
    ///
566
    /// (By reparameterizing channels as needed)
567
    /// This function will handle
568
    ///   - netdir update
569
    ///   - a reconfiguration
570
    ///   - dormancy
571
    ///
572
    /// For `new_config` and `new_dormancy`, `None` means "no change to previous info".
573
98
    pub(super) fn reconfigure_general(
574
98
        &self,
575
98
        new_config: Option<&ChannelConfig>,
576
98
        new_dormancy: Option<Dormancy>,
577
98
        netparams: Arc<dyn AsRef<NetParameters>>,
578
98
    ) -> StdResult<(), tor_error::Bug> {
579
        use ChannelState as CS;
580

            
581
        // TODO when we support operation as a relay, inter-relay channels ought
582
        // not to get padding.
583
98
        let netdir = {
584
98
            let extract = NetParamsExtract::from((*netparams).as_ref());
585
98
            drop(netparams);
586
98
            extract
587
        };
588

            
589
98
        let mut inner = self
590
98
            .inner
591
98
            .lock()
592
98
            .map_err(|_| internal!("poisoned channel manager"))?;
593
98
        let inner = &mut *inner;
594

            
595
98
        if let Some(new_config) = new_config {
596
28
            inner.config = new_config.clone();
597
70
        }
598
98
        if let Some(new_dormancy) = new_dormancy {
599
42
            inner.dormancy = new_dormancy;
600
70
        }
601

            
602
98
        let update = parameterize(
603
98
            &mut inner.channels_params.padding,
604
98
            &inner.config,
605
98
            inner.dormancy,
606
98
            &netdir,
607
        )?;
608

            
609
98
        let update = update.map(Arc::new);
610

            
611
98
        let new_kist_params = netdir.kist;
612
98
        let kist_params = if new_kist_params != inner.channels_params.kist {
613
            // The KIST params have changed: remember their value,
614
            // and reparameterize_kist()
615
            inner.channels_params.kist = new_kist_params;
616
            Some(new_kist_params)
617
        } else {
618
            // If the new KIST params are identical to the previous ones,
619
            // we don't need to call reparameterize_kist()
620
98
            None
621
        };
622

            
623
98
        if update.is_none() && kist_params.is_none() {
624
            // Return early, nothing to reconfigure
625
36
            return Ok(());
626
62
        }
627

            
628
62
        let channels = inner.channels.values().filter_map(|chan| match chan {
629
62
            CS::Open(OpenEntry { channel, .. }) => Some(channel),
630
            CS::Building(_) => None,
631
62
        });
632
        #[cfg(feature = "relay")]
633
62
        let channels = channels.chain(inner.unauth_channels.iter());
634

            
635
124
        for channel in channels {
636
62
            if let Some(ref update) = update {
637
62
                // Ignore error (which simply means the channel is closed or gone)
638
62
                let _ = channel.reparameterize(Arc::clone(update));
639
62
            }
640

            
641
62
            if let Some(kist) = kist_params {
642
                // Ignore error (which simply means the channel is closed or gone)
643
                let _ = channel.reparameterize_kist(kist);
644
62
            }
645
        }
646

            
647
62
        Ok(())
648
98
    }
649

            
650
    /// Expire all channels that have been unused for too long.
651
    ///
652
    /// Return a Duration until the next time at which
653
    /// a channel _could_ expire.
654
40
    pub(crate) fn expire_channels(&self) -> Duration {
655
        // TODO(relay): First, I don't think dropping the ChannelState<> from the channel list
656
        // actually closes a channel reactor. Because it holds a Arc<Channel>, the ref counter is
657
        // simply decremented but the reactor still goes on. We can't have guarantees on an object
658
        // in an `Arc<>` to be cleaned up when we drop it as it defeats the purpose of using an Arc
659
        // as we don't know if other reference are held elsewhere.
660
        //
661
        // Second, Unauthenticated channels are relay only as in client/bridge connecting inbound.
662
        // We need to expire any unused as well.
663
        //
664
        // Taking both points above, maybe it could be better to move the expiry logic into the
665
        // channel reactor itself and make this function simply retain any channel that are
666
        // `is_usable()`. This would remove the convoluted needs for ChannelDetails shared between
667
        // a `Channel` (reactor handle) and the channel `Reactor`.
668
        //
669
        // We'll address all this in https://gitlab.torproject.org/tpo/core/arti/-/work_items/1600
670

            
671
40
        let mut ret = Duration::from_secs(180);
672
40
        self.inner
673
40
            .lock()
674
40
            .expect("Poisoned lock")
675
40
            .channels
676
46
            .retain(|chan| !chan.ready_to_expire(&mut ret));
677
40
        ret
678
40
    }
679

            
680
    /// Helper: Return the default max unused duration for a channel.
681
66
    fn random_max_unused_duration() -> Duration {
682
66
        Duration::from_secs(
683
66
            rand::rng()
684
66
                .gen_range_checked(180..270)
685
66
                .expect("not 180 < 270 !"),
686
        )
687
66
    }
688
}
689

            
690
/// A channel for a given target relay.
691
pub(crate) enum ChannelForTarget<CF: AbstractChannelFactory> {
692
    /// A channel that is open.
693
    Open(Arc<CF::Channel>),
694
    /// A channel that is building.
695
    Pending(Pending),
696
    /// Information about a new pending channel entry.
697
    NewEntry((PendingChannelHandle, Sending)),
698
}
699

            
700
/// A handle for a pending channel.
701
///
702
/// WARNING: This handle should never be dropped, and should always be passed back into
703
/// [`MgrState::remove_pending_channel`] or [`MgrState::upgrade_pending_channel_to_open`], otherwise
704
/// the pending channel may be left in the channel map forever.
705
///
706
/// This handle must only be used with the `MgrState` from which it was given.
707
pub(crate) struct PendingChannelHandle {
708
    /// Any relay ID for this pending channel.
709
    relay_id: tor_linkspec::RelayId,
710
    /// The unique ID for this pending channel.
711
    unique_id: UniqPendingChanId,
712
    /// The pending channel has been removed from the channel map.
713
    chan_has_been_removed: bool,
714
}
715

            
716
impl PendingChannelHandle {
717
    /// Create a new [`PendingChannelHandle`].
718
74
    fn new(relay_id: tor_linkspec::RelayId, unique_id: UniqPendingChanId) -> Self {
719
74
        Self {
720
74
            relay_id,
721
74
            unique_id,
722
74
            chan_has_been_removed: false,
723
74
        }
724
74
    }
725

            
726
    /// This should be called when the pending channel has been removed from the pending channel
727
    /// map. Not calling this will result in an error log message (and panic in debug builds) when
728
    /// this handle is dropped.
729
74
    fn chan_has_been_removed(mut self) {
730
74
        self.chan_has_been_removed = true;
731
74
    }
732
}
733

            
734
impl std::ops::Drop for PendingChannelHandle {
735
74
    fn drop(&mut self) {
736
74
        if !self.chan_has_been_removed {
737
            #[allow(clippy::missing_docs_in_private_items)]
738
            const MSG: &str = "Dropped the 'PendingChannelHandle' without removing the channel";
739
            error_report!(
740
                internal!("{MSG}"),
741
                "'PendingChannelHandle' dropped unexpectedly",
742
            );
743
74
        }
744
74
    }
745
}
746

            
747
/// Helper: return the objects used to inform pending tasks about a newly open or failed channel.
748
74
fn setup_launch(ids: RelayIds) -> (PendingEntry, Sending, UniqPendingChanId) {
749
74
    let (snd, rcv) = oneshot::channel();
750
74
    let pending = rcv.shared();
751
74
    let unique_id = UniqPendingChanId::new();
752
74
    let entry = PendingEntry {
753
74
        ids,
754
74
        pending,
755
74
        unique_id,
756
74
    };
757

            
758
74
    (entry, snd, unique_id)
759
74
}
760

            
761
/// Helper: remove the pending channel identified by `handle` from `channel_map`.
762
74
fn remove_pending<C: AbstractChannel>(
763
74
    channel_map: &mut tor_linkspec::ListByRelayIds<ChannelState<C>>,
764
74
    handle: PendingChannelHandle,
765
74
) {
766
    // we need only one relay id to locate it, even if it has multiple relay ids
767
76
    let removed = channel_map.remove_by_id(&handle.relay_id, |c| {
768
76
        let ChannelState::Building(c) = c else {
769
2
            return false;
770
        };
771
74
        c.unique_id == handle.unique_id
772
76
    });
773
74
    debug_assert_eq!(removed.len(), 1, "expected to remove exactly one channel");
774

            
775
74
    handle.chan_has_been_removed();
776
74
}
777

            
778
/// Converts config, dormancy, and netdir, into parameter updates
779
///
780
/// Calculates new parameters, updating `channels_params` as appropriate.
781
/// If anything changed, the corresponding update instruction is returned.
782
///
783
/// `channels_params` is updated with the new parameters,
784
/// and the update message, if one is needed, is returned.
785
///
786
/// This is called in two places:
787
///
788
///  1. During chanmgr creation, it is called once to analyze the initial state
789
///     and construct a corresponding ChannelPaddingInstructions.
790
///
791
///  2. During reconfiguration.
792
1982
fn parameterize(
793
1982
    channels_params: &mut ChannelPaddingInstructions,
794
1982
    config: &ChannelConfig,
795
1982
    dormancy: Dormancy,
796
1982
    netdir: &NetParamsExtract,
797
1982
) -> StdResult<Option<ChannelPaddingInstructionsUpdates>, tor_error::Bug> {
798
    // Everything in this calculation applies to *all* channels, disregarding
799
    // channel usage.  Usage is handled downstream, in the channel frontend.
800
    // See the module doc in `crates/tor-proto/src/channel/padding.rs`.
801

            
802
4075
    let padding_of_level = |level| padding_parameters(level, netdir);
803
1982
    let send_padding = padding_of_level(config.padding)?;
804
1982
    let padding_default = padding_of_level(PaddingLevel::default())?;
805

            
806
1982
    let send_padding = match dormancy {
807
1486
        Dormancy::Active => send_padding,
808
496
        Dormancy::Dormant => None,
809
    };
810

            
811
1982
    let recv_padding = match config.padding {
812
48
        PaddingLevel::Reduced => None,
813
1934
        PaddingLevel::Normal => send_padding,
814
        PaddingLevel::None => None,
815
    };
816

            
817
    // Whether the inbound padding approach we are to use, is the same as the default
818
    // derived from the netdir (disregarding our config and dormancy).
819
    //
820
    // Ie, whether the parameters we want are precisely those that a peer would
821
    // use by default (assuming they have the same view of the netdir as us).
822
1982
    let recv_equals_default = recv_padding == padding_default;
823

            
824
1982
    let padding_negotiate = if recv_equals_default {
825
        // Our padding approach is the same as peers' defaults.  So the PADDING_NEGOTIATE
826
        // message we need to send is the START(0,0).  (The channel frontend elides an
827
        // initial message of this form, - see crates/tor-proto/src/channel.rs::note_usage.)
828
        //
829
        // If the netdir default is no padding, and we previously negotiated
830
        // padding being enabled, and now want to disable it, we would send
831
        // START(0,0) rather than STOP.  That is OK (even, arguably, right).
832
1450
        PaddingNegotiate::start_default()
833
    } else {
834
532
        match recv_padding {
835
532
            None => PaddingNegotiate::stop(),
836
            Some(params) => params.padding_negotiate_cell()?,
837
        }
838
    };
839

            
840
1982
    let mut update = channels_params
841
1982
        .start_update()
842
1982
        .padding_enable(send_padding.is_some())
843
1982
        .padding_negotiate(padding_negotiate);
844
1982
    if let Some(params) = send_padding {
845
1474
        update = update.padding_parameters(params);
846
1474
    }
847
1982
    let update = update.finish();
848

            
849
1982
    Ok(update)
850
1982
}
851

            
852
/// Given a `NetDirExtract` and whether we're reducing padding, return a `PaddingParameters`
853
///
854
/// With `PaddingLevel::None`, or the consensus specifies no padding, will return `None`;
855
/// but does not account for other reasons why padding might be enabled/disabled.
856
3970
fn padding_parameters(
857
3970
    config: PaddingLevel,
858
3970
    netdir: &NetParamsExtract,
859
3970
) -> StdResult<Option<PaddingParameters>, tor_error::Bug> {
860
3970
    let reduced = match config {
861
50
        PaddingLevel::Reduced => true,
862
3920
        PaddingLevel::Normal => false,
863
        PaddingLevel::None => return Ok(None),
864
    };
865

            
866
3970
    padding_parameters_builder(reduced, netdir)
867
3971
        .unwrap_or_else(|e: &str| {
868
2
            info!(
869
                "consensus channel padding parameters wrong, using defaults: {}",
870
                &e,
871
            );
872
2
            Some(PaddingParametersBuilder::default())
873
2
        })
874
4183
        .map(|p| {
875
3946
            p.build()
876
3946
                .map_err(into_internal!("failed to build padding parameters"))
877
3946
        })
878
3970
        .transpose()
879
3970
}
880

            
881
/// Given a `NetDirExtract` and whether we're reducing padding,
882
/// return a `PaddingParametersBuilder`
883
///
884
/// If the consensus specifies no padding, will return `None`;
885
/// but does not account for other reasons why padding might be enabled/disabled.
886
///
887
/// If `Err`, the string is a description of what is wrong with the parameters;
888
/// the caller should use `PaddingParameters::Default`.
889
3970
fn padding_parameters_builder(
890
3970
    reduced: bool,
891
3970
    netdir: &NetParamsExtract,
892
3970
) -> StdResult<Option<PaddingParametersBuilder>, &'static str> {
893
3970
    let mut p = PaddingParametersBuilder::default();
894

            
895
3970
    let low = netdir.pad_low(reduced);
896
3970
    let high = netdir.pad_high(reduced);
897
3970
    if low > high {
898
2
        return Err("low > high");
899
3968
    }
900
3968
    if low.as_millis() == 0 && high.as_millis() == 0 {
901
        // Zeroes for both channel padding consensus parameters means "don't send padding".
902
        // padding-spec.txt s2.6, see description of `nf_ito_high`.
903
24
        return Ok(None);
904
3944
    }
905
3944
    p.low(low);
906
3944
    p.high(high);
907
3944
    Ok::<_, &'static str>(Some(p))
908
3970
}
909

            
910
#[cfg(test)]
911
mod test {
912
    // @@ begin test lint list maintained by maint/add_warning @@
913
    #![allow(clippy::bool_assert_comparison)]
914
    #![allow(clippy::clone_on_copy)]
915
    #![allow(clippy::dbg_macro)]
916
    #![allow(clippy::mixed_attributes_style)]
917
    #![allow(clippy::print_stderr)]
918
    #![allow(clippy::print_stdout)]
919
    #![allow(clippy::single_char_pattern)]
920
    #![allow(clippy::unwrap_used)]
921
    #![allow(clippy::unchecked_time_subtraction)]
922
    #![allow(clippy::useless_vec)]
923
    #![allow(clippy::needless_pass_by_value)]
924
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
925

            
926
    use super::*;
927
    use crate::factory::BootstrapReporter;
928
    use async_trait::async_trait;
929
    #[cfg(feature = "relay")]
930
    use safelog::Sensitive;
931
    use std::sync::{Arc, Mutex};
932
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
933
    use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
934
    use tor_proto::memquota::ChannelAccount;
935

            
936
    fn new_test_state() -> MgrState<FakeChannelFactory> {
937
        MgrState::new(
938
            FakeChannelFactory::default(),
939
            ChannelConfig::default(),
940
            Default::default(),
941
            &Default::default(),
942
        )
943
    }
944

            
945
    #[derive(Clone, Debug, Default)]
946
    struct FakeChannelFactory {}
947

            
948
    #[allow(clippy::diverging_sub_expression)] // for unimplemented!() + async_trait
949
    #[async_trait]
950
    impl AbstractChannelFactory for FakeChannelFactory {
951
        type Channel = FakeChannel;
952

            
953
        type BuildSpec = tor_linkspec::OwnedChanTarget;
954

            
955
        type Stream = ();
956

            
957
        async fn build_channel(
958
            &self,
959
            _target: &Self::BuildSpec,
960
            _reporter: BootstrapReporter,
961
            _memquota: ChannelAccount,
962
        ) -> Result<Arc<FakeChannel>> {
963
            unimplemented!()
964
        }
965

            
966
        #[cfg(feature = "relay")]
967
        async fn build_channel_using_incoming(
968
            &self,
969
            _peer: Sensitive<std::net::SocketAddr>,
970
            _stream: Self::Stream,
971
            _memquota: ChannelAccount,
972
        ) -> Result<Arc<Self::Channel>> {
973
            unimplemented!()
974
        }
975
    }
976

            
977
    #[derive(Clone, Debug)]
978
    struct FakeChannel {
979
        ed_ident: Ed25519Identity,
980
        usable: bool,
981
        unused_duration: Option<u64>,
982
        params_update: Arc<Mutex<Option<Arc<ChannelPaddingInstructionsUpdates>>>>,
983
    }
984
    impl AbstractChannel for FakeChannel {
985
        fn is_canonical(&self) -> bool {
986
            unimplemented!()
987
        }
988
        fn is_canonical_to_peer(&self) -> bool {
989
            unimplemented!()
990
        }
991
        fn is_usable(&self) -> bool {
992
            self.usable
993
        }
994
        fn duration_unused(&self) -> Option<Duration> {
995
            self.unused_duration.map(Duration::from_secs)
996
        }
997
        fn reparameterize(
998
            &self,
999
            update: Arc<ChannelPaddingInstructionsUpdates>,
        ) -> tor_proto::Result<()> {
            *self.params_update.lock().unwrap() = Some(update);
            Ok(())
        }
        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
            Ok(())
        }
        fn engage_padding_activities(&self) {}
    }
    impl tor_linkspec::HasRelayIds for FakeChannel {
        fn identity(
            &self,
            key_type: tor_linkspec::RelayIdType,
        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
            match key_type {
                tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
                _ => None,
            }
        }
    }
    /// Get a fake ed25519 identity from the first byte of a string.
    fn str_to_ed(s: &str) -> Ed25519Identity {
        let byte = s.as_bytes()[0];
        [byte; 32].into()
    }
    fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
        let channel = FakeChannel {
            ed_ident: str_to_ed(ident),
            usable: true,
            unused_duration: None,
            params_update: Arc::new(Mutex::new(None)),
        };
        ChannelState::Open(OpenEntry {
            channel: Arc::new(channel),
            max_unused_duration: Duration::from_secs(180),
        })
    }
    fn ch_with_details(
        ident: &'static str,
        max_unused_duration: Duration,
        unused_duration: Option<u64>,
    ) -> ChannelState<FakeChannel> {
        let channel = FakeChannel {
            ed_ident: str_to_ed(ident),
            usable: true,
            unused_duration,
            params_update: Arc::new(Mutex::new(None)),
        };
        ChannelState::Open(OpenEntry {
            channel: Arc::new(channel),
            max_unused_duration,
        })
    }
    fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
        let channel = FakeChannel {
            ed_ident: str_to_ed(ident),
            usable: false,
            unused_duration: None,
            params_update: Arc::new(Mutex::new(None)),
        };
        ChannelState::Open(OpenEntry {
            channel: Arc::new(channel),
            max_unused_duration: Duration::from_secs(180),
        })
    }
    #[test]
    fn rmv_unusable() -> Result<()> {
        let map = new_test_state();
        map.with_channels(|map| {
            map.insert(closed("machen"));
            map.insert(closed("wir"));
            map.insert(ch("wir"));
            map.insert(ch("feinen"));
            map.insert(ch("Fug"));
            map.insert(ch("Fug"));
        })?;
        map.remove_unusable().unwrap();
        map.with_channels(|map| {
            assert_eq!(map.by_id(&str_to_ed("m")).len(), 0);
            assert_eq!(map.by_id(&str_to_ed("w")).len(), 1);
            assert_eq!(map.by_id(&str_to_ed("f")).len(), 1);
            assert_eq!(map.by_id(&str_to_ed("F")).len(), 2);
        })?;
        Ok(())
    }
    #[test]
    fn reparameterize_via_netdir() -> Result<()> {
        let map = new_test_state();
        // Set some non-default parameters so that we can tell when an update happens
        let _ = map
            .inner
            .lock()
            .unwrap()
            .channels_params
            .padding
            .start_update()
            .padding_parameters(
                PaddingParametersBuilder::default()
                    .low(1234.into())
                    .build()
                    .unwrap(),
            )
            .finish();
        map.with_channels(|map| {
            map.insert(ch("track"));
        })?;
        let netdir = tor_netdir::testnet::construct_netdir()
            .unwrap_if_sufficient()
            .unwrap();
        let netdir = Arc::new(netdir);
        let with_ch = |f: &dyn Fn(&FakeChannel)| {
            let inner = map.inner.lock().unwrap();
            let mut ch = inner.channels.by_ed25519(&str_to_ed("t"));
            let ch = ch.next().unwrap().unwrap_open();
            f(ch);
        };
        eprintln!("-- process a default netdir, which should send an update --");
        map.reconfigure_general(None, None, netdir.clone()).unwrap();
        with_ch(&|ch| {
            assert_eq!(
                format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()),
                // evade field visibility by (ab)using Debug impl
                "ChannelPaddingInstructionsUpdates { padding_enable: None, \
                    padding_parameters: Some(Parameters { \
                        low: IntegerMilliseconds { value: 1500 }, \
                        high: IntegerMilliseconds { value: 9500 } }), \
                    padding_negotiate: None }"
            );
        });
        eprintln!();
        eprintln!("-- process a default netdir again, which should *not* send an update --");
        map.reconfigure_general(None, None, netdir).unwrap();
        with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none()));
        Ok(())
    }
    #[test]
    fn expire_channels() -> Result<()> {
        let map = new_test_state();
        // Channel that has been unused beyond max duration allowed is expired
        map.with_channels(|map| {
            map.insert(ch_with_details(
                "wello",
                Duration::from_secs(180),
                Some(181),
            ));
        })?;
        // Minimum value of max unused duration is 180 seconds
        assert_eq!(180, map.expire_channels().as_secs());
        map.with_channels(|map| {
            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 0);
        })?;
        let map = new_test_state();
        // Channel that has been unused for shorter than max unused duration
        map.with_channels(|map| {
            map.insert(ch_with_details(
                "wello",
                Duration::from_secs(180),
                Some(120),
            ));
            map.insert(ch_with_details(
                "yello",
                Duration::from_secs(180),
                Some(170),
            ));
            // Channel that has been unused beyond max duration allowed is expired
            map.insert(ch_with_details(
                "gello",
                Duration::from_secs(180),
                Some(181),
            ));
            // Closed channel should be retained
            map.insert(closed("hello"));
        })?;
        // Return duration until next channel expires
        assert_eq!(10, map.expire_channels().as_secs());
        map.with_channels(|map| {
            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("y")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("h")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("g")).len(), 0);
        })?;
        Ok(())
    }
}