intmain(int argc, char *argv[]){ try { if (argc <= 0) { abort(); // For sticklers: don't try to access argv[0] if argc <= 0. }
// The program takes two command-line arguments: the hostname and "path" part of the URL. // Print the usage message unless there are these two arguments (plus the program name // itself, so arg count = 3 in total). if (argc != 3) { cerr << "Usage: " << argv[0] << " HOST PATH\n"; cerr << "\tExample: " << argv[0] << " stanford.edu /class/cs144\n"; return EXIT_FAILURE; }
// Get the command-line arguments. const string host = argv[1]; const string path = argv[2];
//! Bytes are written on the "input" side and read from the "output" //! side. The byte stream is finite: the writer can end the input, //! and then no more bytes can be written. classByteStream { private: // Your code here -- add private members as necessary. BufferList _buffer = {}; size_t _capacity = 0; size_t _read_count = 0; size_t _write_count = 0; bool _input_ended_flag = false; bool _error = false; //!< Flag indicating that the stream suffered an error.
public: //! Construct a stream with room for `capacity` bytes. ByteStream(constsize_t capacity);
//! \name "Input" interface for the writer //!@{
//! Write a string of bytes into the stream. Write as many //! as will fit, and return how many were written. //! \returns the number of bytes accepted into the stream size_twrite(const std::string &data);
//! \returns the number of additional bytes that the stream has space for size_tremaining_capacity()const;
//! Signal that the byte stream has reached its ending voidend_input();
//! Indicate that the stream suffered an error. voidset_error(){ _error = true; } //!@}
//! \name "Output" interface for the reader //!@{
//! Peek at next "len" bytes of the stream //! \returns a string std::string peek_output(constsize_t len)const;
//! Remove bytes from the buffer voidpop_output(constsize_t len);
//! Read (i.e., copy and then pop) the next "len" bytes of the stream //! \returns a vector of bytes read std::string read(constsize_t len){ constauto ret = peek_output(len); pop_output(len); return ret; }
//! \returns `true` if the stream input has ended boolinput_ended()const;
//! \returns `true` if the stream has suffered an error boolerror()const{ return _error; }
//! \returns the maximum amount that can currently be read from the stream size_tbuffer_size()const;
//! \returns `true` if the buffer is empty boolbuffer_empty()const;
//! \returns `true` if the output has reached the ending booleof()const; //!@}
// ! \name General accounting //!@{
//! Total number of bytes written size_tbytes_written()const;
//! Total number of bytes popped size_tbytes_read()const; //!@} };
//! \param[in] len bytes will be copied from the output side of the buffer string ByteStream::peek_output(constsize_t len)const{ size_t length = len; if (length > _buffer.size()) { length = _buffer.size(); } string s = _buffer.concatenate(); returnstring().assign(s.begin(), s.begin() + length); }
//! \param[in] len bytes will be removed from the output side of the buffer voidByteStream::pop_output(constsize_t len){ size_t length = len; if (length > _buffer.size()) { length = _buffer.size(); } _read_count += length; _buffer.remove_prefix(length); return; }
git clone https://gitee.com/kangyupl/sponge git checkout -b lab1-startercode origin/lab1-startercode mkdir build && cd build cmake .. make format make -j4 && make check_lab1
// Construct a `StreamReassembler` that will store up to `capacity` bytes. // 构造一个“StreamReassembler”,最多可存储“capacity”字节。 StreamReassembler(constsize_t capacity); // Receive a substring and write any newly contiguous bytes into the stream, // while staying within the memory limits of the `capacity`. Bytes that would // exceed the capacity are silently discarded. // // `data`: the substring // `index` indicates the index (place in sequence) of the first byte in `data` // `eof`: the last byte of this substring will be the last byte in the entire stream // 接收子字符串并将任何新的连续字节写入流中, // 同时保持在“容量”的内存限制内。 // 超出容量的字节将被默默丢弃。 // // `data`:子字符串 // `index` 表示 `data` 中第一个字节的索引(按顺序排列) // `eof`:该子字符串的最后一个字节将是该子字符串中的最后一个字节整个流 voidpush_substring(const string &data, constuint64_t index, constbool eof); // Access the reassembled ByteStream (your code from Lab 0) // 访问重新组装的字节流(来自lab 0 的代码) ByteStream &stream_out(); // The number of bytes in the substrings stored but not yet reassembled // 已存储但尚未重组的子字符串中的字节数 size_tunassembled_bytes()const; // Is the internal state empty (other than the output stream)? // 内部状态是否为空(除了输出流)? boolempty()const;
//! \brief type of unassembled elm classtypeUnassembled { public: size_t _index; std::string _data; typeUnassembled(size_t index, std::string data) : _index(index), _data(data){}; booloperator <(typeUnassembled t)const{ return _index < t._index; } };
//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order, //! possibly overlapping) into an in-order byte stream. classStreamReassembler { private: // Your code here -- add private members as necessary. /// @brief std::set<typeUnassembled> _Unassembled; size_t _firstUnassembled; size_t _nUnassembled; bool _eof; ByteStream _output; //!< The reassembled in-order byte stream size_t _capacity; //!< The maximum number of bytes
intmerge_substring(size_t & index, std::string &data, size_t index2, const std::string &data2); public: //! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes. //! \note This capacity limits both the bytes that have been reassembled, //! and those that have not yet been reassembled. StreamReassembler(constsize_t capacity);
//! \brief Receives a substring and writes any newly contiguous bytes into the stream. //! //! If accepting all the data would overflow the `capacity` of this //! `StreamReassembler`, then only the part of the data that fits will be //! accepted. If the substring is only partially accepted, then the `eof` //! will be disregarded. //! //! \param data the string being added //! \param index the index of the first byte in `data` //! \param eof whether or not this segment ends with the end of the stream voidpush_substring(const std::string &data, constuint64_t index, constbool eof);
//! The number of bytes in the substrings stored but not yet reassembled //! //! \note If the byte at a particular index has been submitted twice, it //! should only be counted once for the purpose of this function. size_tunassembled_bytes()const;
//! \brief Is the internal state empty (other than the output stream)? //! \returns `true` if no substrings are waiting to be assembled boolempty()const; };
classTCPReceiver { //! Our data structure for re-assembling bytes. StreamReassembler _reassembler; bool _syn_received; bool _fin_received; WrappingInt32 _isn; WrappingInt32 _ackno; uint64_t _checkpoint; //! The maximum number of bytes we'll store. size_t _capacity;
public: //! \brief Construct a TCP receiver //! //! \param capacity the maximum number of bytes that the receiver will //! store in its buffers at any give time. TCPReceiver(constsize_t capacity) : _reassembler(capacity), _syn_received(0), _fin_received(0), _isn(0), _ackno(0), _checkpoint(0) {} ...... };
if (hdr.fin && !_fin_received) { _fin_received = 1; // if flags = SF and payload_size = 0, we need to end_input() the stream manually if (hdr.syn && seg.length_in_sequence_space() == 2) { stream_out().end_input(); } }
// second syn or fin will be rejected if ((inbound) || (hdr.fin && !old_fin_received) || (hdr.syn && !old_syn_received)) { returntrue; } returnfalse; }
//! \brief The "sender" part of a TCP implementation.
//! Accepts a ByteStream, divides it up into segments and sends the //! segments, keeps track of which segments are still in-flight, //! maintains the Retransmission Timer, and retransmits in-flight //! segments if the retransmission timer expires. classTCPRetransmissionTimer { public: //! retransmission timer for the connection unsignedint _initial_RTO;
//! retransmission timeout unsignedint _RTO;
//! timeout unsignedint _TO;
//! state of the timer, 1:open, 0:close bool _open;
//! \name "Input" interface for the writer //!@{ ByteStream &stream_in(){ return _stream; } const ByteStream &stream_in()const{ return _stream; } //!@}
//! \name Methods that can cause the TCPSender to send a segment //!@{
//! \brief A new acknowledgment was received boolack_received(const WrappingInt32 ackno, constuint16_t window_size);
//! \brief Generate an empty-payload segment (useful for creating empty ACK segments) voidsend_empty_segment();
//! \brief create and send segments to fill as much of the window as possible voidfill_window();
//! \brief Notifies the TCPSender of the passage of time voidtick(constsize_t ms_since_last_tick); //!@}
//! \name Accessors //!@{
//! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged? //! \note count is in "sequence space," i.e. SYN and FIN each count for one byte //! (see TCPSegment::length_in_sequence_space()) size_tbytes_in_flight()const;
//! \brief Number of consecutive retransmissions that have occurred in a row unsignedintconsecutive_retransmissions()const;
//! \brief TCPSegments that the TCPSender has enqueued for transmission. //! \note These must be dequeued and sent by the TCPConnection, //! which will need to fill in the fields that are set by the TCPReceiver //! (ackno and window size) before sending. std::queue<TCPSegment> &segments_out(){ return _segments_out; } //!@}
//! \name What is the next sequence number? (used for testing) //!@{
//! \brief absolute seqno for the next byte to be sent uint64_tnext_seqno_absolute()const{ return _next_seqno; }
//! \brief relative seqno for the next byte to be sent WrappingInt32 next_seqno()const{ returnwrap(_next_seqno, _isn); } //!@}
//! \param[in] capacity the capacity of the outgoing byte stream //! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment //! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN) TCPSender::TCPSender(constsize_t capacity, constuint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn) : _isn(fixed_isn.value_or(WrappingInt32{random_device()()})) , _segments_out{} , _segments_outstanding{} , _nBytes_inflight(0) , _recv_ackno(0) , _timer{retx_timeout} , _window_size(1) , _consecutive_retransmissions{0} , _stream(capacity) , _next_seqno(0) , _syn_sent(0) , _fin_sent(0) {}
voidTCPSender::fill_window(){ assert(!_stream.error()); TCPSegment seg; if (_next_seqno == 0) { // state is CLOSE, need to send SYN seg.header().syn = 1; _syn_sent = 1; send_non_empty_segment(seg); return; } elseif (_next_seqno == _nBytes_inflight) { // state is SYN SENT, don't send SYN return; }
// zero window probing uint16_t window_size = _window_size ? _window_size : 1; uint64_t remaining; while ((remaining = static_cast<uint64_t>(window_size) + (_recv_ackno - _next_seqno))) { // FIN flag occupies space in window TCPSegment new_seg; if (_stream.eof() && !_fin_sent) { new_seg.header().fin = 1; _fin_sent = 1; send_non_empty_segment(new_seg); return; } elseif (_stream.eof()) { return; } size_t size = min(static_cast<size_t>(remaining), TCPConfig::MAX_PAYLOAD_SIZE); new_seg.payload() = Buffer(_stream.read(size)); if (new_seg.length_in_sequence_space() < window_size && _stream.eof()) { // piggy-back FIN new_seg.header().fin = 1; _fin_sent = 1; } if (new_seg.length_in_sequence_space() == 0) { return; } send_non_empty_segment(new_seg); } }
//! \param ackno The remote receiver's ackno (acknowledgment number) //! \param window_size The remote receiver's advertised window size //! \returns `false` if the ackno appears invalid (acknowledges something the TCPSender hasn't sent yet) boolTCPSender::ack_received(const WrappingInt32 ackno, constuint16_t window_size){ if (ackno - next_seqno() > 0) { returnfalse; } //如果window_size为0,需要记录下来,"zero window probing", 影响tick()和fill_window()的行为 _window_size = window_size; uint64_t abs_ackno = unwrap(ackno, _isn, _recv_ackno); if (abs_ackno <= _recv_ackno) { returntrue; }
_recv_ackno = abs_ackno; // acknowledges the successful receipt of new data _timer._RTO = _timer._initial_RTO; _timer._TO = 0; _consecutive_retransmissions = 0;
// any outstanding segment, restart the timer. // [RFC6298](5.3) if (!_segments_outstanding.empty()) { _timer.start(); } returntrue; }
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method voidTCPSender::tick(constsize_t ms_since_last_tick){ size_t time_left = ms_since_last_tick; if (_timer.tick(time_left)) { // Notice: remove fill_the_window() here to fix the test fsm_retx_relaxed // timer has expired, retransmit at most ONE outstanding segment if (!_segments_outstanding.empty()) { // retransmit the outstanding segment with the lowest sequence number _segments_out.push(_segments_outstanding.front()); if (_window_size) { _consecutive_retransmissions++; _timer._RTO *= 2; // double the RTO, exponential backoff, it slows down retransmissions on lousy // networks to avoid further gumming up the works } if (!_timer.open()) { //[RFC6298](5.1) _timer.start(); } if (syn_sent() && (_next_seqno == _nBytes_inflight) && (_timer._RTO < 3000)) { _timer._RTO = 3000; // SYN_SENT, [RFC6298](5.7) } } if (_segments_outstanding.empty()) { _timer.close(); } } }