Skip to content

Commit b99f5dc

Browse files
committed
insights: insights and sqlstats subsystem should consume the same types
Previously, insights consumed its own data type containing per-execution statisitcs about statements and transactions for outlier analysis. These should only be created if we determine that the execution should be cached in our insights registry. SQL stats and insights will now consume the same data types to process and perform their respective recordings: `sqlstats.RecordedStmtStats` and `sqlstats.RecordedTxnStats`, which contain per execution statistics. This is supporting work in moving sql stats recording to to occur in the same async routine as insights. This commit mostly deals with changing logic in the insights package and related testing to perform analysis on `sqlstats.Recorded{Stmt,Txn}Stats` instead of `insights.{Statement,Transaction}` types. We also change the function signatures for the sqlstats containers to accept pointers to `sqlstats.Recorded*` objects instead of by value. Epic: none Part of: cockroachdb#141024 Release note: None
1 parent 879f32a commit b99f5dc

20 files changed

+437
-295
lines changed

Diff for: pkg/sql/conn_executor_exec.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -4329,7 +4329,7 @@ func (ex *connExecutor) recordTransactionFinish(
43294329
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
43304330
commitLat := ex.phaseTimes.GetCommitLatency()
43314331

4332-
recordedTxnStats := sqlstats.RecordedTxnStats{
4332+
recordedTxnStats := &sqlstats.RecordedTxnStats{
43334333
FingerprintID: transactionFingerprintID,
43344334
SessionID: ex.planner.extendedEvalCtx.SessionID,
43354335
TransactionID: ev.txnID,
@@ -4373,7 +4373,7 @@ func (ex *connExecutor) recordTransactionFinish(
43734373
ex.planner.logTransaction(ctx,
43744374
int(ex.extraTxnState.txnCounter.Load()),
43754375
transactionFingerprintID,
4376-
&recordedTxnStats,
4376+
recordedTxnStats,
43774377
ex.extraTxnState.telemetrySkippedTxns,
43784378
)
43794379
}

Diff for: pkg/sql/exec_util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1745,7 +1745,7 @@ type ExecutorTestingKnobs struct {
17451745

17461746
// OnRecordTxnFinish, if set, will be called as we record a transaction
17471747
// finishing.
1748-
OnRecordTxnFinish func(isInternal bool, phaseTimes *sessionphase.Times, stmt string, txnStats sqlstats.RecordedTxnStats)
1748+
OnRecordTxnFinish func(isInternal bool, phaseTimes *sessionphase.Times, stmt string, txnStats *sqlstats.RecordedTxnStats)
17491749

17501750
// UseTransactionDescIDGenerator is used to force descriptor ID generation
17511751
// to use a transaction, and, in doing so, more deterministically allocate

Diff for: pkg/sql/executor_statement_metrics.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (ex *connExecutor) recordStatementSummary(
167167
implicitTxn := flags.IsSet(planFlagImplicitTxn)
168168
stmtFingerprintID := appstatspb.ConstructStatementFingerprintID(
169169
stmt.StmtNoConstants, implicitTxn, planner.SessionData().Database)
170-
recordedStmtStats := sqlstats.RecordedStmtStats{
170+
recordedStmtStats := &sqlstats.RecordedStmtStats{
171171
FingerprintID: stmtFingerprintID,
172172
QuerySummary: stmt.StmtSummary,
173173
DistSQL: flags.ShouldBeDistributed(),
@@ -207,6 +207,7 @@ func (ex *connExecutor) recordStatementSummary(
207207
}
208208

209209
err := ex.statsCollector.RecordStatement(ctx, recordedStmtStats)
210+
210211
if err != nil {
211212
if log.V(1) {
212213
log.Warningf(ctx, "failed to record statement: %s", err)

Diff for: pkg/sql/instrumentation_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,12 @@ func TestSampledStatsCollectionOnNewFingerprint(t *testing.T) {
207207

208208
testApp := `sampling-test`
209209
ctx := context.Background()
210-
var collectedTxnStats []sqlstats.RecordedTxnStats
210+
var collectedTxnStats []*sqlstats.RecordedTxnStats
211211
s := serverutils.StartServerOnly(t, base.TestServerArgs{
212212
Knobs: base.TestingKnobs{
213213
SQLExecutor: &ExecutorTestingKnobs{
214214
DisableProbabilisticSampling: true,
215-
OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, stmt string, txnStats sqlstats.RecordedTxnStats) {
215+
OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, stmt string, txnStats *sqlstats.RecordedTxnStats) {
216216
// We won't run into a race here because we'll only observe
217217
// txns from a single connection.
218218
if txnStats.Application == testApp {

Diff for: pkg/sql/sqlstats/insights/BUILD.bazel

+4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ go_test(
5454
"//pkg/settings/cluster",
5555
"//pkg/sql/appstatspb",
5656
"//pkg/sql/clusterunique",
57+
"//pkg/sql/execstats",
58+
"//pkg/sql/pgwire/pgcode",
59+
"//pkg/sql/pgwire/pgerror",
60+
"//pkg/sql/sqlstats",
5761
"//pkg/util/leaktest",
5862
"//pkg/util/log",
5963
"//pkg/util/stop",

Diff for: pkg/sql/sqlstats/insights/detector.go

+19-20
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ import (
1111

1212
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1313
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
1415
"github.com/cockroachdb/cockroach/pkg/util/quantile"
1516
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
1617
)
1718

1819
type detector interface {
1920
enabled() bool
20-
isSlow(*Statement) bool
21+
isSlow(stats *sqlstats.RecordedStmtStats) bool
2122
}
2223

2324
var _ detector = &compositeDetector{}
@@ -37,7 +38,7 @@ func (d *compositeDetector) enabled() bool {
3738
return false
3839
}
3940

40-
func (d *compositeDetector) isSlow(statement *Statement) bool {
41+
func (d *compositeDetector) isSlow(statement *sqlstats.RecordedStmtStats) bool {
4142
// Because some detectors may need to observe all statements to build up
4243
// their baseline sense of what "normal" is, we avoid short-circuiting.
4344
result := false
@@ -69,18 +70,18 @@ func (d *AnomalyDetector) enabled() bool {
6970
return AnomalyDetectionEnabled.Get(&d.settings.SV)
7071
}
7172

72-
func (d *AnomalyDetector) isSlow(stmt *Statement) (decision bool) {
73+
func (d *AnomalyDetector) isSlow(stmt *sqlstats.RecordedStmtStats) (decision bool) {
7374
if !d.enabled() {
7475
return
7576
}
7677

77-
d.withFingerprintLatencySummary(stmt, func(latencySummary *quantile.Stream) {
78-
latencySummary.Insert(stmt.LatencyInSeconds)
78+
d.withFingerprintLatencySummary(stmt.FingerprintID, stmt.ServiceLatencySec, func(latencySummary *quantile.Stream) {
79+
latencySummary.Insert(stmt.ServiceLatencySec)
7980
p50 := latencySummary.Query(0.5, true)
8081
p99 := latencySummary.Query(0.99, true)
81-
decision = stmt.LatencyInSeconds >= p99 &&
82-
stmt.LatencyInSeconds >= 2*p50 &&
83-
stmt.LatencyInSeconds >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds()
82+
decision = stmt.ServiceLatencySec >= p99 &&
83+
stmt.ServiceLatencySec >= 2*p50 &&
84+
stmt.ServiceLatencySec >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds()
8485
})
8586

8687
return
@@ -104,21 +105,23 @@ func (d *AnomalyDetector) GetPercentileValues(id appstatspb.StmtFingerprintID) P
104105
}
105106

106107
func (d *AnomalyDetector) withFingerprintLatencySummary(
107-
stmt *Statement, consumer func(latencySummary *quantile.Stream),
108+
stmtFingerprintID appstatspb.StmtFingerprintID,
109+
latencySecs float64,
110+
consumer func(latencySummary *quantile.Stream),
108111
) {
109112
d.mu.Lock()
110113
defer d.mu.Unlock()
111114
var latencySummary *quantile.Stream
112115

113-
if element, ok := d.mu.index[stmt.FingerprintID]; ok {
116+
if element, ok := d.mu.index[stmtFingerprintID]; ok {
114117
// We are already tracking latencies for this fingerprint.
115118
latencySummary = element.Value.(latencySummaryEntry).value
116119
d.store.MoveToFront(element) // Mark this latency summary as recently used.
117-
} else if stmt.LatencyInSeconds >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds() {
120+
} else if latencySecs >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds() {
118121
// We want to start tracking latencies for this fingerprint.
119122
latencySummary = quantile.NewTargeted(desiredQuantiles)
120-
entry := latencySummaryEntry{key: stmt.FingerprintID, value: latencySummary}
121-
d.mu.index[stmt.FingerprintID] = d.store.PushFront(entry)
123+
entry := latencySummaryEntry{key: stmtFingerprintID, value: latencySummary}
124+
d.mu.index[stmtFingerprintID] = d.store.PushFront(entry)
122125
d.metrics.Fingerprints.Inc(1)
123126
d.metrics.Memory.Inc(latencySummary.ByteSize())
124127
} else {
@@ -160,18 +163,14 @@ func (d *latencyThresholdDetector) enabled() bool {
160163
return LatencyThreshold.Get(&d.st.SV) > 0
161164
}
162165

163-
func (d *latencyThresholdDetector) isSlow(s *Statement) bool {
164-
return d.enabled() && s.LatencyInSeconds >= LatencyThreshold.Get(&d.st.SV).Seconds()
165-
}
166-
167-
func isFailed(s *Statement) bool {
168-
return s.Status == Statement_Failed
166+
func (d *latencyThresholdDetector) isSlow(s *sqlstats.RecordedStmtStats) bool {
167+
return d.enabled() && s.ServiceLatencySec >= LatencyThreshold.Get(&d.st.SV).Seconds()
169168
}
170169

171170
var prefixesToIgnore = []string{"SET ", "EXPLAIN "}
172171

173172
// shouldIgnoreStatement returns true if we don't want to analyze the statement.
174-
func shouldIgnoreStatement(s *Statement) bool {
173+
func shouldIgnoreStatement(s *sqlstats.RecordedStmtStats) bool {
175174
for _, start := range prefixesToIgnore {
176175
if strings.HasPrefix(s.Query, start) {
177176
return true

Diff for: pkg/sql/sqlstats/insights/detector_test.go

+27-27
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1515
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
16+
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
1617
"github.com/stretchr/testify/require"
1718
)
1819

@@ -34,17 +35,17 @@ func TestAnyDetector(t *testing.T) {
3435

3536
t.Run("isSlow is false without any detectors", func(t *testing.T) {
3637
detector := &compositeDetector{}
37-
require.False(t, detector.isSlow(&Statement{}))
38+
require.False(t, detector.isSlow(&sqlstats.RecordedStmtStats{}))
3839
})
3940

4041
t.Run("isSlow is false without any concerned detectors", func(t *testing.T) {
4142
detector := &compositeDetector{[]detector{&fakeDetector{}, &fakeDetector{}}}
42-
require.False(t, detector.isSlow(&Statement{}))
43+
require.False(t, detector.isSlow(&sqlstats.RecordedStmtStats{}))
4344
})
4445

4546
t.Run("isSlow is true with at least one concerned detector", func(t *testing.T) {
4647
detector := &compositeDetector{[]detector{&fakeDetector{stubIsSlow: true}, &fakeDetector{}}}
47-
require.True(t, detector.isSlow(&Statement{}))
48+
require.True(t, detector.isSlow(&sqlstats.RecordedStmtStats{}))
4849
})
4950

5051
t.Run("isSlow consults all detectors without short-circuiting", func(t *testing.T) {
@@ -55,7 +56,7 @@ func TestAnyDetector(t *testing.T) {
5556
d2 := &fakeDetector{stubIsSlow: true}
5657

5758
detector := &compositeDetector{[]detector{d1, d2}}
58-
detector.isSlow(&Statement{})
59+
detector.isSlow(&sqlstats.RecordedStmtStats{})
5960
require.True(t, d1.isSlowCalled, "the first detector should be consulted")
6061
require.True(t, d2.isSlowCalled, "the second detector should be consulted")
6162
})
@@ -106,16 +107,18 @@ func TestLatencyQuantileDetector(t *testing.T) {
106107
for _, test := range tests {
107108
t.Run(test.name, func(t *testing.T) {
108109
d := newAnomalyDetector(st, NewMetrics())
110+
stmtWithSeedLatency := &sqlstats.RecordedStmtStats{ServiceLatencySec: test.seedLatency.Seconds()}
109111
for i := 0; i < 1000; i++ {
110-
d.isSlow(&Statement{LatencyInSeconds: test.seedLatency.Seconds()})
112+
d.isSlow(stmtWithSeedLatency)
111113
}
112-
require.Equal(t, test.isSlow, d.isSlow(&Statement{LatencyInSeconds: test.candidateLatency.Seconds()}))
114+
// Now determine if the candidate latency is slow.
115+
require.Equal(t, test.isSlow, d.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: test.candidateLatency.Seconds()}))
113116
})
114117
}
115118
})
116119

117120
// Testing the slow and failure detectors at the same time.
118-
t.Run("isSlow and isFailed", func(t *testing.T) {
121+
t.Run("isSlow with slow and failed statement", func(t *testing.T) {
119122
ctx := context.Background()
120123
st := cluster.MakeTestingClusterSettings()
121124
AnomalyDetectionEnabled.Override(ctx, &st.SV, true)
@@ -127,46 +130,41 @@ func TestLatencyQuantileDetector(t *testing.T) {
127130
candidateLatency time.Duration
128131
status Statement_Status
129132
isSlow bool
130-
isFailed bool
131133
}{{
132134
name: "slow and failed statement",
133135
seedLatency: 100 * time.Millisecond,
134136
candidateLatency: 200 * time.Millisecond,
135137
status: Statement_Failed,
136138
isSlow: true,
137-
isFailed: true,
138139
}, {
139140
name: "slow and non-failed statement",
140141
seedLatency: 100 * time.Millisecond,
141142
candidateLatency: 200 * time.Millisecond,
142143
status: Statement_Completed,
143144
isSlow: true,
144-
isFailed: false,
145145
}, {
146146
name: "fast and non-failed statement",
147147
seedLatency: 100 * time.Millisecond,
148148
candidateLatency: 50 * time.Millisecond,
149149
status: Statement_Completed,
150150
isSlow: false,
151-
isFailed: false,
152151
}, {
153152
name: "fast and failed statement",
154153
seedLatency: 100 * time.Millisecond,
155154
candidateLatency: 50 * time.Millisecond,
156155
status: Statement_Failed,
157156
isSlow: false,
158-
isFailed: true,
159157
}}
160158

161159
for _, test := range tests {
162160
t.Run(test.name, func(t *testing.T) {
163161
d := newAnomalyDetector(st, NewMetrics())
162+
stmtWithSeedLatency := &sqlstats.RecordedStmtStats{ServiceLatencySec: test.seedLatency.Seconds()}
164163
for i := 0; i < 1000; i++ {
165-
d.isSlow(&Statement{LatencyInSeconds: test.seedLatency.Seconds()})
164+
d.isSlow(stmtWithSeedLatency)
166165
}
167-
stmt := &Statement{LatencyInSeconds: test.candidateLatency.Seconds(), Status: test.status}
168-
require.Equal(t, test.isSlow, d.isSlow(stmt))
169-
require.Equal(t, test.isFailed, isFailed(stmt))
166+
// Now determine if the candidate latency is slow.
167+
require.Equal(t, test.isSlow, d.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: test.candidateLatency.Seconds()}))
170168
})
171169
}
172170
})
@@ -223,10 +221,11 @@ func TestLatencyQuantileDetector(t *testing.T) {
223221
d := newAnomalyDetector(st, metrics)
224222
// Show the detector `test.fingerprints` distinct fingerprints.
225223
for i := 0; i < test.fingerprints; i++ {
226-
d.isSlow(&Statement{
227-
LatencyInSeconds: AnomalyDetectionLatencyThreshold.Get(&st.SV).Seconds(),
228-
FingerprintID: appstatspb.StmtFingerprintID(i),
229-
})
224+
stmt := &sqlstats.RecordedStmtStats{
225+
ServiceLatencySec: AnomalyDetectionLatencyThreshold.Get(&st.SV).Seconds(),
226+
FingerprintID: appstatspb.StmtFingerprintID(i),
227+
}
228+
d.isSlow(stmt)
230229
}
231230
test.assertion(t, metrics)
232231
})
@@ -241,10 +240,11 @@ func BenchmarkLatencyQuantileDetector(b *testing.B) {
241240
settings := cluster.MakeTestingClusterSettings()
242241
AnomalyDetectionEnabled.Override(context.Background(), &settings.SV, true)
243242
d := newAnomalyDetector(settings, NewMetrics())
243+
stmt := &sqlstats.RecordedStmtStats{
244+
ServiceLatencySec: random.Float64(),
245+
}
244246
for i := 0; i < b.N; i++ {
245-
d.isSlow(&Statement{
246-
LatencyInSeconds: random.Float64(),
247-
})
247+
d.isSlow(stmt)
248248
}
249249
}
250250

@@ -267,21 +267,21 @@ func TestLatencyThresholdDetector(t *testing.T) {
267267
st := cluster.MakeTestingClusterSettings()
268268
LatencyThreshold.Override(context.Background(), &st.SV, 0)
269269
detector := latencyThresholdDetector{st: st}
270-
require.False(t, detector.isSlow(&Statement{LatencyInSeconds: 1}))
270+
require.False(t, detector.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: 1}))
271271
})
272272

273273
t.Run("isSlow false when fast enough", func(t *testing.T) {
274274
st := cluster.MakeTestingClusterSettings()
275275
LatencyThreshold.Override(context.Background(), &st.SV, 1*time.Second)
276276
detector := latencyThresholdDetector{st: st}
277-
require.False(t, detector.isSlow(&Statement{LatencyInSeconds: 0.5}))
277+
require.False(t, detector.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: 0.5}))
278278
})
279279

280280
t.Run("isSlow true beyond threshold", func(t *testing.T) {
281281
st := cluster.MakeTestingClusterSettings()
282282
LatencyThreshold.Override(context.Background(), &st.SV, 1*time.Second)
283283
detector := latencyThresholdDetector{st: st}
284-
require.True(t, detector.isSlow(&Statement{LatencyInSeconds: 1}))
284+
require.True(t, detector.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: 1}))
285285
})
286286
}
287287

@@ -295,7 +295,7 @@ func (f *fakeDetector) enabled() bool {
295295
return f.stubEnabled
296296
}
297297

298-
func (f *fakeDetector) isSlow(*Statement) bool {
298+
func (f *fakeDetector) isSlow(stats *sqlstats.RecordedStmtStats) bool {
299299
f.isSlowCalled = true
300300
return f.stubIsSlow
301301
}

Diff for: pkg/sql/sqlstats/insights/ingester.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
1515
"github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils"
16+
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
1617
"github.com/cockroachdb/cockroach/pkg/util/stop"
1718
)
1819

@@ -69,8 +70,8 @@ var eventBufferPool = sync.Pool{
6970

7071
type event struct {
7172
sessionID clusterunique.ID
72-
transaction *Transaction
73-
statement *Statement
73+
transaction *sqlstats.RecordedTxnStats
74+
statement *sqlstats.RecordedStmtStats
7475
}
7576

7677
type BufferOpt func(i *ConcurrentBufferIngester)
@@ -167,7 +168,7 @@ func (i *ConcurrentBufferIngester) ingest(events *eventBuffer) {
167168
}
168169

169170
func (i *ConcurrentBufferIngester) ObserveStatement(
170-
sessionID clusterunique.ID, statement *Statement,
171+
sessionID clusterunique.ID, statement *sqlstats.RecordedStmtStats,
171172
) {
172173
if !i.registry.enabled() {
173174
return
@@ -187,7 +188,7 @@ func (i *ConcurrentBufferIngester) ObserveStatement(
187188
}
188189

189190
func (i *ConcurrentBufferIngester) ObserveTransaction(
190-
sessionID clusterunique.ID, transaction *Transaction,
191+
sessionID clusterunique.ID, transaction *sqlstats.RecordedTxnStats,
191192
) {
192193
if !i.registry.enabled() {
193194
return

0 commit comments

Comments
 (0)