Skip to content

Commit 695432f

Browse files
committed
sqlstats,insights: adjust testing and move insights knobs
Now that the sql exec stats ingester lives in the sqlstats package we move the relevant testing knobs from the insights package to the sql stats knobs. This commit also adds the `OnIngesterFlush` testing knob to synchronously test the ingester flush. Epic: none Release note: None
1 parent fbe6dd9 commit 695432f

16 files changed

+143
-124
lines changed

pkg/base/testing_knobs.go

-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ type TestingKnobs struct {
5454
KeyVisualizer ModuleTestingKnobs
5555
TenantCapabilitiesTestingKnobs ModuleTestingKnobs
5656
TableStatsKnobs ModuleTestingKnobs
57-
Insights ModuleTestingKnobs
5857
TableMetadata ModuleTestingKnobs
5958
LicenseTestingKnobs ModuleTestingKnobs
6059
VecIndexTestingKnobs ModuleTestingKnobs

pkg/server/server_sql.go

-5
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ import (
105105
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
106106
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
107107
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
108-
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights"
109108
"github.com/cockroachdb/cockroach/pkg/sql/stats"
110109
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
111110
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache"
@@ -1144,10 +1143,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
11441143
execCfg.ExternalConnectionTestingKnobs = externalConnKnobs.(*externalconn.TestingKnobs)
11451144
}
11461145

1147-
if insightsKnobs := cfg.TestingKnobs.Insights; insightsKnobs != nil {
1148-
execCfg.InsightsTestingKnobs = insightsKnobs.(*insights.TestingKnobs)
1149-
1150-
}
11511146
var tableStatsTestingKnobs *stats.TableStatsTestingKnobs
11521147
if tableStatsKnobs := cfg.TestingKnobs.TableStatsKnobs; tableStatsKnobs != nil {
11531148
tableStatsTestingKnobs = tableStatsKnobs.(*stats.TableStatsTestingKnobs)

pkg/sql/conn_executor.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ type ServerMetrics struct {
441441
func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
442442
metrics := makeMetrics(false /* internal */, &cfg.Settings.SV)
443443
serverMetrics := makeServerMetrics(cfg)
444-
insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics, cfg.InsightsTestingKnobs)
444+
insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics)
445445
reportedSQLStats := sslocal.New(
446446
cfg.Settings,
447447
sqlstats.MaxMemReportedSQLStatsStmtFingerprints,
@@ -463,7 +463,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
463463
reportedSQLStats,
464464
cfg.SQLStatsTestingKnobs,
465465
)
466-
sqlStatsIngester := sslocal.NewSQLStatsIngester(insightsProvider)
466+
sqlStatsIngester := sslocal.NewSQLStatsIngester(cfg.SQLStatsTestingKnobs, insightsProvider)
467467
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
468468
sqlstats.TxnStatsEnable.SetOnChange(&cfg.Settings.SV, func(_ context.Context) {
469469
if !sqlstats.TxnStatsEnable.Get(&cfg.Settings.SV) {

pkg/sql/exec_util.go

-2
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ import (
9898
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
9999
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
100100
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
101-
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights"
102101
"github.com/cockroachdb/cockroach/pkg/sql/stats"
103102
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
104103
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache"
@@ -1407,7 +1406,6 @@ type ExecutorConfig struct {
14071406
UnusedIndexRecommendationsKnobs *idxusage.UnusedIndexRecommendationTestingKnobs
14081407
ExternalConnectionTestingKnobs *externalconn.TestingKnobs
14091408
EventLogTestingKnobs *EventLogTestingKnobs
1410-
InsightsTestingKnobs *insights.TestingKnobs
14111409
TableMetadataKnobs *tablemetadatacache_util.TestingKnobs
14121410

14131411
// HistogramWindowInterval is (server.Config).HistogramWindowInterval.

pkg/sql/sqlstats/insights/BUILD.bazel

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ go_library(
1212
"provider.go",
1313
"registry.go",
1414
"store.go",
15-
"test_utils.go",
1615
"util.go",
1716
],
1817
embed = [":insights_go_proto"],

pkg/sql/sqlstats/insights/insights.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ type PercentileValues struct {
133133
}
134134

135135
// New builds a new Provider.
136-
func New(st *cluster.Settings, metrics Metrics, knobs *TestingKnobs) *Provider {
136+
func New(st *cluster.Settings, metrics Metrics) *Provider {
137137
store := newStore(st)
138138
anomalyDetector := newAnomalyDetector(st, metrics)
139139

@@ -142,7 +142,7 @@ func New(st *cluster.Settings, metrics Metrics, knobs *TestingKnobs) *Provider {
142142
registry: newRegistry(st, &compositeDetector{detectors: []detector{
143143
&latencyThresholdDetector{st: st},
144144
anomalyDetector,
145-
}}, store, knobs),
145+
}}, store),
146146
anomalyDetector: anomalyDetector,
147147
}
148148
}

pkg/sql/sqlstats/insights/insights_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func BenchmarkInsights(b *testing.B) {
4141
// down, guiding us as we tune buffer sizes, etc.
4242
for _, numSessions := range []int{1, 10, 100, 1000, 10000} {
4343
b.Run(fmt.Sprintf("numSessions=%d", numSessions), func(b *testing.B) {
44-
provider := insights.New(settings, insights.NewMetrics(), nil)
44+
provider := insights.New(settings, insights.NewMetrics())
4545

4646
// Spread the b.N work across the simulated SQL sessions, so that we
4747
// can make apples-to-apples comparisons in the benchmark reports:

pkg/sql/sqlstats/insights/integration/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_test(
1515
"//pkg/sql/clusterunique",
1616
"//pkg/sql/contention",
1717
"//pkg/sql/sessiondata",
18+
"//pkg/sql/sqlstats",
1819
"//pkg/sql/sqlstats/insights",
1920
"//pkg/testutils",
2021
"//pkg/testutils/serverutils",

pkg/sql/sqlstats/insights/integration/insights_test.go

+21-21
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
2727
"github.com/cockroachdb/cockroach/pkg/sql/contention"
2828
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
29+
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
2930
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights"
3031
"github.com/cockroachdb/cockroach/pkg/testutils"
3132
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
@@ -89,7 +90,7 @@ func TestInsightsIntegration(t *testing.T) {
8990
require.NoError(t, err)
9091

9192
// Eventually see one recorded insight.
92-
testutils.SucceedsWithin(t, func() error {
93+
testutils.SucceedsSoon(t, func() error {
9394
row = conn.QueryRowContext(ctx, "SELECT count(*), coalesce(string_agg(query, ';'),'') "+
9495
"FROM crdb_internal.cluster_execution_insights where app_name = $1 ", appName)
9596
if err = row.Scan(&count, &queryText); err != nil {
@@ -99,10 +100,10 @@ func TestInsightsIntegration(t *testing.T) {
99100
return fmt.Errorf("expected 1, but was %d, queryText:%s", count, queryText)
100101
}
101102
return nil
102-
}, 1*time.Second)
103+
})
103104

104105
// Verify the table content is valid.
105-
testutils.SucceedsWithin(t, func() error {
106+
testutils.SucceedsSoon(t, func() error {
106107
row = conn.QueryRowContext(ctx, "SELECT "+
107108
"query, "+
108109
"status, "+
@@ -147,12 +148,12 @@ func TestInsightsIntegration(t *testing.T) {
147148
}
148149

149150
return nil
150-
}, 1*time.Second)
151+
})
151152

152153
// TODO (xzhang) Turn this into a datadriven test
153154
// https://github.com/cockroachdb/cockroach/issues/95010
154155
// Verify the txn table content is valid.
155-
testutils.SucceedsWithin(t, func() error {
156+
testutils.SucceedsSoon(t, func() error {
156157
row = conn.QueryRowContext(ctx, "SELECT "+
157158
"query, "+
158159
"start_time, "+
@@ -194,7 +195,7 @@ func TestInsightsIntegration(t *testing.T) {
194195
}
195196

196197
return nil
197-
}, 1*time.Second)
198+
})
198199
}
199200

200201
func TestFailedInsights(t *testing.T) {
@@ -275,8 +276,7 @@ func TestFailedInsights(t *testing.T) {
275276
_, _ = conn.ExecContext(ctx, tc.stmt)
276277

277278
var query, status, problem, errorCode, errorMsg string
278-
testutils.SucceedsWithin(t, func() error {
279-
279+
testutils.SucceedsSoon(t, func() error {
280280
// Query the node execution insights table.
281281
row := conn.QueryRowContext(ctx, `
282282
SELECT query,
@@ -289,7 +289,7 @@ WHERE query = $1 AND app_name = $2 `,
289289
tc.fingerprint, appName)
290290

291291
return row.Scan(&query, &status, &problem, &errorCode, &errorMsg)
292-
}, 1*time.Second)
292+
})
293293

294294
require.Equal(t, tc.status, status)
295295
require.Equal(t, tc.problem, problem)
@@ -362,7 +362,7 @@ WHERE query = $1 AND app_name = $2 `,
362362
}
363363

364364
var query, problems, status, errorCode, errorMsg string
365-
testutils.SucceedsWithin(t, func() error {
365+
testutils.SucceedsSoon(t, func() error {
366366

367367
// Query the node txn execution insights table.
368368
row := conn.QueryRowContext(ctx, `
@@ -375,7 +375,7 @@ FROM crdb_internal.node_txn_execution_insights
375375
WHERE query = $1 AND app_name = $2`, tc.fingerprint, appName)
376376

377377
return row.Scan(&query, &problems, &status, &errorCode, &errorMsg)
378-
}, 1*time.Second)
378+
})
379379

380380
require.Equal(t, tc.txnStatus, status)
381381
require.Equal(t, tc.errorCode, errorCode)
@@ -587,7 +587,7 @@ func TestInsightsPriorityIntegration(t *testing.T) {
587587
_, err = conn.ExecContext(ctx, "SELECT pg_sleep(.11)")
588588
require.NoError(t, err)
589589

590-
testutils.SucceedsWithin(t, func() error {
590+
testutils.SucceedsSoon(t, func() error {
591591
row := conn.QueryRowContext(ctx, "SELECT "+
592592
"implicit_txn "+
593593
"FROM crdb_internal.node_execution_insights where "+
@@ -604,7 +604,7 @@ func TestInsightsPriorityIntegration(t *testing.T) {
604604
}
605605

606606
return nil
607-
}, 2*time.Second)
607+
})
608608

609609
var priorities = []struct {
610610
setPriorityQuery string
@@ -639,7 +639,7 @@ func TestInsightsPriorityIntegration(t *testing.T) {
639639
}
640640

641641
for _, p := range priorities {
642-
testutils.SucceedsWithin(t, func() error {
642+
testutils.SucceedsSoon(t, func() error {
643643
tx, errTxn := conn.BeginTx(ctx, &gosql.TxOptions{})
644644
require.NoError(t, errTxn)
645645

@@ -654,9 +654,9 @@ func TestInsightsPriorityIntegration(t *testing.T) {
654654
errTxn = tx.Commit()
655655
require.NoError(t, errTxn)
656656
return nil
657-
}, 2*time.Second)
657+
})
658658

659-
testutils.SucceedsWithin(t, func() error {
659+
testutils.SucceedsSoon(t, func() error {
660660
row := conn.QueryRowContext(ctx, "SELECT "+
661661
"query, "+
662662
"priority, "+
@@ -685,7 +685,7 @@ func TestInsightsPriorityIntegration(t *testing.T) {
685685
}
686686

687687
return nil
688-
}, 2*time.Second)
688+
})
689689
}
690690
}
691691

@@ -911,7 +911,7 @@ func TestInsightsIndexRecommendationIntegration(t *testing.T) {
911911
}
912912

913913
// Verify the table content is valid.
914-
testutils.SucceedsWithin(t, func() error {
914+
testutils.SucceedsSoon(t, func() error {
915915
rows, err := sqlConn.QueryContext(ctx, "SELECT "+
916916
"query, "+
917917
"array_to_string(index_recommendations, ';') as cmb_index_recommendations "+
@@ -952,7 +952,7 @@ func TestInsightsIndexRecommendationIntegration(t *testing.T) {
952952
}
953953

954954
return nil
955-
}, 1*time.Second)
955+
})
956956
}
957957

958958
// TestInsightsClearsPerSessionMemory ensures that memory allocated
@@ -966,8 +966,8 @@ func TestInsightsClearsPerSessionMemory(t *testing.T) {
966966
clearedSessionID := clusterunique.ID{}
967967
ts := serverutils.StartServerOnly(t, base.TestServerArgs{
968968
Knobs: base.TestingKnobs{
969-
Insights: &insights.TestingKnobs{
970-
OnSessionClear: func(sessionID clusterunique.ID) {
969+
SQLStatsKnobs: &sqlstats.TestingKnobs{
970+
OnIngesterSessionClear: func(sessionID clusterunique.ID) {
971971
defer close(sessionClosedCh)
972972
clearedSessionID = sessionID
973973
},

pkg/sql/sqlstats/insights/registry.go

+7-11
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ import (
1717
// statement execution to determine which statements are outliers and
1818
// writes insights into the provided sink.
1919
type lockingRegistry struct {
20-
detector detector
21-
causes *causes
22-
store *LockingStore
23-
testingKnobs *TestingKnobs
20+
detector detector
21+
causes *causes
22+
store *LockingStore
2423
}
2524

2625
// Instead of creating and allocating a map to track duplicate
@@ -150,13 +149,10 @@ func (r *lockingRegistry) enabled() bool {
150149
return r.detector.enabled()
151150
}
152151

153-
func newRegistry(
154-
st *cluster.Settings, detector detector, store *LockingStore, knobs *TestingKnobs,
155-
) *lockingRegistry {
152+
func newRegistry(st *cluster.Settings, detector detector, store *LockingStore) *lockingRegistry {
156153
return &lockingRegistry{
157-
detector: detector,
158-
causes: &causes{st: st},
159-
store: store,
160-
testingKnobs: knobs,
154+
detector: detector,
155+
causes: &causes{st: st},
156+
store: store,
161157
}
162158
}

pkg/sql/sqlstats/insights/registry_test.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,9 @@ func TestRegistry(t *testing.T) {
7070
st := cluster.MakeTestingClusterSettings()
7171
LatencyThreshold.Override(ctx, &st.SV, 1*time.Second)
7272
store := newStore(st)
73-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
73+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store)
7474

7575
registry.observeTransaction(txns[0].txn, txns[0].stmts)
76-
t.Log("aaaaaaaaaah ", store.stmtCount.Load())
7776

7877
expected := []*Insight{{
7978
Session: session,
@@ -121,7 +120,7 @@ func TestRegistry(t *testing.T) {
121120
st := cluster.MakeTestingClusterSettings()
122121
LatencyThreshold.Override(ctx, &st.SV, 1*time.Second)
123122
store := newStore(st)
124-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
123+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store)
125124
// Transaction status is set during expectedTxnInsight stats recorded based on
126125
// if the transaction committed. We'll inject the failure here to align
127126
// it with the test. The insights integration tests will verify that this
@@ -156,7 +155,7 @@ func TestRegistry(t *testing.T) {
156155
st := cluster.MakeTestingClusterSettings()
157156
LatencyThreshold.Override(ctx, &st.SV, 0)
158157
store := newStore(st)
159-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
158+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store)
160159
registry.observeTransaction(transaction, []*sqlstats.RecordedStmtStats{statement})
161160

162161
var actual []*Insight
@@ -180,7 +179,7 @@ func TestRegistry(t *testing.T) {
180179
ServiceLatencySec: 0.5,
181180
}
182181
store := newStore(st)
183-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
182+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store)
184183
registry.observeTransaction(transaction, []*sqlstats.RecordedStmtStats{stmt})
185184

186185
var actual []*Insight
@@ -234,7 +233,7 @@ func TestRegistry(t *testing.T) {
234233
st := cluster.MakeTestingClusterSettings()
235234
LatencyThreshold.Override(ctx, &st.SV, 1*time.Second)
236235
store := newStore(st)
237-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
236+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store)
238237

239238
expected := []*Insight{{
240239
Session: session,
@@ -317,7 +316,7 @@ func TestRegistry(t *testing.T) {
317316
st := cluster.MakeTestingClusterSettings()
318317
LatencyThreshold.Override(ctx, &st.SV, 1*time.Second)
319318
store := newStore(st)
320-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
319+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store)
321320

322321
registry.observeTransaction(transaction, []*sqlstats.RecordedStmtStats{
323322
statement, siblingStatement,
@@ -335,14 +334,14 @@ func TestRegistry(t *testing.T) {
335334
t.Run("txn with no stmts", func(t *testing.T) {
336335
transaction := &sqlstats.RecordedTxnStats{TransactionID: uuid.MakeV4(), Committed: true, SessionID: session.ID}
337336
st := cluster.MakeTestingClusterSettings()
338-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, newStore(st), nil)
337+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, newStore(st))
339338
require.NotPanics(t, func() { registry.observeTransaction(transaction, nil) })
340339
})
341340

342341
t.Run("txn with high accumulated contention without high single stmt contention", func(t *testing.T) {
343342
st := cluster.MakeTestingClusterSettings()
344343
store := newStore(st)
345-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
344+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store)
346345
contentionDuration := 10 * time.Second
347346
statement := &sqlstats.RecordedStmtStats{
348347
SessionID: session.ID,
@@ -439,7 +438,7 @@ func TestRegistry(t *testing.T) {
439438
st := cluster.MakeTestingClusterSettings()
440439
LatencyThreshold.Override(ctx, &st.SV, 1*time.Second)
441440
store := newStore(st)
442-
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil)
441+
registry := newRegistry(st, &latencyThresholdDetector{st: st}, store)
443442
registry.observeTransaction(transaction, stmts)
444443

445444
expected := []*Insight{

0 commit comments

Comments
 (0)