@@ -44,6 +44,9 @@ type Conn struct {
44
44
// base network connection
45
45
conn net.Conn
46
46
47
+ // number of inflight requests on the connection.
48
+ inflight int32
49
+
47
50
// offset management (synchronized on the mutex field)
48
51
mutex sync.Mutex
49
52
offset int64
@@ -1236,6 +1239,18 @@ func (c *Conn) writeOperation(write func(time.Time, int32) error, read func(time
1236
1239
return c .do (& c .wdeadline , write , read )
1237
1240
}
1238
1241
1242
+ func (c * Conn ) enter () {
1243
+ atomic .AddInt32 (& c .inflight , + 1 )
1244
+ }
1245
+
1246
+ func (c * Conn ) leave () {
1247
+ atomic .AddInt32 (& c .inflight , - 1 )
1248
+ }
1249
+
1250
+ func (c * Conn ) concurrency () int {
1251
+ return int (atomic .LoadInt32 (& c .inflight ))
1252
+ }
1253
+
1239
1254
func (c * Conn ) do (d * connDeadline , write func (time.Time , int32 ) error , read func (time.Time , int ) error ) error {
1240
1255
id , err := c .doRequest (d , write )
1241
1256
if err != nil {
@@ -1261,6 +1276,7 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func
1261
1276
}
1262
1277
1263
1278
func (c * Conn ) doRequest (d * connDeadline , write func (time.Time , int32 ) error ) (id int32 , err error ) {
1279
+ c .enter ()
1264
1280
c .wlock .Lock ()
1265
1281
c .correlationID ++
1266
1282
id = c .correlationID
@@ -1272,67 +1288,53 @@ func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (i
1272
1288
// recoverable state so we're better off just giving up at this point to
1273
1289
// avoid any risk of corrupting the following operations.
1274
1290
c .conn .Close ()
1291
+ c .leave ()
1275
1292
}
1276
1293
1277
1294
c .wlock .Unlock ()
1278
1295
return
1279
1296
}
1280
1297
1281
1298
func (c * Conn ) waitResponse (d * connDeadline , id int32 ) (deadline time.Time , size int , lock * sync.Mutex , err error ) {
1282
- // I applied exactly zero scientific process to choose this value,
1283
- // it seemed to worked fine in practice tho.
1284
- //
1285
- // My guess is 100 iterations where the goroutine gets descheduled
1286
- // by calling runtime.Gosched() may end up on a wait of ~10ms to ~1s
1287
- // (if the programs is heavily CPU bound and has lots of goroutines),
1288
- // so it should allow for bailing quickly without taking too much risk
1289
- // to get false positives.
1290
- const maxAttempts = 100
1291
- var lastID int32
1292
-
1293
- for attempt := 0 ; attempt < maxAttempts ; {
1299
+ for {
1294
1300
var rsz int32
1295
1301
var rid int32
1296
1302
1297
1303
c .rlock .Lock ()
1298
1304
deadline = d .setConnReadDeadline (c .conn )
1305
+ rsz , rid , err = c .peekResponseSizeAndID ()
1299
1306
1300
- if rsz , rid , err = c . peekResponseSizeAndID (); err != nil {
1307
+ if err != nil {
1301
1308
d .unsetConnReadDeadline ()
1302
1309
c .conn .Close ()
1303
1310
c .rlock .Unlock ()
1304
- return
1311
+ break
1305
1312
}
1306
1313
1307
1314
if id == rid {
1308
1315
c .skipResponseSizeAndID ()
1309
1316
size , lock = int (rsz - 4 ), & c .rlock
1310
- return
1317
+ // Don't unlock the read mutex to yield ownership to the caller.
1318
+ break
1319
+ }
1320
+
1321
+ if c .concurrency () == 1 {
1322
+ // If the goroutine is the only one waiting on this connection it
1323
+ // should be impossible to read a correlation id different from the
1324
+ // one it expects. This is a sign that the data we are reading on
1325
+ // the wire is corrupted and the connection needs to be closed.
1326
+ err = io .ErrNoProgress
1327
+ c .rlock .Unlock ()
1328
+ break
1311
1329
}
1312
1330
1313
1331
// Optimistically release the read lock if a response has already
1314
1332
// been received but the current operation is not the target for it.
1315
1333
c .rlock .Unlock ()
1316
1334
runtime .Gosched ()
1317
-
1318
- // This check is a safety mechanism, if we make too many loop
1319
- // iterations and always draw the same id then we could be facing
1320
- // corrupted data on the wire, or the goroutine(s) sharing ownership
1321
- // of this connection may have panicked and therefore will not be able
1322
- // to participate in consuming bytes from the wire. To prevent entering
1323
- // an infinite loop which reads the same value over and over we bail
1324
- // with the uncommon io.ErrNoProgress error which should give a good
1325
- // enough signal about what is going wrong.
1326
- if rid != lastID {
1327
- attempt ++
1328
- } else {
1329
- attempt = 0
1330
- }
1331
-
1332
- lastID = rid
1333
1335
}
1334
1336
1335
- err = io . ErrNoProgress
1337
+ c . leave ()
1336
1338
return
1337
1339
}
1338
1340
0 commit comments