Skip to content

Commit f041b32

Browse files
committed
sqlstats: move sql stats writing to be async
This commit removes the recording of sql stats at the end of statement and transaction execution. The per-node SQLStats container has been added as a sink to the async sql stats ingester. Instead of having sslocal.StatsCollector record sql stats at the end of execution, it will now record the stats asynchronously as part of event processing in sslocal.SQLStatsIngester's ingest routine. Epic: none Closes: cockroachdb#141024 Release note: None
1 parent 615456f commit f041b32

9 files changed

+99
-153
lines changed

Diff for: pkg/sql/conn_executor.go

-11
Original file line numberDiff line numberDiff line change
@@ -1230,7 +1230,6 @@ func (s *Server) newConnExecutor(
12301230
s.sqlStatsIngester,
12311231
ex.phaseTimes,
12321232
s.localSqlStats.GetCounters(),
1233-
underOuterTxn,
12341233
s.cfg.SQLStatsTestingKnobs,
12351234
)
12361235
ex.dataMutatorIterator.onApplicationNameChange = func(newName string) {
@@ -4079,17 +4078,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
40794078
return advanceInfo{}, err
40804079
}
40814080
if advInfo.txnEvent.eventType == txnUpgradeToExplicit {
4082-
// TODO (xinhaoz): This is a temporary hook until
4083-
// https://github.com/cockroachdb/cockroach/issues/141024
4084-
// is resolved. The reason this exists is because we
4085-
// need to recompute the statement fingerprint id for
4086-
// statements currently in the stats collector, which
4087-
// were computed once already for insights. There is an
4088-
// acknowledgement that this means the fingerprint id
4089-
// given to insights for upgraded transactions are
4090-
// currently incorrect.
40914081
ex.extraTxnState.txnFinishClosure.implicit = false
4092-
ex.statsCollector.UpgradeToExplicitTransaction()
40934082
}
40944083
}
40954084
case txnStart:

Diff for: pkg/sql/conn_executor_exec.go

+7-9
Original file line numberDiff line numberDiff line change
@@ -4181,11 +4181,6 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err
41814181
}
41824182
}
41834183

4184-
discardedStats := ex.statsCollector.EndTransaction(ctx, transactionFingerprintID)
4185-
if discardedStats > 0 {
4186-
ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(discardedStats)
4187-
}
4188-
41894184
if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil {
41904185
ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded(
41914186
ex.sessionData(),
@@ -4330,6 +4325,8 @@ func (ex *connExecutor) recordTransactionFinish(
43304325
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
43314326
commitLat := ex.phaseTimes.GetCommitLatency()
43324327

4328+
isInternaleExec := ex.executorType == executorTypeInternal
4329+
43334330
recordedTxnStats := &sqlstats.RecordedTxnStats{
43344331
FingerprintID: transactionFingerprintID,
43354332
SessionID: ex.planner.extendedEvalCtx.SessionID,
@@ -4357,14 +4354,15 @@ func (ex *connExecutor) recordTransactionFinish(
43574354
// TODO(107318): add qos
43584355
// TODO(107318): add asoftime or ishistorical
43594356
// TODO(107318): add readonly
4360-
TxnErr: txnErr,
4361-
Application: ex.applicationName.Load().(string),
4362-
UserNormalized: ex.sessionData().User().Normalized(),
4357+
TxnErr: txnErr,
4358+
Application: ex.applicationName.Load().(string),
4359+
UserNormalized: ex.sessionData().User().Normalized(),
4360+
InternalExecutor: isInternaleExec,
43634361
}
43644362

43654363
if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil {
43664364
ex.server.cfg.TestingKnobs.OnRecordTxnFinish(
4367-
ex.executorType == executorTypeInternal, ex.phaseTimes, ex.planner.stmt.SQL, recordedTxnStats,
4365+
isInternaleExec, ex.phaseTimes, ex.planner.stmt.SQL, recordedTxnStats,
43684366
)
43694367
}
43704368

Diff for: pkg/sql/executor_statement_metrics.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,9 @@ func (ex *connExecutor) recordStatementSummary(
202202
ExecStats: queryLevelStats,
203203
// TODO(mgartner): Use a slice of struct{uint64, uint64} instead of
204204
// converting to strings.
205-
Indexes: planner.instrumentation.indexesUsed.Strings(),
206-
Database: planner.SessionData().Database,
205+
Indexes: planner.instrumentation.indexesUsed.Strings(),
206+
Database: planner.SessionData().Database,
207+
UnderOuterTxn: ex.extraTxnState.underOuterTxn,
207208
}
208209

209210
err := ex.statsCollector.RecordStatement(ctx, recordedStmtStats)

Diff for: pkg/sql/sqlstats/insights/provider.go

+3
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,8 @@ func (p *Provider) ObserveTransaction(
3535
transactionStats *sqlstats.RecordedTxnStats,
3636
statements []*sqlstats.RecordedStmtStats,
3737
) {
38+
if transactionStats == nil || transactionStats.InternalExecutor {
39+
return
40+
}
3841
p.registry.observeTransaction(transactionStats, statements)
3942
}

Diff for: pkg/sql/sqlstats/sslocal/sql_stats.go

+32
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ type SQLStats struct {
4747
knobs *sqlstats.TestingKnobs
4848
}
4949

50+
var _ SQLStatsSink = &SQLStats{}
51+
5052
func newSQLStats(
5153
st *cluster.Settings,
5254
uniqueStmtFingerprintLimit *settings.IntSetting,
@@ -177,3 +179,33 @@ func (s *SQLStats) MaybeDumpStatsToLog(
177179
func (s *SQLStats) GetClusterSettings() *cluster.Settings {
178180
return s.st
179181
}
182+
183+
// ObserveTransaction implements the sslocal.SQLStatsSink interface.
184+
func (s *SQLStats) ObserveTransaction(
185+
ctx context.Context,
186+
transaction *sqlstats.RecordedTxnStats,
187+
statements []*sqlstats.RecordedStmtStats,
188+
) {
189+
if transaction != nil {
190+
// Write statements.
191+
appStats := s.GetApplicationStats(transaction.Application)
192+
for _, stmt := range statements {
193+
err := appStats.RecordStatement(ctx, stmt)
194+
if err != nil {
195+
// todo
196+
}
197+
}
198+
err := appStats.RecordTransaction(ctx, transaction)
199+
if err != nil {
200+
// todo
201+
}
202+
} else {
203+
for _, stmt := range statements {
204+
appStats := s.GetApplicationStats(stmt.App)
205+
err := appStats.RecordStatement(ctx, stmt)
206+
if err != nil {
207+
// todo
208+
}
209+
}
210+
}
211+
}

Diff for: pkg/sql/sqlstats/sslocal/sql_stats_ingestor.go

+30-11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
1415
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
1516
"github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils"
1617
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
@@ -22,14 +23,15 @@ import (
2223
const defaultFlushInterval = time.Millisecond * 500
2324

2425
type SQLStatsSink interface {
25-
// ObserveTransaction is called by the ingester to pass along a transaction and its statementsBySessionID.
26+
// ObserveTransaction is called by the ingester to pass along a transaction event (possibly nil) and its
27+
// statementsBySessionID.
2628
// Note that the sink should transform the transaction and statementsBySessionID into the appropriate format
2729
// as these objects will be returned to the pool.
2830
ObserveTransaction(ctx context.Context, transaction *sqlstats.RecordedTxnStats, statements []*sqlstats.RecordedStmtStats)
2931
}
3032

31-
// SQLStatsIngester amortizes the locking cost of writing to
32-
// the sql stats container concurrently from multiple goroutines.
33+
// SQLStatsIngester collects and buffers statements and transactions per session
34+
// before passing them to the sinks when we receive complete information for a transaction.
3335
// Built around contentionutils.ConcurrentBufferGuard.
3436
type SQLStatsIngester struct {
3537
guard struct {
@@ -66,6 +68,9 @@ type eventBufChPayload struct {
6668
type statementBuf []*sqlstats.RecordedStmtStats
6769

6870
func (b *statementBuf) release() {
71+
for i, n := 0, len(*b); i < n; i++ {
72+
(*b)[i] = nil
73+
}
6974
*b = (*b)[:0]
7075
statementsBufPool.Put(b)
7176
}
@@ -76,7 +81,7 @@ var statementsBufPool = sync.Pool{
7681
},
7782
}
7883

79-
// SQLStatsIngester buffers the "events" it sees (via ObserveStatement
84+
// SQLStatsIngester buffers the "events" it sees (via IngestStatement
8085
// and IngestTransaction) and passes them along to the underlying registry
8186
// once its buffer is full. (Or once a timeout has passed, for low-traffic
8287
// clusters and tests.)
@@ -184,8 +189,13 @@ func (i *SQLStatsIngester) ingest(ctx context.Context, events *eventBuffer) {
184189
}
185190
if e.statement != nil {
186191
i.processStatement(e.statement)
192+
// When under an outer transaction, we don't have a txn to associate
193+
// the stmts with so we can send immediately to the sinks.
194+
if e.statement.UnderOuterTxn {
195+
i.flushBuffer(ctx, e.statement.SessionID, nil)
196+
}
187197
} else if e.transaction != nil {
188-
i.flushBuffer(ctx, e.transaction)
198+
i.flushBuffer(ctx, e.transaction.SessionID, e.transaction)
189199
} else {
190200
i.clearSession(e.sessionID)
191201
}
@@ -293,11 +303,10 @@ func (i *SQLStatsIngester) processStatement(statement *sqlstats.RecordedStmtStat
293303
}
294304

295305
// flushBuffer sends the buffered statementsBySessionID and provided transaction
296-
// to the registered sinks.
306+
// to the registered sinks. The transaction may be nil.
297307
func (i *SQLStatsIngester) flushBuffer(
298-
ctx context.Context, transaction *sqlstats.RecordedTxnStats,
308+
ctx context.Context, sessionID clusterunique.ID, transaction *sqlstats.RecordedTxnStats,
299309
) {
300-
sessionID := transaction.SessionID
301310
statements, ok := func() (*statementBuf, bool) {
302311
statements, ok := i.statementsBySessionID[sessionID]
303312
if !ok {
@@ -315,9 +324,19 @@ func (i *SQLStatsIngester) flushBuffer(
315324
return
316325
}
317326

318-
// Set the transaction fingerprint ID for each statement.
319-
for _, s := range *statements {
320-
s.TransactionFingerprintID = transaction.FingerprintID
327+
if transaction != nil {
328+
// Set the transaction fingerprint ID for each statement.
329+
// These values are only known at the time of the transaction.
330+
for _, s := range *statements {
331+
s.TransactionFingerprintID = transaction.FingerprintID
332+
if s.ImplicitTxn == transaction.ImplicitTxn {
333+
continue
334+
}
335+
// We need to recompute the fingerprint ID.
336+
s.ImplicitTxn = transaction.ImplicitTxn
337+
s.FingerprintID = appstatspb.ConstructStatementFingerprintID(
338+
s.Query, s.ImplicitTxn, s.Database)
339+
}
321340
}
322341

323342
for _, sink := range i.sinks {

Diff for: pkg/sql/sqlstats/sslocal/sql_stats_test.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -455,9 +455,7 @@ func TestExplicitTxnFingerprintAccounting(t *testing.T) {
455455
nil, /* knobs */
456456
)
457457

458-
// TODO(xinhaoz): We'll come back and add the sql stats sink once we
459-
// enable the SQL stats ingestion for sql stats.
460-
ingester := sslocal.NewSQLStatsIngester(nil /* testing knobs */)
458+
ingester := sslocal.NewSQLStatsIngester(nil /* knobs */, sqlStats)
461459

462460
appStats := sqlStats.GetApplicationStats("" /* appName */)
463461
statsCollector := sslocal.NewStatsCollector(
@@ -466,16 +464,14 @@ func TestExplicitTxnFingerprintAccounting(t *testing.T) {
466464
ingester,
467465
sessionphase.NewTimes(),
468466
sqlStats.GetCounters(),
469-
false, /* underOuterTxn */
470-
nil, /* knobs */
467+
nil, /* knobs */
471468
)
472469

473470
recordStats := func(testCase *tc) {
474471
var txnFingerprintID appstatspb.TransactionFingerprintID
475472
txnFingerprintIDHash := util.MakeFNV64()
476473
statsCollector.StartTransaction()
477474
defer func() {
478-
statsCollector.EndTransaction(ctx, txnFingerprintID)
479475
require.NoError(t,
480476
statsCollector.RecordTransaction(ctx, &sqlstats.RecordedTxnStats{
481477
FingerprintID: txnFingerprintID,
@@ -586,8 +582,7 @@ func TestAssociatingStmtStatsWithTxnFingerprint(t *testing.T) {
586582
ingester,
587583
sessionphase.NewTimes(),
588584
sqlStats.GetCounters(),
589-
false, /* underOuterTxn */
590-
nil, /* knobs */
585+
nil, /* knobs */
591586
)
592587

593588
for _, txn := range simulatedTxns {
@@ -604,7 +599,6 @@ func TestAssociatingStmtStatsWithTxnFingerprint(t *testing.T) {
604599
}
605600

606601
transactionFingerprintID := appstatspb.TransactionFingerprintID(txnFingerprintIDHash.Sum())
607-
statsCollector.EndTransaction(ctx, transactionFingerprintID)
608602
err := statsCollector.RecordTransaction(ctx, &sqlstats.RecordedTxnStats{
609603
FingerprintID: transactionFingerprintID,
610604
UserNormalized: username.RootUser,

0 commit comments

Comments
 (0)