Skip to content

Commit d2a4cf5

Browse files
committed
sqlstats,insights: remove sessionID param in Observe* fn signatures
This parameter is redundant as we should read the session id from the provided stats object. Epic: none Release note: None
1 parent 4bc4946 commit d2a4cf5

File tree

6 files changed

+69
-43
lines changed

6 files changed

+69
-43
lines changed

pkg/sql/sqlstats/insights/ingester.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ var eventBufferPool = sync.Pool{
6868
New: func() interface{} { return new(eventBuffer) },
6969
}
7070

71+
// event is a single event that can be observed by the ingester.
72+
// At most one of transaction or statement will be non-nil.
7173
type event struct {
7274
sessionID clusterunique.ID
7375
transaction *sqlstats.RecordedTxnStats
@@ -157,7 +159,7 @@ func (i *ConcurrentBufferIngester) ingest(events *eventBuffer) {
157159
break
158160
}
159161
if e.statement != nil {
160-
i.registry.ObserveStatement(e.sessionID, e.statement)
162+
i.registry.ObserveStatement(e.statement)
161163
} else if e.transaction != nil {
162164
i.registry.ObserveTransaction(e.sessionID, e.transaction)
163165
} else if e.sessionID != (clusterunique.ID{}) {
@@ -167,41 +169,36 @@ func (i *ConcurrentBufferIngester) ingest(events *eventBuffer) {
167169
}
168170
}
169171

170-
func (i *ConcurrentBufferIngester) ObserveStatement(
171-
sessionID clusterunique.ID, statement *sqlstats.RecordedStmtStats,
172-
) {
172+
func (i *ConcurrentBufferIngester) ObserveStatement(statement *sqlstats.RecordedStmtStats) {
173173
if !i.registry.enabled() {
174174
return
175175
}
176176

177177
if i.testingKnobs != nil && i.testingKnobs.InsightsWriterStmtInterceptor != nil {
178-
i.testingKnobs.InsightsWriterStmtInterceptor(sessionID, statement)
178+
i.testingKnobs.InsightsWriterStmtInterceptor(statement.SessionID, statement)
179179
return
180180
}
181181

182182
i.guard.AtomicWrite(func(writerIdx int64) {
183183
i.guard.eventBuffer[writerIdx] = event{
184-
sessionID: sessionID,
185184
statement: statement,
186185
}
187186
})
188187
}
189188

190-
func (i *ConcurrentBufferIngester) ObserveTransaction(
191-
sessionID clusterunique.ID, transaction *sqlstats.RecordedTxnStats,
192-
) {
189+
func (i *ConcurrentBufferIngester) ObserveTransaction(transaction *sqlstats.RecordedTxnStats) {
193190
if !i.registry.enabled() {
194191
return
195192
}
196193

197194
if i.testingKnobs != nil && i.testingKnobs.InsightsWriterTxnInterceptor != nil {
198-
i.testingKnobs.InsightsWriterTxnInterceptor(sessionID, transaction)
195+
i.testingKnobs.InsightsWriterTxnInterceptor(transaction.SessionID, transaction)
199196
return
200197
}
201198

202199
i.guard.AtomicWrite(func(writerIdx int64) {
203200
i.guard.eventBuffer[writerIdx] = event{
204-
sessionID: sessionID,
201+
sessionID: transaction.SessionID,
205202
transaction: transaction,
206203
}
207204
})

pkg/sql/sqlstats/insights/ingester_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ func TestIngester(t *testing.T) {
8181
ingester.Start(ctx, stopper, WithFlushInterval(10))
8282
for _, e := range tc.observations {
8383
if e.statementID != 0 {
84-
ingester.ObserveStatement(e.SessionID(), &sqlstats.RecordedStmtStats{StatementID: e.StatementID()})
84+
ingester.ObserveStatement(&sqlstats.RecordedStmtStats{SessionID: e.SessionID(), StatementID: e.StatementID()})
8585
} else {
86-
ingester.ObserveTransaction(e.SessionID(), &sqlstats.RecordedTxnStats{TransactionID: e.TransactionID()})
86+
ingester.ObserveTransaction(&sqlstats.RecordedTxnStats{SessionID: e.SessionID(), TransactionID: e.TransactionID()})
8787
}
8888
}
8989

@@ -145,9 +145,9 @@ func TestIngester_Clear(t *testing.T) {
145145
}
146146
for _, o := range ingesterObservations {
147147
if o.statementID != 0 {
148-
ingester.ObserveStatement(o.SessionID(), &sqlstats.RecordedStmtStats{StatementID: o.StatementID()})
148+
ingester.ObserveStatement(&sqlstats.RecordedStmtStats{SessionID: o.SessionID(), StatementID: o.StatementID()})
149149
} else {
150-
ingester.ObserveTransaction(o.SessionID(), &sqlstats.RecordedTxnStats{TransactionID: o.TransactionID()})
150+
ingester.ObserveTransaction(&sqlstats.RecordedTxnStats{SessionID: o.SessionID(), TransactionID: o.TransactionID()})
151151
}
152152
}
153153
empty := event{}
@@ -178,8 +178,8 @@ func TestIngester_Disabled(t *testing.T) {
178178
st := cluster.MakeTestingClusterSettings()
179179

180180
ingester := newConcurrentBufferIngester(newRegistry(st, &fakeDetector{}, newStore(st), nil))
181-
ingester.ObserveStatement(clusterunique.ID{}, &sqlstats.RecordedStmtStats{})
182-
ingester.ObserveTransaction(clusterunique.ID{}, &sqlstats.RecordedTxnStats{})
181+
ingester.ObserveStatement(&sqlstats.RecordedStmtStats{})
182+
ingester.ObserveTransaction(&sqlstats.RecordedTxnStats{})
183183
require.Equal(t, event{}, ingester.guard.eventBuffer[0])
184184
}
185185

@@ -211,7 +211,7 @@ func TestIngester_DoesNotBlockWhenReceivingManyObservationsAfterShutdown(t *test
211211
// twice. With no consumer of the channel running and no safeguards in
212212
// place, this operation would block, which would be bad.
213213
for i := 0; i < 2*bufferSize+1; i++ {
214-
ingester.ObserveStatement(clusterunique.ID{}, &sqlstats.RecordedStmtStats{})
214+
ingester.ObserveStatement(&sqlstats.RecordedStmtStats{})
215215
}
216216
done <- struct{}{}
217217
}()

pkg/sql/sqlstats/insights/insights_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ func BenchmarkInsights(b *testing.B) {
5555
statements := make([]sqlstats.RecordedStmtStats, b.N)
5656
transactions := make([]sqlstats.RecordedTxnStats, b.N)
5757
for i := 0; i < numSessions; i++ {
58+
sessionID := clusterunique.ID{Uint128: uint128.FromInts(rand.Uint64(), uint64(i))}
5859
for j := 0; j < numTransactionsPerSession; j++ {
5960
statements[numTransactionsPerSession*i+j] = sqlstats.RecordedStmtStats{
61+
SessionID: sessionID,
6062
// Spread across 6 different statement fingerprints.
6163
FingerprintID: appstatspb.StmtFingerprintID(j % 6),
6264
// Choose latencies in 20ms, 40ms, 60ms, 80ms, 100ms, 120ms, 140ms.
@@ -65,17 +67,19 @@ func BenchmarkInsights(b *testing.B) {
6567
ServiceLatencySec: float64(j%7+1) * 0.02,
6668
}
6769
}
70+
transactions[i] = sqlstats.RecordedTxnStats{
71+
SessionID: sessionID,
72+
}
6873
}
6974

7075
b.ResetTimer()
7176
for i := 0; i < numSessions; i++ {
72-
sessionID := clusterunique.ID{Uint128: uint128.FromInts(rand.Uint64(), uint64(i))}
7377
go func(i int) {
7478
defer sessions.Done()
7579
for j := 0; j < numTransactionsPerSession; j++ {
7680
idx := numTransactionsPerSession*i + j
77-
writer.ObserveStatement(sessionID, &statements[idx])
78-
writer.ObserveTransaction(sessionID, &transactions[idx])
81+
writer.ObserveStatement(&statements[idx])
82+
writer.ObserveTransaction(&transactions[idx])
7983
}
8084
}(i)
8185
}

pkg/sql/sqlstats/insights/registry.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,14 @@ func (r *lockingRegistry) Clear() {
3131
r.statements = make(map[clusterunique.ID]*statementBuf)
3232
}
3333

34-
func (r *lockingRegistry) ObserveStatement(
35-
sessionID clusterunique.ID, statement *sqlstats.RecordedStmtStats,
36-
) {
34+
func (r *lockingRegistry) ObserveStatement(statement *sqlstats.RecordedStmtStats) {
3735
if !r.enabled() {
3836
return
3937
}
40-
b, ok := r.statements[sessionID]
38+
b, ok := r.statements[statement.SessionID]
4139
if !ok {
4240
b = statementsBufPool.Get().(*statementBuf)
43-
r.statements[sessionID] = b
41+
r.statements[statement.SessionID] = b
4442
}
4543
b.append(statement)
4644
}

pkg/sql/sqlstats/insights/registry_test.go

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func TestRegistry(t *testing.T) {
5353
},
5454
stmts: []*sqlstats.RecordedStmtStats{
5555
{
56+
SessionID: session.ID,
5657
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
5758
FingerprintID: appstatspb.StmtFingerprintID(100),
5859
ServiceLatencySec: 2,
@@ -72,7 +73,7 @@ func TestRegistry(t *testing.T) {
7273
store := newStore(st)
7374
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
7475

75-
registry.ObserveStatement(session.ID, txns[0].stmts[0])
76+
registry.ObserveStatement(txns[0].stmts[0])
7677
registry.ObserveTransaction(session.ID, txns[0].txn)
7778

7879
expected := []*Insight{{
@@ -97,6 +98,7 @@ func TestRegistry(t *testing.T) {
9798
// when the transaction does not have this information.
9899
txn := &sqlstats.RecordedTxnStats{TransactionID: uuid.MakeV4(), Committed: false}
99100
stmt := &sqlstats.RecordedStmtStats{
101+
SessionID: session.ID,
100102
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
101103
FingerprintID: appstatspb.StmtFingerprintID(100),
102104
ServiceLatencySec: 2,
@@ -123,7 +125,7 @@ func TestRegistry(t *testing.T) {
123125
LatencyThreshold.Override(ctx, &st.SV, 1*time.Second)
124126
store := newStore(st)
125127
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
126-
registry.ObserveStatement(session.ID, stmt)
128+
registry.ObserveStatement(stmt)
127129
// Transaction status is set during transaction stats recorded based on
128130
// if the transaction committed. We'll inject the failure here to align
129131
// it with the test. The insights integration tests will verify that this
@@ -152,6 +154,7 @@ func TestRegistry(t *testing.T) {
152154
t.Run("disabled", func(t *testing.T) {
153155
transaction := &sqlstats.RecordedTxnStats{TransactionID: uuid.MakeV4(), Committed: true}
154156
statement := &sqlstats.RecordedStmtStats{
157+
SessionID: session.ID,
155158
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
156159
FingerprintID: appstatspb.StmtFingerprintID(100),
157160
ServiceLatencySec: 2,
@@ -160,7 +163,7 @@ func TestRegistry(t *testing.T) {
160163
LatencyThreshold.Override(ctx, &st.SV, 0)
161164
store := newStore(st)
162165
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
163-
registry.ObserveStatement(session.ID, statement)
166+
registry.ObserveStatement(statement)
164167
registry.ObserveTransaction(session.ID, transaction)
165168

166169
var actual []*Insight
@@ -178,13 +181,14 @@ func TestRegistry(t *testing.T) {
178181
st := cluster.MakeTestingClusterSettings()
179182
LatencyThreshold.Override(ctx, &st.SV, 1*time.Second)
180183
stmt := &sqlstats.RecordedStmtStats{
184+
SessionID: session.ID,
181185
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
182186
FingerprintID: appstatspb.StmtFingerprintID(100),
183187
ServiceLatencySec: 0.5,
184188
}
185189
store := newStore(st)
186190
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
187-
registry.ObserveStatement(session.ID, stmt)
191+
registry.ObserveStatement(stmt)
188192
registry.ObserveTransaction(session.ID, transaction)
189193

190194
var actual []*Insight
@@ -210,6 +214,7 @@ func TestRegistry(t *testing.T) {
210214
},
211215
stmts: []*sqlstats.RecordedStmtStats{
212216
{
217+
SessionID: session.ID,
213218
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
214219
FingerprintID: appstatspb.StmtFingerprintID(100),
215220
ServiceLatencySec: 2,
@@ -224,6 +229,7 @@ func TestRegistry(t *testing.T) {
224229
},
225230
stmts: []*sqlstats.RecordedStmtStats{
226231
{
232+
SessionID: otherSession.ID,
227233
StatementID: clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")),
228234
FingerprintID: appstatspb.StmtFingerprintID(101),
229235
ServiceLatencySec: 3,
@@ -265,7 +271,7 @@ func TestRegistry(t *testing.T) {
265271

266272
for _, txn := range txns {
267273
for _, stmt := range txn.stmts {
268-
registry.ObserveStatement(txn.sessionID, stmt)
274+
registry.ObserveStatement(stmt)
269275
}
270276
registry.ObserveTransaction(txn.sessionID, txn.txn)
271277
}
@@ -289,11 +295,13 @@ func TestRegistry(t *testing.T) {
289295
t.Run("sibling statements without problems", func(t *testing.T) {
290296
transaction := &sqlstats.RecordedTxnStats{TransactionID: uuid.MakeV4(), Committed: true}
291297
statement := &sqlstats.RecordedStmtStats{
298+
SessionID: session.ID,
292299
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
293300
FingerprintID: appstatspb.StmtFingerprintID(100),
294301
ServiceLatencySec: 2,
295302
}
296303
siblingStatement := &sqlstats.RecordedStmtStats{
304+
SessionID: session.ID,
297305
StatementID: clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")),
298306
FingerprintID: appstatspb.StmtFingerprintID(101),
299307
}
@@ -322,8 +330,8 @@ func TestRegistry(t *testing.T) {
322330
LatencyThreshold.Override(ctx, &st.SV, 1*time.Second)
323331
store := newStore(st)
324332
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
325-
registry.ObserveStatement(session.ID, statement)
326-
registry.ObserveStatement(session.ID, siblingStatement)
333+
registry.ObserveStatement(statement)
334+
registry.ObserveStatement(siblingStatement)
327335
registry.ObserveTransaction(session.ID, transaction)
328336

329337
var actual []*Insight
@@ -350,6 +358,7 @@ func TestRegistry(t *testing.T) {
350358
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
351359
contentionDuration := 10 * time.Second
352360
statement := &sqlstats.RecordedStmtStats{
361+
SessionID: session.ID,
353362
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
354363
FingerprintID: appstatspb.StmtFingerprintID(100),
355364
ServiceLatencySec: 0.00001,
@@ -362,7 +371,7 @@ func TestRegistry(t *testing.T) {
362371
},
363372
}
364373

365-
registry.ObserveStatement(session.ID, statement)
374+
registry.ObserveStatement(statement)
366375
registry.ObserveTransaction(session.ID, txnHighContention)
367376

368377
expected := []*Insight{
@@ -398,18 +407,21 @@ func TestRegistry(t *testing.T) {
398407
stmts := []*sqlstats.RecordedStmtStats{
399408
// copy the statement objects below:
400409
{
410+
SessionID: session.ID,
401411
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
402412
FingerprintID: appstatspb.StmtFingerprintID(100),
403413
ServiceLatencySec: 2,
404414
Query: "SELECT * FROM users",
405415
},
406416
{
417+
SessionID: session.ID,
407418
StatementID: clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")),
408419
FingerprintID: appstatspb.StmtFingerprintID(101),
409420
ServiceLatencySec: 2,
410421
Query: "SET vectorize = '_'",
411422
},
412423
{
424+
SessionID: session.ID,
413425
StatementID: clusterunique.IDFromBytes([]byte("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee")),
414426
FingerprintID: appstatspb.StmtFingerprintID(102),
415427
ServiceLatencySec: 2,
@@ -442,7 +454,7 @@ func TestRegistry(t *testing.T) {
442454
store := newStore(st)
443455
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
444456
for _, s := range stmts {
445-
registry.ObserveStatement(session.ID, s)
457+
registry.ObserveStatement(s)
446458
}
447459
registry.ObserveTransaction(session.ID, transaction)
448460

@@ -481,14 +493,21 @@ func TestInsightsRegistry_Clear(t *testing.T) {
481493
// Create some test data.
482494
sessionA := Session{ID: clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))}
483495
sessionB := Session{ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"))}
484-
statement := &sqlstats.RecordedStmtStats{
496+
statementA := &sqlstats.RecordedStmtStats{
497+
SessionID: sessionA.ID,
498+
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
499+
FingerprintID: appstatspb.StmtFingerprintID(100),
500+
ServiceLatencySec: 2,
501+
}
502+
statementB := &sqlstats.RecordedStmtStats{
503+
SessionID: sessionB.ID,
485504
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
486505
FingerprintID: appstatspb.StmtFingerprintID(100),
487506
ServiceLatencySec: 2,
488507
}
489508
// Record the test data, assert it's cached.
490-
registry.ObserveStatement(sessionA.ID, statement)
491-
registry.ObserveStatement(sessionB.ID, statement)
509+
registry.ObserveStatement(statementA)
510+
registry.ObserveStatement(statementB)
492511
expLenStmts := 2
493512
// No need to acquire the lock here, as the registry is not attached to anything.
494513
require.Len(t, registry.statements, expLenStmts)
@@ -510,16 +529,24 @@ func TestInsightsRegistry_ClearSession(t *testing.T) {
510529
// Create some test data.
511530
sessionA := Session{ID: clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))}
512531
sessionB := Session{ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"))}
513-
statement := &sqlstats.RecordedStmtStats{
532+
statementA := &sqlstats.RecordedStmtStats{
533+
Failed: false,
534+
SessionID: sessionA.ID,
535+
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
536+
FingerprintID: appstatspb.StmtFingerprintID(100),
537+
ServiceLatencySec: 2,
538+
}
539+
statementB := &sqlstats.RecordedStmtStats{
514540
Failed: false,
541+
SessionID: sessionB.ID,
515542
StatementID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
516543
FingerprintID: appstatspb.StmtFingerprintID(100),
517544
ServiceLatencySec: 2,
518545
}
519546

520547
// Record the test data, assert it's cached.
521-
registry.ObserveStatement(sessionA.ID, statement)
522-
registry.ObserveStatement(sessionB.ID, statement)
548+
registry.ObserveStatement(statementA)
549+
registry.ObserveStatement(statementB)
523550
// No need to acquire the lock here, as the registry is not attached to anything.
524551
require.Len(t, registry.statements, 2)
525552

pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (s *StatsCollector) RecordStatement(
221221
ctx context.Context, value *sqlstats.RecordedStmtStats,
222222
) error {
223223
if s.sendInsights && s.insightsWriter != nil {
224-
s.insightsWriter.ObserveStatement(value.SessionID, value)
224+
s.insightsWriter.ObserveStatement(value)
225225
}
226226

227227
// TODO(xinhaoz): This isn't the best place to set this, but we'll clean this up
@@ -243,7 +243,7 @@ func (s *StatsCollector) RecordTransaction(
243243
ctx context.Context, value *sqlstats.RecordedTxnStats,
244244
) error {
245245
if s.sendInsights && s.insightsWriter != nil {
246-
s.insightsWriter.ObserveTransaction(value.SessionID, value)
246+
s.insightsWriter.ObserveTransaction(value)
247247
}
248248

249249
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.

0 commit comments

Comments
 (0)