Stanford CS144 Lab 3

Lab Checkpoint 3: the TCP sender

上一个 checkpoint 中我们实现了 TCPReceiver,现在我们实现(简化版的)TCPSender,组合成一套可以工作的 TCP 系统。

2 Checkpoint 3: The TCP Sender #

我们在实现 TCPReceiversend 方法的时候,回复的消息包括 ackno(期待的下一个 byte 的序列号),和 window size(能够处理,不被丢弃的序列号范围)。TCPSender 需要在内部维护这两个状态,以便:

  • 尽可能地发送多的数据包以填满 window size
  • 标记发出但没有被 ackno 确认的数据包
  • 在必要的时候(超过一段时间),重传尚未被确认的数据包

2.1 How does the TCPSender know if a segment was lost? #

这里原版实验手册给出了一个比较复杂的规则,基于抽象的 timer 叙述(或许期待我们实现一个 Timer helper class)。我认为这里有些复杂化了。对于我们这样的单线程 FIFO 实现,只需维护一个简单的状态量即可达到一样的效果。

我的 TCPSender 有以下私有成员:

private:
  Reader& reader() { return input_.reader(); }

  ByteStream input_;
  Wrap32 isn_;
  uint64_t initial_RTO_ms_;
  uint64_t cur_RTO_ms_ { initial_RTO_ms_ };
  uint64_t last_tick_ms_ { 0 };
  uint16_t window_size_ { 1 };
  uint64_t consecutive_retx_ { 0 };
  Wrap32 next_seqno_ { isn_ };
  Wrap32 last_ackno_ { isn_ };
  bool syn_sent_ { false };
  bool fin_sent_ { false };

  std::map<uint64_t, TCPSenderMessage> outstanding_segments_ {};

记录初始 RTO 和更新之后的 RTO。last_tick_ms_ 代表 outstanding segments 里时间最早的消息在上一次 tick 之后积累的时间。一旦 last_tick_ms_ 超过了 cur_RTO_ms_,立刻重发最早(序列号最小)的消息,然后清零 last_tick_ms_,这在逻辑上和 timer expire 是一样的。由于重发只会重发序列号最小的消息,因此连续重发次数的统计也变得容易了。

2.2 Implementing the TCP sender #

实现中有几个需要注意的细节:

window size 必须先更新,即使 ackno 未传入。由于 window size 初始化为 1,不即时更新会出现问题。

void TCPSender::receive( const TCPReceiverMessage& msg )
{
  if (msg.RST) {
    input_.set_error();
    syn_sent_ = false;
    fin_sent_ = false;
    next_seqno_ = isn_;
    last_ackno_ = isn_;
    outstanding_segments_.clear();
    consecutive_retx_ = 0;
    return;
  }

  // Must update window size, even before ackno is None
  window_size_ = msg.window_size; 

  if ( msg.ackno == std::nullopt ) {
    return; 
  }

  uint64_t abs_ackno = msg.ackno->unwrap( isn_, writer().bytes_pushed() );
  uint64_t abs_next_seqno = next_seqno_.unwrap( isn_, writer().bytes_pushed() );
  if (abs_ackno > abs_next_seqno) {
    return; // Required ackno is beyond the next sequence number
  }
  last_ackno_ = msg.ackno.value();

  // Remove all acknowledged segments
  for (auto it = outstanding_segments_.begin(); it != outstanding_segments_.end();) {
    uint64_t seg_end = it->first + it->second.sequence_length();
    if (seg_end <= abs_ackno) {
      cur_RTO_ms_ = initial_RTO_ms_;
      last_tick_ms_ = 0;
      consecutive_retx_ = 0;
      it = outstanding_segments_.erase(it);
    } else {
      break;
    }
  }
}

在检测有没有空间放下 FIN 标志位时,需要加上 SYN 标志位(如果有)占用的空间。提供的测试样例很弱,无法发现这个问题。

void TCPSender::push( const TransmitFunction& transmit )
{
  // ...
    if ( !fin_sent_ && reader().is_finished() && msg.payload.size() + msg.SYN + 1 <= available_capacity ) {
      msg.FIN = true;
      fin_sent_ = true;
    }
  // ...
}