Skip to content

Commit 0540664

Browse files
jsvisagzliudan
authored andcommitted
rpc: improve performance of subscription notification encoding (ethereum#28328)
It turns out that encoding json.RawMessage is slow because package json basically parses the message again to ensure it is valid. We can avoid the slowdown by encoding the entire RPC notification once, which yields a 30% speedup.
1 parent 80abc58 commit 0540664

File tree

3 files changed

+82
-16
lines changed

3 files changed

+82
-16
lines changed

rpc/json.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ type subscriptionResult struct {
4646
Result json.RawMessage `json:"result,omitempty"`
4747
}
4848

49+
type subscriptionResultEnc struct {
50+
ID string `json:"subscription"`
51+
Result any `json:"result"`
52+
}
53+
54+
type jsonrpcSubscriptionNotification struct {
55+
Version string `json:"jsonrpc"`
56+
Method string `json:"method"`
57+
Params subscriptionResultEnc `json:"params"`
58+
}
59+
4960
// A value of this type can a JSON-RPC request, notification, successful response or
5061
// error response. Which one it is depends on the fields.
5162
type jsonrpcMessage struct {

rpc/subscription.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ type Notifier struct {
105105

106106
mu sync.Mutex
107107
sub *Subscription
108-
buffer []json.RawMessage
108+
buffer []any
109109
callReturned bool
110110
activated bool
111111
}
@@ -129,12 +129,7 @@ func (n *Notifier) CreateSubscription() *Subscription {
129129

130130
// Notify sends a notification to the client with the given data as payload.
131131
// If an error occurs the RPC connection is closed and the error is returned.
132-
func (n *Notifier) Notify(id ID, data interface{}) error {
133-
enc, err := json.Marshal(data)
134-
if err != nil {
135-
return err
136-
}
137-
132+
func (n *Notifier) Notify(id ID, data any) error {
138133
n.mu.Lock()
139134
defer n.mu.Unlock()
140135

@@ -144,9 +139,9 @@ func (n *Notifier) Notify(id ID, data interface{}) error {
144139
panic("Notify with wrong ID")
145140
}
146141
if n.activated {
147-
return n.send(n.sub, enc)
142+
return n.send(n.sub, data)
148143
}
149-
n.buffer = append(n.buffer, enc)
144+
n.buffer = append(n.buffer, data)
150145
return nil
151146
}
152147

@@ -181,16 +176,16 @@ func (n *Notifier) activate() error {
181176
return nil
182177
}
183178

184-
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
185-
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
186-
ctx := context.Background()
187-
188-
msg := &jsonrpcMessage{
179+
func (n *Notifier) send(sub *Subscription, data any) error {
180+
msg := jsonrpcSubscriptionNotification{
189181
Version: vsn,
190182
Method: n.namespace + notificationMethodSuffix,
191-
Params: params,
183+
Params: subscriptionResultEnc{
184+
ID: string(sub.ID),
185+
Result: data,
186+
},
192187
}
193-
return n.h.conn.writeJSON(ctx, msg, false)
188+
return n.h.conn.writeJSON(context.Background(), &msg, false)
194189
}
195190

196191
// A Subscription is created by a notifier and tied to that notifier. The client can use

rpc/subscription_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717
package rpc
1818

1919
import (
20+
"bytes"
21+
"context"
2022
"encoding/json"
2123
"fmt"
24+
"io"
25+
"math/big"
2226
"net"
2327
"strings"
2428
"testing"
2529
"time"
30+
31+
"github.com/XinFinOrg/XDPoSChain/common"
32+
"github.com/XinFinOrg/XDPoSChain/core/types"
2633
)
2734

2835
func TestNewID(t *testing.T) {
@@ -220,3 +227,56 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe
220227
return nil, nil, fmt.Errorf("unrecognized message: %v", msg)
221228
}
222229
}
230+
231+
type mockConn struct {
232+
enc *json.Encoder
233+
}
234+
235+
// writeJSON writes a message to the connection.
236+
func (c *mockConn) writeJSON(ctx context.Context, msg interface{}, isError bool) error {
237+
return c.enc.Encode(msg)
238+
}
239+
240+
// Closed returns a channel which is closed when the connection is closed.
241+
func (c *mockConn) closed() <-chan interface{} { return nil }
242+
243+
// RemoteAddr returns the peer address of the connection.
244+
func (c *mockConn) remoteAddr() string { return "" }
245+
246+
// BenchmarkNotify benchmarks the performance of notifying a subscription.
247+
func BenchmarkNotify(b *testing.B) {
248+
id := ID("test")
249+
notifier := &Notifier{
250+
h: &handler{conn: &mockConn{json.NewEncoder(io.Discard)}},
251+
sub: &Subscription{ID: id},
252+
activated: true,
253+
}
254+
msg := &types.Header{
255+
ParentHash: common.HexToHash("0x01"),
256+
Number: big.NewInt(100),
257+
}
258+
b.ResetTimer()
259+
for i := 0; i < b.N; i++ {
260+
notifier.Notify(id, msg)
261+
}
262+
}
263+
264+
func TestNotify(t *testing.T) {
265+
out := new(bytes.Buffer)
266+
id := ID("test")
267+
notifier := &Notifier{
268+
h: &handler{conn: &mockConn{json.NewEncoder(out)}},
269+
sub: &Subscription{ID: id},
270+
activated: true,
271+
}
272+
msg := &types.Header{
273+
ParentHash: common.HexToHash("0x01"),
274+
Number: big.NewInt(100),
275+
}
276+
notifier.Notify(id, msg)
277+
have := strings.TrimSpace(out.String())
278+
want := `{"jsonrpc":"2.0","method":"_subscription","params":{"subscription":"test","result":{"parentHash":"0x0000000000000000000000000000000000000000000000000000000000000001","sha3Uncles":"0x0000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionsRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","receiptsRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","difficulty":null,"number":"0x64","gasLimit":"0x0","gasUsed":"0x0","timestamp":null,"extraData":"0x","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0000000000000000","validators":null,"validator":null,"penalties":null,"baseFeePerGas":null,"hash":"0x40f9f4f62eba4f75e54e5168d0fe28302f06f2780cfaf45356a52131f2addf77"}}}`
279+
if have != want {
280+
t.Errorf("have:\n%v\nwant:\n%v\n", have, want)
281+
}
282+
}

0 commit comments

Comments
 (0)