Skip to content

Commit 8a4fac1

Browse files
stevendannayuzefovich
authored andcommitted
kvnemesis: add FlushLockTableOperation
This adds FlushLockTable to the set of KVNemsis operations. It doesn't add any particularly interesting validations. Epic: none Release note: None
1 parent 8cf7e0c commit 8a4fac1

File tree

8 files changed

+117
-3
lines changed

8 files changed

+117
-3
lines changed

pkg/kv/batch.go

+22
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ func (b *Batch) fillResults(ctx context.Context) {
300300
case *kvpb.MigrateRequest:
301301
case *kvpb.QueryResolvedTimestampRequest:
302302
case *kvpb.BarrierRequest:
303+
case *kvpb.FlushLockTableRequest:
303304
case *kvpb.LinkExternalSSTableRequest:
304305
case *kvpb.ExciseRequest:
305306
default:
@@ -1147,6 +1148,27 @@ func (b *Batch) barrier(s, e interface{}, withLAI bool) {
11471148
b.initResult(1, 0, notRaw, nil)
11481149
}
11491150

1151+
func (b *Batch) flushLockTable(s, e interface{}) {
1152+
begin, err := marshalKey(s)
1153+
if err != nil {
1154+
b.initResult(0, 0, notRaw, err)
1155+
return
1156+
}
1157+
end, err := marshalKey(e)
1158+
if err != nil {
1159+
b.initResult(0, 0, notRaw, err)
1160+
return
1161+
}
1162+
req := &kvpb.FlushLockTableRequest{
1163+
RequestHeader: kvpb.RequestHeader{
1164+
Key: begin,
1165+
EndKey: end,
1166+
},
1167+
}
1168+
b.appendReqs(req)
1169+
b.initResult(1, 0, notRaw, nil)
1170+
}
1171+
11501172
func (b *Batch) bulkRequest(
11511173
numKeys int, requestFactory func() (req kvpb.RequestUnion, kvSize int),
11521174
) {

pkg/kv/db.go

+17
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,23 @@ func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestam
899899
return resp.Timestamp, nil
900900
}
901901

902+
func (db *DB) FlushLockTable(ctx context.Context, begin, end interface{}) error {
903+
b := &Batch{}
904+
b.flushLockTable(begin, end)
905+
if err := getOneErr(db.Run(ctx, b), b); err != nil {
906+
return err
907+
}
908+
if l := len(b.response.Responses); l != 1 {
909+
return errors.Errorf("got %d responses for FlushLockTable", l)
910+
}
911+
resp := b.response.Responses[0].GetFlushLockTable()
912+
if resp == nil {
913+
return errors.Errorf("unexpected response %T for FlushLockTable",
914+
b.response.Responses[0].GetInner())
915+
}
916+
return nil
917+
}
918+
902919
// BarrierWithLAI is like Barrier, but also returns the lease applied index and
903920
// range descriptor at which the barrier was applied. In this case, the barrier
904921
// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned.

pkg/kv/kvnemesis/applier.go

+14
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
143143
_, err = db.Barrier(ctx, o.Key, o.EndKey)
144144
}
145145
o.Result = resultInit(ctx, err)
146+
case *FlushLockTableOperation:
147+
o.Result = resultInit(ctx, db.FlushLockTable(ctx, o.Key, o.EndKey))
146148
case *ClosureTxnOperation:
147149
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
148150
// epochs of the same transaction to avoid waiting while holding locks.
@@ -449,6 +451,16 @@ func applyClientOp(
449451
})
450452
})
451453
o.Result = resultInit(ctx, err)
454+
case *FlushLockTableOperation:
455+
_, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
456+
b.AddRawRequest(&kvpb.FlushLockTableRequest{
457+
RequestHeader: kvpb.RequestHeader{
458+
Key: o.Key,
459+
EndKey: o.EndKey,
460+
},
461+
})
462+
})
463+
o.Result = resultInit(ctx, err)
452464
case *BatchOperation:
453465
b := &kv.Batch{}
454466
applyBatchOp(ctx, b, db.Run, o)
@@ -569,6 +581,8 @@ func applyBatchOp(
569581
panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))
570582
case *BarrierOperation:
571583
panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
584+
case *FlushLockTableOperation:
585+
panic(errors.AssertionFailedf(`FlushLockOperation cannot be used in batches`))
572586
default:
573587
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
574588
}

pkg/kv/kvnemesis/generator.go

+31
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ type ClientOperationConfig struct {
261261
AddSSTable int
262262
// Barrier is an operation that waits for in-flight writes to complete.
263263
Barrier int
264+
265+
// FlushLockTable is an operation that moves unreplicated locks in the
266+
// in-memory lock table into the
267+
FlushLockTable int
264268
}
265269

266270
// BatchOperationConfig configures the relative probability of generating a
@@ -401,6 +405,7 @@ func newAllOperationsConfig() GeneratorConfig {
401405
DeleteRangeUsingTombstone: 1,
402406
AddSSTable: 1,
403407
Barrier: 1,
408+
FlushLockTable: 1,
404409
}
405410
batchOpConfig := BatchOperationConfig{
406411
Batch: 4,
@@ -536,6 +541,11 @@ func NewDefaultConfig() GeneratorConfig {
536541
config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0
537542
config.Ops.ClosureTxn.TxnClientOps.Barrier = 0
538543
config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0
544+
545+
config.Ops.Batch.Ops.FlushLockTable = 0
546+
config.Ops.ClosureTxn.CommitBatchOps.FlushLockTable = 0
547+
config.Ops.ClosureTxn.TxnClientOps.FlushLockTable = 0
548+
config.Ops.ClosureTxn.TxnBatchOps.Ops.FlushLockTable = 0
539549
return config
540550
}
541551

@@ -833,6 +843,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
833843
addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone)
834844
addOpGen(allowed, randAddSSTable, c.AddSSTable)
835845
addOpGen(allowed, randBarrier, c.Barrier)
846+
addOpGen(allowed, randFlushLockTable, c.FlushLockTable)
836847
}
837848

838849
func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
@@ -1138,6 +1149,19 @@ func randBarrier(g *generator, rng *rand.Rand) Operation {
11381149
return barrier(key, endKey, withLAI)
11391150
}
11401151

1152+
func randFlushLockTable(g *generator, rng *rand.Rand) Operation {
1153+
// FlushLockTable can't span multiple ranges. We want to test a combination of
1154+
// requests that span the entire range and those that span part of a range.
1155+
key, endKey := randRangeSpan(rng, g.currentSplits)
1156+
1157+
wholeRange := rng.Float64() < 0.5
1158+
if !wholeRange {
1159+
key = randKeyBetween(rng, key, endKey)
1160+
}
1161+
1162+
return flushLockTable(key, endKey)
1163+
}
1164+
11411165
func randScan(g *generator, rng *rand.Rand) Operation {
11421166
key, endKey := randSpan(rng)
11431167
return scan(key, endKey)
@@ -1976,6 +2000,13 @@ func barrier(key, endKey string, withLAI bool) Operation {
19762000
}}
19772001
}
19782002

2003+
func flushLockTable(key, endKey string) Operation {
2004+
return Operation{FlushLockTable: &FlushLockTableOperation{
2005+
Key: []byte(key),
2006+
EndKey: []byte(endKey),
2007+
}}
2008+
}
2009+
19792010
func createSavepoint(id int) Operation {
19802011
return Operation{SavepointCreate: &SavepointCreateOperation{ID: int32(id)}}
19812012
}

pkg/kv/kvnemesis/generator_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ func TestRandStep(t *testing.T) {
248248
client.AddSSTable++
249249
case *BarrierOperation:
250250
client.Barrier++
251+
case *FlushLockTableOperation:
252+
client.FlushLockTable++
251253
case *BatchOperation:
252254
batch.Batch++
253255
countClientOps(&batch.Ops, nil, o.Ops...)
@@ -284,7 +286,8 @@ func TestRandStep(t *testing.T) {
284286
*DeleteRangeOperation,
285287
*DeleteRangeUsingTombstoneOperation,
286288
*AddSSTableOperation,
287-
*BarrierOperation:
289+
*BarrierOperation,
290+
*FlushLockTableOperation:
288291
countClientOps(&counts.DB, &counts.Batch, step.Op)
289292
case *ClosureTxnOperation:
290293
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)

pkg/kv/kvnemesis/operations.go

+9
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func (op Operation) Result() *Result {
3838
return &o.Result
3939
case *BarrierOperation:
4040
return &o.Result
41+
case *FlushLockTableOperation:
42+
return &o.Result
4143
case *SplitOperation:
4244
return &o.Result
4345
case *MergeOperation:
@@ -138,6 +140,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
138140
o.format(w, fctx)
139141
case *BarrierOperation:
140142
o.format(w, fctx)
143+
case *FlushLockTableOperation:
144+
o.format(w, fctx)
141145
case *SplitOperation:
142146
o.format(w, fctx)
143147
case *MergeOperation:
@@ -364,6 +368,11 @@ func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) {
364368
op.Result.format(w)
365369
}
366370

371+
func (op FlushLockTableOperation) format(w *strings.Builder, fctx formatCtx) {
372+
fmt.Fprintf(w, `%s.FlushLockTable(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
373+
op.Result.format(w)
374+
}
375+
367376
func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) {
368377
fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key))
369378
op.Result.format(w)

pkg/kv/kvnemesis/operations.proto

+7
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ message BarrierOperation {
9797
Result result = 4 [(gogoproto.nullable) = false];
9898
}
9999

100+
message FlushLockTableOperation {
101+
bytes key = 1;
102+
bytes end_key = 2;
103+
Result result = 3 [(gogoproto.nullable) = false];
104+
}
105+
100106
message SplitOperation {
101107
bytes key = 1;
102108
Result result = 2 [(gogoproto.nullable) = false];
@@ -182,6 +188,7 @@ message Operation {
182188
SavepointReleaseOperation savepoint_release = 21;
183189
SavepointRollbackOperation savepoint_rollback = 22;
184190
BarrierOperation barrier = 23;
191+
FlushLockTableOperation flush_lock_table = 24;
185192
}
186193

187194
enum ResultType {

pkg/kv/kvnemesis/validator.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -733,8 +733,19 @@ func (v *validator) processOp(op Operation) {
733733
// Fail or retry on other errors, depending on type.
734734
v.checkNonAmbError(op, t.Result, exceptUnhandledRetry)
735735
}
736-
// We don't yet actually check the barrier guarantees here, i.e. that all
737-
// concurrent writes are applied by the time it completes. Maybe later.
736+
// We don't yet actually check the barrier guarantees here, i.e. that all
737+
// concurrent writes are applied by the time it completes. Maybe later.
738+
case *FlushLockTableOperation:
739+
// TODO(ssd): Should this be true? Right now it needs
740+
// to be or the test fails; but shoujld we be
741+
// returning a timestamp?
742+
execTimestampStrictlyOptional = true
743+
if resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) {
744+
// FlushLockTableOperation may race with a split.
745+
} else {
746+
// Fail or retry on other errors, depending on type.
747+
v.checkNonAmbError(op, t.Result, exceptUnhandledRetry)
748+
}
738749
case *ScanOperation:
739750
if _, isErr := v.checkError(op, t.Result); isErr {
740751
break

0 commit comments

Comments
 (0)