Skip to content
This repository was archived by the owner on Apr 24, 2025. It is now read-only.

Commit a3bf20b

Browse files
authored
Merge pull request #56 from tetratelabs/stream-example
connection counter on network filter example
2 parents 71c771b + 168160b commit a3bf20b

File tree

6 files changed

+78
-26
lines changed

6 files changed

+78
-26
lines changed

e2e/e2e_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func TestE2E_network(t *testing.T) {
154154
assert.True(t, strings.Contains(out, "new connection!"))
155155
assert.True(t, strings.Contains(out, "downstream connection close!"))
156156
assert.True(t, strings.Contains(out, "upstream data received"))
157+
assert.True(t, strings.Contains(out, "connection complete!"))
157158
}
158159

159160
func TestE2E_metrics(t *testing.T) {

examples/http_auth_random/main_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,11 @@ func TestHttpHeaders_OnHttpCallResponse(t *testing.T) {
5353
id = host.InitContext()
5454
body = []byte(`{"uuid": "aaaaaaaa-1c67-4199-835b-cbefcd4a63d4"}`)
5555
host.PutCalloutResponse(id, headers, nil, body)
56-
assert.NotNil(t, host.GetSentLocalResponse(id))
56+
localResponse := host.GetSentLocalResponse(id) // check local responses
57+
assert.NotNil(t, localResponse)
5758
logs = host.GetLogs(types.LogLevelInfo)
5859
assert.Equal(t, "access forbidden", logs[len(logs)-1])
5960

60-
localResponse := host.GetSentLocalResponse(id) // check local responses
61-
require.NotNil(t, localResponse)
6261
assert.Equal(t, uint32(403), localResponse.StatusCode)
6362
assert.Equal(t, []byte("access forbidden"), localResponse.Data)
6463
require.Len(t, localResponse.Headers, 1)

examples/network/main.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,33 @@ import (
1919
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm/types"
2020
)
2121

22+
var (
23+
connectionCounterName = "proxy_wasm_go.connection_counter"
24+
counter proxywasm.MetricCounter
25+
)
26+
2227
func main() {
23-
proxywasm.SetNewStreamContext(newHelloWorld)
28+
proxywasm.SetNewStreamContext(func(contextID uint32) proxywasm.StreamContext { return context{} })
29+
proxywasm.SetNewRootContext(func(contextID uint32) proxywasm.RootContext { return context{} })
2430
}
2531

26-
type network struct{ proxywasm.DefaultContext }
32+
type context struct{ proxywasm.DefaultContext }
2733

28-
func newHelloWorld(contextID uint32) proxywasm.StreamContext {
29-
return network{}
34+
func (ctx context) OnVMStart(int) bool {
35+
var err error
36+
counter, err = proxywasm.DefineCounterMetric(connectionCounterName)
37+
if err != nil {
38+
proxywasm.LogCritical("failed to initialize connection counter: ", err.Error())
39+
}
40+
return true
3041
}
3142

32-
func (ctx network) OnNewConnection() types.Action {
43+
func (ctx context) OnNewConnection() types.Action {
3344
proxywasm.LogInfo("new connection!")
3445
return types.ActionContinue
3546
}
3647

37-
func (ctx network) OnDownstreamData(dataSize int, _ bool) types.Action {
48+
func (ctx context) OnDownstreamData(dataSize int, _ bool) types.Action {
3849
// TODO: dispatch http call
3950

4051
if dataSize == 0 {
@@ -50,12 +61,12 @@ func (ctx network) OnDownstreamData(dataSize int, _ bool) types.Action {
5061
return types.ActionContinue
5162
}
5263

53-
func (ctx network) OnDownstreamClose(types.PeerType) {
64+
func (ctx context) OnDownstreamClose(types.PeerType) {
5465
proxywasm.LogInfo("downstream connection close!")
5566
return
5667
}
5768

58-
func (ctx network) OnUpstreamData(dataSize int, _ bool) types.Action {
69+
func (ctx context) OnUpstreamData(dataSize int, _ bool) types.Action {
5970

6071
if dataSize == 0 {
6172
return types.ActionContinue
@@ -69,3 +80,12 @@ func (ctx network) OnUpstreamData(dataSize int, _ bool) types.Action {
6980
proxywasm.LogInfo("upstream data received: ", string(data))
7081
return types.ActionContinue
7182
}
83+
84+
func (ctx context) OnDone() bool {
85+
err := counter.Increment(1)
86+
if err != nil {
87+
proxywasm.LogCritical("failed to increment connection counter: ", err.Error())
88+
}
89+
proxywasm.LogInfo("connection complete!")
90+
return true
91+
}

examples/network/main_test.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,16 @@ import (
2121
"github.com/stretchr/testify/require"
2222

2323
"github.com/tetratelabs/proxy-wasm-go-sdk/proxytest"
24+
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm"
2425
"github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm/types"
2526
)
2627

28+
func newStreamContext(uint32) proxywasm.StreamContext {
29+
return context{}
30+
}
31+
2732
func TestNetwork_OnNewConnection(t *testing.T) {
28-
host, done := proxytest.NewNetworkFilterHost(newHelloWorld)
33+
host, done := proxytest.NewNetworkFilterHost(newStreamContext)
2934
defer done() // release the host emulation lock so that other test cases can insert their own host emulation
3035

3136
_ = host.InitConnection() // OnNewConnection is called
@@ -35,7 +40,7 @@ func TestNetwork_OnNewConnection(t *testing.T) {
3540
}
3641

3742
func TestNetwork_OnDownstreamClose(t *testing.T) {
38-
host, done := proxytest.NewNetworkFilterHost(newHelloWorld)
43+
host, done := proxytest.NewNetworkFilterHost(newStreamContext)
3944
defer done() // release the host emulation lock so that other test cases can insert their own host emulation
4045

4146
contextID := host.InitConnection() // OnNewConnection is called
@@ -47,7 +52,7 @@ func TestNetwork_OnDownstreamClose(t *testing.T) {
4752
}
4853

4954
func TestNetwork_OnDownstreamData(t *testing.T) {
50-
host, done := proxytest.NewNetworkFilterHost(newHelloWorld)
55+
host, done := proxytest.NewNetworkFilterHost(newStreamContext)
5156
defer done() // release the host emulation lock so that other test cases can insert their own host emulation
5257

5358
contextID := host.InitConnection() // OnNewConnection is called
@@ -61,7 +66,7 @@ func TestNetwork_OnDownstreamData(t *testing.T) {
6166
}
6267

6368
func TestNetwork_OnUpstreamData(t *testing.T) {
64-
host, done := proxytest.NewNetworkFilterHost(newHelloWorld)
69+
host, done := proxytest.NewNetworkFilterHost(newStreamContext)
6570
defer done() // release the host emulation lock so that other test cases can insert their own host emulation
6671

6772
contextID := host.InitConnection() // OnNewConnection is called
@@ -73,3 +78,21 @@ func TestNetwork_OnUpstreamData(t *testing.T) {
7378
logs := host.GetLogs(types.LogLevelInfo) // retrieve logs emitted to Envoy
7479
assert.Equal(t, "upstream data received: "+msg, logs[len(logs)-1])
7580
}
81+
82+
func TestNetwork_counter(t *testing.T) {
83+
host, done := proxytest.NewNetworkFilterHost(newStreamContext)
84+
defer done() // release the host emulation lock so that other test cases can insert their own host emulation
85+
86+
context{}.OnVMStart(0) // init metric
87+
88+
contextID := host.InitConnection()
89+
host.CompleteConnection(contextID) // call OnDone on contextID -> increment the connection counter
90+
91+
logs := host.GetLogs(types.LogLevelInfo)
92+
require.Greater(t, len(logs), 0)
93+
94+
assert.Equal(t, "connection complete!", logs[len(logs)-1])
95+
actual, err := counter.Get()
96+
require.NoError(t, err)
97+
assert.Equal(t, uint64(1), actual)
98+
}

proxytest/http.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,6 @@ func (h *HttpFilterHost) ProxyAddHeaderMapValue(mapType types.MapType, keyData *
229229
case types.MapTypeHttpResponseTrailers:
230230
ctx.responseTrailers = addMapValue(ctx.responseTrailers, key, value)
231231
default:
232-
// is it allowed to manipulate callout headers?
233232
panic("unimplemented")
234233
}
235234

@@ -263,7 +262,6 @@ func (h *HttpFilterHost) ProxyReplaceHeaderMapValue(mapType types.MapType, keyDa
263262
case types.MapTypeHttpResponseTrailers:
264263
ctx.responseTrailers = replaceMapValue(ctx.responseTrailers, key, value)
265264
default:
266-
// is it allowed to manipulate callout headers?
267265
panic("unimplemented")
268266
}
269267
return types.StatusOK
@@ -293,7 +291,6 @@ func (h *HttpFilterHost) ProxyRemoveHeaderMapValue(mapType types.MapType, keyDat
293291
case types.MapTypeHttpResponseTrailers:
294292
ctx.responseTrailers = removeHeaderMapValue(ctx.responseTrailers, key)
295293
default:
296-
// is it allowed to manipulate callout headers?
297294
panic("unimplemented")
298295
}
299296
return types.StatusOK
@@ -348,7 +345,6 @@ func (h *HttpFilterHost) ProxySetHeaderMapPairs(mapType types.MapType, mapData *
348345
case types.MapTypeHttpResponseTrailers:
349346
ctx.responseTrailers = m
350347
default:
351-
// is it allowed to manipulate callout headers?
352348
panic("unimplemented")
353349
}
354350
return types.StatusOK
@@ -385,3 +381,7 @@ func (h *HttpFilterHost) ProxySendLocalResponse(statusCode uint32,
385381
func (h *HttpFilterHost) GetSentLocalResponse(contextID uint32) *LocalHttpResponse {
386382
return h.contexts[contextID].sentLocalResponse
387383
}
384+
385+
func (h *HttpFilterHost) GetContext(contextID uint32) proxywasm.HttpContext {
386+
return h.contexts[contextID].context
387+
}

proxytest/network.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type NetworkFilterHost struct {
3030

3131
type streamState struct {
3232
upstream, downstream []byte
33-
ctx proxywasm.StreamContext
33+
context proxywasm.StreamContext
3434
}
3535

3636
func NewNetworkFilterHost(f func(contextID uint32) proxywasm.StreamContext) (*NetworkFilterHost, func()) {
@@ -44,7 +44,7 @@ func NewNetworkFilterHost(f func(contextID uint32) proxywasm.StreamContext) (*Ne
4444
if !ok {
4545
log.Fatalf("invalid context id for callback: %d", contextID)
4646
}
47-
stream.ctx.OnHttpCallResponse(numHeaders, bodySize, numTrailers)
47+
stream.context.OnHttpCallResponse(numHeaders, bodySize, numTrailers)
4848
})
4949
hostMux.Lock() // acquire the lock of host emulation
5050
rawhostcall.RegisterMockWASMHost(host)
@@ -64,7 +64,7 @@ func (n *NetworkFilterHost) PutUpstreamData(contextID uint32, data []byte) {
6464
}
6565

6666
n.currentContextID = contextID
67-
action := stream.ctx.OnUpstreamData(len(stream.upstream), false)
67+
action := stream.context.OnUpstreamData(len(stream.upstream), false)
6868
switch action {
6969
case types.ActionPause:
7070
return
@@ -86,7 +86,7 @@ func (n *NetworkFilterHost) PutDownstreamData(contextID uint32, data []byte) {
8686
}
8787

8888
n.currentContextID = contextID
89-
action := stream.ctx.OnDownstreamData(len(stream.downstream), false)
89+
action := stream.context.OnDownstreamData(len(stream.downstream), false)
9090
switch action {
9191
case types.ActionPause:
9292
return
@@ -101,19 +101,24 @@ func (n *NetworkFilterHost) PutDownstreamData(contextID uint32, data []byte) {
101101
func (n *NetworkFilterHost) InitConnection() (contextID uint32) {
102102
contextID = uint32(len(n.streams) + 1)
103103
ctx := n.newContext(contextID)
104-
n.streams[contextID] = &streamState{ctx: ctx}
104+
n.streams[contextID] = &streamState{context: ctx}
105105

106106
n.currentContextID = contextID
107107
ctx.OnNewConnection()
108108
return
109109
}
110110

111111
func (n *NetworkFilterHost) CloseUpstreamConnection(contextID uint32) {
112-
n.streams[contextID].ctx.OnUpstreamClose(types.PeerTypeLocal) // peerType will be removed in the next ABI
112+
n.streams[contextID].context.OnUpstreamClose(types.PeerTypeLocal) // peerType will be removed in the next ABI
113113
}
114114

115115
func (n *NetworkFilterHost) CloseDownstreamConnection(contextID uint32) {
116-
n.streams[contextID].ctx.OnDownstreamClose(types.PeerTypeLocal) // peerType will be removed in the next ABI
116+
n.streams[contextID].context.OnDownstreamClose(types.PeerTypeLocal) // peerType will be removed in the next ABI
117+
}
118+
119+
func (n *NetworkFilterHost) CompleteConnection(contextID uint32) {
120+
n.streams[contextID].context.OnDone()
121+
delete(n.streams, contextID)
117122
}
118123

119124
func (n *NetworkFilterHost) ProxyGetBufferBytes(bt types.BufferType, start int, maxSize int,
@@ -153,3 +158,7 @@ func (n *NetworkFilterHost) ProxyGetHeaderMapPairs(mapType types.MapType, return
153158
returnValueSize *int) types.Status {
154159
return n.getMapPairs(mapType, returnValueData, returnValueSize)
155160
}
161+
162+
func (n *NetworkFilterHost) GetContext(contextID uint32) proxywasm.StreamContext {
163+
return n.streams[contextID].context
164+
}

0 commit comments

Comments
 (0)