This repository was archived by the owner on Aug 18, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 631
/
Copy pathRetrieval.hs
384 lines (357 loc) · 16.8 KB
/
Retrieval.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
-- | Server which deals with blocks processing.
module Pos.Network.Block.Retrieval
( retrievalWorker
) where
import Universum
import Control.Concurrent.STM (TVar, newTVar, putTMVar, swapTMVar,
swapTVar, tryReadTBQueue, tryReadTMVar, tryTakeTMVar)
import Control.Exception.Safe (IOException, handleAny)
import Control.Lens (to)
import Control.Monad.STM (retry)
import qualified Data.List.NonEmpty as NE
import Data.Time.Units (Second)
import Formatting (build, int, sformat, shown, (%))
import Pos.Chain.Block (Block, BlockHeader, HasHeaderHash (..),
HeaderHash, headerHashF)
import Pos.Chain.Genesis as Genesis (Config)
import Pos.Chain.Txp (TxpConfiguration)
import Pos.Core (difficultyL, isMoreDifficult)
import Pos.Core.Chrono (NE, OldestFirst (..), _OldestFirst)
import Pos.Core.Conc (delay)
import Pos.Core.Reporting (HasMisbehaviorMetrics)
import Pos.Crypto (shortHashF)
import Pos.DB.Block (ClassifyHeaderRes (..), classifyNewHeader,
getHeadersOlderExp)
import qualified Pos.DB.BlockIndex as DB
import Pos.Infra.Communication.Protocol (NodeId)
import Pos.Infra.Diffusion.Types (Diffusion, StreamBlocks (..))
import qualified Pos.Infra.Diffusion.Types as Diffusion
import Pos.Infra.Recovery.Types (RecoveryHeaderTag)
import Pos.Infra.Reporting (reportOrLogE, reportOrLogW)
import Pos.Network.Block.Logic (BlockNetLogicException (..),
handleBlocks, triggerRecovery)
import Pos.Network.Block.RetrievalQueue (BlockRetrievalQueueTag,
BlockRetrievalTask (..))
import Pos.Network.Block.WorkMode (BlockWorkMode)
import Pos.Util.Util (HasLens (..))
import Pos.Util.Wlog (logDebug, logError, logInfo, logWarning)
-- I really don't like join
{-# ANN retrievalWorker ("HLint: ignore Use join" :: Text) #-}
-- | Worker that queries blocks. It has two jobs:
--
-- * If there are headers in 'BlockRetrievalQueue', this worker retrieves
-- blocks according to that queue.
--
-- * If recovery is in progress, this worker keeps recovery going by asking
-- headers (and then switching to block retrieval on next loop iteration).
--
-- If both happen at the same time, 'BlockRetrievalQueue' takes precedence.
--
retrievalWorker
:: forall ctx m.
( BlockWorkMode ctx m
, HasMisbehaviorMetrics ctx
)
=> Genesis.Config
-> TxpConfiguration
-> Diffusion m -> m ()
retrievalWorker genesisConfig txpConfig diffusion = do
logInfo "Starting retrievalWorker loop"
mainLoop
where
mainLoop = do
queue <- view (lensOf @BlockRetrievalQueueTag)
recHeaderVar <- view (lensOf @RecoveryHeaderTag)
logDebug "Waiting on the block queue or recovery header var"
-- Reading the queue is a priority, because it sets the recovery
-- variable in case the header is classified as alternative. So if the
-- queue contains lots of headers after a long delay, we'll first
-- iterate over them and set recovery variable to the latest one, and
-- only then we'll do recovery.
thingToDoNext <- atomically $ do
mbQueuedHeadersChunk <- tryReadTBQueue queue
mbRecHeader <- tryReadTMVar recHeaderVar
case (mbQueuedHeadersChunk, mbRecHeader) of
(Nothing, Nothing) -> retry
-- Dispatch the task
(Just (nodeId, task), _) ->
pure (handleBlockRetrieval nodeId task)
-- No tasks & the recovery header is set => do the recovery
(_, Just (nodeId, rHeader)) ->
pure (handleRecoveryWithHandler nodeId rHeader)
-- Exception handlers are installed locally, on the 'thingToDoNext',
-- to ensure that network troubles, for instance, do not kill the
-- worker.
() <- thingToDoNext
mainLoop
-----------------
-- That's the first queue branch (task dispatching).
handleBlockRetrieval nodeId BlockRetrievalTask{..} =
handleAny (handleRetrievalE nodeId brtHeader) $ do
logDebug $ sformat
("Block retrieval queue task received, nodeId="%build%
", header="%build%", continues="%build)
nodeId
(headerHash brtHeader)
brtContinues
(if brtContinues then handleContinues else handleAlternative)
nodeId
brtHeader
-- When we have a continuation of the chain, just try to get and apply it.
handleContinues nodeId header = do
let hHash = headerHash header
logDebug $ "handleContinues: " <> pretty hHash
classifyNewHeader genesisConfig header >>= \case
CHContinues ->
void $ getProcessBlocks genesisConfig txpConfig diffusion nodeId hHash [hHash]
res -> logDebug $
"processContHeader: expected header to " <>
"be continuation, but it's " <> show res
-- When we have an alternative header, we should check whether it's actually
-- recovery mode (server side should send us headers as a proof) and then
-- enter recovery mode.
handleAlternative nodeId header = do
logDebug $ "handleAlternative: " <> pretty (headerHash header)
classifyNewHeader genesisConfig header >>= \case
CHInvalid _ ->
logError "handleAlternative: invalid header got into retrievalWorker queue"
CHUseless _ ->
logDebug $
sformat ("handleAlternative: header "%build%" became useless, ignoring it")
header
_ -> do
logDebug "handleAlternative: considering header for recovery mode"
-- CSL-1514
updateRecoveryHeader nodeId header
-- Squelch the exception and continue. Used with 'handleAny' from
-- safe-exceptions so it will let async exceptions pass.
handleRetrievalE nodeId cHeader e = do
reportOrLogW (sformat
("handleRetrievalE: error handling nodeId="%build%", header="%build%": ")
nodeId (headerHash cHeader)) e
-----------------
handleRecoveryWithHandler nodeId rHeader =
handleAny (handleRecoveryE nodeId rHeader) $
handleRecovery nodeId rHeader
-- We immediately drop recovery mode/header and request tips
-- again.
handleRecoveryE nodeId rHeader e = do
-- REPORT:ERROR 'reportOrLogW' in block retrieval worker/recovery.
reportOrLogW (sformat errfmt nodeId (headerHash rHeader)) e
`catch` handleIOException
dropRecoveryHeaderAndRepeat genesisConfig diffusion nodeId
where
errfmt = "handleRecoveryE: error handling nodeId="%build%", header="%headerHashF%": "
handleIOException :: IOException -> m ()
handleIOException _ = logError $ sformat (errfmt%shown) nodeId (headerHash rHeader) e
-- Recovery handling. We assume that header in the recovery variable is
-- appropriate and just query headers/blocks.
handleRecovery :: NodeId -> BlockHeader -> m ()
handleRecovery nodeId rHeader = do
logDebug "Block retrieval queue is empty and we're in recovery mode,\
\ so we will fetch more blocks"
whenM (fmap isJust $ DB.getHeader $ headerHash rHeader) $
-- How did we even got into recovery then?
throwM $ DialogUnexpected $ "handleRecovery: recovery header is " <>
"already present in db"
logDebug "handleRecovery: fetching blocks"
checkpoints <- toList <$> getHeadersOlderExp genesisConfig Nothing
void $ streamProcessBlocks genesisConfig
txpConfig
diffusion
nodeId
(headerHash rHeader)
checkpoints
----------------------------------------------------------------------------
-- Entering and exiting recovery mode
----------------------------------------------------------------------------
-- | Result of attempt to update recovery header.
data UpdateRecoveryResult ssc
= RecoveryStarted NodeId BlockHeader
-- ^ Recovery header was absent, so we've set it.
| RecoveryShifted NodeId BlockHeader NodeId BlockHeader
-- ^ Header was present, but we've replaced it with another
-- (more difficult) one.
| RecoveryContinued NodeId BlockHeader
-- ^ Header is good, but is irrelevant, so recovery variable is
-- unchanged.
-- | Be careful to run this in the same thread that ends recovery mode
-- (or synchronise those threads with an MVar), otherwise a race
-- condition can occur where we are caught in the recovery mode
-- indefinitely.
updateRecoveryHeader
:: BlockWorkMode ctx m
=> NodeId
-> BlockHeader
-> m ()
updateRecoveryHeader nodeId hdr = do
recHeaderVar <- view (lensOf @RecoveryHeaderTag)
logDebug "Updating recovery header..."
updated <- atomically $ do
mbRecHeader <- tryReadTMVar recHeaderVar
case mbRecHeader of
Nothing -> do
putTMVar recHeaderVar (nodeId, hdr)
return $ RecoveryStarted nodeId hdr
Just (oldNodeId, oldHdr) -> do
let needUpdate = hdr `isMoreDifficult` oldHdr
if needUpdate
then swapTMVar recHeaderVar (nodeId, hdr) $>
RecoveryShifted oldNodeId oldHdr nodeId hdr
else return $ RecoveryContinued oldNodeId oldHdr
logDebug $ case updated of
RecoveryStarted rNodeId rHeader -> sformat
("Recovery started with nodeId="%build%" and tip="%build)
rNodeId
(headerHash rHeader)
RecoveryShifted rNodeId' rHeader' rNodeId rHeader -> sformat
("Recovery shifted from nodeId="%build%" and tip="%build%
" to nodeId="%build%" and tip="%build)
rNodeId' (headerHash rHeader')
rNodeId (headerHash rHeader)
RecoveryContinued rNodeId rHeader -> sformat
("Recovery continued with nodeId="%build%" and tip="%build)
rNodeId
(headerHash rHeader)
-- | The returned 'Bool' signifies whether given peer was kicked and recovery
-- was stopped.
--
-- NB. The reason @nodeId@ is passed is that we want to avoid a race
-- condition. If you work with peer P and header H, after failure you want to
-- drop communication with P; however, if at the same time a new block
-- arrives and another thread replaces peer and header to (P2, H2), you want
-- to continue working with P2 and ignore the exception that happened with P.
-- So, @nodeId@ is used to check that the peer wasn't replaced mid-execution.
dropRecoveryHeader
:: BlockWorkMode ctx m
=> NodeId
-> m Bool
dropRecoveryHeader nodeId = do
recHeaderVar <- view (lensOf @RecoveryHeaderTag)
(kicked,realPeer) <- atomically $ do
let processKick (peer,_) = do
let p = peer == nodeId
when p $ void $ tryTakeTMVar recHeaderVar
pure (p, Just peer)
maybe (pure (True,Nothing)) processKick =<< tryReadTMVar recHeaderVar
when kicked $ logWarning $
sformat ("Recovery mode communication dropped with peer "%build) nodeId
unless kicked $
logDebug $ "Recovery mode wasn't disabled: " <>
maybe "noth" show realPeer <> " vs " <> show nodeId
pure kicked
-- | Drops the recovery header and, if it was successful, queries the tips.
dropRecoveryHeaderAndRepeat
:: BlockWorkMode ctx m
=> Genesis.Config
-> Diffusion m
-> NodeId
-> m ()
dropRecoveryHeaderAndRepeat genesisConfig diffusion nodeId = do
kicked <- dropRecoveryHeader nodeId
when kicked $ attemptRestartRecovery
where
attemptRestartRecovery = do
logDebug "Attempting to restart recovery"
-- FIXME why delay? Why 2 seconds?
delay (2 :: Second)
handleAny handleRecoveryTriggerE $ triggerRecovery genesisConfig diffusion
logDebug "Attempting to restart recovery over"
handleRecoveryTriggerE =
-- REPORT:ERROR 'reportOrLogE' somewhere in block retrieval.
reportOrLogE $ "Exception happened while trying to trigger " <>
"recovery inside dropRecoveryHeaderAndRepeat: "
-- Returns only if blocks were successfully downloaded and
-- processed. Throws exception if something goes wrong.
getProcessBlocks
:: forall ctx m
. (BlockWorkMode ctx m, HasMisbehaviorMetrics ctx)
=> Genesis.Config
-> TxpConfiguration
-> Diffusion m
-> NodeId
-> HeaderHash
-> [HeaderHash]
-> m ()
getProcessBlocks genesisConfig txpConfig diffusion nodeId desired checkpoints = do
result <- Diffusion.getBlocks diffusion nodeId desired checkpoints
case OldestFirst <$> nonEmpty (getOldestFirst result) of
Nothing -> do
let msg = sformat ("getProcessBlocks: diffusion returned []"%
" on request to fetch "%shortHashF%" from peer "%build)
desired nodeId
throwM $ DialogUnexpected msg
Just (blocks :: OldestFirst NE Block) -> do
recHeaderVar <- view (lensOf @RecoveryHeaderTag)
logDebug $ sformat
("Retrieved "%int%" blocks")
(blocks ^. _OldestFirst . to NE.length)
handleBlocks genesisConfig txpConfig blocks diffusion
-- If we've downloaded any block with bigger
-- difficulty than ncRecoveryHeader, we're
-- gracefully exiting recovery mode.
let isMoreDifficultThan b x = b ^. difficultyL >= x ^. difficultyL
exitedRecovery <- atomically $ tryReadTMVar recHeaderVar >>= \case
-- We're not in recovery mode? That must be ok.
Nothing -> pure False
-- If we're in recovery mode we should exit it if
-- any block is more difficult than one in
-- recHeader.
Just (_, rHeader) ->
if any (`isMoreDifficultThan` rHeader) blocks
then isJust <$> tryTakeTMVar recHeaderVar
else pure False
when exitedRecovery $
logInfo "Recovery mode exited gracefully on receiving block we needed"
-- Attempts to catch up by streaming blocks from peer.
-- Will fall back to getProcessBlocks if streaming is disabled
-- or not supported by peer.
streamProcessBlocks
:: forall ctx m
. (BlockWorkMode ctx m, HasMisbehaviorMetrics ctx)
=> Genesis.Config
-> TxpConfiguration
-> Diffusion m
-> NodeId
-> HeaderHash
-> [HeaderHash]
-> m ()
streamProcessBlocks genesisConfig txpConfig diffusion nodeId desired checkpoints = do
logInfo "streaming start"
mostDifficultBlock <- atomically $ newTVar Nothing
r <- Diffusion.streamBlocks diffusion nodeId desired checkpoints (writeCallback mostDifficultBlock)
case r of
Nothing -> do
logInfo "streaming not supported, reverting to batch mode"
getProcessBlocks genesisConfig txpConfig diffusion nodeId desired checkpoints
Just _ -> do
logInfo "streaming done"
recHeaderVar <- view (lensOf @RecoveryHeaderTag)
exitedRecovery <- atomically $ do
mbMostDifficult <- readTVar mostDifficultBlock
mbRecHeader <- tryReadTMVar recHeaderVar
case (mbMostDifficult, mbRecHeader) of
(Nothing, _) -> pure False -- We have not gotten a single block
(Just _, Nothing) -> pure False -- We where not in recovery?
(Just mostDifficult, Just (_, recHeader)) ->
if (mostDifficult ^. difficultyL) >= (recHeader ^. difficultyL)
then isJust <$> tryTakeTMVar recHeaderVar
else pure False
if exitedRecovery
then do
logInfo "Recovery mode exited gracefully on receiving block we needed"
return ()
else do -- Streaming stopped but we didn't make any progress
_ <- dropRecoveryHeaderAndRepeat genesisConfig diffusion nodeId
return ()
where
writeCallback :: TVar (Maybe Block) -> StreamBlocks Block m ()
writeCallback mostDifficultBlock = StreamBlocks
{ streamBlocksMore = \blks -> do
_ <- atomically $ swapTVar mostDifficultBlock (Just (NE.head blks))
_ <- handleBlocks genesisConfig txpConfig (OldestFirst (NE.reverse $ blks)) diffusion
pure (writeCallback mostDifficultBlock)
, streamBlocksDone = pure ()
}