计算机网路五

拥塞控制

什么是congestion control

网络流量在某段时间可能发生急剧地上升,这个时候网络交换机或者路由器的buffer没办法承受过大的流量,就会发生溢出,导致丢包,进而导致重传,更进一步加剧网络拥堵

所以需要一系列的策略(拥堵控制)来缓解这种情况,拥堵控制和前面所提到的流量控制有点类似,流量控制是为了网络流量超出了end point的承受范围;而拥堵控制是为了避免超出网络中的中间节点的承受范围。

Network Address Translation

NAT(Network Address Translation,网络地址转换)是1994年提出的。当在专用网内部的一些主机本来已经分配到了本地IP地址(即仅在本专用网内使用的专用地址),但现在又想和因特网上的主机通信(并不需要加密)时,可使用NAT方法。

这种方法需要在专用网连接到因特网的路由器上安装NAT软件。装有NAT软件的路由器叫做NAT路由器,它至少有一个有效的外部全球IP地址。这样,所有使用本地地址的主机在和外界通信时,都要在NAT路由器上将其本地地址转换成全球IP地址,才能和因特网连接。

NAT的功能:

NAT不仅能解决了lP地址不足的问题,而且还能够有效地避免来自网络外部的攻击,隐藏并保护网络内部的计算机。把内网的私有地址,转化成外网的公有地址。使得内部网络上的(被设置为私有IP地址的)主机可以访问Internet。

网络地址转换协议NAT功能详解及NAT基础知识介绍 - 知乎 (zhihu.com)

Lab 0

Lab 0:An in-memory reliable byte stream

By now, you’ve seen how the abstraction of a reliable byte stream can be useful in communicating across the Internet, even though the Internet itself only provides the service of “best-effort” (unreliable) datagrams.

完成一个对象,这个对象提供了写接口和读接口,其存储结构是顺序的,其存储的字节流是有限的(当存储空间满了的时候,就不能再写了);同样地,当存储空间为空的时候,意味着它到达了“EOF”

该对象初始化的提供一个参数——maximum size,表示它的容量。本次作业是单线程的,不要考虑并发情况。

这里的存储虽然是有限的,但只是表示其同一时刻内存中的最大存储量,其本身还是可以接受任意大小的输入的,直到读到了输入的EOF或者结束了本次读取。

image-20230730120130298

以上是本次要实现的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
ByteStream::ByteStream( uint64_t capacity ) : capacity_( capacity ) {}

void Writer::push( const std::string& data )
{
/* Throw away the incoming data */
if ( error_ || is_closed() || data.empty() ) {
return;
}

const size_t len = min( data.length(), available_capacity() );
queue_.append( data.substr( 0, len ) );
pushed_len_ += len;
}

void Writer::close()
{
closed_ = true;
}

void Writer::set_error()
{
error_ = true;
}

bool Writer::is_closed() const
{
return closed_;
}

uint64_t Writer::available_capacity() const
{
return capacity_ - pushed_len_ + popped_len_;
}

uint64_t Writer::bytes_pushed() const
{
return pushed_len_;
}

string_view Reader::peek() const
{
return { queue_ };
}

bool Reader::is_finished() const
{
return closed_ && queue_.empty();
}

bool Reader::has_error() const
{
return error_;
}

void Reader::pop( uint64_t len )
{
if ( queue_.empty() ) {
return;
}

len = std::min( len, queue_.size() );
queue_.erase( queue_.begin(), queue_.begin() + static_cast<int64_t>( len ) );
popped_len_ += len;
}

uint64_t Reader::bytes_buffered() const
{
return pushed_len_ - popped_len_;
}

uint64_t Reader::bytes_popped() const
{
return popped_len_;
}

Lab 1:stitching substrings into a byte stream

上一章节的作业实现了一个存储的数据结构,这个结构对于接收方来说是用于存放供上层应用读取的数据,然后供reassembler(本次要实现的对象)组装成有序的数据之后写入,相当于Stream提供了write接口供reassembler使用

img

对于Bytestream来说,其输入就是绿色部分的未重组的已缓存的字节流,当前时刻Bytestream最大容量位capacity,内存中已有的量为byted_pushed_ - byted_poped_,ws(window size)为两者之差,超出的部分将会被直接丢弃,所以通信双方必须协商好ws

Lab2:

本次要实现的是TCP的接收方,主要是使用上一章使用的insert接口,以及进行相对序列号和绝对序列号的转换。

seqno:在TCP传输的TCPsegment中的标志每个字节的序列号,从ISN开始,32位。 absolute seqno:将seqno变为从0开始,64位。通过wrap和unwrap与seqno相互转换。 stream indices:实际接受的字节流中每个字节的序列号,64位,即真正传输的数据的序列号(也就是我们在StreamReassembler中使用的索引,从0开始),FIN和SYN不占序列号

image-20210818184635063

流重组器重组的子串每个字节都有一个 64 位的流索引,流中的第一个字节的索引总是为0。一个 64 位的索引足够大,我们认为它永远不会溢出。但实际上,在 TCP 头部中,空间是非常宝贵的,流中每个字节的索引不是 64位表示的,而是用32位的“序列号”或“seqno”表示的。

  1. 为什么TCP 序号是从一个随机值开始的

    为了安全性,也为了避免被同一端点之间早期连接的旧的segments所混淆,TCP序列号从随机值开始,避免重复和被攻击者猜到。流中的第一个序列号是一个随机的32位数字,称为初始序列号(ISN)。这是代表SYN(流的起始)的序列号。后续字节的序号正常工作: (ISN + 1) mod 2^32、(ISN + 2) mod 2^32 ……

  2. 字节流的逻辑开始和结束各占据一个序列号,在 TCP 中,SYN(流开始)和 FIN(流结束)控制标志被分配序列号。 这些中的每一个都占用一个序列号(SYN标志占用的序列号就是ISN)

绝对序列号位数比较小,所以肯定出现循环,这样就会出现一个绝对序列号对多个相对序列号,比如绝对序列号15可能对15,2^32 - 15, 2

64 => 32

static_cast<uint32_t>(this.raw_value_ - start_index) + isn

32 => 64

32转64需要有checkpoint的存在,否则不知道偏移量走了多少,因为32位会出现循环,基于isn做偏移量没有意义,

这样在相对序列号上的offset = this.raw_value_ - wrap32(checkpoint)

那么答案不就是?checkpoint + offset,注意这里可能出现负数,这说明在你相对序列号上出现了循环点跨越,这个时候计算的offset是不符合预期的,需要加上一个2^32抵消掉这部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#include "reassembler.hh"
#include <cassert>
#include <cmath>
#include <iostream>

using namespace std;

bool Reassembler::is_closed() const { return closed_ && bytes_pending() == 0; }

void Reassembler::insert(uint64_t first_index, string data, bool is_last_substring, Writer &output)
{
if (is_last_substring) {
closed_ = true;
}

// Remember index_ points to where the current byte located at.
// 1. Unacceptable index: first_index overwhelms the capability range.
// 2. All overlapped: The end index of the substring is smaller than current index_.
// 3. data is empty.
// 4. No available space.
if (first_index >= unassembled_index_ + output.available_capacity() || /* Out of bound */
first_index + data.length() - 1 < unassembled_index_ || /* Data have been transferred */
data.empty() || output.available_capacity() == 0) {
if (is_closed()) {
output.close();
}
return;
}

const uint64_t cap = output.available_capacity();
// new_index actually distinguish where the current data start, the start index
uint64_t new_index = first_index;

// Data needs to fit the capability limitation
if (first_index <= unassembled_index_) {
new_index = unassembled_index_;
const uint64_t overlapped_length = unassembled_index_ - first_index;
data = data.substr(overlapped_length, min(data.size() - overlapped_length, cap));
} else {
data = data.substr(0, min(data.size(), cap));
if (first_index + data.size() - 1 > unassembled_index_ + cap - 1) {
data = data.substr(0, unassembled_index_ + cap - first_index);
}
}
// Get the rear substring and merge the overlapped part
auto rear_iter = unassembled_substrings_.lower_bound(new_index);
while (rear_iter != unassembled_substrings_.end()) {
auto &[rear_index, rear_data] = *rear_iter;
if (new_index + data.size() - 1 < rear_index) {
break;
} // No overlap conflict
uint64_t rear_overlapped_length = 0;
if (new_index + data.size() - 1 < rear_index + rear_data.size() - 1) {
rear_overlapped_length = new_index + data.size() - rear_index;
} else {
rear_overlapped_length = rear_data.size();
}
// Prepare for next rear early, because the data may be erased afterwards.
const uint64_t next_rear = rear_index + rear_data.size() - 1;
if (rear_overlapped_length == rear_data.size()) {
unassembled_bytes_ -= rear_data.size();
unassembled_substrings_.erase(rear_index);
} else {
// We don't combine current data and rear data.
// Erase the overlapped part in current data is more efficient.
data.erase(data.end() - static_cast<int64_t>(rear_overlapped_length), data.end());
}
rear_iter = unassembled_substrings_.lower_bound(next_rear);
}

if (first_index > unassembled_index_) {
auto front_iter = unassembled_substrings_.upper_bound(new_index);
if (front_iter != unassembled_substrings_.begin()) {
front_iter--;
const auto &[front_index, front_data] = *front_iter;
if (front_index + front_data.size() - 1 >= first_index) {
uint64_t overlapped_length = 0;
if (front_index + front_data.size() <= first_index + data.size()) {
overlapped_length = front_index + front_data.size() - first_index;
} else {
overlapped_length = data.size();
}
if (overlapped_length == front_data.size()) {
unassembled_bytes_ -= front_data.size();
unassembled_substrings_.erase(front_index);
} else {
data.erase(data.begin(), data.begin() + static_cast<int64_t>(overlapped_length));
// Don't forget to update the inserted location
new_index = first_index + overlapped_length;
}
}
}
}

// If the processed data is empty, no need to insert it.
if (data.size() > 0) {
unassembled_bytes_ += data.size();
unassembled_substrings_.insert(make_pair(new_index, std::move(data)));
}

for (auto iter = unassembled_substrings_.begin(); iter != unassembled_substrings_.end(); /* nop */) {
auto &[sub_index, sub_data] = *iter;
if (sub_index == unassembled_index_) {
const uint64_t prev_bytes_pushed = output.bytes_pushed();
output.push(sub_data);
const uint64_t bytes_pushed = output.bytes_pushed();
if (bytes_pushed != prev_bytes_pushed + sub_data.size()) {
// Cannot push all data, we need to reserve the un-pushed part.
const uint64_t pushed_length = bytes_pushed - prev_bytes_pushed;
unassembled_index_ += pushed_length;
unassembled_bytes_ -= pushed_length;
unassembled_substrings_.insert(make_pair(unassembled_index_, sub_data.substr(pushed_length)));
// Don't forget to remove the previous incompletely transferred data
unassembled_substrings_.erase(sub_index);
break;
}
unassembled_index_ += sub_data.size();
unassembled_bytes_ -= sub_data.size();
unassembled_substrings_.erase(sub_index);
iter = unassembled_substrings_.find(unassembled_index_);
} else {
break; // No need to do more. Data has been discontinuous.
}
}

if (is_closed()) {
output.close();
}
}

uint64_t Reassembler::bytes_pending() const { return unassembled_bytes_; }

#include "wrapping_integers.hh"

using namespace std;

Wrap32 Wrap32::wrap( uint64_t n, Wrap32 zero_point )
{
// Your code here.
return Wrap32(static_cast<uint32_t>(n & 0x00000000ffffffff) + zero_point.raw_value_);
}

uint64_t Wrap32::unwrap( Wrap32 zero_point, uint64_t checkpoint ) const
{
// Your code here.
int32_t offset = this->raw_value_ - wrap(checkpoint, zero_point).raw_value_;
int64_t res = checkpoint + offset;
return res >= 0 ? res : res + (1ul << 32);
}

receiver

Lab3: sender

sender做什么

  1. 尽可能填满window size大小的数据,发送
  2. 处理已经发送但是还没收到确认的数据(outstanding segments),进行重传

如何监测超时并重传,sender每个一段时间会调用一次tick函数,一般超时时间设置为1个RTT,在发送的时候,timer为0,并且当前数据报(segment)被放进outstanding segments,表示还未收到确认报文;在一个RTT后没收到确认报文,就会导致重传

看代码会发现计时器timer只有一个(是属于最新发送的segement的,因为每发送最新的segment就会重新计时),原本我以为每个segemnet都会有自己的计时器。但这样是合理的,同时超时时间设置为一个略大于RTT的大小很重要。想象一下,现在有先后发送了两个segment:1和2,这个时候如果收到了2,没收到1,那么在sender收到2的响应的时候是应该超出了一个RTT的,这就会导致在处理2响应的时候就会导致重传;或者说收到了1,没收到2,在收到1的时候,即使本次没有超时,也会在收到下一个segment响应的时候导致重传(遍历outstanding, segments);那如果不会收到下一次响应怎么办,这个时候计时器是2的计时器,也会导致2的重传

RTO太短:导致很快重传,浪费资源

RTO太长:不会导致重传?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/* TCPSender constructor (uses a random ISN if none given) */
TCPSender::TCPSender(uint64_t initial_RTO_ms, optional<Wrap32> fixed_isn)
: isn_(fixed_isn.value_or(Wrap32 {random_device()()})), initial_RTO_ms_(initial_RTO_ms)
{}

uint64_t TCPSender::sequence_numbers_in_flight() const { return outstanding_seqno_; }

uint64_t TCPSender::consecutive_retransmissions() const { return consecutive_retransmission_times_; }

optional<TCPSenderMessage> TCPSender::maybe_send()
{
if (!segments_out_.empty() && set_syn_) {
TCPSenderMessage segment = std::move(segments_out_.front());
segments_out_.pop();
return segment;
}

return nullopt;
}

// 组装数据往outstanding_seg_和segments_out里写
void TCPSender::push(Reader &outbound_stream)
{

// 优先取响应里的ws,兜底为1
const uint64_t curr_window_size = window_size_ ? window_size_ : 1;

// 一直填充数据直到填满window
while (curr_window_size > outstanding_seqno_) {
TCPSenderMessage msg;

// 如果是同步报文,也第一次,需要设置标志位
if (!set_syn_) {
msg.SYN = true;
set_syn_ = true;
}

// 获取下一个数据的气势序列号
msg.seqno = get_next_seqno();
const uint64_t payload_size
= min(TCPConfig::MAX_PAYLOAD_SIZE, curr_window_size - outstanding_seqno_ - msg.SYN);
std::string payload = std::string(wangoutbound_stream.peek()).substr(0, payload_size);

// 从 bytestream 指定大小读取数据
outbound_stream.pop(payload_size);

// 是否为FIN报文,如果不是,同时没有数据需要传送了,则说明结束了,设置FIN标志位
if (!set_fin_ && outbound_stream.is_finished()
&& payload.size() + outstanding_seqno_ + msg.SYN < curr_window_size) {
msg.FIN = true;
set_fin_ = true;
}

msg.payload = Buffer(std::move(payload));

// no data, stop sending
if (msg.sequence_length() == 0) {
break;
}

// no outstanding segments, restart timer
if (outstanding_seg_.empty()) {
RTO_timeout_ = initial_RTO_ms_;
timer_ = 0;
}
// 往待发送缓冲区填充
segments_out_.push(msg);

// 往未确认区域填充
outstanding_seqno_ += msg.sequence_length();
outstanding_seg_.insert(std::make_pair(next_abs_seqno_, msg));

// 递增序列号,也就是期望下一次返回的ack
next_abs_seqno_ += msg.sequence_length();

// 收到FIN直接退出
if (msg.FIN) {
break;
}
}
}

TCPSenderMessage TCPSender::send_empty_message() const
{
TCPSenderMessage segment;
segment.seqno = get_next_seqno();

return segment;
}

// 接收数据的时候调用,处理数据
void TCPSender::receive(const TCPReceiverMessage &msg)
{
// 没有处需要处理的数据
if (!msg.ackno.has_value()) {
; // Don't return directly
} else {
// 转换接收的ack
const uint64_t recv_abs_seqno = msg.ackno.value().unwrap(isn_, next_abs_seqno_);

// 说明收到了已发送数据后的数据,一般不会如此
if (recv_abs_seqno > next_abs_seqno_) {
// Impossible, we couldn't transmit future data
return;
}
// 遍历outstanding_seg_,抹除掉已经确认的已发送数据的缓存,顺序从最先发送的开始到最近发送的
for (auto iter = outstanding_seg_.begin(); iter != outstanding_seg_.end();) {
const auto &[abs_seqno, segment] = *iter;
if (abs_seqno + segment.sequence_length() <= recv_abs_seqno) {
outstanding_seqno_ -= segment.sequence_length();
iter = outstanding_seg_.erase(iter);
// reset RTO and if outstanding data is not empty, start timer
// 重置超时重传时间阈值
RTO_timeout_ = initial_RTO_ms_;
if (!outstanding_seg_.empty()) {
timer_ = 0;
}
} else {
break;
}
}
consecutive_retransmission_times_ = 0;
}
window_size_ = msg.window_size;
}

void TCPSender::tick(const size_t ms_since_last_tick)
{
// 累计时间,ms_since_last_tick是自上次tick被调用以来经过的时间,tick每隔几秒就吧被调用
timer_ += ms_since_last_tick;
auto iter = outstanding_seg_.begin();
// 如果时间超出了阈值且outstanding_seg_里有值
if (timer_ >= RTO_timeout_ && iter != outstanding_seg_.end()) {
const auto &[abs_seqno, segment] = *iter;
if (window_size_ > 0) {
RTO_timeout_ *= 2;
}
timer_ = 0;
// 连续重传次数+1, TCPConnection 会使用这个信息用以判断 TCP 连接的可靠性, 太多连续的重传意味着 TCP 连接不稳定需要终止。
consecutive_retransmission_times_++;
}
}

Lab 4:the network interface

本次的内容是实现ARP(地址解析协议)

network interface structure
  • TCP-in-UDP-in-IP: TCP 报文会被置于用户的数据报的 payload 中, 在用户空间下这是最简单的实现方式: Linux 提供接口 (如 UDPSocket), 而用户侧仅需要提供 payload, 目标地址, Linux 内核会负责将 UDP 报部, IP 报头, 以太网报头组装起来, 将这个网络包发向下一个 hop。 Linux 内核会保证每个 socket 会有独立的本地与远端地址以及端口号, 并且保证这些数据在应用层的相互隔离。

  • TCP-in-IP: 一般情况下, TCP 报文会直接放在 Internet datagrams 中, 这通常被成为 “TCP/IP”。 Linux 会提供一个 TUN 设备接口, 需要应用层提供整个 Internet datagram, 而 Linux 内核则会处理剩下的部分。 但此时应用层需要自己构建整个 IP 报头以及 payload 部分。

  • TCP-in-IP-in-Ethernet: 以上的方法依赖Linux内核来实现的协议栈操作, 每次用户向 TUN 设备写入 IP datagrams 时, Linux 都需要构建正确的带有 IP datagrams 的以太网帧作为 payload。 这意味着 Linux 需要知悉下一个 hop 的以太网目的地址, 给出其 IP 地址。 否则 Linux 会以广播的形式请求这些信息。

这些功能是由 Network Interface 实现的, 该组件能将 IP 数据报转义成以太网帧等等, 之后会传入 TAP 设备 (类似 TUN 设备但更底层), 实现对 link-layer 的数据帧的传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// ethernet_address: Ethernet (what ARP calls "hardware") address of the interface
// ip_address: IP (what ARP calls "protocol") address of the interface
// cppcheck-suppress uninitMemberVar
NetworkInterface::NetworkInterface(const EthernetAddress &ethernet_address, const Address &ip_address)
: ethernet_address_(ethernet_address), ip_address_(ip_address)
{
cerr << "DEBUG: Network interface has Ethernet address " << to_string(ethernet_address_) << " and IP address "
<< ip_address.ip() << "\n";
}

// dgram: the IPv4 datagram to be sent
// next_hop: the IP address of the interface to send it to (typically a router or default gateway, but
// may also be another host if directly connected to the same network as the destination)

// Note: the Address type can be converted to a uint32_t (raw 32-bit IP address) by using the
// Address::ipv4_numeric() method.
// 将待发送的 Internet(IP) datagrams 转义为以太网帧并最终发送出去
void NetworkInterface::send_datagram(const InternetDatagram &dgram /* IP数据报 */, const Address &next_hop /* 下一跳ip地址 */)
{
// 获取下一跳的ipv4地址
const uint32_t addr_numeric = next_hop.ipv4_numeric();

/* ARP Table has stored the mapping info, we send the datagram directly */
// 如果缓存表arp_table里有,则直接包裹该IP数据报,然后发送
if (arp_table_.contains(addr_numeric)) {
EthernetFrame eth_frame;
// source mac addr
eth_frame.header.src = ethernet_address_;
// source mac addr
eth_frame.header.dst = arp_table_.at(addr_numeric).eth_addr;
// IPv4类型
eth_frame.header.type = EthernetHeader::TYPE_IPv4;
// 序列化
eth_frame.payload = serialize(dgram);
// 往待发送缓冲区发送
outbound_frames_.push(eth_frame);
} else {
/* ARP Table has no such mapping and we haven't send an ARP request for target ip */
// 否则发送广播需求下一跳mac addr
if (arp_requests_lifetime_.find(addr_numeric) == arp_requests_lifetime_.end()) {
// next hop ipv4 addr is not contained in the arp requests waiting list
ARPMessage arp_msg;
arp_msg.opcode = ARPMessage::OPCODE_REQUEST;
arp_msg.sender_ip_address = ip_address_.ipv4_numeric();
arp_msg.sender_ethernet_address = ethernet_address_;
arp_msg.target_ip_address = addr_numeric;
arp_msg.target_ethernet_address = {/* empty */};

EthernetFrame arp_eth_frame;
arp_eth_frame.header.src = ethernet_address_;
arp_eth_frame.header.dst = ETHERNET_BROADCAST; // 广播帧
arp_eth_frame.header.type = EthernetHeader::TYPE_ARP;
arp_eth_frame.payload = serialize(arp_msg);
outbound_frames_.push(arp_eth_frame);
// 上面几步就是构建好发送的请求,往待发送缓冲区发送,同时在队列里记录该请求的存活时间
arp_requests_lifetime_.emplace(std::make_pair(addr_numeric, ARP_REQUEST_DEFAULT_TTL));
}
// We need to store the datagram in the list. After we know the eth addr, we can queue
// the corresponding dgrams.
arp_datagrams_waiting_list_.emplace_back(std::pair {next_hop, dgram});
}
}

// frame: the incoming Ethernet frame
// 收到了以太网frame,做出响应
optional<InternetDatagram> NetworkInterface::recv_frame(const EthernetFrame &frame)
{
// 如果不是广播帧且请求的目的主机可能可能和本机不同,所以本机不用作出回应
if (frame.header.dst != ethernet_address_ && frame.header.dst != ETHERNET_BROADCAST) {
return nullopt;
}

/* IP datagrams:parse error */
if (frame.header.type == EthernetHeader::TYPE_IPv4) {
InternetDatagram datagram;
if (not parse(datagram, frame.payload)) {
// printf("[NetworkInterface ERROR]: 'recv_frame' IPV4 parse error\n");
return nullopt;
}
return datagram;
}

/* ARP datagrams */
if (frame.header.type == EthernetHeader::TYPE_ARP) {
ARPMessage arp_msg;
if (not parse(arp_msg, frame.payload)) {
printf("[NetworkInterface ERROR]: 'recv_frame' ARP parse error\n");
return nullopt;
}

const bool is_arp_request = arp_msg.opcode == ARPMessage::OPCODE_REQUEST
&& arp_msg.target_ip_address == ip_address_.ipv4_numeric();
if (is_arp_request) {
ARPMessage arp_reply_msg;
arp_reply_msg.opcode = ARPMessage::OPCODE_REPLY;
arp_reply_msg.sender_ip_address = ip_address_.ipv4_numeric();
arp_reply_msg.sender_ethernet_address = ethernet_address_;
arp_reply_msg.target_ip_address = arp_msg.sender_ip_address;
arp_reply_msg.target_ethernet_address = arp_msg.sender_ethernet_address;

EthernetFrame arp_reply_eth_frame;
arp_reply_eth_frame.header.src = ethernet_address_;
arp_reply_eth_frame.header.dst = arp_msg.sender_ethernet_address;
arp_reply_eth_frame.header.type = EthernetHeader::TYPE_ARP;
arp_reply_eth_frame.payload = serialize(arp_reply_msg);
outbound_frames_.push(arp_reply_eth_frame);
}

const bool is_arp_response
= arp_msg.opcode == ARPMessage::OPCODE_REPLY && arp_msg.target_ethernet_address == ethernet_address_;

// we can get arp info from either ARP request or ARP reply
if (is_arp_request || is_arp_response) {
arp_table_.emplace(std::make_pair(arp_msg.sender_ip_address,
arp_t {arp_msg.sender_ethernet_address, ARP_DEFAULT_TTL}));
// delete arp datagrams waiting list
for (auto iter = arp_datagrams_waiting_list_.begin(); iter != arp_datagrams_waiting_list_.end();) {
const auto &[ipv4_addr, datagram] = *iter;
if (ipv4_addr.ipv4_numeric() == arp_msg.sender_ip_address) {
send_datagram(datagram, ipv4_addr);
iter = arp_datagrams_waiting_list_.erase(iter);
} else {
iter++;
}
}
arp_requests_lifetime_.erase(arp_msg.sender_ip_address);
}
}

return nullopt;
}

// ms_since_last_tick: the number of milliseconds since the last call to this method
// 记录时间, 以使得任何已经过期的 IP 地址到 Ethernet 地址的映射失效
void NetworkInterface::tick(const size_t ms_since_last_tick)
{
/* delete expired ARP items in ARP Table */
// FIXME: Don't use 'iter++' if we have erase current iter's data!
for (auto iter = arp_table_.begin(); iter != arp_table_.end(); /* nop */) {
auto &[ipv4_addr_numeric, arp] = *iter;
if (arp.ttl <= ms_since_last_tick) {
// 小于一定阈值,则抹除
iter = arp_table_.erase(iter);
} else {
// 更新缓存表每一项的存活时间
arp.ttl -= ms_since_last_tick;
iter++;
}
}

/* delete expired ARP requests */
for (auto &[ipv4_addr, arp_ttl] : arp_requests_lifetime_) {
/* resent ARP request if this request has expired */
if (arp_ttl <= ms_since_last_tick) {
ARPMessage arp_msg;
arp_msg.opcode = ARPMessage::OPCODE_REQUEST;
arp_msg.sender_ip_address = ip_address_.ipv4_numeric();
arp_msg.sender_ethernet_address = ethernet_address_;
arp_msg.target_ip_address = ipv4_addr;
arp_msg.target_ethernet_address = {/* empty */};

EthernetFrame arp_eth_frame;
arp_eth_frame.header.src = ethernet_address_;
arp_eth_frame.header.dst = ETHERNET_BROADCAST;
arp_eth_frame.header.type = EthernetHeader::TYPE_ARP;
arp_eth_frame.payload = serialize(arp_msg);
outbound_frames_.push(arp_eth_frame);

/* reset ARP ttl for this component */
arp_ttl = ARP_REQUEST_DEFAULT_TTL;
} else {
// 更新每个请求的存活时间
arp_ttl -= ms_since_last_tick;
}
}
}

optional<EthernetFrame> NetworkInterface::maybe_send()
{
if (!outbound_frames_.empty()) {
EthernetFrame eth_frame = std::move(outbound_frames_.front());
outbound_frames_.pop();
return eth_frame;
}

return nullopt;
}

Lab 5:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// route_prefix: The "up-to-32-bit" IPv4 address prefix to match the datagram's destination address against
// prefix_length: For this route to be applicable, how many high-order (most-significant) bits of
// the route_prefix will need to match the corresponding bits of the datagram's destination address?
// next_hop: The IP address of the next hop. Will be empty if the network is directly attached to the router (in
// which case, the next hop address should be the datagram's final destination).
// interface_num: The index of the interface to send the datagram out on.
void Router::add_route(const uint32_t route_prefix, const uint8_t prefix_length, const optional<Address> next_hop, const size_t interface_num)
{
cerr << "DEBUG: adding route " << Address::from_ipv4_numeric(route_prefix).ip() << "/"
<< static_cast<int>(prefix_length) << " => " << (next_hop.has_value() ? next_hop->ip() : "(direct)")
<< " on interface " << interface_num << "\n";

routing_table_.emplace_back(route_t {route_prefix, prefix_length, next_hop, interface_num});
}

void Router::route()
{
// scan interfaces to receive InternetDatagram
for (auto &net_interface : interfaces_) {
while (std::optional<InternetDatagram> datagram = net_interface.maybe_receive()) {
if (datagram) {
InternetDatagram dgram = datagram.value();
const uint32_t dst_ipaddr_numeric = dgram.header.dst;
auto largest_matched_iter = routing_table_.end();
for (auto route = routing_table_.begin(); route != routing_table_.end(); route++) {
// zero prefix_length means match all
if (route->prefix_length == 0
|| ((route->route_prefix ^ dst_ipaddr_numeric)
>> (static_cast<uint8_t>(32) - route->prefix_length)) == 0) {
// update longest prefix matched route
if (largest_matched_iter == routing_table_.end()
|| route->prefix_length > largest_matched_iter->prefix_length) {
largest_matched_iter = route;
}
}
}

// check the legitimacy of the incoming datagram
uint8_t &ttl = dgram.header.ttl;
if (largest_matched_iter != routing_table_.end() && ttl-- > 1) {
// We have changed the dgram content. Checksum needs to be recomputed.
dgram.header.compute_checksum();
AsyncNetworkInterface &outbound_interface = interface(largest_matched_iter->interface_id);
const Address next_addr = largest_matched_iter->next_hop.has_value()
? largest_matched_iter->next_hop.value()
: Address::from_ipv4_numeric(dst_ipaddr_numeric);
outbound_interface.send_datagram(dgram, next_addr);
}; // no route matched (increase code readability)
}
}
}
}

Lab6:

1
client(192.168.0.1) => router1(10.0.0.192) => router2(10.0.0.172) => server(172.10.0.1)
image-20230730194254878

路由表:

ip next hop
192.168.0.0 不转发
10.0.0.0 不转发
172.16.0.0 10.0.0.172
Ip next hop
172.16.0.0 不转发
10.0.0.0 不转发
192.168.0.0 10.0.0.192

endtoend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
EthernetAddress random_host_ethernet_address()
{
EthernetAddress addr;
for (auto &byte : addr) {
byte = random_device()(); // use a random local Ethernet address
}
addr.at(0) |= 0x02; // "10" in last two binary digits marks a private Ethernet address
addr.at(0) &= 0xfe;

return addr;
}

EthernetAddress random_router_ethernet_address()
{
EthernetAddress addr;
for (auto &byte : addr) {
byte = random_device()(); // use a random local Ethernet address
}
addr.at(0) = 0x02; // "10" in last two binary digits marks a private Ethernet address
addr.at(1) = 0;
addr.at(2) = 0;

return addr;
}

optional<EthernetFrame> maybe_receive_frame(FileDescriptor &fd)
{
vector<string> strs(3);
strs.at(0).resize(EthernetHeader::LENGTH);
strs.at(1).resize(IPv4Header::LENGTH);
fd.read(strs);

EthernetFrame frame;
vector<Buffer> buffers;
ranges::transform(strs, back_inserter(buffers), identity());
if (not parse(frame, buffers)) {
return {};
}

return frame;
}

class NetworkInterfaceAdapter : public TCPOverIPv4Adapter
{
private:
NetworkInterface _interface;
Address _next_hop;
pair<FileDescriptor, FileDescriptor> _data_socket_pair = socket_pair_helper(SOCK_DGRAM);

void send_pending()
{
while (auto frame = _interface.maybe_send()) {
_data_socket_pair.first.write(serialize(frame.value()));
}
}

public:
NetworkInterfaceAdapter(const Address &ip_address, const Address &next_hop) // NOLINT(*-swappable-*)
: _interface(random_host_ethernet_address(), ip_address), _next_hop(next_hop)
{}

optional<TCPSegment> read()
{
auto frame_opt = maybe_receive_frame(_data_socket_pair.first);
if (not frame_opt) {
return {};
}
EthernetFrame frame = move(frame_opt.value());

// Give the frame to the NetworkInterface. Get back an Internet datagram if frame was carrying one.
optional<InternetDatagram> ip_dgram = _interface.recv_frame(frame);

// The incoming frame may have caused the NetworkInterface to send a frame
send_pending();

// Try to interpret IPv4 datagram as TCP
if (ip_dgram) {
return unwrap_tcp_in_ip(ip_dgram.value());
}

return {};
}
void write(TCPSegment &seg)
{
_interface.send_datagram(wrap_tcp_in_ip(seg), _next_hop);
send_pending();
}
void tick(const size_t ms_since_last_tick)
{
_interface.tick(ms_since_last_tick);
send_pending();
}
NetworkInterface &interface() { return _interface; }

FileDescriptor &fd() { return _data_socket_pair.first; }
FileDescriptor &frame_fd() { return _data_socket_pair.second; }
};

class TCPSocketEndToEnd : public TCPMinnowSocket<NetworkInterfaceAdapter>
{
Address _local_address;

public:
TCPSocketEndToEnd(const Address &ip_address, const Address &next_hop)
: TCPMinnowSocket<NetworkInterfaceAdapter>(NetworkInterfaceAdapter(ip_address, next_hop)),
_local_address(ip_address)
{}

void connect(const Address &address)
{
FdAdapterConfig multiplexer_config;

_local_address = Address {_local_address.ip(), uint16_t(random_device()())};
cerr << "DEBUG: Connecting from " << _local_address.to_string() << "...\n";
multiplexer_config.source = _local_address;
multiplexer_config.destination = address;

TCPMinnowSocket<NetworkInterfaceAdapter>::connect({}, multiplexer_config);
}

void bind(const Address &address)
{
if (address.ip() != _local_address.ip()) {
throw runtime_error("Cannot bind to " + address.to_string());
}
_local_address = Address {_local_address.ip(), address.port()};
}

void listen_and_accept()
{
FdAdapterConfig multiplexer_config;
multiplexer_config.source = _local_address;
TCPMinnowSocket<NetworkInterfaceAdapter>::listen_and_accept({}, multiplexer_config);
}

NetworkInterfaceAdapter &adapter() { return _datagram_adapter; }
};

// NOLINTBEGIN(*-cognitive-complexity)
void program_body(bool is_client, const string &bounce_host, const string &bounce_port, const bool debug)
{
// UDP socket,用于接收IP发送的IP datagram
UDPSocket internet_socket;
Address bounce_address {bounce_host, bounce_port};

/* let bouncer know where we are */
internet_socket.sendto(bounce_address, "");
internet_socket.sendto(bounce_address, "");
internet_socket.sendto(bounce_address, "");
internet_socket.connect(bounce_address);

/* set up the router */
Router router;

unsigned int host_side {};
unsigned int internet_side {};

// 分别为client和server添加路由,此实验仅两个转发路由
if (is_client) {
host_side = router.add_interface({random_router_ethernet_address(), Address {"192.168.0.1"}});
internet_side = router.add_interface({random_router_ethernet_address(), Address {"10.0.0.192"}});
router.add_route(Address {"192.168.0.0"}.ipv4_numeric(), 16, {}, host_side);
router.add_route(Address {"10.0.0.0"}.ipv4_numeric(), 8, {}, internet_side);
router.add_route(Address {"172.16.0.0"}.ipv4_numeric(), 12, Address {"10.0.0.172"}, internet_side);
} else {
host_side = router.add_interface({random_router_ethernet_address(), Address {"172.16.0.1"}});
internet_side = router.add_interface({random_router_ethernet_address(), Address {"10.0.0.172"}});
router.add_route(Address {"172.16.0.0"}.ipv4_numeric(), 12, {}, host_side);
router.add_route(Address {"10.0.0.0"}.ipv4_numeric(), 8, {}, internet_side);
router.add_route(Address {"192.168.0.0"}.ipv4_numeric(), 16, Address {"10.0.0.192"}, internet_side);
}

/* set up the client */
// 建立TCPSocketEndToEnd对象,这个对象负责了连接等过程
TCPSocketEndToEnd sock = is_client ? TCPSocketEndToEnd {Address {"192.168.0.50"}, Address {"192.168.0.1"}}
: TCPSocketEndToEnd {Address {"172.16.0.100"}, Address {"172.16.0.1"}};

atomic<bool> exit_flag {};

queue<EthernetFrame> router_to_host;
queue<EthernetFrame> router_to_internet;

/* set up the network */
// event循环,分别是
/*
1. client => router
2. router => client
3. router => Internet
4. Internet => router
在触发对应的作执行相应的回调函数
*/
thread network_thread([&]() {
try {
EventLoop event_loop;
// Frames from host to router
event_loop.add_rule("frames from host to router", sock.adapter().frame_fd(), Direction::In, [&] {
// 从socket接收以太网frame
auto frame_opt = maybe_receive_frame(sock.adapter().frame_fd());
if (not frame_opt) {
return;
}
EthernetFrame frame = move(frame_opt.value());
if (debug) {
cerr << " Host->router: " << summary(frame) << "\n";
}
//以太网链路层 => IP层
router.interface(host_side).recv_frame(frame);
// 为IP层路由
router.route();
});

// Frames from router to host
event_loop.add_rule(
"frames from router to host", sock.adapter().frame_fd(), Direction::Out,
[&] {
// f是要发送给主机的frame
auto &f = router_to_host;
if (debug) {
cerr << " Router->host: " << summary(f.front()) << "\n";
}
// 写入socket
sock.adapter().frame_fd().write(serialize(f.front()));
f.pop();
},
[&] { return not router_to_host.empty(); });

// Frames from router to Internet
event_loop.add_rule(
"frames from router to Internet", internet_socket, Direction::Out,
[&] {
//
auto &f = router_to_internet;
if (debug) {
cerr << " Router->Internet: " << summary(f.front()) << "\n";
}
// IP => frame => internet_socket
internet_socket.write(serialize(f.front()));
f.pop();
},
[&] { return not router_to_internet.empty(); });

// Frames from Internet to router
event_loop.add_rule("frames from Internet to router", internet_socket, Direction::In, [&] {
auto frame_opt = maybe_receive_frame(internet_socket);
if (not frame_opt) {
return;
}
EthernetFrame frame = move(frame_opt.value());
if (debug) {
cerr << " Internet->router: " << summary(frame) << "\n";
}
// frame => IP => internet_socket
router.interface(internet_side).recv_frame(frame);
router.route();
});

while (true) {
if (EventLoop::Result::Exit == event_loop.wait_next_event(10)) {
cerr << "Exiting...\n";
return;
}
router.interface(host_side).tick(10);
router.interface(internet_side).tick(10);
while (auto frame = router.interface(host_side).maybe_send()) {
router_to_host.push(move(frame.value()));
}
while (auto frame = router.interface(internet_side).maybe_send()) {
router_to_internet.push(move(frame.value()));
}

if (exit_flag) {
return;
}
}
} catch (const exception &e) {
cerr << "Thread ending from exception: " << e.what() << "\n";
}
});

try {
if (is_client) {
sock.connect(Address {"172.16.0.100", 1234});
} else {
sock.bind(Address {"172.16.0.100", 1234});
sock.listen_and_accept();
}

bidirectional_stream_copy(sock);
sock.wait_until_closed();
} catch (const exception &e) {
cerr << "Exception: " << e.what() << "\n";
}

cerr << "Exiting... ";
exit_flag = true;
network_thread.join();
cerr << "done.\n";
}
// NOLINTEND(*-cognitive-complexity)



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
_eventloop.add_rule(
// IP => TCP
"receive TCP segment from the network", _datagram_adapter.fd(), Direction::In,
[&] {
if (auto seg = _datagram_adapter.read()) {
_tcp->receive(move(seg.value()));
collect_segments();
}

// debugging output:
if (_thread_data.eof() and _tcp.value().sender().sequence_numbers_in_flight() == 0
and not _fully_acked) {
cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
<< " has been fully acknowledged.\n";
_fully_acked = true;
}
},
[&] { return _tcp->active(); });

// rule 2: read from pipe into outbound buffer
_eventloop.add_rule(
// prepare send
"push bytes to TCPPeer", _thread_data, Direction::In,
[&] {
string data;
data.resize(_tcp->outbound_writer().available_capacity());
_thread_data.read(data);
_tcp->outbound_writer().push(move(data));

if (_thread_data.eof()) {
_tcp->outbound_writer().close();
_outbound_shutdown = true;

// debugging output:
cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
<< " finished (" << _tcp.value().sender().sequence_numbers_in_flight() << " seqno"
<< (_tcp.value().sender().sequence_numbers_in_flight() == 1 ? "" : "s")
<< " still in flight).\n";
}

_tcp->push();
collect_segments();
},
[&] {
return (_tcp->active()) and (not _outbound_shutdown)
and (_tcp->outbound_writer().available_capacity() > 0);
},
[&] {
_tcp->outbound_writer().close();
_outbound_shutdown = true;
});

// rule 3: read from inbound buffer into pipe
_eventloop.add_rule(
// read
"read bytes from inbound stream", _thread_data, Direction::Out,
[&] {
Reader &inbound = _tcp->inbound_reader();
// Write from the inbound_stream into
// the pipe, handling the possibility of a partial
// write (i.e., only pop what was actually written).
if (inbound.bytes_buffered()) {
const std::string_view buffer = inbound.peek();
const auto bytes_written = _thread_data.write(buffer);
inbound.pop(bytes_written);
}

if (inbound.is_finished() or inbound.has_error()) {
_thread_data.shutdown(SHUT_WR);
_inbound_shutdown = true;

// debugging output:
cerr << "DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()
<< " finished " << (inbound.has_error() ? "with an error/reset.\n" : "cleanly.\n");
}
},
[&] {
return _tcp->inbound_reader().bytes_buffered()
or ((_tcp->inbound_reader().is_finished() or _tcp->inbound_reader().has_error())
and not _inbound_shutdown);
});

// rule 4: read outbound segments from TCPConnection and send as datagrams
_eventloop.add_rule(
// send TCP segment
"send TCP segment", _datagram_adapter.fd(), Direction::Out,
[&] {
while (not outgoing_segments_.empty()) {
// 直接warraped in IP datagram
_datagram_adapter.write(outgoing_segments_.front());
outgoing_segments_.pop();
}
},
[&] { return not outgoing_segments_.empty(); });


TCP => wrapped in IP(写入TUN设备) => socket(eventlop里对应上述规则1) => router(frame => socket(eventloop) => IP => socket(eventlop) => frame) => Internet(frame => IP => frame) => Host


计算机网路五
https://mingmingjiang1.github.io/emocoder/2023/07/29/计算机网路五/
作者
迷途知返
发布于
2023年7月29日
许可协议