Stanford CS144 Lab 2

Lab Checkpoint 2: the TCP receiver

上一个 Checkpoint 的 Reassembler 维护 ByteStream,实现了现实世界中 TCP 接受组件和应用程序之间“接受缓冲区”的功能。这一个 Checkpoint 中,我们实现 TCPReceiver,向接受缓冲区中写入收到的数据。

2 Checkpoint 2: The TCP Receiver #

2.1 Translating between 64-bit indexes and 32-bit seqnos #

这不是我们 Rust 里的 std::num::Wrapping 吗(

简单来说就是 TCP 包头的 sequence number 是 32 位的,每个字节的序列号在模 $2^{32}$ 意义下 $+1$。每个流的初始 sequence number 是随机的,称为 ISN。需要我们写一个新的类型,能够在【模意义下 32 位从 ISN 开始的序列号】和【64 位从 0 开始的绝对序列号】之间互相转换。

总体代码不难。但需要注意,unwrap 方法需要返回离 checkpoint 最近的序列号,如果直接 $\pm 2^{32}$ 生成三个值比较和 checkpoint 的大小关系,可能会由于减到负数 u64 下溢得到很大的数,改变相对大小关系。需要留意特判,最好直接计算相对距离来实现。

Wrap32 Wrap32::wrap( uint64_t n, Wrap32 zero_point )
{
  return Wrap32 { static_cast<uint32_t>( n + zero_point.raw_value_ ) };
}

uint64_t Wrap32::unwrap( Wrap32 zero_point, uint64_t checkpoint ) const
{
  uint64_t offset = raw_value_ - zero_point.raw_value_;
  uint64_t base = checkpoint & 0xFFFFFFFF00000000;
  uint64_t absolute = base + offset;
  uint64_t pre = absolute - numeric_limits<uint32_t>::max() - 1;
  uint64_t post = absolute + numeric_limits<uint32_t>::max() + 1;
  uint64_t dis_1 = ( checkpoint > pre ) ? checkpoint - pre : pre - checkpoint;
  uint64_t dis_2 = ( checkpoint > post ) ? checkpoint - post : post - checkpoint;
  uint64_t dis_3 = ( checkpoint > absolute ) ? checkpoint - absolute : absolute - checkpoint;
  if ( dis_1 <= dis_2 && dis_1 <= dis_3 ) {
    return pre;
  } else if ( dis_2 <= dis_1 && dis_2 <= dis_3 ) {
    return post;
  } else {
    return absolute;
  }
}

2.2 Implementing the TCP receiver #

要求我们实现 TCPReceiver 类,但其实也就是两个接口,一个 receive 负责接受信息,设置 seqno,最终插入 Reassembler,另一个接口 send 负责发出 ACK 信息,包含 window size 和希望收到的下一个 seqno。

void TCPReceiver::receive( TCPSenderMessage message )
{
  if ( message.RST ) {
    reassembler_.reader().set_error();
    isn_set_ = false;
    return;
  }
  if ( message.SYN && !isn_set_ ) {
    isn_ = message.seqno;
    isn_set_ = true;
  }
  if ( !isn_set_ ) {
    return;
  }
  uint64_t checkpoint = writer().bytes_pushed();
  uint64_t abs_stream_index = message.seqno.unwrap( isn_, checkpoint );
  if ( message.SYN ) {
    abs_stream_index = 0;
  } else {
    abs_stream_index -= 1;
  }
  reassembler_.insert( abs_stream_index, std::move( message.payload ), message.FIN );
}

TCPReceiverMessage TCPReceiver::send() const
{
  TCPReceiverMessage msg;
  if ( reassembler_.reader().has_error() ) {
    msg.RST = true;
    return msg;
  }
  uint16_t window_size = static_cast<uint64_t>(
    std::min( static_cast<uint64_t>( UINT16_MAX ), reassembler_.writer().available_capacity() ) );
  msg.window_size = window_size;
  if ( isn_set_ ) {
    uint64_t ackno = writer().bytes_pushed() + 1 + static_cast<uint64_t>( writer().is_closed() );
    msg.ackno = Wrap32::wrap( ackno, isn_ );
  }
  return msg;
}

总体实现不难,需要特别注意 SYN 和 FIN 也占据 seqno,需要加减 1。但是框架对于 ByteStream 的读写接口引起了我的一些疑问。在 TCPReceiver 类的 public 接口中,有如下定义:

// Access the output
const Reassembler& reassembler() const { return reassembler_; }
Reader& reader() { return reassembler_.reader(); }
const Reader& reader() const { return reassembler_.reader(); }
const Writer& writer() const { return reassembler_.writer(); }

可以看到,只有 reader() 是 non-const 的。这样设置的意图也很好理解,希望我们通过调用 reader().set_error() 以在收到 RST 消息时,将整个流置于错误状态。然而,在流的另一头,应用程序也会调用 reader,这会引起竞争。

我认为这是本实验中简化的一点,在单线程环境中使用单线程模拟测试,代码是交替执行的,不存在竞争问题。而在真实的操作系统中,TCP 协议栈和用户程序通过 syscall 进行沟通,TCP 协议栈需要用某种方法锁定 buffer,用户在进行 syscall 时,操作系统会保证竞争问题不出现。