Skip to content

Commit 03145fb

Browse files
committed
insights: insights and sqlstats subsystem should consume the same types
Previously, insights consumed its own data type containing per-execution statisitcs about statements and transactions for outlier analysis. These should only be created if we determine that the execution should be cached in our insights registry. SQL stats and insights will now consume the same data types to process and perform their respective recordings: `sqlstats.RecordedStmtStats` and `sqlstats.RecordedTxnStats`, which contain per execution statistics. This is supporting work in moving sql stats recording to to occur in the same async routine as insights. For reviewers: this commit mostly deals with changing logic in the insights package and related testing to perform analysis on `sqlstats.Recorded{Stmt,Txn}Stats` instead of `insights.{Statement,Transaction}` types. We also change the function signatures for the sqlstats containers to accept pointers to `sqlstats.Recorded*` objects instead of by value. Epic: none Part of: cockroachdb#141024 Release note: None
1 parent 879f32a commit 03145fb

20 files changed

+446
-305
lines changed

Diff for: pkg/sql/conn_executor_exec.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -4329,7 +4329,7 @@ func (ex *connExecutor) recordTransactionFinish(
43294329
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
43304330
commitLat := ex.phaseTimes.GetCommitLatency()
43314331

4332-
recordedTxnStats := sqlstats.RecordedTxnStats{
4332+
recordedTxnStats := &sqlstats.RecordedTxnStats{
43334333
FingerprintID: transactionFingerprintID,
43344334
SessionID: ex.planner.extendedEvalCtx.SessionID,
43354335
TransactionID: ev.txnID,
@@ -4373,7 +4373,7 @@ func (ex *connExecutor) recordTransactionFinish(
43734373
ex.planner.logTransaction(ctx,
43744374
int(ex.extraTxnState.txnCounter.Load()),
43754375
transactionFingerprintID,
4376-
&recordedTxnStats,
4376+
recordedTxnStats,
43774377
ex.extraTxnState.telemetrySkippedTxns,
43784378
)
43794379
}

Diff for: pkg/sql/exec_util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1745,7 +1745,7 @@ type ExecutorTestingKnobs struct {
17451745

17461746
// OnRecordTxnFinish, if set, will be called as we record a transaction
17471747
// finishing.
1748-
OnRecordTxnFinish func(isInternal bool, phaseTimes *sessionphase.Times, stmt string, txnStats sqlstats.RecordedTxnStats)
1748+
OnRecordTxnFinish func(isInternal bool, phaseTimes *sessionphase.Times, stmt string, txnStats *sqlstats.RecordedTxnStats)
17491749

17501750
// UseTransactionDescIDGenerator is used to force descriptor ID generation
17511751
// to use a transaction, and, in doing so, more deterministically allocate

Diff for: pkg/sql/executor_statement_metrics.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,18 @@ func (ex *connExecutor) recordStatementSummary(
167167
implicitTxn := flags.IsSet(planFlagImplicitTxn)
168168
stmtFingerprintID := appstatspb.ConstructStatementFingerprintID(
169169
stmt.StmtNoConstants, implicitTxn, planner.SessionData().Database)
170-
recordedStmtStats := sqlstats.RecordedStmtStats{
171-
FingerprintID: stmtFingerprintID,
172-
QuerySummary: stmt.StmtSummary,
173-
DistSQL: flags.ShouldBeDistributed(),
174-
Vec: flags.IsSet(planFlagVectorized),
175-
ImplicitTxn: implicitTxn,
176-
PlanHash: planner.instrumentation.planGist.Hash(),
170+
recordedStmtStats := &sqlstats.RecordedStmtStats{
171+
FingerprintID: stmtFingerprintID,
172+
Query: stmt.StmtNoConstants,
173+
QuerySummary: stmt.StmtSummary,
174+
DistSQL: flags.ShouldBeDistributed(),
175+
Vec: flags.IsSet(planFlagVectorized),
176+
ImplicitTxn: implicitTxn,
177+
FullScan: fullScan,
178+
Database: planner.SessionData().Database,
179+
PlanHash: planner.instrumentation.planGist.Hash(),
180+
181+
// Per-execution metrics.
177182
SessionID: ex.planner.extendedEvalCtx.SessionID,
178183
StatementID: stmt.QueryID,
179184
AutoRetryCount: automaticRetryCount,
@@ -195,18 +200,16 @@ func (ex *connExecutor) recordStatementSummary(
195200
PlanGist: planner.instrumentation.planGist.String(),
196201
StatementError: stmtErr,
197202
IndexRecommendations: idxRecommendations,
198-
Query: stmt.StmtNoConstants,
199203
StartTime: startTime,
200204
EndTime: startTime.Add(svcLatRaw),
201-
FullScan: fullScan,
202205
ExecStats: queryLevelStats,
203206
// TODO(mgartner): Use a slice of struct{uint64, uint64} instead of
204207
// converting to strings.
205-
Indexes: planner.instrumentation.indexesUsed.Strings(),
206-
Database: planner.SessionData().Database,
208+
Indexes: planner.instrumentation.indexesUsed.Strings(),
207209
}
208210

209211
err := ex.statsCollector.RecordStatement(ctx, recordedStmtStats)
212+
210213
if err != nil {
211214
if log.V(1) {
212215
log.Warningf(ctx, "failed to record statement: %s", err)

Diff for: pkg/sql/instrumentation_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,12 @@ func TestSampledStatsCollectionOnNewFingerprint(t *testing.T) {
207207

208208
testApp := `sampling-test`
209209
ctx := context.Background()
210-
var collectedTxnStats []sqlstats.RecordedTxnStats
210+
var collectedTxnStats []*sqlstats.RecordedTxnStats
211211
s := serverutils.StartServerOnly(t, base.TestServerArgs{
212212
Knobs: base.TestingKnobs{
213213
SQLExecutor: &ExecutorTestingKnobs{
214214
DisableProbabilisticSampling: true,
215-
OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, stmt string, txnStats sqlstats.RecordedTxnStats) {
215+
OnRecordTxnFinish: func(isInternal bool, _ *sessionphase.Times, stmt string, txnStats *sqlstats.RecordedTxnStats) {
216216
// We won't run into a race here because we'll only observe
217217
// txns from a single connection.
218218
if txnStats.Application == testApp {

Diff for: pkg/sql/sqlstats/insights/BUILD.bazel

+4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ go_test(
5454
"//pkg/settings/cluster",
5555
"//pkg/sql/appstatspb",
5656
"//pkg/sql/clusterunique",
57+
"//pkg/sql/execstats",
58+
"//pkg/sql/pgwire/pgcode",
59+
"//pkg/sql/pgwire/pgerror",
60+
"//pkg/sql/sqlstats",
5761
"//pkg/util/leaktest",
5862
"//pkg/util/log",
5963
"//pkg/util/stop",

Diff for: pkg/sql/sqlstats/insights/detector.go

+19-20
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ import (
1111

1212
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1313
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
1415
"github.com/cockroachdb/cockroach/pkg/util/quantile"
1516
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
1617
)
1718

1819
type detector interface {
1920
enabled() bool
20-
isSlow(*Statement) bool
21+
isSlow(stats *sqlstats.RecordedStmtStats) bool
2122
}
2223

2324
var _ detector = &compositeDetector{}
@@ -37,7 +38,7 @@ func (d *compositeDetector) enabled() bool {
3738
return false
3839
}
3940

40-
func (d *compositeDetector) isSlow(statement *Statement) bool {
41+
func (d *compositeDetector) isSlow(statement *sqlstats.RecordedStmtStats) bool {
4142
// Because some detectors may need to observe all statements to build up
4243
// their baseline sense of what "normal" is, we avoid short-circuiting.
4344
result := false
@@ -69,18 +70,18 @@ func (d *AnomalyDetector) enabled() bool {
6970
return AnomalyDetectionEnabled.Get(&d.settings.SV)
7071
}
7172

72-
func (d *AnomalyDetector) isSlow(stmt *Statement) (decision bool) {
73+
func (d *AnomalyDetector) isSlow(stmt *sqlstats.RecordedStmtStats) (decision bool) {
7374
if !d.enabled() {
7475
return
7576
}
7677

77-
d.withFingerprintLatencySummary(stmt, func(latencySummary *quantile.Stream) {
78-
latencySummary.Insert(stmt.LatencyInSeconds)
78+
d.withFingerprintLatencySummary(stmt.FingerprintID, stmt.ServiceLatencySec, func(latencySummary *quantile.Stream) {
79+
latencySummary.Insert(stmt.ServiceLatencySec)
7980
p50 := latencySummary.Query(0.5, true)
8081
p99 := latencySummary.Query(0.99, true)
81-
decision = stmt.LatencyInSeconds >= p99 &&
82-
stmt.LatencyInSeconds >= 2*p50 &&
83-
stmt.LatencyInSeconds >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds()
82+
decision = stmt.ServiceLatencySec >= p99 &&
83+
stmt.ServiceLatencySec >= 2*p50 &&
84+
stmt.ServiceLatencySec >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds()
8485
})
8586

8687
return
@@ -104,21 +105,23 @@ func (d *AnomalyDetector) GetPercentileValues(id appstatspb.StmtFingerprintID) P
104105
}
105106

106107
func (d *AnomalyDetector) withFingerprintLatencySummary(
107-
stmt *Statement, consumer func(latencySummary *quantile.Stream),
108+
stmtFingerprintID appstatspb.StmtFingerprintID,
109+
latencySecs float64,
110+
consumer func(latencySummary *quantile.Stream),
108111
) {
109112
d.mu.Lock()
110113
defer d.mu.Unlock()
111114
var latencySummary *quantile.Stream
112115

113-
if element, ok := d.mu.index[stmt.FingerprintID]; ok {
116+
if element, ok := d.mu.index[stmtFingerprintID]; ok {
114117
// We are already tracking latencies for this fingerprint.
115118
latencySummary = element.Value.(latencySummaryEntry).value
116119
d.store.MoveToFront(element) // Mark this latency summary as recently used.
117-
} else if stmt.LatencyInSeconds >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds() {
120+
} else if latencySecs >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds() {
118121
// We want to start tracking latencies for this fingerprint.
119122
latencySummary = quantile.NewTargeted(desiredQuantiles)
120-
entry := latencySummaryEntry{key: stmt.FingerprintID, value: latencySummary}
121-
d.mu.index[stmt.FingerprintID] = d.store.PushFront(entry)
123+
entry := latencySummaryEntry{key: stmtFingerprintID, value: latencySummary}
124+
d.mu.index[stmtFingerprintID] = d.store.PushFront(entry)
122125
d.metrics.Fingerprints.Inc(1)
123126
d.metrics.Memory.Inc(latencySummary.ByteSize())
124127
} else {
@@ -160,18 +163,14 @@ func (d *latencyThresholdDetector) enabled() bool {
160163
return LatencyThreshold.Get(&d.st.SV) > 0
161164
}
162165

163-
func (d *latencyThresholdDetector) isSlow(s *Statement) bool {
164-
return d.enabled() && s.LatencyInSeconds >= LatencyThreshold.Get(&d.st.SV).Seconds()
165-
}
166-
167-
func isFailed(s *Statement) bool {
168-
return s.Status == Statement_Failed
166+
func (d *latencyThresholdDetector) isSlow(s *sqlstats.RecordedStmtStats) bool {
167+
return d.enabled() && s.ServiceLatencySec >= LatencyThreshold.Get(&d.st.SV).Seconds()
169168
}
170169

171170
var prefixesToIgnore = []string{"SET ", "EXPLAIN "}
172171

173172
// shouldIgnoreStatement returns true if we don't want to analyze the statement.
174-
func shouldIgnoreStatement(s *Statement) bool {
173+
func shouldIgnoreStatement(s *sqlstats.RecordedStmtStats) bool {
175174
for _, start := range prefixesToIgnore {
176175
if strings.HasPrefix(s.Query, start) {
177176
return true

Diff for: pkg/sql/sqlstats/insights/detector_test.go

+27-27
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1515
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
16+
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
1617
"github.com/stretchr/testify/require"
1718
)
1819

@@ -34,17 +35,17 @@ func TestAnyDetector(t *testing.T) {
3435

3536
t.Run("isSlow is false without any detectors", func(t *testing.T) {
3637
detector := &compositeDetector{}
37-
require.False(t, detector.isSlow(&Statement{}))
38+
require.False(t, detector.isSlow(&sqlstats.RecordedStmtStats{}))
3839
})
3940

4041
t.Run("isSlow is false without any concerned detectors", func(t *testing.T) {
4142
detector := &compositeDetector{[]detector{&fakeDetector{}, &fakeDetector{}}}
42-
require.False(t, detector.isSlow(&Statement{}))
43+
require.False(t, detector.isSlow(&sqlstats.RecordedStmtStats{}))
4344
})
4445

4546
t.Run("isSlow is true with at least one concerned detector", func(t *testing.T) {
4647
detector := &compositeDetector{[]detector{&fakeDetector{stubIsSlow: true}, &fakeDetector{}}}
47-
require.True(t, detector.isSlow(&Statement{}))
48+
require.True(t, detector.isSlow(&sqlstats.RecordedStmtStats{}))
4849
})
4950

5051
t.Run("isSlow consults all detectors without short-circuiting", func(t *testing.T) {
@@ -55,7 +56,7 @@ func TestAnyDetector(t *testing.T) {
5556
d2 := &fakeDetector{stubIsSlow: true}
5657

5758
detector := &compositeDetector{[]detector{d1, d2}}
58-
detector.isSlow(&Statement{})
59+
detector.isSlow(&sqlstats.RecordedStmtStats{})
5960
require.True(t, d1.isSlowCalled, "the first detector should be consulted")
6061
require.True(t, d2.isSlowCalled, "the second detector should be consulted")
6162
})
@@ -106,16 +107,18 @@ func TestLatencyQuantileDetector(t *testing.T) {
106107
for _, test := range tests {
107108
t.Run(test.name, func(t *testing.T) {
108109
d := newAnomalyDetector(st, NewMetrics())
110+
stmtWithSeedLatency := &sqlstats.RecordedStmtStats{ServiceLatencySec: test.seedLatency.Seconds()}
109111
for i := 0; i < 1000; i++ {
110-
d.isSlow(&Statement{LatencyInSeconds: test.seedLatency.Seconds()})
112+
d.isSlow(stmtWithSeedLatency)
111113
}
112-
require.Equal(t, test.isSlow, d.isSlow(&Statement{LatencyInSeconds: test.candidateLatency.Seconds()}))
114+
// Now determine if the candidate latency is slow.
115+
require.Equal(t, test.isSlow, d.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: test.candidateLatency.Seconds()}))
113116
})
114117
}
115118
})
116119

117120
// Testing the slow and failure detectors at the same time.
118-
t.Run("isSlow and isFailed", func(t *testing.T) {
121+
t.Run("isSlow with slow and failed statement", func(t *testing.T) {
119122
ctx := context.Background()
120123
st := cluster.MakeTestingClusterSettings()
121124
AnomalyDetectionEnabled.Override(ctx, &st.SV, true)
@@ -127,46 +130,41 @@ func TestLatencyQuantileDetector(t *testing.T) {
127130
candidateLatency time.Duration
128131
status Statement_Status
129132
isSlow bool
130-
isFailed bool
131133
}{{
132134
name: "slow and failed statement",
133135
seedLatency: 100 * time.Millisecond,
134136
candidateLatency: 200 * time.Millisecond,
135137
status: Statement_Failed,
136138
isSlow: true,
137-
isFailed: true,
138139
}, {
139140
name: "slow and non-failed statement",
140141
seedLatency: 100 * time.Millisecond,
141142
candidateLatency: 200 * time.Millisecond,
142143
status: Statement_Completed,
143144
isSlow: true,
144-
isFailed: false,
145145
}, {
146146
name: "fast and non-failed statement",
147147
seedLatency: 100 * time.Millisecond,
148148
candidateLatency: 50 * time.Millisecond,
149149
status: Statement_Completed,
150150
isSlow: false,
151-
isFailed: false,
152151
}, {
153152
name: "fast and failed statement",
154153
seedLatency: 100 * time.Millisecond,
155154
candidateLatency: 50 * time.Millisecond,
156155
status: Statement_Failed,
157156
isSlow: false,
158-
isFailed: true,
159157
}}
160158

161159
for _, test := range tests {
162160
t.Run(test.name, func(t *testing.T) {
163161
d := newAnomalyDetector(st, NewMetrics())
162+
stmtWithSeedLatency := &sqlstats.RecordedStmtStats{ServiceLatencySec: test.seedLatency.Seconds()}
164163
for i := 0; i < 1000; i++ {
165-
d.isSlow(&Statement{LatencyInSeconds: test.seedLatency.Seconds()})
164+
d.isSlow(stmtWithSeedLatency)
166165
}
167-
stmt := &Statement{LatencyInSeconds: test.candidateLatency.Seconds(), Status: test.status}
168-
require.Equal(t, test.isSlow, d.isSlow(stmt))
169-
require.Equal(t, test.isFailed, isFailed(stmt))
166+
// Now determine if the candidate latency is slow.
167+
require.Equal(t, test.isSlow, d.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: test.candidateLatency.Seconds()}))
170168
})
171169
}
172170
})
@@ -223,10 +221,11 @@ func TestLatencyQuantileDetector(t *testing.T) {
223221
d := newAnomalyDetector(st, metrics)
224222
// Show the detector `test.fingerprints` distinct fingerprints.
225223
for i := 0; i < test.fingerprints; i++ {
226-
d.isSlow(&Statement{
227-
LatencyInSeconds: AnomalyDetectionLatencyThreshold.Get(&st.SV).Seconds(),
228-
FingerprintID: appstatspb.StmtFingerprintID(i),
229-
})
224+
stmt := &sqlstats.RecordedStmtStats{
225+
ServiceLatencySec: AnomalyDetectionLatencyThreshold.Get(&st.SV).Seconds(),
226+
FingerprintID: appstatspb.StmtFingerprintID(i),
227+
}
228+
d.isSlow(stmt)
230229
}
231230
test.assertion(t, metrics)
232231
})
@@ -241,10 +240,11 @@ func BenchmarkLatencyQuantileDetector(b *testing.B) {
241240
settings := cluster.MakeTestingClusterSettings()
242241
AnomalyDetectionEnabled.Override(context.Background(), &settings.SV, true)
243242
d := newAnomalyDetector(settings, NewMetrics())
243+
stmt := &sqlstats.RecordedStmtStats{
244+
ServiceLatencySec: random.Float64(),
245+
}
244246
for i := 0; i < b.N; i++ {
245-
d.isSlow(&Statement{
246-
LatencyInSeconds: random.Float64(),
247-
})
247+
d.isSlow(stmt)
248248
}
249249
}
250250

@@ -267,21 +267,21 @@ func TestLatencyThresholdDetector(t *testing.T) {
267267
st := cluster.MakeTestingClusterSettings()
268268
LatencyThreshold.Override(context.Background(), &st.SV, 0)
269269
detector := latencyThresholdDetector{st: st}
270-
require.False(t, detector.isSlow(&Statement{LatencyInSeconds: 1}))
270+
require.False(t, detector.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: 1}))
271271
})
272272

273273
t.Run("isSlow false when fast enough", func(t *testing.T) {
274274
st := cluster.MakeTestingClusterSettings()
275275
LatencyThreshold.Override(context.Background(), &st.SV, 1*time.Second)
276276
detector := latencyThresholdDetector{st: st}
277-
require.False(t, detector.isSlow(&Statement{LatencyInSeconds: 0.5}))
277+
require.False(t, detector.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: 0.5}))
278278
})
279279

280280
t.Run("isSlow true beyond threshold", func(t *testing.T) {
281281
st := cluster.MakeTestingClusterSettings()
282282
LatencyThreshold.Override(context.Background(), &st.SV, 1*time.Second)
283283
detector := latencyThresholdDetector{st: st}
284-
require.True(t, detector.isSlow(&Statement{LatencyInSeconds: 1}))
284+
require.True(t, detector.isSlow(&sqlstats.RecordedStmtStats{ServiceLatencySec: 1}))
285285
})
286286
}
287287

@@ -295,7 +295,7 @@ func (f *fakeDetector) enabled() bool {
295295
return f.stubEnabled
296296
}
297297

298-
func (f *fakeDetector) isSlow(*Statement) bool {
298+
func (f *fakeDetector) isSlow(stats *sqlstats.RecordedStmtStats) bool {
299299
f.isSlowCalled = true
300300
return f.stubIsSlow
301301
}

0 commit comments

Comments
 (0)