CS144-总结

Check 0: 内存可靠字节流性能优化

吞吐量优化目标

为满足最终基准测试要求,系统需实现至少0.2Gbit/s的持续吞吐量。经性能分析发现,数据弹出操作(pop)的实现方式可能成为主要性能瓶颈。

关键技术优化

传统实现直接弹出指定长度的字节数据会导致以下问题:

  1. 字符串操作需频繁移动内存数据(时间复杂度O(n))
  2. 内存碎片化加剧
  3. 缓存局部性降低

采用延迟批量处理策略:

  • 累计多个删除操作请求
  • 单次批量完成内存数据迁移
  • 通过逻辑地址映射替代物理数据移动(参考CS61B课程中的虚拟化思想)

Check 1: 缓冲区管理系统演进

实现记录

我首次实现时尝试通过一个 Buffer 类来管理整个缓冲区

指针确实很难用:

1
2
3
4
5
6
7
std::string s("abcd");
std::string* pr=&s;
while(....){
    std::string new_s(".....");
    pr=&new_s;
}
const size_t size= pr->size();

由于 new_s 是局部变量,出了循环就会被销毁,导致 pr 的后续调用出错。

贪吃蛇合并

一开始试图使用快慢迭代器来处理buffer的合并问题,发现需要管理两个迭代器并且稍有不慎修改了容器迭代器就失效了。 后改为使用“贪吃蛇”方案:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void Reassembler::merge_buffer()
{
    if (buffer_.empty()) return;
    
    auto current = buffer_.begin();
    while (current != buffer_.end()) {
        auto next = std::next(current);
        if (next == buffer_.end()) break;

        if (current->second.can_merge(next->second)) {
            // 合并处理逻辑
            auto merged = Segment::merge(/* ... */);
            
            // 更新当前节点值
            current->second = std::move(merged);
            
            // 删除下一个节点
            buffer_.erase(next);
            
            // 不需要移动current,继续检查合并后的current和新的next
        } else {
            ++current; // 仅当无法合并时前进
        }
    }
}

设计变更记录

由于受“符合请求数据,立即写入 writer ”的影响,一开始试图先写入 writer,有剩余数据判断是否写入 buffer。但发现这样很难实现以下测试:

  • 先写writer后缓存剩余数据,导致:
    • 接收窗口外的高优先级数据无法覆盖旧缓存(如测试用例中第4次插入的数据被容量限制阻挡)
    • 需同时维护writer头指针和缓存头指针,出现"中间空洞"维护难题
1
2
// 原架构示意图
___writer__|____空闲空间_____|_____buffer_____
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    {
      // Credit: Tanmay Garg
      ReassemblerTestHarness test { "overlapping multiple unassembled sections 3", 30 };

      test.execute( Insert { "hello", 15 } );
      test.execute( Insert { "world!", 21 } );
      test.execute( Insert { "I am sentient", 0 } );
      test.execute( Insert { "sentient, hello world", 5 } );

      test.execute( BytesPending( 0 ) );
      test.execute( BytesPushed( 27 ) );
      test.execute( ReadAll( "I am sentient, hello world!" ) );
    }

具体来说,最后一次数据比之前数据“更正确”,即补全了 helloworld 的空格。但如果按先写入 writer 后入缓存的逻辑,会发现 "sentient, hello world" 因为容量限制根本不会如缓存然后触发缓存的进行合并替换。

新架构优化

  • 统一写入buffer后再批量写入writer,实现:

    • 自动合并重叠数据段(如测试用例中"sentient, hello world"补全空缺)
    • 仅需维护接收窗口
    1
    2
    
    // 新架构示意图
    ____writer____|____buffer____|____空闲空间_______
    

设计启示

本来就有数据写入缓存区,合并后写入 writer 的步骤。干脆直接全部先写入缓冲区,而不是维护两步:先写入 writer,然后剩余数据写入 buffer,又从 buffer 写回 writer。


Check 2

Wrap_int

基础概念

  • 序列号回绕:32 位序列号达到最大值(0xFFFFFFFF)后归零的特性
  • 绝对序列号:理论上的无限增长计数器(实际用 uint64_t 表示)
  • checkpoint:最近已知的绝对序列号,用于解决回绕歧义

wrap( uint64_t n, Wrap32 zero_point )

将绝对序列号(uint64_t)转换为 32 位序列号时,通过 static_cast<uint32_t> 实现隐式模 2³² 运算,这与无符号整型的自然溢出特性完全等价。

unwrap( Wrap32 zero_point, uint64_t checkpoint )

目的是解封装,将原始seq转为绝对序列号。

由于 uint32_t 存在回绕的特性,即累计至最大值后继续增加将变为0。因此需要通过 checkpoint 辅助确定当前是第几次回绕。

首先直接计算 raw_value 和 zero.raw_value 的差值,如果大于 checkpoint 认为没有发生回绕。然而如果小于 checkpoint,根据 TCP seq 只增不减的特点认为是发生了回绕。

由于要求计算最接近 checkpoint 的可能值,采用四舍五入的方式估计当前是第 k 次回绕。

1
2
3
constexpr uint64_t HALF_UINT32 = 1UL << 31; // 中点阈值
constexpr uint64_t UINT32_RANGE = 1UL << 32;
const uint64_t wrap_count = ( checkpoint - offset + HALF_UINT32 ) / UINT32_RANGE;
为什么需要四舍五入

将数轴按 MOD 长度分段后,四舍五入操作相当于寻找距离 checkpoint - offset 最近的 MOD 分界点:

1
2
3
4
|-----|-----|-----|-----|-----|-----|
0     MOD/2  MOD  3MOD/2 2MOD ...
     中点:HALF_UINT32

在计算中,由于除法是整数除法。直接进行 (checkpoint-offset)/UINT32_RANGE 会丢弃小数部分(5.4->5; 5.6->6),进而出现一个周期的偏差。

以下代码可以模拟这种情况:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
MOD = 2**32
checkpoint = 5*MOD + 1500  # 第5周期中的序列
offset = 3000              # 相对偏移

# 正确四舍五入计算:
k_true = (checkpoint - offset + MOD//2) // MOD  # → 5
abs_seq = offset + k_true*MOD # 21474839480

# 错误截断计算:
k_wrong = (checkpoint - offset) // MOD  # → 4
abs_seq_wrong = offset + k_wrong*MOD # 17179872184

TCP Receiver

ackno

根据RFC 793,控制标志的序列号消耗与数据载荷无关,每个控制标志占用1个序列号位置。因此 ackno 中,需要留意判断是否徐娅在已经写入字节数中加上SYN占用的1序号。

同时,对于FIN的情况要复杂一些,因为FIN抵达后,如果实际数据没有全部抵达,FIN 的 1 序号不应加入 ACKNO,需要实际完成写入后加上。


Check 3

TCP Sender

设计任务

  1. 跟踪接收窗口的大小。
  2. 从 ByteStream 中读取数据,尽可能的填充接收窗口直到接收窗口为0或字节流结束。
  3. 必要时添加 SYN 和 FIN 标志。
  4. 跟踪发送了但未返回 ACK 的 segment。
  5. 实现超时重传(定时检查最旧的已发送未确认报文是否被接收)。

一句话:什么时候发送什么东西,要不要重发?

有用的函数

  1. TCPSender tick 将被定时调用,用来传递上次调用 tick 过去的时间。使用其来维护整个 TCPSender 的生存时间。
  2. TCPSender 构造函数将赋予 RTO,即超时重传时间,但 RTO 可能会变化。
  3. 实现一个 timer,用于标志数据报超时。
  4. 当接收到 ACK 时,停止对应的 timer。
  5. tick 的工作方式如下:
    • 重传最旧的未确认报文。
    • 需要跟踪连续重传次数,必要时进行增加以便 TCPConnection 能决定是否中断 TCP。
    • 超时发送时,将 RTO 翻倍,执行“指数避退”。
    • 发送超时报文时需要重置超时计时器,使其在 RTO 后过期。
  6. 当接收到一个新的 ACK 时,需要进行以下步骤:
    • 将 RTO 改为初始值。
    • 重设所有还在等待的超时定时器。
    • 清空连续重传次数。

注意事项:

push
  • 通过 transmit 发送数据,同时数据包尽可能接近 TCPConfig::MAX_PAYLOAD_SIZE 大小。
  • 当接收窗口为 0 时,继续发送大小为 1 的探测报文。同时,首次发送时假定窗口大小为 1,发送空报文。
  • 本 lab 不考虑数据报的部分被接收的情况。
  • 不考虑为了优化性能,重组数据报的情况。

区分“满窗口”与“零窗口”

  • 若飞行中的数据量等于接收窗口,则禁止发送新数据(包括探测报文);
  • 零窗口探测仅在ACK明确表示接收窗口为 0 时触发。

关于探测报文:

  • 探测报文不会导致指数避退。
  • 实现中,只有当上一个探测报文被接受才继续发送新的探测报文。
receive

返回ACK和窗口大小。注意,ACK 为累计确认。

tick

通过 ms_since_last_tick 返回距离上一次调用过去的时间,用于实现超时重传。


以下为框架代码的解读。

事件循环

事件循环有两个关键功能:

  1. 事件注册与分发。
  2. 事件对象的管理。

在本项目的实现中,BasicRule 与 FDRule 负责实现“待处理事件”。

1
2
3
4
5
6
7
  struct BasicRule
  {
    size_t category_id; // 事件ID,用以分类事件
    InterestT interest; // 函数对象,当对应事件发生时判断是否指向 callback
    CallbackT callback;
    bool cancel_requested {}; // 原子位,用来取消事件
  };

维护选择使用 std::shared_ptr 管理规则,而不是直接使用 std::list 或 std::reference_wrapper?

1
2
  std::list<std::shared_ptr<FDRule>> _fd_rules {};
  std::list<std::shared_ptr<BasicRule>> _non_fd_rules {};

对象切片角度——排除直接使用 std::list

在 C++ 中,当一个派生对象被复制到基类时,由于基类的空间布局不包含派生对象特有的功能,导致派生对象的一部分丢失,如同“切去”一般。为此,常常使用基类指针传递对象,由于指针不会导致对象复制,所以可以避免对象切片。

因此,直接使用 std::list 不合理。

使用 shared_ptr 的必要性

  1. 事件循环需要频繁的添加或删除事件,智能指针能够保证对象持续存在。
  2. reference_wrapper 多态支持不足:无法像 weak_ptr 那样自动处理继承关系(BasicRule ← FDRule)
  3. 所有权观点:
    • 事件循环应该持有事件的所有权,同时RuleHandle需要非拥有式的弱引用(weak_ptr)。
    • std::reference_wrapper无法实现这种共享所有权模型,它只是简单的引用包装。

wait_next_event

poll

对于涉及文件句柄的事件,wait_next_event 采用了系统调用 poll 实现IO多路复用机制。

具体来说,首先清洗 _fd_rules 并生成准备监控的 poll 文件数组。

清洗是去除以下事件:

  • 已经取消的文件。
  • 已经读取到 EOF的文件。
  • 已经关闭的文件。

接着,调用 poll 监控文件事件。而 poll 是 unix 提供的系统调用,将需要监控的文件传入后操作系统内核会监控是否有文件可用。避免我们直接使用 read、write 时阻塞线程导致一个线程只能监控一个文件。

通过 poll,我们可以使用一个线程监控大量文件,并在有任一文件可用时结束阻塞继续执行。

关键参数说明

  • timeout:-1表示阻塞等待,0立即返回,>0表示超时时间(ms)
  • events字段可组合以下标志:
    • POLLIN:数据可读
    • POLLOUT:写缓冲区可用
    • POLLERR:错误条件发生

接着是两个术语 边缘触发水平触发

  • 边缘触发:当有可用文件时用户进程将苏醒一次,无论是否清空数据。就像快递放到快递柜后发送一条短信,无论是否取件也不会再次通知你。
  • 水平触发:当有文件可用时,用户进程会一直苏醒,直到read清空数据。就像快递柜不停的提醒你取件。
水平触发(LT) 边缘触发(ET)
事件通知频率 条件持续满足即通知 状态变化时仅通知一次
数据读取要求 可部分读取 必须完全读取
典型系统调用 poll, select epoll(ET模式)
本实现处理策略 单次处理立即返回 不适用(因poll特性)

从而,使用边缘触发时需要在一次苏醒时尽可能的处理任务。而水平触发时,可以快速处理然后推出。二者有各自的优点,比如边缘触发可以减少系统调用从而应对大量任务,而水平触发可以提高程序的敏捷性,快速反应变化。

当然,poll 只支持水平触发,因此 wait_next_event 被设计为处理一次任务后直接返回的模式。

用户态 TCP 栈实现(tcp_minnow_socket)

该部分使用双线程实现,即主线程监听事件循环拉起另一个线程执行实际操作。

事件循环的注册

有三类事件需要处理:

  1. 接收到下层的数据报。
  2. 本地应用调用 write 写入数据。
  3. Reassembler 发来接收的数据。

并发

主线程

通过 connect 函数中的事件循环机制完成三次握手。

1
2
3
4
5
6
7
 _tcp->push( [&]( auto x ) { _datagram_adapter.write( x ); } );

  if ( _tcp->sender().sequence_numbers_in_flight() != 1 ) {
    throw std::runtime_error( "After TCPConnection::connect(), expected sequence_numbers_in_flight() == 1" );
  }

  _tcp_loop( [&] { return _tcp->sender().sequence_numbers_in_flight() == 1; } );  // 判断三次握手是否完成

然后调用 _tcp_thread = std::thread( &TCPMinnowSocket::_tcp_main, this ); 实际执行后台的处理线程,用于完成传输数据工作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_tcp_main()
{
  try {
    if ( not _tcp.has_value() ) {
      throw std::runtime_error( "no TCP" );
    }
    _tcp_loop( [] { return true; } ); //持续事件循环,直到TCP关闭
    shutdown( SHUT_RDWR );
    if ( not _tcp.value().active() ) {
      std::cerr << "DEBUG: minnow TCP connection finished "
                << ( _tcp->inbound_reader().has_error() ? "uncleanly.\n" : "cleanly.\n" );
    }
    _tcp.reset();
  } catch ( const std::exception& e ) {
    std::cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
    throw e;
  }
}
updatedupdated2025-03-242025-03-24