Skip to content

network: stateless vote compression #6276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 67 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
1660433
vpack static initial version
cce Mar 14, 2025
a2b670a
compression benchmarking for algodump
cce Mar 14, 2025
ddfa52e
use vpack/gen.go to make parser and static table
cce Mar 14, 2025
894ab1b
add options to protocol.RandomizeObject
cce Mar 14, 2025
8c27738
fix license check
cce Mar 14, 2025
a1b6d14
fix linter
cce Mar 14, 2025
e29e203
slightly optimize generated code
cce Mar 14, 2025
4a53502
unexport static table indices and stop sharing msgp varuint markers w…
cce Mar 14, 2025
636d418
replace sync.Mutex with deadlock.Mutex
cce Mar 14, 2025
fc5c807
support vpack_size tag for fixed-size structs
cce Mar 14, 2025
daef6e9
update makeVoteCheck
cce Mar 14, 2025
a4b6a2d
fix encoding of fixints broken in 4a535028f402c9f0750756746823dd154b5…
cce Mar 14, 2025
05e8218
adapt TestEncodingTest to stricter parsing rules
cce Mar 14, 2025
52a70c3
fix gen.go comments a little
cce Mar 15, 2025
dda786e
remove testMakeVoteCheckMu
cce Mar 15, 2025
937f440
increase code coverage for vpack package
cce Mar 17, 2025
8dd3754
remove getTestMakeVoteCheck()
cce Mar 17, 2025
6e88ce1
add fuzz tests
cce Mar 18, 2025
597a0d6
add tests for DecompressStatic
cce Mar 19, 2025
82e6e04
100% coverage for vpack package
cce Mar 19, 2025
5dc091e
Merge remote-tracking branch 'upstream/master' into vote-compression
cce Mar 19, 2025
d2a2dc8
add -useVpack flag to algodump
cce Mar 27, 2025
c5f6b2e
change compressWriter to take static idx
cce Mar 28, 2025
3990f81
make StaticEncoder/Decoder pos math easier to read
cce Mar 29, 2025
2c7b46f
implement BitmaskEncoder/BitmaskDecoder
cce Mar 29, 2025
59c2c2b
WIP tests
cce Mar 30, 2025
b9718bc
WIP tests 2
cce Mar 30, 2025
1d0cf5f
WIP test 3
cce Mar 30, 2025
9da7f27
WIP test use slices.Concat
cce Mar 30, 2025
ae9b9f0
WIP tests more fuzz
cce Mar 30, 2025
c3ef431
remove bin64/bin80/bin32 markers
cce Mar 30, 2025
bbec669
WIP
cce Apr 5, 2025
e228116
WIP gen
cce Apr 17, 2025
7891d15
WIP
cce Apr 17, 2025
b6d1318
new gen
cce Apr 17, 2025
ede2811
WIP tests pass
cce Apr 18, 2025
b5b0058
reorg
cce Apr 18, 2025
ff009f8
rename Stateless
cce Apr 19, 2025
1abff36
100% coverage for parse_gen.go
cce Apr 24, 2025
5e323fe
remove code generation
cce Apr 28, 2025
f8c1471
reorganize
cce Apr 28, 2025
9ca9f65
integrate with network
cce Apr 28, 2025
1d8ba36
fix linter
cce Apr 28, 2025
d86c4a0
remove struct tags in other packages
cce Apr 28, 2025
9f29e87
CR changes
cce Apr 29, 2025
9714946
Merge remote-tracking branch 'upstream/master' into vote-compression
cce Apr 29, 2025
614ca6e
remove algodump changes (will go to different PR)
cce Apr 29, 2025
6d2e54c
fix TestPreparePeerData
cce Apr 29, 2025
c8571b8
fix lint
cce Apr 29, 2025
d2351bb
update comment
cce Apr 29, 2025
127cdfc
CR fixes
cce Apr 30, 2025
9b864ad
README and more fixes
cce Apr 30, 2025
1248bd5
replace vpack.CompressBound with const
cce Apr 30, 2025
2cf236c
fix comment on testMakeVoteCheck
cce May 1, 2025
ca96957
upgrade rapid package so MakeFuzz works
cce May 1, 2025
3c07a40
improve coverage
cce May 1, 2025
7c790a9
improve decoder error test
cce May 1, 2025
a102e00
Merge remote-tracking branch 'upstream/master' into vote-compression
cce May 1, 2025
5a9e996
add config.EnableVoteCompression and TestWebsocketVoteCompression
cce May 2, 2025
f6cceb0
Update network/msgCompressor.go
cce May 2, 2025
26b0dae
fix TestPreparePeerData
cce May 2, 2025
58fa026
Fix TestWebsocketVoteCompression
cce May 2, 2025
cadbbb2
PR feedback from @yossigi
cce May 6, 2025
ed0e60b
Merge remote-tracking branch 'upstream/master' into vote-compression
cce May 6, 2025
4170d91
update README
cce May 6, 2025
6bdbfde
Merge remote-tracking branch 'upstream/master' into vote-compression
cce May 6, 2025
245cfec
fix merge
cce May 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion agreement/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func (uv unauthenticatedVote) verify(l LedgerReader) (vote, error) {
return vote{R: rv, Cred: cred, Sig: uv.Sig}, nil
}

var (
// testMakeVoteCheck is a function that can be set to check every
// unauthenticatedVote before it is returned by makeVote. It is only set by tests.
testMakeVoteCheck func(*unauthenticatedVote) error
)

// makeVote creates a new unauthenticated vote from its constituent components.
//
// makeVote returns an error if it fails.
Expand Down Expand Up @@ -178,7 +184,15 @@ func makeVote(rv rawVote, voting crypto.OneTimeSigner, selection *crypto.VRFSecr
}

cred := committee.MakeCredential(&selection.SK, m.Selector)
return unauthenticatedVote{R: rv, Cred: cred, Sig: sig}, nil
ret := unauthenticatedVote{R: rv, Cred: cred, Sig: sig}

// for use when running in tests
if testMakeVoteCheck != nil {
if testErr := testMakeVoteCheck(&ret); testErr != nil {
return unauthenticatedVote{}, fmt.Errorf("makeVote: testMakeVoteCheck failed: %w", testErr)
}
}
return ret, nil
}

// ToBeHashed implements the Hashable interface.
Expand Down
32 changes: 32 additions & 0 deletions agreement/vote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package agreement

import (
"bytes"
"encoding/base64"
"encoding/hex"
"fmt"
"os"
"testing"

Expand All @@ -27,10 +31,38 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/committee"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/vpack"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)

func init() {
testMakeVoteCheck = testVPackMakeVote
}

func testVPackMakeVote(v *unauthenticatedVote) error {
vbuf := protocol.Encode(v)
enc := vpack.NewStatelessEncoder()
dec := vpack.NewStatelessDecoder()
encBuf, err := enc.CompressVote(nil, vbuf)
if err != nil {
return fmt.Errorf("makeVote: failed to parse vote msgpack: %v", err)
}
decBuf, err := dec.DecompressVote(nil, encBuf)
if err != nil {
return fmt.Errorf("makeVote: failed to decompress vote msgpack: %v", err)
}
if !bytes.Equal(vbuf, decBuf) {
fmt.Printf("vote: %+v\n", v)
fmt.Printf("oldbuf: %s\n", hex.EncodeToString(vbuf))
fmt.Printf("decbuf: %s\n", hex.EncodeToString(decBuf))
fmt.Printf("base64 oldbuf: %s\n", base64.StdEncoding.EncodeToString(vbuf))
fmt.Printf("base64 decbuf: %s\n", base64.StdEncoding.EncodeToString(decBuf))
return fmt.Errorf("makeVote: decompressed vote msgpack does not match original")
}
return nil
}

// error is set if this address is not selected
func makeVoteTesting(addr basics.Address, vrfSecs *crypto.VRFSecrets, otSecs crypto.OneTimeSigner, ledger Ledger, round basics.Round, period period, step step, digest crypto.Digest) (vote, error) {
var proposal proposalValue
Expand Down
5 changes: 4 additions & 1 deletion config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Local struct {
// Version tracks the current version of the defaults so we can migrate old -> new
// This is specifically important whenever we decide to change the default value
// for an existing parameter. This field tag must be updated any time we add a new version.
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23" version[24]:"24" version[25]:"25" version[26]:"26" version[27]:"27" version[28]:"28" version[29]:"29" version[30]:"30" version[31]:"31" version[32]:"32" version[33]:"33" version[34]:"34" version[35]:"35"`
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23" version[24]:"24" version[25]:"25" version[26]:"26" version[27]:"27" version[28]:"28" version[29]:"29" version[30]:"30" version[31]:"31" version[32]:"32" version[33]:"33" version[34]:"34" version[35]:"35" version[36]:"36"`

// Archival nodes retain a full copy of the block history. Non-Archival nodes will delete old blocks and only retain what's need to properly validate blockchain messages (the precise number of recent blocks depends on the consensus parameters. Currently the last 1321 blocks are required). This means that non-Archival nodes require significantly less storage than Archival nodes. If setting this to true for the first time, the existing ledger may need to be deleted to get the historical values stored as the setting only affects current blocks forward. To do this, shutdown the node and delete all .sqlite files within the data/testnet-version directory, except the crash.sqlite file. Restart the node and wait for the node to sync.
Archival bool `version[0]:"false"`
Expand Down Expand Up @@ -643,6 +643,9 @@ type Local struct {
// GoMemLimit provides the Go runtime with a soft memory limit. The default behavior is no limit,
// unless the GOMEMLIMIT environment variable is set.
GoMemLimit uint64 `version[34]:"0"`

// EnableVoteCompression controls whether vote compression is enabled for websocket networks
EnableVoteCompression bool `version[36]:"true"`
}

// DNSBootstrapArray returns an array of one or more DNS Bootstrap identifiers
Expand Down
3 changes: 2 additions & 1 deletion config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package config

var defaultLocal = Local{
Version: 35,
Version: 36,
AccountUpdatesStatsInterval: 5000000000,
AccountsRebuildSynchronousMode: 1,
AgreementIncomingBundlesQueueLength: 15,
Expand Down Expand Up @@ -89,6 +89,7 @@ var defaultLocal = Local{
EnableTxnEvalTracer: false,
EnableUsageLog: false,
EnableVerbosedTransactionSyncLogging: false,
EnableVoteCompression: true,
EndpointAddress: "127.0.0.1:0",
FallbackDNSResolverAddress: "",
ForceFetchTransactions: false,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
golang.org/x/sys v0.30.0
golang.org/x/text v0.22.0
gopkg.in/sohlich/elogrus.v3 v3.0.0-20180410122755-1fa29e2f2009
pgregory.net/rapid v0.6.2
pgregory.net/rapid v1.2.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -970,8 +970,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
lukechampine.com/blake3 v1.3.0 h1:sJ3XhFINmHSrYCgl958hscfIa3bw8x4DqMP3u1YvoYE=
lukechampine.com/blake3 v1.3.0/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
pgregory.net/rapid v0.6.2 h1:ErW5sL+UKtfBfUTsWHDCoeB+eZKLKMxrSd1VJY6W4bw=
pgregory.net/rapid v0.6.2/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk=
pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU=
rsc.io/tmplfunc v0.0.3/go.mod h1:AG3sTPzElb1Io3Yg4voV9AGZJuleGAwaVRxL9M49PhA=
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
Expand Down
3 changes: 2 additions & 1 deletion installer/config.json.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"Version": 35,
"Version": 36,
"AccountUpdatesStatsInterval": 5000000000,
"AccountsRebuildSynchronousMode": 1,
"AgreementIncomingBundlesQueueLength": 15,
Expand Down Expand Up @@ -68,6 +68,7 @@
"EnableTxnEvalTracer": false,
"EnableUsageLog": false,
"EnableVerbosedTransactionSyncLogging": false,
"EnableVoteCompression": true,
"EndpointAddress": "127.0.0.1:0",
"FallbackDNSResolverAddress": "",
"ForceFetchTransactions": false,
Expand Down
58 changes: 52 additions & 6 deletions network/msgCompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"github.com/DataDog/zstd"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/vpack"
"github.com/algorand/go-algorand/protocol"
)

Expand Down Expand Up @@ -52,19 +53,39 @@
return mbytesComp, ""
}

func vpackCompressVote(tbytes []byte, d []byte) ([]byte, string) {
var enc vpack.StatelessEncoder
bound := vpack.MaxCompressedVoteSize
// Pre-allocate buffer for tag bytes and compressed data
mbytesComp := make([]byte, len(tbytes)+bound)
copy(mbytesComp, tbytes)
comp, err := enc.CompressVote(mbytesComp[len(tbytes):], d)
if err != nil {
// fallback and reuse non-compressed original data
logMsg := fmt.Sprintf("failed to compress vote into buffer of len %d: %v", len(d), err)
copied := copy(mbytesComp[len(tbytes):], d)
return mbytesComp[:len(tbytes)+copied], logMsg
}

result := mbytesComp[:len(tbytes)+len(comp)]
return result, ""
}

// MaxDecompressedMessageSize defines a maximum decompressed data size
// to prevent zip bombs. This depends on MaxTxnBytesPerBlock consensus parameter
// and should be larger.
const MaxDecompressedMessageSize = 20 * 1024 * 1024 // some large enough value

// wsPeerMsgDataConverter performs optional incoming messages conversion.
// At the moment it only supports zstd decompression for payload proposal
type wsPeerMsgDataConverter struct {
// wsPeerMsgDataDecoder performs optional incoming messages conversion.
// At the moment it only supports zstd decompression for payload proposal,
// and vpack decompression for votes.
type wsPeerMsgDataDecoder struct {
log logging.Logger
origin string

// actual converter(s)
ppdec zstdProposalDecompressor
avdec vpackVoteDecompressor
}

type zstdProposalDecompressor struct{}
Expand All @@ -73,6 +94,15 @@
return len(data) > 4 && bytes.Equal(data[:4], zstdCompressionMagic[:])
}

type vpackVoteDecompressor struct {
enabled bool
dec *vpack.StatelessDecoder
}

func (dec vpackVoteDecompressor) convert(data []byte) ([]byte, error) {
return dec.dec.DecompressVote(nil, data)
}

func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) {
r := zstd.NewReader(bytes.NewReader(data))
defer r.Close()
Expand All @@ -96,7 +126,7 @@
}
}

func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte, error) {
func (c *wsPeerMsgDataDecoder) convert(tag protocol.Tag, data []byte) ([]byte, error) {
if tag == protocol.ProposalPayloadTag {
// sender might support compressed payload but fail to compress for whatever reason,
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
Expand All @@ -108,16 +138,32 @@
return res, nil
}
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
} else if tag == protocol.AgreementVoteTag {
if c.avdec.enabled {
res, err := c.avdec.convert(data)
if err != nil {
c.log.Warnf("peer %s vote decompress error: %v", c.origin, err)
// fall back to original data
return data, nil
}
return res, nil

Check warning on line 149 in network/msgCompressor.go

View check run for this annotation

Codecov / codecov/patch

network/msgCompressor.go#L149

Added line #L149 was not covered by tests
}
}
return data, nil
}

func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter {
c := wsPeerMsgDataConverter{
func makeWsPeerMsgDataDecoder(wp *wsPeer) *wsPeerMsgDataDecoder {
c := wsPeerMsgDataDecoder{
log: wp.log,
origin: wp.originAddress,
}

c.ppdec = zstdProposalDecompressor{}
if wp.vpackVoteCompressionSupported() {
c.avdec = vpackVoteDecompressor{
enabled: true,
dec: vpack.NewStatelessDecoder(),
}
}
return &c
}
2 changes: 1 addition & 1 deletion network/msgCompressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (cl *converterTestLogger) Warnf(s string, args ...interface{}) {
func TestWsPeerMsgDataConverterConvert(t *testing.T) {
partitiontest.PartitionTest(t)

c := wsPeerMsgDataConverter{}
c := wsPeerMsgDataDecoder{}
c.ppdec = zstdProposalDecompressor{}
tag := protocol.AgreementVoteTag
data := []byte("data")
Expand Down
65 changes: 65 additions & 0 deletions network/vpack/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Stateless *vpack* wire format

This document specifies the byte‑level (on‑wire) layout produced by `StatelessEncoder.CompressVote` and accepted by `StatelessDecoder.DecompressVote`.
The goal is to minimize vote size while retaining a 1‑to‑1, loss‑free mapping to the canonical msgpack representation of `agreement.UnauthenticatedVote`.

---

## 1. High‑level structure

```
+---------+-----------------+---------------------+--------------------------+
| Header | VrfProof ("pf") | rawVote ("r") | OneTimeSignature ("sig") |
| 2 bytes | 80 bytes | variable length | 256 bytes |
+---------+-----------------+------------------------------------------------+
```

All fields appear exactly once, and in the fixed order above. The presence of optional sub‑fields inside `rawVote` are indicated by a 1‑byte bitmask in the header.

---

## 2. Header (2 bytes)

| Offset | Description |
| ------ | -------------------------------------------------------------- |
| `0` | Presence flags for optional values (LSB first, see table). |
| `1` | Reserved, currently zero. |

### 2.1 Bit‑mask layout (byte 0)

| Bit | Flag | Field enabled | Size needed |
| --- | ----------- | -------------------------------- | ----------- |
| 0 | `bitPer` | `r.per` (varuint) | 1 – 9 |
| 1 | `bitDig` | `r.prop.dig` (32-byte digest) | 32 |
| 2 | `bitEncDig` | `r.prop.encdig` (32-byte digest) | 32 |
| 3 | `bitOper` | `r.prop.oper` (varuint) | 1 – 9 |
| 4 | `bitOprop` | `r.prop.oprop` (32-byte address) | 32 |
| 5 | `bitStep` | `r.step` (varuint) | 1 - 9 |

*Variable‑length integers* use msgpack varuint encoding:
- `fixint` (≤ 127), 1 byte in length
- `uint8` 2 bytes in length (1 for marker, 1 for value)
- `uint16` 3 bytes in length (1 for marker, 2 for value)
- `uint32` 5 bytes in length (1 for marker, 4 for value)
- `uint64` 9 bytes in length (1 for marker, 8 for value)

---

## 3. Field serialization order

After the 2-byte header, the encoder emits values in the following order:

1. `pf` VRF credential (80 bytes), always present.
1. `r.per` period (varuint), if `bitPer` is set.
1. `r.prop.dig` proposal's digest (32 bytes), if `bitDig` is set.
1. `r.prop.encdig` encoded proposal's digest (32 bytes), if `bitEncDig` is set.
1. `r.prop.oper` proposal's original period (32 bytes), if `bitOper` is set.
1. `r.prop.oprop` proposal's original proposer (32 bytes), if `bitOprop` is set.
1. `r.rnd` round number (varuint), always present.
1. `r.snd` sender address (32 bytes) always present.
1. `r.step` step (varuint), if `bitStep` is set.
1. `sig.p` public key (32 bytes), always present.
1. `sig.p1s` signature of offset ID (64 bytes), always present.
1. `sig.p2` second-tier public key (32 bytes), always present.
1. `sig.p2s` signature of batch ID (64 bytes), always present.
1. `sig.s` signature of message under key p (64 bytes), always present.
Loading
Loading