By now, you’ve seen how the abstraction of a reliable byte stream can
be useful in communicating across the Internet, even though the Internet
itself only provides the service of “best-effort” (unreliable)
datagrams.
// Remember index_ points to where the current byte located at. // 1. Unacceptable index: first_index overwhelms the capability range. // 2. All overlapped: The end index of the substring is smaller than current index_. // 3. data is empty. // 4. No available space. if (first_index >= unassembled_index_ + output.available_capacity() || /* Out of bound */ first_index + data.length() - 1 < unassembled_index_ || /* Data have been transferred */ data.empty() || output.available_capacity() == 0) { if (is_closed()) { output.close(); } return; }
constuint64_t cap = output.available_capacity(); // new_index actually distinguish where the current data start, the start index uint64_t new_index = first_index;
// Data needs to fit the capability limitation if (first_index <= unassembled_index_) { new_index = unassembled_index_; constuint64_t overlapped_length = unassembled_index_ - first_index; data = data.substr(overlapped_length, min(data.size() - overlapped_length, cap)); } else { data = data.substr(0, min(data.size(), cap)); if (first_index + data.size() - 1 > unassembled_index_ + cap - 1) { data = data.substr(0, unassembled_index_ + cap - first_index); } } // Get the rear substring and merge the overlapped part auto rear_iter = unassembled_substrings_.lower_bound(new_index); while (rear_iter != unassembled_substrings_.end()) { auto &[rear_index, rear_data] = *rear_iter; if (new_index + data.size() - 1 < rear_index) { break; } // No overlap conflict uint64_t rear_overlapped_length = 0; if (new_index + data.size() - 1 < rear_index + rear_data.size() - 1) { rear_overlapped_length = new_index + data.size() - rear_index; } else { rear_overlapped_length = rear_data.size(); } // Prepare for next rear early, because the data may be erased afterwards. constuint64_t next_rear = rear_index + rear_data.size() - 1; if (rear_overlapped_length == rear_data.size()) { unassembled_bytes_ -= rear_data.size(); unassembled_substrings_.erase(rear_index); } else { // We don't combine current data and rear data. // Erase the overlapped part in current data is more efficient. data.erase(data.end() - static_cast<int64_t>(rear_overlapped_length), data.end()); } rear_iter = unassembled_substrings_.lower_bound(next_rear); }
// If the processed data is empty, no need to insert it. if (data.size() > 0) { unassembled_bytes_ += data.size(); unassembled_substrings_.insert(make_pair(new_index, std::move(data))); }
for (auto iter = unassembled_substrings_.begin(); iter != unassembled_substrings_.end(); /* nop */) { auto &[sub_index, sub_data] = *iter; if (sub_index == unassembled_index_) { constuint64_t prev_bytes_pushed = output.bytes_pushed(); output.push(sub_data); constuint64_t bytes_pushed = output.bytes_pushed(); if (bytes_pushed != prev_bytes_pushed + sub_data.size()) { // Cannot push all data, we need to reserve the un-pushed part. constuint64_t pushed_length = bytes_pushed - prev_bytes_pushed; unassembled_index_ += pushed_length; unassembled_bytes_ -= pushed_length; unassembled_substrings_.insert(make_pair(unassembled_index_, sub_data.substr(pushed_length))); // Don't forget to remove the previous incompletely transferred data unassembled_substrings_.erase(sub_index); break; } unassembled_index_ += sub_data.size(); unassembled_bytes_ -= sub_data.size(); unassembled_substrings_.erase(sub_index); iter = unassembled_substrings_.find(unassembled_index_); } else { break; // No need to do more. Data has been discontinuous. } }
TCP-in-UDP-in-IP: TCP 报文会被置于用户的数据报的 payload 中,
在用户空间下这是最简单的实现方式: Linux 提供接口 (如 UDPSocket),
而用户侧仅需要提供 payload, 目标地址, Linux 内核会负责将 UDP 报部, IP
报头, 以太网报头组装起来, 将这个网络包发向下一个 hop。 Linux
内核会保证每个 socket 会有独立的本地与远端地址以及端口号,
并且保证这些数据在应用层的相互隔离。
TCP-in-IP: 一般情况下, TCP 报文会直接放在 Internet datagrams
中, 这通常被成为 “TCP/IP”。 Linux 会提供一个 TUN 设备接口,
需要应用层提供整个 Internet datagram, 而 Linux 内核则会处理剩下的部分。
但此时应用层需要自己构建整个 IP 报头以及 payload 部分。
TCP-in-IP-in-Ethernet:
以上的方法依赖Linux内核来实现的协议栈操作, 每次用户向 TUN 设备写入 IP
datagrams 时, Linux 都需要构建正确的带有 IP datagrams 的以太网帧作为
payload。 这意味着 Linux 需要知悉下一个 hop 的以太网目的地址, 给出其 IP
地址。 否则 Linux 会以广播的形式请求这些信息。
这些功能是由 Network Interface 实现的, 该组件能将 IP
数据报转义成以太网帧等等, 之后会传入 TAP 设备 (类似 TUN
设备但更底层), 实现对 link-layer 的数据帧的传输
// ethernet_address: Ethernet (what ARP calls "hardware") address of the interface // ip_address: IP (what ARP calls "protocol") address of the interface // cppcheck-suppress uninitMemberVar NetworkInterface::NetworkInterface(const EthernetAddress ðernet_address, const Address &ip_address) : ethernet_address_(ethernet_address), ip_address_(ip_address) { cerr << "DEBUG: Network interface has Ethernet address " << to_string(ethernet_address_) << " and IP address " << ip_address.ip() << "\n"; }
// dgram: the IPv4 datagram to be sent // next_hop: the IP address of the interface to send it to (typically a router or default gateway, but // may also be another host if directly connected to the same network as the destination)
// Note: the Address type can be converted to a uint32_t (raw 32-bit IP address) by using the // Address::ipv4_numeric() method. // 将待发送的 Internet(IP) datagrams 转义为以太网帧并最终发送出去 voidNetworkInterface::send_datagram(const InternetDatagram &dgram /* IP数据报 */, const Address &next_hop /* 下一跳ip地址 */) { // 获取下一跳的ipv4地址 constuint32_t addr_numeric = next_hop.ipv4_numeric();
/* ARP Table has stored the mapping info, we send the datagram directly */ // 如果缓存表arp_table里有,则直接包裹该IP数据报,然后发送 if (arp_table_.contains(addr_numeric)) { EthernetFrame eth_frame; // source mac addr eth_frame.header.src = ethernet_address_; // source mac addr eth_frame.header.dst = arp_table_.at(addr_numeric).eth_addr; // IPv4类型 eth_frame.header.type = EthernetHeader::TYPE_IPv4; // 序列化 eth_frame.payload = serialize(dgram); // 往待发送缓冲区发送 outbound_frames_.push(eth_frame); } else { /* ARP Table has no such mapping and we haven't send an ARP request for target ip */ // 否则发送广播需求下一跳mac addr if (arp_requests_lifetime_.find(addr_numeric) == arp_requests_lifetime_.end()) { // next hop ipv4 addr is not contained in the arp requests waiting list ARPMessage arp_msg; arp_msg.opcode = ARPMessage::OPCODE_REQUEST; arp_msg.sender_ip_address = ip_address_.ipv4_numeric(); arp_msg.sender_ethernet_address = ethernet_address_; arp_msg.target_ip_address = addr_numeric; arp_msg.target_ethernet_address = {/* empty */};
EthernetFrame arp_eth_frame; arp_eth_frame.header.src = ethernet_address_; arp_eth_frame.header.dst = ETHERNET_BROADCAST; // 广播帧 arp_eth_frame.header.type = EthernetHeader::TYPE_ARP; arp_eth_frame.payload = serialize(arp_msg); outbound_frames_.push(arp_eth_frame); // 上面几步就是构建好发送的请求,往待发送缓冲区发送,同时在队列里记录该请求的存活时间 arp_requests_lifetime_.emplace(std::make_pair(addr_numeric, ARP_REQUEST_DEFAULT_TTL)); } // We need to store the datagram in the list. After we know the eth addr, we can queue // the corresponding dgrams. arp_datagrams_waiting_list_.emplace_back(std::pair {next_hop, dgram}); } }
// we can get arp info from either ARP request or ARP reply if (is_arp_request || is_arp_response) { arp_table_.emplace(std::make_pair(arp_msg.sender_ip_address, arp_t {arp_msg.sender_ethernet_address, ARP_DEFAULT_TTL})); // delete arp datagrams waiting list for (auto iter = arp_datagrams_waiting_list_.begin(); iter != arp_datagrams_waiting_list_.end();) { constauto &[ipv4_addr, datagram] = *iter; if (ipv4_addr.ipv4_numeric() == arp_msg.sender_ip_address) { send_datagram(datagram, ipv4_addr); iter = arp_datagrams_waiting_list_.erase(iter); } else { iter++; } } arp_requests_lifetime_.erase(arp_msg.sender_ip_address); } }
returnnullopt; }
// ms_since_last_tick: the number of milliseconds since the last call to this method // 记录时间, 以使得任何已经过期的 IP 地址到 Ethernet 地址的映射失效 voidNetworkInterface::tick(constsize_t ms_since_last_tick) { /* delete expired ARP items in ARP Table */ // FIXME: Don't use 'iter++' if we have erase current iter's data! for (auto iter = arp_table_.begin(); iter != arp_table_.end(); /* nop */) { auto &[ipv4_addr_numeric, arp] = *iter; if (arp.ttl <= ms_since_last_tick) { // 小于一定阈值,则抹除 iter = arp_table_.erase(iter); } else { // 更新缓存表每一项的存活时间 arp.ttl -= ms_since_last_tick; iter++; } }
/* delete expired ARP requests */ for (auto &[ipv4_addr, arp_ttl] : arp_requests_lifetime_) { /* resent ARP request if this request has expired */ if (arp_ttl <= ms_since_last_tick) { ARPMessage arp_msg; arp_msg.opcode = ARPMessage::OPCODE_REQUEST; arp_msg.sender_ip_address = ip_address_.ipv4_numeric(); arp_msg.sender_ethernet_address = ethernet_address_; arp_msg.target_ip_address = ipv4_addr; arp_msg.target_ethernet_address = {/* empty */};
// route_prefix: The "up-to-32-bit" IPv4 address prefix to match the datagram's destination address against // prefix_length: For this route to be applicable, how many high-order (most-significant) bits of // the route_prefix will need to match the corresponding bits of the datagram's destination address? // next_hop: The IP address of the next hop. Will be empty if the network is directly attached to the router (in // which case, the next hop address should be the datagram's final destination). // interface_num: The index of the interface to send the datagram out on. voidRouter::add_route(constuint32_t route_prefix, constuint8_t prefix_length, const optional<Address> next_hop, constsize_t interface_num) { cerr << "DEBUG: adding route " << Address::from_ipv4_numeric(route_prefix).ip() << "/" << static_cast<int>(prefix_length) << " => " << (next_hop.has_value() ? next_hop->ip() : "(direct)") << " on interface " << interface_num << "\n";
EthernetAddress random_host_ethernet_address() { EthernetAddress addr; for (auto &byte : addr) { byte = random_device()(); // use a random local Ethernet address } addr.at(0) |= 0x02; // "10" in last two binary digits marks a private Ethernet address addr.at(0) &= 0xfe;
return addr; }
EthernetAddress random_router_ethernet_address() { EthernetAddress addr; for (auto &byte : addr) { byte = random_device()(); // use a random local Ethernet address } addr.at(0) = 0x02; // "10" in last two binary digits marks a private Ethernet address addr.at(1) = 0; addr.at(2) = 0;
optional<TCPSegment> read() { auto frame_opt = maybe_receive_frame(_data_socket_pair.first); if (not frame_opt) { return {}; } EthernetFrame frame = move(frame_opt.value());
// Give the frame to the NetworkInterface. Get back an Internet datagram if frame was carrying one. optional<InternetDatagram> ip_dgram = _interface.recv_frame(frame);
// The incoming frame may have caused the NetworkInterface to send a frame send_pending();
// Try to interpret IPv4 datagram as TCP if (ip_dgram) { returnunwrap_tcp_in_ip(ip_dgram.value()); }
/* let bouncer know where we are */ internet_socket.sendto(bounce_address, ""); internet_socket.sendto(bounce_address, ""); internet_socket.sendto(bounce_address, ""); internet_socket.connect(bounce_address);
_eventloop.add_rule( // IP => TCP "receive TCP segment from the network", _datagram_adapter.fd(), Direction::In, [&] { if (auto seg = _datagram_adapter.read()) { _tcp->receive(move(seg.value())); collect_segments(); }
// debugging output: if (_thread_data.eof() and _tcp.value().sender().sequence_numbers_in_flight() == 0 andnot _fully_acked) { cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string() << " has been fully acknowledged.\n"; _fully_acked = true; } }, [&] { return _tcp->active(); });
// rule 2: read from pipe into outbound buffer _eventloop.add_rule( // prepare send "push bytes to TCPPeer", _thread_data, Direction::In, [&] { string data; data.resize(_tcp->outbound_writer().available_capacity()); _thread_data.read(data); _tcp->outbound_writer().push(move(data));
if (_thread_data.eof()) { _tcp->outbound_writer().close(); _outbound_shutdown = true;
// rule 3: read from inbound buffer into pipe _eventloop.add_rule( // read "read bytes from inbound stream", _thread_data, Direction::Out, [&] { Reader &inbound = _tcp->inbound_reader(); // Write from the inbound_stream into // the pipe, handling the possibility of a partial // write (i.e., only pop what was actually written). if (inbound.bytes_buffered()) { const std::string_view buffer = inbound.peek(); constauto bytes_written = _thread_data.write(buffer); inbound.pop(bytes_written); }
if (inbound.is_finished() or inbound.has_error()) { _thread_data.shutdown(SHUT_WR); _inbound_shutdown = true;
// debugging output: cerr << "DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string() << " finished " << (inbound.has_error() ? "with an error/reset.\n" : "cleanly.\n"); } }, [&] { return _tcp->inbound_reader().bytes_buffered() or ((_tcp->inbound_reader().is_finished() or _tcp->inbound_reader().has_error()) andnot _inbound_shutdown); });
// rule 4: read outbound segments from TCPConnection and send as datagrams _eventloop.add_rule( // send TCP segment "send TCP segment", _datagram_adapter.fd(), Direction::Out, [&] { while (not outgoing_segments_.empty()) { // 直接warraped in IP datagram _datagram_adapter.write(outgoing_segments_.front()); outgoing_segments_.pop(); } }, [&] { returnnot outgoing_segments_.empty(); });
TCP => wrapped in IP(写入TUN设备) =>
socket(eventlop里对应上述规则1) => router(frame =>
socket(eventloop) => IP => socket(eventlop) => frame) =>
Internet(frame => IP => frame) => Host