Lines
100 %
Functions
80 %
Branches
//! Type and code for handling a "half-closed" stream.
//!
//! A half-closed stream is one that we've sent an END on, but where
//! we might still receive some cells.
use crate::Result;
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
use crate::stream::flow_ctrl::state::{HalfStreamFlowCtrl, HalfStreamFlowCtrlHooks};
use tor_cell::relaycell::UnparsedRelayMsg;
/// Type to track state of half-closed streams.
///
/// A half-closed stream is one where we've sent an END cell, but where
/// the other side might still send us data.
/// We need to track these streams instead of forgetting about them entirely,
/// since otherwise we'd be vulnerable to a class of "DropMark" attacks;
/// see <https://gitlab.torproject.org/tpo/core/tor/-/issues/25573>.
#[derive(Debug)]
pub(crate) struct HalfStream {
/// Flow control for this stream.
/// Used to process incoming flow control messages (SENDME, XON, etc).
flow_control: HalfStreamFlowCtrl,
/// Object to tell us which cells to accept on this stream.
cmd_checker: AnyCmdChecker,
}
impl HalfStream {
/// Create a new half-closed stream.
pub(crate) fn new(flow_control: HalfStreamFlowCtrl, cmd_checker: AnyCmdChecker) -> Self {
HalfStream {
flow_control,
cmd_checker,
/// Process an incoming message and adjust this HalfStream accordingly.
/// Give an error if the protocol has been violated.
/// The caller must handle END cells; it is an internal error to pass
/// END cells to this method.
/// no ends here.
pub(crate) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
use StreamStatus::*;
let Some(msg) = self.flow_control.handle_incoming_msg(msg)? else {
// The flow control code consumed the message,
// which means that it was a flow control message.
// We don't give flow control messages to the checker below.
return Ok(Open);
};
let status = self.cmd_checker.check_msg(&msg)?;
self.cmd_checker.consume_checked_msg(msg)?;
Ok(status)
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_time_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use crate::stream::RECV_WINDOW_INIT;
use crate::stream::flow_ctrl::state::StreamFlowCtrl;
use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow};
use rand::{CryptoRng, Rng};
use tor_basic_utils::test_rng::testing_rng;
use tor_cell::relaycell::{
AnyRelayMsgOuter, RelayCellFormat, StreamId,
msg::{self, AnyRelayMsg},
fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayMsg {
UnparsedRelayMsg::from_singleton_body(
RelayCellFormat::V0,
AnyRelayMsgOuter::new(StreamId::new(77), val)
.encode(RelayCellFormat::V0, rng)
.expect("encoding failed"),
)
.unwrap()
#[test]
fn halfstream_sendme() {
let mut rng = testing_rng();
// Stream level SENDMEs are not authenticated and so the only way to make sure we were not
// expecting one is if the window busts its maximum.
//
// Starting the window at 450, the first SENDME will increment it to 500 (the maximum)
// meaning that the second SENDME will bust that and we'll noticed that it was unexpected.
let sendw = StreamSendWindow::new(450);
let mut hs = HalfStream::new(
StreamFlowCtrl::new_window(sendw).half_stream(),
OutboundDataCmdChecker::new_any(),
);
// one sendme is fine
let m = msg::Sendme::new_empty();
assert!(
hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
.is_ok()
// but no more were expected!
let e = hs
.handle_msg(to_unparsed(&mut rng, m.into()))
.err()
.unwrap();
assert_eq!(
format!("{}", e),
"Circuit protocol violation: Unexpected stream SENDME"
fn hs_new() -> HalfStream {
HalfStream::new(
StreamFlowCtrl::new_window(StreamSendWindow::new(20)).half_stream(),
fn halfstream_data() {
let mut hs = hs_new();
// we didn't give a connected cell during setup, so do it now.
hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
// `RECV_WINDOW_INIT` (500) data cells are okay.
let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
for _ in 0_u16..RECV_WINDOW_INIT {
// But one more is a protocol violation.
"Circuit protocol violation: Received a data cell in violation of a window"
fn halfstream_connected() {
// We were told to accept a connected, so we'll accept one
// and no more.
let m = msg::Connected::new_empty();
.is_err()
// If we try that again _after getting a connected_,
// accept any.
let mut cmd_checker = OutboundDataCmdChecker::new_any();
{
cmd_checker
.check_msg(&to_unparsed(&mut rng, msg::Connected::new_empty().into()))
"Stream protocol violation: Received CONNECTED twice on a stream."
fn halfstream_other() {
let m = msg::Extended2::new(Vec::new());
"Stream protocol violation: Unexpected EXTENDED2 on a data stream!"