@@ -77,9 +77,12 @@ type TransactionStreamer struct {
77
77
broadcastServer * broadcaster.Broadcaster
78
78
inboxReader * InboxReader
79
79
delayedBridge * DelayedBridge
80
- espressoClient * espressoClient.Client
81
80
82
- lightClientReader lightclient.LightClientReaderInterface
81
+ // Espresso specific fields. These fields are set from batch poster
82
+ espressoClient * espressoClient.Client
83
+ lightClientReader lightclient.LightClientReaderInterface
84
+ espressoTxnsPollingInterval time.Duration
85
+ espressoSwitchDelayThreshold uint64
83
86
// Public these fields for testing
84
87
HotshotDown bool
85
88
UseEscapeHatch bool
@@ -89,46 +92,26 @@ type TransactionStreamerConfig struct {
89
92
MaxBroadcasterQueueSize int `koanf:"max-broadcaster-queue-size"`
90
93
MaxReorgResequenceDepth int64 `koanf:"max-reorg-resequence-depth" reload:"hot"`
91
94
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`
92
-
93
- // Espresso specific fields
94
- SovereignSequencerEnabled bool `koanf:"sovereign-sequencer-enabled"`
95
- HotShotUrl string `koanf:"hotshot-url"`
96
- EspressoNamespace uint64 `koanf:"espresso-namespace"`
97
- EspressoTxnsPollingInterval time.Duration `koanf:"espresso-txns-polling-interval"`
98
- EspressoSwitchDelayThreshold uint64 `koanf:"espresso-switch-delay-threshold"`
99
95
}
100
96
101
97
type TransactionStreamerConfigFetcher func () * TransactionStreamerConfig
102
98
103
99
var DefaultTransactionStreamerConfig = TransactionStreamerConfig {
104
- MaxBroadcasterQueueSize : 50_000 ,
105
- MaxReorgResequenceDepth : 1024 ,
106
- ExecuteMessageLoopDelay : time .Millisecond * 100 ,
107
- SovereignSequencerEnabled : false ,
108
- HotShotUrl : "" ,
109
- EspressoTxnsPollingInterval : time .Millisecond * 100 ,
110
- EspressoSwitchDelayThreshold : 20 ,
100
+ MaxBroadcasterQueueSize : 50_000 ,
101
+ MaxReorgResequenceDepth : 1024 ,
102
+ ExecuteMessageLoopDelay : time .Millisecond * 100 ,
111
103
}
112
104
113
105
var TestTransactionStreamerConfig = TransactionStreamerConfig {
114
- MaxBroadcasterQueueSize : 10_000 ,
115
- MaxReorgResequenceDepth : 128 * 1024 ,
116
- ExecuteMessageLoopDelay : time .Millisecond ,
117
- SovereignSequencerEnabled : false ,
118
- HotShotUrl : "" ,
119
- EspressoTxnsPollingInterval : time .Millisecond * 100 ,
120
- EspressoSwitchDelayThreshold : 10 ,
106
+ MaxBroadcasterQueueSize : 10_000 ,
107
+ MaxReorgResequenceDepth : 128 * 1024 ,
108
+ ExecuteMessageLoopDelay : time .Millisecond ,
121
109
}
122
110
123
111
func TransactionStreamerConfigAddOptions (prefix string , f * flag.FlagSet ) {
124
112
f .Int (prefix + ".max-broadcaster-queue-size" , DefaultTransactionStreamerConfig .MaxBroadcasterQueueSize , "maximum cache of pending broadcaster messages" )
125
113
f .Int64 (prefix + ".max-reorg-resequence-depth" , DefaultTransactionStreamerConfig .MaxReorgResequenceDepth , "maximum number of messages to attempt to resequence on reorg (0 = never resequence, -1 = always resequence)" )
126
114
f .Duration (prefix + ".execute-message-loop-delay" , DefaultTransactionStreamerConfig .ExecuteMessageLoopDelay , "delay when polling calls to execute messages" )
127
- f .Bool (prefix + ".sovereign-sequencer-enabled" , DefaultTransactionStreamerConfig .SovereignSequencerEnabled , "if true, transactions will be sent to espresso's sovereign sequencer to be notarized by espresso network" )
128
- f .String (prefix + ".hotshot-url" , DefaultTransactionStreamerConfig .HotShotUrl , "url of the hotshot sequencer" )
129
- f .Uint64 (prefix + ".espresso-namespace" , DefaultTransactionStreamerConfig .EspressoNamespace , "espresso namespace that corresponds the L2 chain" )
130
- f .Duration (prefix + ".espresso-txns-polling-interval" , DefaultTransactionStreamerConfig .EspressoTxnsPollingInterval , "interval between polling for transactions to be included in the block" )
131
- f .Uint64 (prefix + ".espresso-switch-delay-threshold" , DefaultTransactionStreamerConfig .EspressoSwitchDelayThreshold , "specifies the switch delay threshold used to determine hotshot liveness" )
132
115
}
133
116
134
117
func NewTransactionStreamer (
@@ -151,12 +134,6 @@ func NewTransactionStreamer(
151
134
snapSyncConfig : snapSyncConfig ,
152
135
}
153
136
154
- if config ().SovereignSequencerEnabled {
155
- espressoClient := espressoClient .NewClient (config ().HotShotUrl )
156
- streamer .espressoClient = espressoClient
157
-
158
- }
159
-
160
137
err := streamer .cleanupInconsistentState ()
161
138
if err != nil {
162
139
return nil , err
@@ -693,6 +670,10 @@ func (s *TransactionStreamer) AddFakeInitMessage() error {
693
670
}})
694
671
}
695
672
673
+ func (s * TransactionStreamer ) isEspressoMode () bool {
674
+ return s .lightClientReader != nil && s .espressoClient != nil
675
+ }
676
+
696
677
// Used in redis tests
697
678
func (s * TransactionStreamer ) GetMessageCountSync (t * testing.T ) (arbutil.MessageIndex , error ) {
698
679
s .insertionMutex .Lock ()
@@ -1297,7 +1278,7 @@ func (s *TransactionStreamer) pollSubmittedTransactionForFinality(ctx context.Co
1297
1278
}
1298
1279
1299
1280
// Verify the namespace proof
1300
- resp , err := s .espressoClient .FetchTransactionsInBlock (ctx , height , s .config (). EspressoNamespace )
1281
+ resp , err := s .espressoClient .FetchTransactionsInBlock (ctx , height , s .chainConfig . ChainID . Uint64 () )
1301
1282
if err != nil {
1302
1283
log .Warn ("failed to fetch the transactions in block, will retry" , "err" , err )
1303
1284
return false
@@ -1627,7 +1608,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
1627
1608
1628
1609
pendingTxnsPos , err := s .getEspressoPendingTxnsPos ()
1629
1610
if err != nil {
1630
- return s .config (). EspressoTxnsPollingInterval
1611
+ return s .espressoTxnsPollingInterval
1631
1612
}
1632
1613
1633
1614
if len (pendingTxnsPos ) > 0 {
@@ -1637,7 +1618,7 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
1637
1618
msg , err := s .GetMessage (pos )
1638
1619
if err != nil {
1639
1620
log .Error ("failed to get espresso submitted pos" , "err" , err )
1640
- return s .config (). EspressoTxnsPollingInterval
1621
+ return s .espressoTxnsPollingInterval
1641
1622
}
1642
1623
if msg .Message != nil {
1643
1624
msgs = append (msgs , * msg .Message )
@@ -1646,20 +1627,20 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
1646
1627
payload , msgCnt := arbos .BuildHotShotPayload (& msgs )
1647
1628
if msgCnt == 0 {
1648
1629
log .Error ("failed to build the hotshot transaction: a large message has exceeded the size limit" )
1649
- return s .config (). EspressoTxnsPollingInterval
1630
+ return s .espressoTxnsPollingInterval
1650
1631
}
1651
1632
1652
1633
log .Info ("submitting transaction to espresso using sovereign sequencer" )
1653
1634
1654
1635
// Note: same key should not be used for two namespaces for this to work
1655
1636
hash , err := s .espressoClient .SubmitTransaction (ctx , espressoTypes.Transaction {
1656
1637
Payload : payload ,
1657
- Namespace : s .config (). EspressoNamespace ,
1638
+ Namespace : s .chainConfig . ChainID . Uint64 () ,
1658
1639
})
1659
1640
1660
1641
if err != nil {
1661
1642
log .Error ("failed to submit transaction to espresso" , "err" , err )
1662
- return s .config (). EspressoTxnsPollingInterval
1643
+ return s .espressoTxnsPollingInterval
1663
1644
}
1664
1645
1665
1646
s .espressoTxnsStateInsertionMutex .Lock ()
@@ -1670,32 +1651,32 @@ func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) ti
1670
1651
err = s .setEspressoSubmittedPos (batch , submittedPos )
1671
1652
if err != nil {
1672
1653
log .Error ("failed to set the submitted txn pos" , "err" , err )
1673
- return s .config (). EspressoTxnsPollingInterval
1654
+ return s .espressoTxnsPollingInterval
1674
1655
}
1675
1656
pendingTxnsPos = pendingTxnsPos [msgCnt :]
1676
1657
err = s .setEspressoPendingTxnsPos (batch , pendingTxnsPos )
1677
1658
if err != nil {
1678
1659
log .Error ("failed to set the pending txns" , "err" , err )
1679
- return s .config (). EspressoTxnsPollingInterval
1660
+ return s .espressoTxnsPollingInterval
1680
1661
}
1681
1662
err = s .setEspressoSubmittedHash (batch , hash )
1682
1663
if err != nil {
1683
1664
log .Error ("failed to set the submitted hash" , "err" , err )
1684
- return s .config (). EspressoTxnsPollingInterval
1665
+ return s .espressoTxnsPollingInterval
1685
1666
}
1686
1667
1687
1668
err = batch .Write ()
1688
1669
if err != nil {
1689
1670
log .Error ("failed to write to db" , "err" , err )
1690
- return s .config (). EspressoTxnsPollingInterval
1671
+ return s .espressoTxnsPollingInterval
1691
1672
}
1692
1673
}
1693
1674
1694
- return s .config (). EspressoTxnsPollingInterval
1675
+ return s .espressoTxnsPollingInterval
1695
1676
}
1696
1677
1697
1678
func (s * TransactionStreamer ) toggleEscapeHatch (ctx context.Context ) error {
1698
- live , err := s .lightClientReader .IsHotShotLive (s .config (). EspressoSwitchDelayThreshold )
1679
+ live , err := s .lightClientReader .IsHotShotLive (s .espressoSwitchDelayThreshold )
1699
1680
if err != nil {
1700
1681
return err
1701
1682
}
@@ -1738,7 +1719,7 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error {
1738
1719
}
1739
1720
1740
1721
l1Height := header .Header .GetL1Head ()
1741
- hotshotLive , err := s .lightClientReader .IsHotShotLiveAtHeight (l1Height , s .config (). EspressoSwitchDelayThreshold )
1722
+ hotshotLive , err := s .lightClientReader .IsHotShotLiveAtHeight (l1Height , s .espressoSwitchDelayThreshold )
1742
1723
if err != nil {
1743
1724
return err
1744
1725
}
@@ -1790,7 +1771,7 @@ func (s *TransactionStreamer) toggleEscapeHatch(ctx context.Context) error {
1790
1771
}
1791
1772
1792
1773
func (s * TransactionStreamer ) espressoSwitch (ctx context.Context , ignored struct {}) time.Duration {
1793
- retryRate := s .config (). EspressoTxnsPollingInterval * 50
1774
+ retryRate := s .espressoTxnsPollingInterval * 50
1794
1775
config , err := s .exec .GetArbOSConfigAtHeight (0 ) // Pass 0 to get the ArbOS config at current block height.
1795
1776
if err != nil {
1796
1777
log .Error ("Error Obtaining ArbOS Config " , "err" , err )
@@ -1802,7 +1783,7 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct
1802
1783
}
1803
1784
// TODO: `SovereignSequencerEnabled` should be removed as it is only the sovereign sequencer
1804
1785
// will use this function.
1805
- if config .ArbitrumChainParams .EnableEspresso && s .config (). SovereignSequencerEnabled {
1786
+ if config .ArbitrumChainParams .EnableEspresso && s .isEspressoMode () {
1806
1787
err := s .toggleEscapeHatch (ctx )
1807
1788
if err != nil {
1808
1789
log .Error ("error checking escape hatch" , "err" , err )
@@ -1813,14 +1794,14 @@ func (s *TransactionStreamer) espressoSwitch(ctx context.Context, ignored struct
1813
1794
return s .submitEspressoTransactions (ctx )
1814
1795
}
1815
1796
1816
- return s .config (). EspressoTxnsPollingInterval
1797
+ return s .espressoTxnsPollingInterval
1817
1798
} else {
1818
1799
return retryRate
1819
1800
}
1820
1801
}
1821
1802
1822
1803
func (s * TransactionStreamer ) shouldSubmitEspressoTransaction () bool {
1823
- if ! s .config (). SovereignSequencerEnabled {
1804
+ if ! s .isEspressoMode () {
1824
1805
// Not using hotshot as finality layer
1825
1806
return false
1826
1807
}
0 commit comments