Skip to content

add function 'void set_max_message_buffer_size(size_t)' to avoid too many message body cached in queue ( caused OOM) #1159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion websocketpp/config/core.hpp
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ struct core {
* @since 0.3.0
*/
static const size_t max_message_size = 32000000;

static const size_t max_message_buffer_size = 32000000*3;

/// Default maximum http body size
/**
* Default value for the http parser's maximum body size. Maximum body size
Expand Down
68 changes: 36 additions & 32 deletions websocketpp/connection.hpp
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ class connection

/// Get maximum message size
/**
* Get maximum message size. Maximum message size determines the point at
* Get maximum message size. Maximum message size determines the point at
* which the connection will fail with the message_too_big protocol error.
*
* The default is set by the endpoint that creates the connection.
Expand All @@ -559,11 +559,14 @@ class connection
size_t get_max_message_size() const {
return m_max_message_size;
}

void set_max_message_buffser_size(size_t mbs) {
max_buffer_size = mbs;
}

/// Set maximum message size
/**
* Set maximum message size. Maximum message size determines the point at
* which the connection will fail with the message_too_big protocol error.
* Set maximum message size. Maximum message size determines the point at
* which the connection will fail with the message_too_big protocol error.
* This value may be changed during the connection.
*
* The default is set by the endpoint that creates the connection.
Expand All @@ -578,7 +581,7 @@ class connection
m_processor->set_max_message_size(new_value);
}
}

/// Get maximum HTTP message body size
/**
* Get maximum HTTP message body size. Maximum message body size determines
Expand All @@ -594,7 +597,7 @@ class connection
size_t get_max_http_body_size() const {
return m_request.get_max_body_size();
}

/// Set maximum HTTP message body size
/**
* Set maximum HTTP message body size. Maximum message body size determines
Expand Down Expand Up @@ -701,14 +704,14 @@ class connection
* @return An error code
*/
lib::error_code interrupt();

/// Transport inturrupt callback
void handle_interrupt();

/// Pause reading of new data
/**
* Signals to the connection to halt reading of new data. While reading is paused,
* the connection will stop reading from its associated socket. In turn this will
* Signals to the connection to halt reading of new data. While reading is paused,
* the connection will stop reading from its associated socket. In turn this will
* result in TCP based flow control kicking in and slowing data flow from the remote
* endpoint.
*
Expand All @@ -720,7 +723,7 @@ class connection
*
* If supported by the transport this is done asynchronously. As such reading may not
* stop until the current read operation completes. Typically you can expect to
* receive no more bytes after initiating a read pause than the size of the read
* receive no more bytes after initiating a read pause than the size of the read
* buffer.
*
* If reading is paused for this connection already nothing is changed.
Expand Down Expand Up @@ -974,7 +977,7 @@ class connection

/// Get response HTTP status code
/**
* Gets the response status code
* Gets the response status code
*
* @since 0.7.0
*
Expand All @@ -986,7 +989,7 @@ class connection

/// Get response HTTP status message
/**
* Gets the response status message
* Gets the response status message
*
* @since 0.7.0
*
Expand All @@ -995,7 +998,7 @@ class connection
std::string const & get_response_msg() const {
return m_response.get_status_msg();
}

/// Set response status code and message
/**
* Sets the response status code to `code` and looks up the corresponding
Expand Down Expand Up @@ -1102,7 +1105,7 @@ class connection
request_type const & get_request() const {
return m_request;
}

/// Get response object
/**
* Direct access to the HTTP response sent or received as a part of the
Expand All @@ -1121,7 +1124,7 @@ class connection
response_type const & get_response() const {
return m_response;
}

/// Defer HTTP Response until later (Exception free)
/**
* Used in the http handler to defer the HTTP response for this connection
Expand All @@ -1136,7 +1139,7 @@ class connection
* @return A status code, zero on success, non-zero otherwise
*/
lib::error_code defer_http_response();

/// Send deferred HTTP Response (exception free)
/**
* Sends an http response to an HTTP connection that was deferred. This will
Expand All @@ -1148,25 +1151,25 @@ class connection
* @param ec A status code, zero on success, non-zero otherwise
*/
void send_http_response(lib::error_code & ec);

/// Send deferred HTTP Response
void send_http_response();

// TODO HTTPNBIO: write_headers
// function that processes headers + status so far and writes it to the wire
// beginning the HTTP response body state. This method will ignore anything
// in the response body.

// TODO HTTPNBIO: write_body_message
// queues the specified message_buffer for async writing

// TODO HTTPNBIO: finish connection
//

// TODO HTTPNBIO: write_response
// Writes the whole response, headers + body and closes the connection



/////////////////////////////////////////////////////////////
// Pass-through access to the other connection information //
Expand Down Expand Up @@ -1286,7 +1289,7 @@ class connection
// you are doing. //
////////////////////////////////////////////////////////////////////////



void read_handshake(size_t num_bytes);

Expand All @@ -1295,7 +1298,7 @@ class connection
void handle_read_http_response(lib::error_code const & ec,
size_t bytes_transferred);


void handle_write_http_response(lib::error_code const & ec);
void handle_send_http_request(lib::error_code const & ec);

Expand Down Expand Up @@ -1337,8 +1340,8 @@ class connection
*/
void handle_write_frame(lib::error_code const & ec);
// protected:
// This set of methods would really like to be protected, but doing so
// requires that the endpoint be able to friend the connection. This is
// This set of methods would really like to be protected, but doing so
// requires that the endpoint be able to friend the connection. This is
// allowed with C++11, but not prior versions

/// Start the connection state machine
Expand Down Expand Up @@ -1368,7 +1371,7 @@ class connection
/// set m_response and return an error code indicating status.
lib::error_code process_handshake_request();
private:


/// Completes m_response, serializes it, and sends it out on the wire.
void write_http_response(lib::error_code const & ec);
Expand Down Expand Up @@ -1474,7 +1477,7 @@ class connection
* Includes: error code and message for why it was failed
*/
void log_fail_result();

/// Prints information about HTTP connections
/**
* Includes: TODO
Expand Down Expand Up @@ -1516,6 +1519,7 @@ class connection
long m_close_handshake_timeout_dur;
long m_pong_timeout_dur;
size_t m_max_message_size;
size_t max_buffer_size = 0;

/// External connection state
/**
Expand Down Expand Up @@ -1623,11 +1627,11 @@ class connection

/// Detailed internal error code
lib::error_code m_ec;

/// A flag that gets set once it is determined that the connection is an
/// HTTP connection and not a WebSocket one.
bool m_is_http;

/// A flag that gets set when the completion of an http connection is
/// deferred until later.
session::http_state::value m_http_state;
Expand Down
6 changes: 6 additions & 0 deletions websocketpp/endpoint.hpp
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class endpoint : public config::transport_type, public config::endpoint_base {
, m_close_handshake_timeout_dur(config::timeout_close_handshake)
, m_pong_timeout_dur(config::timeout_pong)
, m_max_message_size(config::max_message_size)
, max_buffer_size(config::max_message_buffer_size)
, m_max_http_body_size(config::max_http_body_size)
, m_is_server(p_is_server)
{
Expand Down Expand Up @@ -143,6 +144,7 @@ class endpoint : public config::transport_type, public config::endpoint_base {
, m_close_handshake_timeout_dur(o.m_close_handshake_timeout_dur)
, m_pong_timeout_dur(o.m_pong_timeout_dur)
, m_max_message_size(o.m_max_message_size)
, max_buffer_size(o.max_buffer_size)
, m_max_http_body_size(o.m_max_http_body_size)

, m_rng(std::move(o.m_rng))
Expand Down Expand Up @@ -432,6 +434,9 @@ class endpoint : public config::transport_type, public config::endpoint_base {
void set_max_message_size(size_t new_value) {
m_max_message_size = new_value;
}
void set_max_message_buffer_size(size_t new_value) {
max_buffer_size = new_value;
}

/// Get maximum HTTP message body size
/**
Expand Down Expand Up @@ -682,6 +687,7 @@ class endpoint : public config::transport_type, public config::endpoint_base {
long m_close_handshake_timeout_dur;
long m_pong_timeout_dur;
size_t m_max_message_size;
size_t max_buffer_size;
size_t m_max_http_body_size;

rng_type m_rng;
Expand Down
21 changes: 19 additions & 2 deletions websocketpp/impl/connection_impl.hpp
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2205,7 +2205,6 @@ connection<config>::get_processor(int version) const {

return p;
}

template <typename config>
void connection<config>::write_push(typename config::message_type::ptr msg)
{
Expand All @@ -2216,10 +2215,28 @@ void connection<config>::write_push(typename config::message_type::ptr msg)
m_send_buffer_size += msg->get_payload().size();
m_send_queue.push(msg);

if (max_buffer_size>0)
{
while (m_send_buffer_size>max_buffer_size)
{
auto legacy_msg = m_send_queue.front();
if (legacy_msg)
{
m_send_queue.pop();
m_send_buffer_size-=legacy_msg->get_payload().size();
}else
{
break;
}
}
}


if (m_alog->static_test(log::alevel::devel)) {
std::stringstream s;
s << "write_push: message count: " << m_send_queue.size()
<< " buffer size: " << m_send_buffer_size;
<< " buffer size: " << m_send_buffer_size/1024.0/1024.0;
std::cout<<s.str()<<std::endl;
m_alog->write(log::alevel::devel,s.str());
}
}
Expand Down
6 changes: 6 additions & 0 deletions websocketpp/impl/endpoint_impl.hpp
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ endpoint<connection,config>::create_connection() {
if (m_max_message_size != config::max_message_size) {
con->set_max_message_size(m_max_message_size);
}
if (max_buffer_size != config::max_message_buffer_size)
{
con->set_max_message_buffser_size(max_buffer_size);
}


con->set_max_http_body_size(m_max_http_body_size);

lib::error_code ec;
Expand Down