voidReassembler::insert(uint64_t first_index, string data, bool is_last_substring, Writer &output) { if (is_last_substring) { closed_ = true; } 待处理的超出可容纳范围 || Data have been transferred || data为空 || 可用capacity为0 if (first_index >= unassembled_index_ + output.available_capacity() || /* Out of bound */ first_index + data.length() - 1 < unassembled_index_ || /* Data have been transferred */ data.empty() || output.available_capacity() == 0) { if (is_closed()) { output.close(); } return; }
constuint64_t cap = output.available_capacity(); // new_index actually distinguish where the current data start, the start index uint64_t new_index = first_index;
// Data needs to fit the capability limitation if (first_index <= unassembled_index_) { // 有重复 new_index = unassembled_index_; // overlapped_length = 70 - 60; constuint64_t overlapped_length = unassembled_index_ - first_index; // substr(10, 20 - 10); data.size = [60, 80] => [0, 20] data = data.substr(overlapped_length, min(data.size() - overlapped_length, cap)); } else { // 没有重复 // get 全部数据or可容纳的部分 data = data.substr(0, min(data.size(), cap)); if (first_index + data.size() - 1 > unassembled_index_ + cap - 1) { // data = data.substr(0, unassembled_index_ + cap - first_index); } }
// 主要是处理 unassembled_substrings_ 和 // 获取 >=new_index的最小索引 auto rear_iter = unassembled_substrings_.lower_bound(new_index); while (rear_iter != unassembled_substrings_.end()) { auto &[rear_index, rear_data] = *rear_iter; // 和rear_index没有重叠 if (new_index + data.size() - 1 < rear_index) { break; } // No overlap conflict
// 否则就是有重叠 uint64_t rear_overlapped_length = 0; if (new_index + data.size() - 1 < rear_index + rear_data.size() - 1) { // the last index of current data less than counterpart of the rear_data // denote that current: [. [ ]. ], then cut from [new_index, new_index + overlap_len],这种case新的或者旧的应该包含重叠部分,这里采用的是旧的包含重叠部分,新的不包含重叠部分 rear_overlapped_length = new_index + data.size() - rear_index; } else { // else current: [. [ ] ]. 这种case旧的substring就不应该需要了,新的替换 rear_overlapped_length = rear_data.size(); } // Prepare for next rear early, because the data may be erased afterwards. constuint64_t next_rear = rear_index + rear_data.size() - 1; // last index of rear_data if (rear_overlapped_length == rear_data.size()) { // 走了上面的else,就会走这里 unassembled_bytes_ -= rear_data.size(); unassembled_substrings_.erase(rear_index); // 抹除旧的 } else { // 走了上面的if,就会走这里 // We don't combine current data and rear data. // Erase the overlapped part in current data is more efficient. data.erase(data.end() - static_cast<int64_t>(rear_overlapped_length), data.end()); } // 寻找下一个 >= next_rear的substring rear_iter = unassembled_substrings_.lower_bound(next_rear); }
// 同样地主要是处理 unassembled_substrings_ 和最大索引 // if the current substring behind the unassembled_index_ if (first_index > unassembled_index_) { // 获取>new_index的最小值 auto front_iter = unassembled_substrings_.upper_bound(new_index); if (front_iter != unassembled_substrings_.begin()) { // 递减front_iter front_iter--; constauto &[front_index, front_data] = *front_iter; // if first_index, ] if (front_index + front_data.size() - 1 >= first_index) { uint64_t overlapped_length = 0; // if ] last_index if (front_index + front_data.size() <= first_index + data.size()) { // 一次性把之前的全部delete了,所以不需要遍历 overlapped_length = front_index + front_data.size() - first_index; } else { // if last_index ] overlapped_length = data.size(); } if (overlapped_length == front_data.size()) { unassembled_bytes_ -= front_data.size(); unassembled_substrings_.erase(front_index); } else { data.erase(data.begin(), data.begin() + static_cast<int64_t>(overlapped_length)); // Don't forget to update the inserted location new_index = first_index + overlapped_length; } } } }
// If the processed data is empty, no need to insert it. if (data.size() > 0) { unassembled_bytes_ += data.size(); unassembled_substrings_.insert(make_pair(new_index, std::move(data))); }
// 从unassembled_strings中取出合理的(这里的合理指有序取,所以顺序遍历从unassembled_strings中取和上一层未组装的地方)插入发送缓冲区 for (auto iter = unassembled_substrings_.begin(); iter != unassembled_substrings_.end();) { auto &[sub_index, sub_data] = *iter; if (sub_index == unassembled_index_) { // 找到了 constuint64_t prev_bytes_pushed = output.bytes_pushed(); output.push(sub_data); constuint64_t bytes_pushed = output.bytes_pushed(); // 但是可能装不下 // which case can go this if condition path ? when the Writer has no available space to store data. if (bytes_pushed != prev_bytes_pushed + sub_data.size()) { // Cannot push all data, we need to reserve the un-pushed part. constuint64_t pushed_length = bytes_pushed - prev_bytes_pushed; // 已经装进去的 unassembled_index_ += pushed_length; // 未装进去的 unassembled_bytes_ -= pushed_length; // 把没有装进去的放回缓存区(unassembled_substrings_) unassembled_substrings_.insert(make_pair(unassembled_index_, sub_data.substr(pushed_length))); // Don't forget to remove the previous incompletely transferred data unassembled_substrings_.erase(sub_index); break; } unassembled_index_ += sub_data.size(); unassembled_bytes_ -= sub_data.size(); unassembled_substrings_.erase(sub_index); iter = unassembled_substrings_.find(unassembled_index_); } else { break; // No need to do more. Data has been discontinuous. } }
if (is_closed()) { // if it is the last substring and bytes_pending === 0, then close output.close(); }
voidTCPReceiver::receive(TCPSenderMessage message, Reassembler &reassembler, Writer &inbound_stream) { // 没有设置SYN,此时不应该接受数据 if (!set_syn_) { // 且当前报道文也不是SYN的 if (!message.SYN) { return; // drop all data if SYN isn't received } // 当前报道文是SYN的,设置ISN:随机的32位数字 isn_ = message.seqno; // FIN occupied one seqno set_syn_ = true; // 设置SYN标志位 }
// inbound_stream.bytes_pushed()即unassembled_index, + 1即为下一个期待接入的序号(first_index),需要基于ISN转为abs_seqno => stram_index // stream_index = abs_seqno - 1 + SYN constuint64_t checkpoint = inbound_stream.bytes_pushed() + 1; constuint64_t abs_seqno = message.seqno.unwrap(isn_, checkpoint); // unwrap function starts from isn_, which occupies one seqno. // We calculate one index more so we need to minus it. // But if SYN exists in this message, compensation is needed. constuint64_t stream_index = abs_seqno - 1 + message.SYN; // 调用上一节实现的insert方法 reassembler.insert(stream_index, message.payload.release(), message.FIN, inbound_stream); }
// ethernet_address: Ethernet (what ARP calls "hardware") address of the interface // ip_address: IP (what ARP calls "protocol") address of the interface // cppcheck-suppress uninitMemberVar NetworkInterface::NetworkInterface(const EthernetAddress ðernet_address, const Address &ip_address) : ethernet_address_(ethernet_address), ip_address_(ip_address) { cerr << "DEBUG: Network interface has Ethernet address " << to_string(ethernet_address_) << " and IP address " << ip_address.ip() << "\n"; }
// dgram: the IPv4 datagram to be sent // next_hop: the IP address of the interface to send it to (typically a router or default gateway, but // may also be another host if directly connected to the same network as the destination)
// Note: the Address type can be converted to a uint32_t (raw 32-bit IP address) by using the // Address::ipv4_numeric() method. voidNetworkInterface::send_datagram(const InternetDatagram &dgram, const Address &next_hop) { // 下一跳的地址 constuint32_t addr_numeric = next_hop.ipv4_numeric();
/* ARP Table has stored the mapping info, we send the datagram directly */ // 如果ARP Table包含下一跳的地址,直接 if (arp_table_.contains(addr_numeric)) { EthernetFrame eth_frame; // 当前物理机以太网地址 eth_frame.header.src = ethernet_address_; // 目标物理机以太网地址 eth_frame.header.dst = arp_table_.at(addr_numeric).eth_addr; eth_frame.header.type = EthernetHeader::TYPE_IPv4; // 序列化 eth_frame.payload = serialize(dgram); // 发乳待发送缓冲区 outbound_frames_.push(eth_frame); } else { // 没有则发送arp请求 /* ARP Table has no such mapping and we haven't send an ARP request for target ip */ if (arp_requests_lifetime_.find(addr_numeric) == arp_requests_lifetime_.end()) { // next hop ipv4 addr is not contained in the arp requests waiting list ARPMessage arp_msg; arp_msg.opcode = ARPMessage::OPCODE_REQUEST; arp_msg.sender_ip_address = ip_address_.ipv4_numeric(); arp_msg.sender_ethernet_address = ethernet_address_; arp_msg.target_ip_address = addr_numeric; arp_msg.target_ethernet_address = {/* empty */};
// 广播请求 arp_requests_lifetime_.emplace(std::make_pair(addr_numeric, ARP_REQUEST_DEFAULT_TTL)); } // We need to store the datagram in the list. After we know the eth addr, we can queue // the corresponding dgrams. // 广播下一跳的以太网地址的 ARP 请求, 并将 IP 报文放入队列中待 ARP 回复收到后能将其发送出去。 arp_datagrams_waiting_list_.emplace_back(std::pair {next_hop, dgram}); } }
// we can get arp info from either ARP request or ARP reply if (is_arp_request || is_arp_response) { // 更新arp_table arp_table_.emplace(std::make_pair(arp_msg.sender_ip_address, arp_t {arp_msg.sender_ethernet_address, ARP_DEFAULT_TTL})); // delete arp datagrams waiting list,并且发送该存储的请求 for (auto iter = arp_datagrams_waiting_list_.begin(); iter != arp_datagrams_waiting_list_.end();) { constauto &[ipv4_addr, datagram] = *iter; if (ipv4_addr.ipv4_numeric() == arp_msg.sender_ip_address) { send_datagram(datagram, ipv4_addr); iter = arp_datagrams_waiting_list_.erase(iter); } else { iter++; } } arp_requests_lifetime_.erase(arp_msg.sender_ip_address); } }
returnnullopt; }
// ms_since_last_tick: the number of milliseconds since the last call to this method voidNetworkInterface::tick(constsize_t ms_since_last_tick) { /* delete expired ARP items in ARP Table */ // FIXME: Don't use 'iter++' if we have erase current iter's data! for (auto iter = arp_table_.begin(); iter != arp_table_.end(); /* nop */) { auto &[ipv4_addr_numeric, arp] = *iter; if (arp.ttl <= ms_since_last_tick) { iter = arp_table_.erase(iter); } else { arp.ttl -= ms_since_last_tick; iter++; } }
/* delete expired ARP requests */ for (auto &[ipv4_addr, arp_ttl] : arp_requests_lifetime_) { /* resent ARP request if this request has expired,使得任何已经过期的 IP 地址到 Ethernet 地址的映射失效 */ if (arp_ttl <= ms_since_last_tick) { ARPMessage arp_msg; arp_msg.opcode = ARPMessage::OPCODE_REQUEST; arp_msg.sender_ip_address = ip_address_.ipv4_numeric(); arp_msg.sender_ethernet_address = ethernet_address_; arp_msg.target_ip_address = ipv4_addr; arp_msg.target_ethernet_address = {/* empty */};