From 8d6df03c1d6e2aeff49c9fd482ec70d7ae46b1da Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sat, 16 Feb 2019 22:42:02 -0500 Subject: [PATCH 1/9] Add incremental updating of open streams count and closed_streams state --- h2/connection.py | 42 +++++++++++++------- h2/stream.py | 85 +++++++++++++++++++++++++++++++++++++++- test/test_basic_logic.py | 2 +- 3 files changed, 111 insertions(+), 18 deletions(-) diff --git a/h2/connection.py b/h2/connection.py index 20975e3bb..e1c6b0d7e 100644 --- a/h2/connection.py +++ b/h2/connection.py @@ -292,6 +292,9 @@ def __init__(self, config=None): self.encoder = Encoder() self.decoder = Decoder() + self._open_outbound_stream_count = 0 + self._open_inbound_stream_count = 0 + # This won't always actually do anything: for versions of HPACK older # than 2.3.0 it does nothing. However, we have to try! self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE @@ -362,6 +365,8 @@ def __init__(self, config=None): size_limit=self.MAX_CLOSED_STREAMS ) + self._streams_to_close = list() + # The flow control window manager for the connection. self._inbound_flow_control_window_manager = WindowManager( max_window_size=self.local_settings.initial_window_size @@ -383,6 +388,15 @@ def __init__(self, config=None): ExtensionFrame: self._receive_unknown_frame } + def _increment_open_streams(self, stream_id, incr): + if stream_id % 2 == 0: + self._open_inbound_stream_count += incr + elif stream_id % 2 == 1: + self._open_outbound_stream_count += incr + + def _close_stream(self, stream_id): + self._streams_to_close.append(stream_id) + def _prepare_for_sending(self, frames): if not frames: return @@ -393,22 +407,18 @@ def _open_streams(self, remainder): """ A common method of counting number of open streams. Returns the number of streams that are open *and* that have (stream ID % 2) == remainder. - While it iterates, also deletes any closed streams. + Also cleans up closed streams. """ - count = 0 - to_delete = [] - - for stream_id, stream in self.streams.items(): - if stream.open and (stream_id % 2 == remainder): - count += 1 - elif stream.closed: - to_delete.append(stream_id) - - for stream_id in to_delete: + for stream_id in self._streams_to_close: stream = self.streams.pop(stream_id) self._closed_streams[stream_id] = stream.closed_by + self._streams_to_close = list() - return count + if remainder == 0: + return self._open_inbound_stream_count + elif remainder == 1: + return self._open_outbound_stream_count + return 0 @property def open_outbound_streams(self): @@ -467,7 +477,9 @@ def _begin_new_stream(self, stream_id, allowed_ids): stream_id, config=self.config, inbound_window_size=self.local_settings.initial_window_size, - outbound_window_size=self.remote_settings.initial_window_size + outbound_window_size=self.remote_settings.initial_window_size, + increment_open_stream_count_callback=self._increment_open_streams, + close_stream_callback=self._close_stream, ) self.config.logger.debug("Stream ID %d created", stream_id) s.max_inbound_frame_size = self.max_inbound_frame_size @@ -1542,8 +1554,8 @@ def _receive_headers_frame(self, frame): max_open_streams = self.local_settings.max_concurrent_streams if (self.open_inbound_streams + 1) > max_open_streams: raise TooManyStreamsError( - "Max outbound streams is %d, %d open" % - (max_open_streams, self.open_outbound_streams) + "Max inbound streams is %d, %d open" % + (max_open_streams, self.open_inbound_streams) ) # Let's decode the headers. We handle headers as bytes internally up diff --git a/h2/stream.py b/h2/stream.py index 827e65a71..20ec754bd 100644 --- a/h2/stream.py +++ b/h2/stream.py @@ -767,6 +767,58 @@ def send_alt_svc(self, previous_state): (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), } +""" +Wraps a stream state change function to ensure that we keep +the parent H2Connection's state in sync +""" +def sync_state_change(func): + def wrapper(self, *args, **kwargs): + # Collect state at the beginning. + start_state = self.state_machine.state + started_open = self.open + started_closed = not started_open + + # Do the state change (if any). + result = func(self, *args, **kwargs) + + # Collect state at the end. + end_state = self.state_machine.state + ended_open = self.open + ended_closed = not ended_open + + # If at any point we've tranwsitioned to the CLOSED state + # from any other state, close our stream. + if end_state == StreamState.CLOSED and start_state != end_state: + if self._close_stream_callback: + self._close_stream_callback(self.stream_id) + # Clear callback so we only call this once per stream + self._close_stream_callback = None + + # If we were open, but are now closed, decrement + # the open stream count, and call the close callback. + if started_open and ended_closed: + if self._decrement_open_stream_count_callback: + self._decrement_open_stream_count_callback(self.stream_id, + -1,) + # Clear callback so we only call this once per stream + self._decrement_open_stream_count_callback = None + + if self._close_stream_callback: + self._close_stream_callback(self.stream_id) + # Clear callback so we only call this once per stream + self._close_stream_callback = None + + # If we were closed, but are now open, increment + # the open stream count. + elif started_closed and ended_open: + if self._increment_open_stream_count_callback: + self._increment_open_stream_count_callback(self.stream_id, + 1,) + # Clear callback so we only call this once per stream + self._increment_open_stream_count_callback = None + return result + return wrapper + class H2Stream(object): """ @@ -782,18 +834,29 @@ def __init__(self, stream_id, config, inbound_window_size, - outbound_window_size): + outbound_window_size, + increment_open_stream_count_callback, + close_stream_callback,): self.state_machine = H2StreamStateMachine(stream_id) self.stream_id = stream_id self.max_outbound_frame_size = None self.request_method = None - # The current value of the outbound stream flow control window + # The current value of the outbound stream flow control window. self.outbound_flow_control_window = outbound_window_size # The flow control manager. self._inbound_window_manager = WindowManager(inbound_window_size) + # Callback to increment open stream count for the H2Connection. + self._increment_open_stream_count_callback = increment_open_stream_count_callback + + # Callback to decrement open stream count for the H2Connection. + self._decrement_open_stream_count_callback = increment_open_stream_count_callback + + # Callback to clean up state for the H2Connection once we're closed. + self._close_stream_callback = close_stream_callback + # The expected content length, if any. self._expected_content_length = None @@ -850,6 +913,7 @@ def closed_by(self): """ return self.state_machine.stream_closed_by + @sync_state_change def upgrade(self, client_side): """ Called by the connection to indicate that this stream is the initial @@ -868,6 +932,7 @@ def upgrade(self, client_side): self.state_machine.process_input(input_) return + @sync_state_change def send_headers(self, headers, encoder, end_stream=False): """ Returns a list of HEADERS/CONTINUATION frames to emit as either headers @@ -917,6 +982,7 @@ def send_headers(self, headers, encoder, end_stream=False): return frames + @sync_state_change def push_stream_in_band(self, related_stream_id, headers, encoder): """ Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed @@ -941,6 +1007,7 @@ def push_stream_in_band(self, related_stream_id, headers, encoder): return frames + @sync_state_change def locally_pushed(self): """ Mark this stream as one that was pushed by this peer. Must be called @@ -954,6 +1021,7 @@ def locally_pushed(self): assert not events return [] + @sync_state_change def send_data(self, data, end_stream=False, pad_length=None): """ Prepare some data frames. Optionally end the stream. @@ -981,6 +1049,7 @@ def send_data(self, data, end_stream=False, pad_length=None): return [df] + @sync_state_change def end_stream(self): """ End a stream without sending data. @@ -992,6 +1061,7 @@ def end_stream(self): df.flags.add('END_STREAM') return [df] + @sync_state_change def advertise_alternative_service(self, field_value): """ Advertise an RFC 7838 alternative service. The semantics of this are @@ -1005,6 +1075,7 @@ def advertise_alternative_service(self, field_value): asf.field = field_value return [asf] + @sync_state_change def increase_flow_control_window(self, increment): """ Increase the size of the flow control window for the remote side. @@ -1020,6 +1091,7 @@ def increase_flow_control_window(self, increment): wuf.window_increment = increment return [wuf] + @sync_state_change def receive_push_promise_in_band(self, promised_stream_id, headers, @@ -1044,6 +1116,7 @@ def receive_push_promise_in_band(self, ) return [], events + @sync_state_change def remotely_pushed(self, pushed_headers): """ Mark this stream as one that was pushed by the remote peer. Must be @@ -1057,6 +1130,7 @@ def remotely_pushed(self, pushed_headers): self._authority = authority_from_headers(pushed_headers) return [], events + @sync_state_change def receive_headers(self, headers, end_stream, header_encoding): """ Receive a set of headers (or trailers). @@ -1091,6 +1165,7 @@ def receive_headers(self, headers, end_stream, header_encoding): ) return [], events + @sync_state_change def receive_data(self, data, end_stream, flow_control_len): """ Receive some data. @@ -1114,6 +1189,7 @@ def receive_data(self, data, end_stream, flow_control_len): events[0].flow_controlled_length = flow_control_len return [], events + @sync_state_change def receive_window_update(self, increment): """ Handle a WINDOW_UPDATE increment. @@ -1150,6 +1226,7 @@ def receive_window_update(self, increment): return frames, events + @sync_state_change def receive_continuation(self): """ A naked CONTINUATION frame has been received. This is always an error, @@ -1162,6 +1239,7 @@ def receive_continuation(self): ) assert False, "Should not be reachable" + @sync_state_change def receive_alt_svc(self, frame): """ An Alternative Service frame was received on the stream. This frame @@ -1189,6 +1267,7 @@ def receive_alt_svc(self, frame): return [], events + @sync_state_change def reset_stream(self, error_code=0): """ Close the stream locally. Reset the stream with an error code. @@ -1202,6 +1281,7 @@ def reset_stream(self, error_code=0): rsf.error_code = error_code return [rsf] + @sync_state_change def stream_reset(self, frame): """ Handle a stream being reset remotely. @@ -1217,6 +1297,7 @@ def stream_reset(self, frame): return [], events + @sync_state_change def acknowledge_received_data(self, acknowledged_size): """ The user has informed us that they've processed some amount of data diff --git a/test/test_basic_logic.py b/test/test_basic_logic.py index 7df99a6a5..adecad2e4 100644 --- a/test/test_basic_logic.py +++ b/test/test_basic_logic.py @@ -1851,7 +1851,7 @@ def test_stream_repr(self): """ Ensure stream string representation is appropriate. """ - s = h2.stream.H2Stream(4, None, 12, 14) + s = h2.stream.H2Stream(4, None, 12, 14, None, None) assert repr(s) == ">" From 9a6effdca783e553b4101e1582a1c38997f527af Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sun, 17 Feb 2019 08:09:32 -0500 Subject: [PATCH 2/9] Add concurrent stream open performance test --- test/test_concurrent_stream_open.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 test/test_concurrent_stream_open.py diff --git a/test/test_concurrent_stream_open.py b/test/test_concurrent_stream_open.py new file mode 100644 index 000000000..4d3af06db --- /dev/null +++ b/test/test_concurrent_stream_open.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +""" +test_flow_control +~~~~~~~~~~~~~~~~~ + +Tests of the flow control management in h2 +""" +import pytest +import time + +from hypothesis import given +from hypothesis.strategies import integers + +import h2.config +import h2.connection +import h2.errors +import h2.events +import h2.exceptions +import h2.settings + + +class TestConcurrentStreamOpenPerformance(object): + """ + Tests the performance of concurrently opening streams + """ + example_request_headers = [ + (':authority', 'example.com'), + (':path', '/'), + (':scheme', 'https'), + (':method', 'GET'), + ] + server_config = h2.config.H2Configuration(client_side=False) + + DEFAULT_FLOW_WINDOW = 65535 + + def test_concurrent_stream_open_performance(self, frame_factory): + """ + Opening many concurrent streams does not take too long + """ + c = h2.connection.H2Connection() + c.initiate_connection() + start = time.time() + for i in xrange(5000): + c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False) + c.clear_outbound_data_buffer() + end = time.time() + + print end-start + assert end-start < 3 + From 862dd4c90acd6df641a97e4e19309f1d228800da Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sun, 17 Feb 2019 08:41:33 -0500 Subject: [PATCH 3/9] Don't log the stream keys unless debug logging is enabled, to prevent quadratic performance with large number of open streams --- h2/config.py | 6 ++++++ h2/connection.py | 6 +++++- test/test_concurrent_stream_open.py | 10 +++++----- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/h2/config.py b/h2/config.py index 08129a406..cdf7f8c78 100644 --- a/h2/config.py +++ b/h2/config.py @@ -36,6 +36,12 @@ class DummyLogger(object): def __init__(self, *vargs): pass + def isEnabledFor(self, lvl): + """ + Dummy logger, so nothing is enabled. + """ + return False + def debug(self, *vargs, **kwargs): """ No-op logging. Only level needed for now. diff --git a/h2/connection.py b/h2/connection.py index e1c6b0d7e..d9bda1de4 100644 --- a/h2/connection.py +++ b/h2/connection.py @@ -6,6 +6,7 @@ An implementation of a HTTP/2 connection. """ import base64 +import logging from enum import Enum, IntEnum @@ -486,7 +487,10 @@ def _begin_new_stream(self, stream_id, allowed_ids): s.max_outbound_frame_size = self.max_outbound_frame_size self.streams[stream_id] = s - self.config.logger.debug("Current streams: %s", self.streams.keys()) + # Disable this log if we're not in debug mode, as it can be expensive + # when there are many concurrently open streams + if self.config.logger.isEnabledFor(logging.DEBUG): + self.config.logger.debug("Current streams: %s", self.streams.keys()) if outbound: self.highest_outbound_stream_id = stream_id diff --git a/test/test_concurrent_stream_open.py b/test/test_concurrent_stream_open.py index 4d3af06db..0294a80fa 100644 --- a/test/test_concurrent_stream_open.py +++ b/test/test_concurrent_stream_open.py @@ -35,16 +35,16 @@ class TestConcurrentStreamOpenPerformance(object): def test_concurrent_stream_open_performance(self, frame_factory): """ - Opening many concurrent streams does not take too long + Opening many concurrent streams is constant time operation """ + num_concurrent_streams = 5000 c = h2.connection.H2Connection() c.initiate_connection() start = time.time() - for i in xrange(5000): + for i in xrange(num_concurrent_streams): c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False) c.clear_outbound_data_buffer() end = time.time() - - print end-start - assert end-start < 3 + + assert end - start < 3 From 40aabf4f83ec3ff299592c7a9772f51d90583d12 Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sun, 17 Feb 2019 09:39:14 -0500 Subject: [PATCH 4/9] Code coverage tests, refactor some code to make coverage tool happy --- h2/config.py | 10 ++++++++-- h2/connection.py | 17 +++++------------ h2/stream.py | 5 ----- test/test_concurrent_stream_open.py | 16 ++++++++++++++-- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/h2/config.py b/h2/config.py index cdf7f8c78..47ab27a6d 100644 --- a/h2/config.py +++ b/h2/config.py @@ -6,6 +6,8 @@ Objects for controlling the configuration of the HTTP/2 stack. """ +import logging + class _BooleanConfigOption(object): """ @@ -34,13 +36,17 @@ class DummyLogger(object): logging functions when no logger is passed into the corresponding object. """ def __init__(self, *vargs): - pass + # Disable all logging + self.lvl = logging.CRITICAL + 1 def isEnabledFor(self, lvl): """ Dummy logger, so nothing is enabled. """ - return False + return lvl >= self.lvl + + def setLevel(self, lvl): + self.lvl = lvl def debug(self, *vargs, **kwargs): """ diff --git a/h2/connection.py b/h2/connection.py index d9bda1de4..9183e18d8 100644 --- a/h2/connection.py +++ b/h2/connection.py @@ -293,9 +293,7 @@ def __init__(self, config=None): self.encoder = Encoder() self.decoder = Decoder() - self._open_outbound_stream_count = 0 - self._open_inbound_stream_count = 0 - + self._open_stream_counts = {0 : 0, 1 : 0} # This won't always actually do anything: for versions of HPACK older # than 2.3.0 it does nothing. However, we have to try! self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE @@ -390,10 +388,8 @@ def __init__(self, config=None): } def _increment_open_streams(self, stream_id, incr): - if stream_id % 2 == 0: - self._open_inbound_stream_count += incr - elif stream_id % 2 == 1: - self._open_outbound_stream_count += incr + remainder = stream_id % 2 + self._open_stream_counts[remainder] += incr def _close_stream(self, stream_id): self._streams_to_close.append(stream_id) @@ -412,14 +408,11 @@ def _open_streams(self, remainder): """ for stream_id in self._streams_to_close: stream = self.streams.pop(stream_id) + assert stream.closed self._closed_streams[stream_id] = stream.closed_by self._streams_to_close = list() - if remainder == 0: - return self._open_inbound_stream_count - elif remainder == 1: - return self._open_outbound_stream_count - return 0 + return self._open_stream_counts[remainder] @property def open_outbound_streams(self): diff --git a/h2/stream.py b/h2/stream.py index 20ec754bd..65241d86f 100644 --- a/h2/stream.py +++ b/h2/stream.py @@ -803,11 +803,6 @@ def wrapper(self, *args, **kwargs): # Clear callback so we only call this once per stream self._decrement_open_stream_count_callback = None - if self._close_stream_callback: - self._close_stream_callback(self.stream_id) - # Clear callback so we only call this once per stream - self._close_stream_callback = None - # If we were closed, but are now open, increment # the open stream count. elif started_closed and ended_open: diff --git a/test/test_concurrent_stream_open.py b/test/test_concurrent_stream_open.py index 0294a80fa..22364a93d 100644 --- a/test/test_concurrent_stream_open.py +++ b/test/test_concurrent_stream_open.py @@ -7,6 +7,7 @@ """ import pytest import time +import logging from hypothesis import given from hypothesis.strategies import integers @@ -37,7 +38,7 @@ def test_concurrent_stream_open_performance(self, frame_factory): """ Opening many concurrent streams is constant time operation """ - num_concurrent_streams = 5000 + num_concurrent_streams = 10000 c = h2.connection.H2Connection() c.initiate_connection() start = time.time() @@ -46,5 +47,16 @@ def test_concurrent_stream_open_performance(self, frame_factory): c.clear_outbound_data_buffer() end = time.time() - assert end - start < 3 + run_time = end - start + assert run_time < 3 + c = h2.connection.H2Connection() + c.initiate_connection() + c.config.logger.setLevel(logging.DEBUG) + start = time.time() + for i in xrange(num_concurrent_streams): + c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False) + c.clear_outbound_data_buffer() + end = time.time() + + assert end - start > run_time From 814d27b09d951487a5807d0c33b8ae2a78dee853 Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sun, 17 Feb 2019 09:57:03 -0500 Subject: [PATCH 5/9] Don't use xrange for py3 compat, reorder tests --- test/test_concurrent_stream_open.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/test/test_concurrent_stream_open.py b/test/test_concurrent_stream_open.py index 22364a93d..564db9735 100644 --- a/test/test_concurrent_stream_open.py +++ b/test/test_concurrent_stream_open.py @@ -39,24 +39,26 @@ def test_concurrent_stream_open_performance(self, frame_factory): Opening many concurrent streams is constant time operation """ num_concurrent_streams = 10000 + c = h2.connection.H2Connection() c.initiate_connection() + c.config.logger.setLevel(logging.DEBUG) start = time.time() - for i in xrange(num_concurrent_streams): + for i in range(num_concurrent_streams): c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False) c.clear_outbound_data_buffer() end = time.time() - - run_time = end - start - assert run_time < 3 + with_debug_logging = end-start + c = h2.connection.H2Connection() c.initiate_connection() - c.config.logger.setLevel(logging.DEBUG) start = time.time() - for i in xrange(num_concurrent_streams): + for i in range(num_concurrent_streams): c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False) c.clear_outbound_data_buffer() end = time.time() - assert end - start > run_time + run_time = end - start + assert run_time < 5 + assert with_debug_logging > run_time From aa713d4f0a8d15819c64a6b72e8a99283e868c98 Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sun, 17 Feb 2019 11:06:42 -0500 Subject: [PATCH 6/9] Add test to exercise incr conditional in sync_state_change wrapper --- test/test_concurrent_stream_open.py | 38 ++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/test/test_concurrent_stream_open.py b/test/test_concurrent_stream_open.py index 564db9735..4e0bbf8a7 100644 --- a/test/test_concurrent_stream_open.py +++ b/test/test_concurrent_stream_open.py @@ -5,12 +5,9 @@ Tests of the flow control management in h2 """ -import pytest -import time import logging +import time -from hypothesis import given -from hypothesis.strategies import integers import h2.config import h2.connection @@ -18,9 +15,10 @@ import h2.events import h2.exceptions import h2.settings +from h2.stream import H2Stream, sync_state_change -class TestConcurrentStreamOpenPerformance(object): +class TestConcurrentStreamOpen(object): """ Tests the performance of concurrently opening streams """ @@ -31,9 +29,29 @@ class TestConcurrentStreamOpenPerformance(object): (':method', 'GET'), ] server_config = h2.config.H2Configuration(client_side=False) + client_config = h2.config.H2Configuration(client_side=True) DEFAULT_FLOW_WINDOW = 65535 + def test_sync_state_change_incr_conditional(self, frame_factory): + + @sync_state_change + def wrap_send_headers(self, *args, **kwargs): + return self.send_headers(*args, **kwargs) + + def dummy_callback(*args, **kwargs): + pass + + c = h2.connection.H2Connection() + s = H2Stream(1, self.client_config, self.DEFAULT_FLOW_WINDOW, + self.DEFAULT_FLOW_WINDOW, dummy_callback, + dummy_callback) + s.max_outbound_frame_size = 65536 + + wrap_send_headers(s, self.example_request_headers, + c.encoder, end_stream=False) + assert s.open + def test_concurrent_stream_open_performance(self, frame_factory): """ Opening many concurrent streams is constant time operation @@ -45,20 +63,22 @@ def test_concurrent_stream_open_performance(self, frame_factory): c.config.logger.setLevel(logging.DEBUG) start = time.time() for i in range(num_concurrent_streams): - c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False) + c.send_headers( + 1 + (2 * i), self.example_request_headers, end_stream=False) c.clear_outbound_data_buffer() end = time.time() with_debug_logging = end-start - + c = h2.connection.H2Connection() c.initiate_connection() start = time.time() for i in range(num_concurrent_streams): - c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False) + c.send_headers( + 1 + (2 * i), self.example_request_headers, end_stream=False) c.clear_outbound_data_buffer() end = time.time() - + run_time = end - start assert run_time < 5 assert with_debug_logging > run_time From f08f680600be911052dbec4a88701c192be85784 Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sun, 17 Feb 2019 11:08:32 -0500 Subject: [PATCH 7/9] Fix lints --- h2/connection.py | 6 +++--- h2/stream.py | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/h2/connection.py b/h2/connection.py index 9183e18d8..822b9b256 100644 --- a/h2/connection.py +++ b/h2/connection.py @@ -293,7 +293,7 @@ def __init__(self, config=None): self.encoder = Encoder() self.decoder = Decoder() - self._open_stream_counts = {0 : 0, 1 : 0} + self._open_stream_counts = {0: 0, 1: 0} # This won't always actually do anything: for versions of HPACK older # than 2.3.0 it does nothing. However, we have to try! self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE @@ -483,7 +483,8 @@ def _begin_new_stream(self, stream_id, allowed_ids): # Disable this log if we're not in debug mode, as it can be expensive # when there are many concurrently open streams if self.config.logger.isEnabledFor(logging.DEBUG): - self.config.logger.debug("Current streams: %s", self.streams.keys()) + self.config.logger.debug( + "Current streams: %s", self.streams.keys()) if outbound: self.highest_outbound_stream_id = stream_id @@ -1034,7 +1035,6 @@ def reset_stream(self, stream_id, error_code=0): def close_connection(self, error_code=0, additional_data=None, last_stream_id=None): - """ Close a connection, emitting a GOAWAY frame. diff --git a/h2/stream.py b/h2/stream.py index 65241d86f..e09ca23e1 100644 --- a/h2/stream.py +++ b/h2/stream.py @@ -90,6 +90,7 @@ class H2StreamStateMachine(object): :param stream_id: The stream ID of this stream. This is stored primarily for logging purposes. """ + def __init__(self, stream_id): self.state = StreamState.IDLE self.stream_id = stream_id @@ -771,6 +772,8 @@ def send_alt_svc(self, previous_state): Wraps a stream state change function to ensure that we keep the parent H2Connection's state in sync """ + + def sync_state_change(func): def wrapper(self, *args, **kwargs): # Collect state at the beginning. @@ -825,6 +828,7 @@ class H2Stream(object): Attempts to create frames that cannot be sent will raise a ``ProtocolError``. """ + def __init__(self, stream_id, config, @@ -845,7 +849,7 @@ def __init__(self, # Callback to increment open stream count for the H2Connection. self._increment_open_stream_count_callback = increment_open_stream_count_callback - + # Callback to decrement open stream count for the H2Connection. self._decrement_open_stream_count_callback = increment_open_stream_count_callback From d6f2f42c99e85eb652f9895560cf9498ea3f0603 Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sun, 17 Feb 2019 15:08:06 -0500 Subject: [PATCH 8/9] Lints, fix flaky test --- h2/stream.py | 6 ++++-- test/test_concurrent_stream_open.py | 23 +++++++++++------------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/h2/stream.py b/h2/stream.py index e09ca23e1..270b10160 100644 --- a/h2/stream.py +++ b/h2/stream.py @@ -848,10 +848,12 @@ def __init__(self, self._inbound_window_manager = WindowManager(inbound_window_size) # Callback to increment open stream count for the H2Connection. - self._increment_open_stream_count_callback = increment_open_stream_count_callback + self._increment_open_stream_count_callback = \ + increment_open_stream_count_callback # Callback to decrement open stream count for the H2Connection. - self._decrement_open_stream_count_callback = increment_open_stream_count_callback + self._decrement_open_stream_count_callback = \ + increment_open_stream_count_callback # Callback to clean up state for the H2Connection once we're closed. self._close_stream_callback = close_stream_callback diff --git a/test/test_concurrent_stream_open.py b/test/test_concurrent_stream_open.py index 4e0bbf8a7..210f2a0d0 100644 --- a/test/test_concurrent_stream_open.py +++ b/test/test_concurrent_stream_open.py @@ -54,13 +54,12 @@ def dummy_callback(*args, **kwargs): def test_concurrent_stream_open_performance(self, frame_factory): """ - Opening many concurrent streams is constant time operation + Opening many concurrent streams isn't prohibitively expensive """ num_concurrent_streams = 10000 c = h2.connection.H2Connection() c.initiate_connection() - c.config.logger.setLevel(logging.DEBUG) start = time.time() for i in range(num_concurrent_streams): c.send_headers( @@ -68,17 +67,17 @@ def test_concurrent_stream_open_performance(self, frame_factory): c.clear_outbound_data_buffer() end = time.time() - with_debug_logging = end-start + run_time = end - start + assert run_time < 5 + def test_stream_open_with_debug_logging(self, frame_factory): + """ + Test that opening a stream with debug logging works + """ c = h2.connection.H2Connection() c.initiate_connection() - start = time.time() - for i in range(num_concurrent_streams): - c.send_headers( - 1 + (2 * i), self.example_request_headers, end_stream=False) - c.clear_outbound_data_buffer() - end = time.time() + c.config.logger.setLevel(logging.DEBUG) + c.send_headers( + 1, self.example_request_headers, end_stream=False) + c.clear_outbound_data_buffer() - run_time = end - start - assert run_time < 5 - assert with_debug_logging > run_time From 8ceafdf3eefd510a8bc0b6edacc134855a0060cc Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Sun, 17 Feb 2019 15:14:31 -0500 Subject: [PATCH 9/9] One more lint :) --- test/test_concurrent_stream_open.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_concurrent_stream_open.py b/test/test_concurrent_stream_open.py index 210f2a0d0..d95a7287c 100644 --- a/test/test_concurrent_stream_open.py +++ b/test/test_concurrent_stream_open.py @@ -80,4 +80,3 @@ def test_stream_open_with_debug_logging(self, frame_factory): c.send_headers( 1, self.example_request_headers, end_stream=False) c.clear_outbound_data_buffer() -