Skip to content

Commit 02a04c7

Browse files
cceCopilot
andcommitted
network: stateless vote compression (algorand#6276)
Co-authored-by: Copilot <[email protected]>
1 parent 5c3c253 commit 02a04c7

23 files changed

+2270
-57
lines changed

agreement/vote.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ func (uv unauthenticatedVote) verify(l LedgerReader) (vote, error) {
146146
return vote{R: rv, Cred: cred, Sig: uv.Sig}, nil
147147
}
148148

149+
var (
150+
// testMakeVoteCheck is a function that can be set to check every
151+
// unauthenticatedVote before it is returned by makeVote. It is only set by tests.
152+
testMakeVoteCheck func(*unauthenticatedVote) error
153+
)
154+
149155
// makeVote creates a new unauthenticated vote from its constituent components.
150156
//
151157
// makeVote returns an error if it fails.
@@ -178,7 +184,15 @@ func makeVote(rv rawVote, voting crypto.OneTimeSigner, selection *crypto.VRFSecr
178184
}
179185

180186
cred := committee.MakeCredential(&selection.SK, m.Selector)
181-
return unauthenticatedVote{R: rv, Cred: cred, Sig: sig}, nil
187+
ret := unauthenticatedVote{R: rv, Cred: cred, Sig: sig}
188+
189+
// for use when running in tests
190+
if testMakeVoteCheck != nil {
191+
if testErr := testMakeVoteCheck(&ret); testErr != nil {
192+
return unauthenticatedVote{}, fmt.Errorf("makeVote: testMakeVoteCheck failed: %w", testErr)
193+
}
194+
}
195+
return ret, nil
182196
}
183197

184198
// ToBeHashed implements the Hashable interface.

agreement/vote_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
package agreement
1818

1919
import (
20+
"bytes"
21+
"encoding/base64"
22+
"encoding/hex"
23+
"fmt"
2024
"os"
2125
"testing"
2226

@@ -27,10 +31,38 @@ import (
2731
"github.com/algorand/go-algorand/data/basics"
2832
"github.com/algorand/go-algorand/data/committee"
2933
"github.com/algorand/go-algorand/logging"
34+
"github.com/algorand/go-algorand/network/vpack"
3035
"github.com/algorand/go-algorand/protocol"
3136
"github.com/algorand/go-algorand/test/partitiontest"
3237
)
3338

39+
func init() {
40+
testMakeVoteCheck = testVPackMakeVote
41+
}
42+
43+
func testVPackMakeVote(v *unauthenticatedVote) error {
44+
vbuf := protocol.Encode(v)
45+
enc := vpack.NewStatelessEncoder()
46+
dec := vpack.NewStatelessDecoder()
47+
encBuf, err := enc.CompressVote(nil, vbuf)
48+
if err != nil {
49+
return fmt.Errorf("makeVote: failed to parse vote msgpack: %v", err)
50+
}
51+
decBuf, err := dec.DecompressVote(nil, encBuf)
52+
if err != nil {
53+
return fmt.Errorf("makeVote: failed to decompress vote msgpack: %v", err)
54+
}
55+
if !bytes.Equal(vbuf, decBuf) {
56+
fmt.Printf("vote: %+v\n", v)
57+
fmt.Printf("oldbuf: %s\n", hex.EncodeToString(vbuf))
58+
fmt.Printf("decbuf: %s\n", hex.EncodeToString(decBuf))
59+
fmt.Printf("base64 oldbuf: %s\n", base64.StdEncoding.EncodeToString(vbuf))
60+
fmt.Printf("base64 decbuf: %s\n", base64.StdEncoding.EncodeToString(decBuf))
61+
return fmt.Errorf("makeVote: decompressed vote msgpack does not match original")
62+
}
63+
return nil
64+
}
65+
3466
// error is set if this address is not selected
3567
func makeVoteTesting(addr basics.Address, vrfSecs *crypto.VRFSecrets, otSecs crypto.OneTimeSigner, ledger Ledger, round basics.Round, period period, step step, digest crypto.Digest) (vote, error) {
3668
var proposal proposalValue

config/localTemplate.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type Local struct {
4444
// Version tracks the current version of the defaults so we can migrate old -> new
4545
// This is specifically important whenever we decide to change the default value
4646
// for an existing parameter. This field tag must be updated any time we add a new version.
47-
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23" version[24]:"24" version[25]:"25" version[26]:"26" version[27]:"27" version[28]:"28" version[29]:"29" version[30]:"30" version[31]:"31" version[32]:"32" version[33]:"33" version[34]:"34" version[35]:"35"`
47+
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23" version[24]:"24" version[25]:"25" version[26]:"26" version[27]:"27" version[28]:"28" version[29]:"29" version[30]:"30" version[31]:"31" version[32]:"32" version[33]:"33" version[34]:"34" version[35]:"35" version[36]:"36"`
4848

4949
// Archival nodes retain a full copy of the block history. Non-Archival nodes will delete old blocks and only retain what's need to properly validate blockchain messages (the precise number of recent blocks depends on the consensus parameters. Currently the last 1321 blocks are required). This means that non-Archival nodes require significantly less storage than Archival nodes. If setting this to true for the first time, the existing ledger may need to be deleted to get the historical values stored as the setting only affects current blocks forward. To do this, shutdown the node and delete all .sqlite files within the data/testnet-version directory, except the crash.sqlite file. Restart the node and wait for the node to sync.
5050
Archival bool `version[0]:"false"`
@@ -643,6 +643,9 @@ type Local struct {
643643
// GoMemLimit provides the Go runtime with a soft memory limit. The default behavior is no limit,
644644
// unless the GOMEMLIMIT environment variable is set.
645645
GoMemLimit uint64 `version[34]:"0"`
646+
647+
// EnableVoteCompression controls whether vote compression is enabled for websocket networks
648+
EnableVoteCompression bool `version[36]:"true"`
646649
}
647650

648651
// DNSBootstrapArray returns an array of one or more DNS Bootstrap identifiers

config/local_defaults.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package config
2121

2222
var defaultLocal = Local{
23-
Version: 35,
23+
Version: 36,
2424
AccountUpdatesStatsInterval: 5000000000,
2525
AccountsRebuildSynchronousMode: 1,
2626
AgreementIncomingBundlesQueueLength: 15,
@@ -89,6 +89,7 @@ var defaultLocal = Local{
8989
EnableTxnEvalTracer: false,
9090
EnableUsageLog: false,
9191
EnableVerbosedTransactionSyncLogging: false,
92+
EnableVoteCompression: true,
9293
EndpointAddress: "127.0.0.1:0",
9394
FallbackDNSResolverAddress: "",
9495
ForceFetchTransactions: false,

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ require (
5858
golang.org/x/sys v0.30.0
5959
golang.org/x/text v0.22.0
6060
gopkg.in/sohlich/elogrus.v3 v3.0.0-20180410122755-1fa29e2f2009
61-
pgregory.net/rapid v0.6.2
61+
pgregory.net/rapid v1.2.0
6262
)
6363

6464
require (

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -970,8 +970,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
970970
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
971971
lukechampine.com/blake3 v1.3.0 h1:sJ3XhFINmHSrYCgl958hscfIa3bw8x4DqMP3u1YvoYE=
972972
lukechampine.com/blake3 v1.3.0/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
973-
pgregory.net/rapid v0.6.2 h1:ErW5sL+UKtfBfUTsWHDCoeB+eZKLKMxrSd1VJY6W4bw=
974-
pgregory.net/rapid v0.6.2/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
973+
pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk=
974+
pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
975975
rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU=
976976
rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA=
977977
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=

installer/config.json.example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"Version": 35,
2+
"Version": 36,
33
"AccountUpdatesStatsInterval": 5000000000,
44
"AccountsRebuildSynchronousMode": 1,
55
"AgreementIncomingBundlesQueueLength": 15,
@@ -68,6 +68,7 @@
6868
"EnableTxnEvalTracer": false,
6969
"EnableUsageLog": false,
7070
"EnableVerbosedTransactionSyncLogging": false,
71+
"EnableVoteCompression": true,
7172
"EndpointAddress": "127.0.0.1:0",
7273
"FallbackDNSResolverAddress": "",
7374
"ForceFetchTransactions": false,

network/msgCompressor.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/DataDog/zstd"
2525

2626
"github.com/algorand/go-algorand/logging"
27+
"github.com/algorand/go-algorand/network/vpack"
2728
"github.com/algorand/go-algorand/protocol"
2829
)
2930

@@ -52,19 +53,39 @@ func zstdCompressMsg(tbytes []byte, d []byte) ([]byte, string) {
5253
return mbytesComp, ""
5354
}
5455

56+
func vpackCompressVote(tbytes []byte, d []byte) ([]byte, string) {
57+
var enc vpack.StatelessEncoder
58+
bound := vpack.MaxCompressedVoteSize
59+
// Pre-allocate buffer for tag bytes and compressed data
60+
mbytesComp := make([]byte, len(tbytes)+bound)
61+
copy(mbytesComp, tbytes)
62+
comp, err := enc.CompressVote(mbytesComp[len(tbytes):], d)
63+
if err != nil {
64+
// fallback and reuse non-compressed original data
65+
logMsg := fmt.Sprintf("failed to compress vote into buffer of len %d: %v", len(d), err)
66+
copied := copy(mbytesComp[len(tbytes):], d)
67+
return mbytesComp[:len(tbytes)+copied], logMsg
68+
}
69+
70+
result := mbytesComp[:len(tbytes)+len(comp)]
71+
return result, ""
72+
}
73+
5574
// MaxDecompressedMessageSize defines a maximum decompressed data size
5675
// to prevent zip bombs. This depends on MaxTxnBytesPerBlock consensus parameter
5776
// and should be larger.
5877
const MaxDecompressedMessageSize = 20 * 1024 * 1024 // some large enough value
5978

60-
// wsPeerMsgDataConverter performs optional incoming messages conversion.
61-
// At the moment it only supports zstd decompression for payload proposal
62-
type wsPeerMsgDataConverter struct {
79+
// wsPeerMsgDataDecoder performs optional incoming messages conversion.
80+
// At the moment it only supports zstd decompression for payload proposal,
81+
// and vpack decompression for votes.
82+
type wsPeerMsgDataDecoder struct {
6383
log logging.Logger
6484
origin string
6585

6686
// actual converter(s)
6787
ppdec zstdProposalDecompressor
88+
avdec vpackVoteDecompressor
6889
}
6990

7091
type zstdProposalDecompressor struct{}
@@ -73,6 +94,15 @@ func (dec zstdProposalDecompressor) accept(data []byte) bool {
7394
return len(data) > 4 && bytes.Equal(data[:4], zstdCompressionMagic[:])
7495
}
7596

97+
type vpackVoteDecompressor struct {
98+
enabled bool
99+
dec *vpack.StatelessDecoder
100+
}
101+
102+
func (dec vpackVoteDecompressor) convert(data []byte) ([]byte, error) {
103+
return dec.dec.DecompressVote(nil, data)
104+
}
105+
76106
func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) {
77107
r := zstd.NewReader(bytes.NewReader(data))
78108
defer r.Close()
@@ -96,7 +126,7 @@ func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) {
96126
}
97127
}
98128

99-
func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte, error) {
129+
func (c *wsPeerMsgDataDecoder) convert(tag protocol.Tag, data []byte) ([]byte, error) {
100130
if tag == protocol.ProposalPayloadTag {
101131
// sender might support compressed payload but fail to compress for whatever reason,
102132
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
@@ -108,16 +138,33 @@ func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte,
108138
return res, nil
109139
}
110140
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
141+
} else if tag == protocol.AgreementVoteTag {
142+
if c.avdec.enabled {
143+
res, err := c.avdec.convert(data)
144+
if err != nil {
145+
c.log.Warnf("peer %s vote decompress error: %v", c.origin, err)
146+
// fall back to original data
147+
return data, nil
148+
}
149+
return res, nil
150+
}
111151
}
112152
return data, nil
113153
}
114154

115-
func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter {
116-
c := wsPeerMsgDataConverter{
155+
func makeWsPeerMsgDataDecoder(wp *wsPeer) *wsPeerMsgDataDecoder {
156+
c := wsPeerMsgDataDecoder{
117157
log: wp.log,
118158
origin: wp.originAddress,
119159
}
120160

121161
c.ppdec = zstdProposalDecompressor{}
162+
// have both ends advertised support for compression?
163+
if wp.enableVoteCompression && wp.vpackVoteCompressionSupported() {
164+
c.avdec = vpackVoteDecompressor{
165+
enabled: true,
166+
dec: vpack.NewStatelessDecoder(),
167+
}
168+
}
122169
return &c
123170
}

network/msgCompressor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (cl *converterTestLogger) Warnf(s string, args ...interface{}) {
7676
func TestWsPeerMsgDataConverterConvert(t *testing.T) {
7777
partitiontest.PartitionTest(t)
7878

79-
c := wsPeerMsgDataConverter{}
79+
c := wsPeerMsgDataDecoder{}
8080
c.ppdec = zstdProposalDecompressor{}
8181
tag := protocol.AgreementVoteTag
8282
data := []byte("data")

network/vpack/README.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Stateless *vpack* wire format
2+
3+
This document specifies the byte‑level (on‑wire) layout produced by `StatelessEncoder.CompressVote` and accepted by `StatelessDecoder.DecompressVote`.
4+
The goal is to minimize vote size while retaining a 1‑to‑1, loss‑free mapping to the canonical msgpack representation of `agreement.UnauthenticatedVote`.
5+
The canonical msgpack representation we rely on is provided by agreement/msgp_gen.go, generated by our [custom msgpack code generator](https://github.com/algorand/msgp)
6+
which ensures fields are generated in lexicographic order, omit empty key-value pairs, and use specific formats for certain types as defined in
7+
[our specification](https://github.com/algorandfoundation/specs/blob/c0331123148971e4705f25b9c937cb23e5ee28d1/dev/crypto.md#L22-L40).
8+
9+
---
10+
11+
## 1. High‑level structure
12+
13+
```
14+
+---------+-----------------+---------------------+--------------------------+
15+
| Header | VrfProof ("pf") | rawVote ("r") | OneTimeSignature ("sig") |
16+
| 2 bytes | 80 bytes | variable length | 256 bytes |
17+
+---------+-----------------+---------------------+--------------------------+
18+
```
19+
20+
All fields appear exactly once, and in the fixed order above. The presence of optional sub‑fields inside `rawVote` are indicated by a 1‑byte bitmask in the header.
21+
No field names appear, only values.
22+
23+
---
24+
25+
## 2. Header (2 bytes)
26+
27+
| Offset | Description |
28+
| ------ | -------------------------------------------------------------- |
29+
| `0` | Presence flags for optional values (LSB first, see table). |
30+
| `1` | Reserved, currently zero. |
31+
32+
### 2.1 Bit‑mask layout (byte 0)
33+
34+
| Bit | Flag | Field enabled | Encoded size |
35+
| --- | ----------- | -------------------------------- | ------------ |
36+
| 0 | `bitPer` | `r.per` (varuint) | 1 - 9 bytes |
37+
| 1 | `bitDig` | `r.prop.dig` (digest) | 32 bytes |
38+
| 2 | `bitEncDig` | `r.prop.encdig` (digest) | 32 bytes |
39+
| 3 | `bitOper` | `r.prop.oper` (varuint) | 1 - 9 bytes |
40+
| 4 | `bitOprop` | `r.prop.oprop` (address) | 32 bytes |
41+
| 5 | `bitStep` | `r.step` (varuint) | 1 - 9 bytes |
42+
43+
Binary fields are represented by their 32-, 64-, and 80-byte values without markers.
44+
Integers use msgpack's variable-length unsigned integer encoding:
45+
- `fixint` (≤ 127), 1 byte in length (values 0x00-0x7f)
46+
- `uint8` 2 bytes in length (marker byte 0xcc + 1-byte value)
47+
- `uint16` 3 bytes in length (marker byte 0xcd + 2-byte value)
48+
- `uint32` 5 bytes in length (marker byte 0xce + 4-byte value)
49+
- `uint64` 9 bytes in length (marker byte 0xcf + 8-byte value)
50+
51+
---
52+
53+
## 3. Field serialization order
54+
55+
After the 2-byte header, the encoder emits values in the following order:
56+
57+
| Field | Type | Encoded size | Presence flag |
58+
| -------------- | ------------------------------ | ------------ | ------------- |
59+
| `pf` | VRF credential | 80 bytes | Required |
60+
| `r.per` | Period | varuint | `bitPer` |
61+
| `r.prop.dig` | Proposal digest | 32 bytes | `bitDig` |
62+
| `r.prop.encdig`| Digest of encoded proposal | 32 bytes | `bitEncDig` |
63+
| `r.prop.oper` | Proposal's original period | varuint | `bitOper` |
64+
| `r.prop.oprop` | Proposal's original proposer | 32 bytes | `bitOprop` |
65+
| `r.rnd` | Round number | varuint | Required |
66+
| `r.snd` | Voter's (sender) address | 32 bytes | Required |
67+
| `r.step` | Step | varuint | `bitStep` |
68+
| `sig.p` | Ed25519 public key | 32 bytes | Required |
69+
| `sig.p1s` | Signature over offset ID | 64 bytes | Required |
70+
| `sig.p2` | Second-tier Ed25519 public key | 32 bytes | Required |
71+
| `sig.p2s` | Signature over batch ID | 64 bytes | Required |
72+
| `sig.s` | Signature over vote using `p` | 64 bytes | Required |

0 commit comments

Comments
 (0)