Skip to content

Commit 1005161

Browse files
authored
Make the otelhttp Transport handle protocol switching (#1628)
* Make the otelhttp Transport handle protocol switching * Remove ctx field from wrappedBody This is an unused field. * Add tests for the wrappedBody Write method * Add changes to changelog * Add full transport test for #1329 * Fix grammar in changelog entry
1 parent e3db96c commit 1005161

File tree

3 files changed

+128
-3
lines changed

3 files changed

+128
-3
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1919
### Fixed
2020

2121
- Change the `http-server-duration` instrument in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp` to record milliseconds instead of microseconds match what is specified in the OpenTelemetry specification. (#1414, #1537)
22+
- The `"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp".Transport` type now correctly handles protocol switching responses.
23+
The returned response body implements the `io.ReadWriteCloser` interface if the underlying one does.
24+
This ensures that protocol switching requests receive a response body that they can write to. (#1329, #1628)
2225

2326
### Removed
2427

instrumentation/net/http/otelhttp/transport.go

+36-3
Original file line numberDiff line numberDiff line change
@@ -123,18 +123,51 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
123123

124124
span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(res.StatusCode)...)
125125
span.SetStatus(semconv.SpanStatusFromHTTPStatusCode(res.StatusCode))
126-
res.Body = &wrappedBody{ctx: ctx, span: span, body: res.Body}
126+
res.Body = newWrappedBody(span, res.Body)
127127

128128
return res, err
129129
}
130130

131+
// newWrappedBody returns a new and appropriately scoped *wrappedBody as an
132+
// io.ReadCloser. If the passed body implements io.Writer, the returned value
133+
// will implement io.ReadWriteCloser.
134+
func newWrappedBody(span trace.Span, body io.ReadCloser) io.ReadCloser {
135+
// The successful protocol switch responses will have a body that
136+
// implement an io.ReadWriteCloser. Ensure this interface type continues
137+
// to be satisfied if that is the case.
138+
if _, ok := body.(io.ReadWriteCloser); ok {
139+
return &wrappedBody{span: span, body: body}
140+
}
141+
142+
// Remove the implementation of the io.ReadWriteCloser and only implement
143+
// the io.ReadCloser.
144+
return struct{ io.ReadCloser }{&wrappedBody{span: span, body: body}}
145+
}
146+
147+
// wrappedBody is the response body type returned by the transport
148+
// instrumentation to complete a span. Errors encountered when using the
149+
// response body are recorded in span tracking the response.
150+
//
151+
// The span tracking the response is ended when this body is closed.
152+
//
153+
// If the response body implements the io.Writer interface (i.e. for
154+
// successful protocol switches), the wrapped body also will.
131155
type wrappedBody struct {
132-
ctx context.Context
133156
span trace.Span
134157
body io.ReadCloser
135158
}
136159

137-
var _ io.ReadCloser = &wrappedBody{}
160+
var _ io.ReadWriteCloser = &wrappedBody{}
161+
162+
func (wb *wrappedBody) Write(p []byte) (int, error) {
163+
// This will not panic given the guard in newWrappedBody.
164+
n, err := wb.body.(io.Writer).Write(p)
165+
if err != nil {
166+
wb.span.RecordError(err)
167+
wb.span.SetStatus(codes.Error, err.Error())
168+
}
169+
return n, err
170+
}
138171

139172
func (wb *wrappedBody) Read(b []byte) (int, error) {
140173
n, err := wb.body.Read(b)

instrumentation/net/http/otelhttp/transport_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ import (
2222
"io/ioutil"
2323
"net/http"
2424
"net/http/httptest"
25+
"strings"
2526
"testing"
2627

2728
"github.com/stretchr/testify/assert"
29+
"github.com/stretchr/testify/require"
2830

2931
"go.opentelemetry.io/otel/codes"
3032
"go.opentelemetry.io/otel/propagation"
@@ -286,3 +288,90 @@ func TestWrappedBodyCloseError(t *testing.T) {
286288
assert.Equal(t, expectedErr, wb.Close())
287289
s.assert(t, true, nil, codes.Unset, "")
288290
}
291+
292+
type readWriteCloser struct {
293+
readCloser
294+
295+
writeErr error
296+
}
297+
298+
const writeSize = 1
299+
300+
func (rwc readWriteCloser) Write([]byte) (int, error) {
301+
return writeSize, rwc.writeErr
302+
}
303+
304+
func TestNewWrappedBodyReadWriteCloserImplementation(t *testing.T) {
305+
wb := newWrappedBody(nil, readWriteCloser{})
306+
assert.Implements(t, (*io.ReadWriteCloser)(nil), wb)
307+
}
308+
309+
func TestNewWrappedBodyReadCloserImplementation(t *testing.T) {
310+
wb := newWrappedBody(nil, readCloser{})
311+
assert.Implements(t, (*io.ReadCloser)(nil), wb)
312+
313+
_, ok := wb.(io.ReadWriteCloser)
314+
assert.False(t, ok, "wrappedBody should not implement io.ReadWriteCloser")
315+
}
316+
317+
func TestWrappedBodyWrite(t *testing.T) {
318+
s := new(span)
319+
var rwc io.ReadWriteCloser
320+
assert.NotPanics(t, func() {
321+
rwc = newWrappedBody(s, readWriteCloser{}).(io.ReadWriteCloser)
322+
})
323+
324+
n, err := rwc.Write([]byte{})
325+
assert.Equal(t, writeSize, n, "wrappedBody returned wrong bytes")
326+
assert.NoError(t, err)
327+
s.assert(t, false, nil, codes.Unset, "")
328+
}
329+
330+
func TestWrappedBodyWriteError(t *testing.T) {
331+
s := new(span)
332+
expectedErr := errors.New("test")
333+
var rwc io.ReadWriteCloser
334+
assert.NotPanics(t, func() {
335+
rwc = newWrappedBody(s, readWriteCloser{
336+
writeErr: expectedErr,
337+
}).(io.ReadWriteCloser)
338+
})
339+
n, err := rwc.Write([]byte{})
340+
assert.Equal(t, writeSize, n, "wrappedBody returned wrong bytes")
341+
assert.ErrorIs(t, err, expectedErr)
342+
s.assert(t, false, expectedErr, codes.Error, expectedErr.Error())
343+
}
344+
345+
func TestTransportProtocolSwitch(t *testing.T) {
346+
// This test validates the fix to #1329.
347+
348+
// Simulate a "101 Switching Protocols" response from the test server.
349+
response := []byte(strings.Join([]string{
350+
"HTTP/1.1 101 Switching Protocols",
351+
"Upgrade: WebSocket",
352+
"Connection: Upgrade",
353+
"", "", // Needed for extra CRLF.
354+
}, "\r\n"))
355+
356+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
357+
conn, buf, err := w.(http.Hijacker).Hijack()
358+
require.NoError(t, err)
359+
360+
_, err = buf.Write(response)
361+
require.NoError(t, err)
362+
require.NoError(t, buf.Flush())
363+
require.NoError(t, conn.Close())
364+
}))
365+
defer ts.Close()
366+
367+
ctx := context.Background()
368+
r, err := http.NewRequestWithContext(ctx, http.MethodGet, ts.URL, http.NoBody)
369+
require.NoError(t, err)
370+
371+
c := http.Client{Transport: NewTransport(http.DefaultTransport)}
372+
res, err := c.Do(r)
373+
require.NoError(t, err)
374+
t.Cleanup(func() { require.NoError(t, res.Body.Close()) })
375+
376+
assert.Implements(t, (*io.ReadWriteCloser)(nil), res.Body, "invalid body returned for protocol switch")
377+
}

0 commit comments

Comments
 (0)