Skip to content

Commit 004b3d4

Browse files
craig[bot]DarrylWongyuzefovichxinhaozkyle-a-wong
committed
140604: roachtest: use atomic pointer for logger r=srosenberg,herkolategan a=DarrylWong The test runner swaps out the test logger when running post test artifacts collection and checks. However, in the case of a timeout, the test goroutine may still be running and have access to the test logger. This leads to a race condition where the logger is replaced as it's being used by the test. This change switches the test logger to use an atomic pointer instead. Fixes: none Epic: none Release note: none 141044: physicalplan: minor fixes to pooling of FlowSpecs r=yuzefovich a=yuzefovich **physicalplan: ensure that FlowSpecs are released** We pool `FlowSpec` allocations in `GenerateFlowSpecs`, but previously we would only release them back to the pool on the main query path. This commit fixes that oversight. **physicalplan: fix minor leak with pooling of FlowSpecs** The only thing that we reuse when pooling `FlowSpec` objects is the capacity of `Processors` slice. Previously, we forgot to unset each element of that slice, so we could have hold onto to some processor specs which in turn could have hold onto some large objects (like `roachpb.Span`s in the TableReader) until a particular index of the slice is overwritten. This oversight is now fixed. Found while looking at #140326. Epic: None Release note: None 141045: sqlstats: mutex improvements r=xinhaoz a=xinhaoz Previously, when attempting to get a stmt or txn entry in the sql stats containers, we pass a flag to the get* functions specifying whether or not to create the entry if it does not exist. This commit cleans up this interface by splitting the function up into a pure get or tryCreate functions. Note that originally we were pursuing a double checked lock approach, however Golang's RWMutex does not scale well with the number of CPUs. Instead this patch just simplifies the code here a bit to make some incoming changes that will eventually move this stats recording step out of the execution path easier. Once that happens we can investigate further improvment or removal of these mutexes. Epic: none Release note: None Part of: #141024 ### sqlstats: convert RWMutex to Mutex This commit changes the RWMutex on the sql stats containers to the regular exclusive Mutex struct. The motivation behind this change is that RWMutex scales poorly with CPU count. See benchmarks below. Epic: none Part of: #140590 141146: sqlstats: delete unused iterators r=kyle-a-wong a=kyle-a-wong deletes unused iterators in persistedsqlstats Epic: None Release note: None Co-authored-by: DarrylWong <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Xin Hao Zhang <[email protected]> Co-authored-by: Kyle Wong <[email protected]>
5 parents 037f9bf + 3ca9845 + 59a572e + 61127a4 + 5847eda commit 004b3d4

22 files changed

+171
-1328
lines changed

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,8 @@ func makePlan(
555555

556556
// Log the plan diagram URL so that we don't have to rely on it being in system.job_info.
557557
const maxLenDiagURL = 1 << 20 // 1 MiB
558-
flowSpecs := p.GenerateFlowSpecs()
558+
flowSpecs, cleanup := p.GenerateFlowSpecs()
559+
defer cleanup(flowSpecs)
559560
if _, diagURL, err := execinfrapb.GeneratePlanDiagramURL(
560561
fmt.Sprintf("changefeed: %d", jobID),
561562
flowSpecs,

pkg/cmd/roachtest/github_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,12 @@ func TestCreatePostRequest(t *testing.T) {
139139

140140
ti := &testImpl{
141141
spec: testSpec,
142-
l: nilLogger(),
143142
start: time.Date(2023, time.July, 21, 16, 34, 3, 817, time.UTC),
144143
end: time.Date(2023, time.July, 21, 16, 42, 13, 137, time.UTC),
145144
cockroach: "cockroach",
146145
cockroachEA: "cockroach-ea",
147146
}
147+
ti.ReplaceL(nilLogger())
148148

149149
testClusterImpl := &clusterImpl{spec: clusterSpec, arch: vm.ArchAMD64, name: "foo"}
150150
vo := vm.DefaultCreateOpts()

pkg/cmd/roachtest/test_impl.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"regexp"
1515
"strings"
1616
"sync"
17+
"sync/atomic"
1718
"time"
1819

1920
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
@@ -77,7 +78,11 @@ type testImpl struct {
7778
buildVersion *version.Version
7879

7980
// l is the logger that the test will use for its output.
80-
l *logger.Logger
81+
//
82+
// N.B. We need to use an atomic pointer here since the test
83+
// runner can swap the logger out when running post test assertions
84+
// and artifacts collection.
85+
l atomic.Pointer[logger.Logger]
8186

8287
// taskManager manages tasks (goroutines) for tests.
8388
taskManager task.Manager
@@ -172,7 +177,7 @@ func (t *testImpl) Cockroach() string {
172177
// If the test is a benchmark test, we don't want to enable assertions
173178
// as it will slow down performance.
174179
if t.spec.Benchmark {
175-
t.l.Printf("Benchmark test, running with standard cockroach")
180+
t.L().Printf("Benchmark test, running with standard cockroach")
176181
t.randomizedCockroach = t.StandardCockroach()
177182
return
178183
}
@@ -181,20 +186,20 @@ func (t *testImpl) Cockroach() string {
181186
// The build with runtime assertions should exist in every nightly
182187
// CI build, but we can't assume it exists in every roachtest call.
183188
if path := t.RuntimeAssertionsCockroach(); path != "" {
184-
t.l.Printf("Runtime assertions enabled")
189+
t.L().Printf("Runtime assertions enabled")
185190
t.randomizedCockroach = path
186191
return
187192
} else {
188-
t.l.Printf("WARNING: running without runtime assertions since the corresponding binary was not specified")
193+
t.L().Printf("WARNING: running without runtime assertions since the corresponding binary was not specified")
189194
}
190195
}
191-
t.l.Printf("Runtime assertions disabled")
196+
t.L().Printf("Runtime assertions disabled")
192197
t.randomizedCockroach = t.StandardCockroach()
193198
case registry.StandardCockroach:
194-
t.l.Printf("Runtime assertions disabled: registry.StandardCockroach set")
199+
t.L().Printf("Runtime assertions disabled: registry.StandardCockroach set")
195200
t.randomizedCockroach = t.StandardCockroach()
196201
case registry.RuntimeAssertionsCockroach:
197-
t.l.Printf("Runtime assertions enabled: registry.RuntimeAssertionsCockroach set")
202+
t.L().Printf("Runtime assertions enabled: registry.RuntimeAssertionsCockroach set")
198203
t.randomizedCockroach = t.RuntimeAssertionsCockroach()
199204
default:
200205
t.Fatal("Specified cockroach binary does not exist.")
@@ -259,13 +264,12 @@ func (t *testImpl) SnapshotPrefix() string {
259264

260265
// L returns the test's logger.
261266
func (t *testImpl) L() *logger.Logger {
262-
return t.l
267+
return t.l.Load()
263268
}
264269

265270
// ReplaceL replaces the test's logger.
266271
func (t *testImpl) ReplaceL(l *logger.Logger) {
267-
// TODO(tbg): get rid of this, this is racy & hacky.
268-
t.l = l
272+
t.l.Store(l)
269273
}
270274

271275
func (t *testImpl) status(ctx context.Context, id int64, args ...interface{}) {

pkg/cmd/roachtest/test_impl_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,8 @@ func Test_failuresMatchingError(t *testing.T) {
179179
}
180180

181181
func Test_failureSpecifyOwnerAndAddFailureCombination(t *testing.T) {
182-
ti := testImpl{
183-
l: nilLogger(),
184-
}
182+
ti := testImpl{}
183+
ti.ReplaceL(nilLogger())
185184
ti.addFailure(0, "", vmPreemptionError("my_VM"))
186185
errWithOwnership := failuresAsErrorWithOwnership(ti.failures())
187186

pkg/cmd/roachtest/test_runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -830,14 +830,14 @@ func (r *testRunner) runWorker(
830830
buildVersion: binaryVersion,
831831
artifactsDir: testArtifactsDir,
832832
artifactsSpec: artifactsSpec,
833-
l: testL,
834833
versionsBinaryOverride: topt.versionsBinaryOverride,
835834
skipInit: topt.skipInit,
836835
debug: clustersOpt.debugMode.IsDebug(),
837836
goCoverEnabled: topt.goCoverEnabled,
838837
exportOpenmetrics: topt.exportOpenMetrics,
839838
runID: generateRunID(clustersOpt),
840839
}
840+
t.ReplaceL(testL)
841841
github := newGithubIssues(r.config.disableIssue, c, vmCreateOpts)
842842

843843
// handleClusterCreationFailure can be called when the `err` given

pkg/cmd/roachtest/test_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,3 +831,38 @@ func TestVMPreemptionPolling(t *testing.T) {
831831
require.NoError(t, err)
832832
})
833833
}
834+
835+
// TestRunnerFailureAfterTimeout checks that a test has a failure added
836+
// after the test has timed out works as expected.
837+
//
838+
// Specifically, this is a regression test that replacing the test logger
839+
// for post test artifacts collection or assertion checks is atomic and
840+
// doesn't race with the logger potentially still being used by the test.
841+
func TestRunnerFailureAfterTimeout(t *testing.T) {
842+
ctx := context.Background()
843+
stopper := stop.NewStopper()
844+
defer stopper.Stop(ctx)
845+
cr := newClusterRegistry()
846+
runner := newUnitTestRunner(cr, stopper)
847+
848+
var buf syncedBuffer
849+
copt := defaultClusterOpt()
850+
lopt := defaultLoggingOpt(&buf)
851+
test := registry.TestSpec{
852+
Name: `timeout`,
853+
Owner: OwnerUnitTest,
854+
// Set the timeout very low so we can observe the timeout
855+
// and error racing.
856+
Timeout: 1 * time.Nanosecond,
857+
Cluster: spec.MakeClusterSpec(0),
858+
CompatibleClouds: registry.AllExceptAWS,
859+
Suites: registry.Suites(registry.Nightly),
860+
CockroachBinary: registry.StandardCockroach,
861+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
862+
t.Error("test failed")
863+
},
864+
}
865+
err := runner.Run(ctx, []registry.TestSpec{test}, 1, /* count */
866+
defaultParallelism, copt, testOpts{}, lopt)
867+
require.Error(t, err)
868+
}

pkg/jobs/jobsprofiler/profiler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func StorePlanDiagram(
4141
ctx, cancel = stopper.WithCancelOnQuiesce(ctx)
4242
defer cancel()
4343

44-
flowSpecs := p.GenerateFlowSpecs()
44+
flowSpecs, cleanup := p.GenerateFlowSpecs()
45+
defer cleanup(flowSpecs)
4546
_, diagURL, err := execinfrapb.GeneratePlanDiagramURL(fmt.Sprintf("job:%d", jobID), flowSpecs,
4647
execinfrapb.DiagramFlags{})
4748
if err != nil {

pkg/sql/distsql_plan_bulk.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,10 @@ type PhysicalPlanMaker func(context.Context, *DistSQLPlanner) (*PhysicalPlan, *P
121121
// the number in the old one.
122122
func calculatePlanGrowth(before, after *PhysicalPlan) (int, float64) {
123123
var changed int
124-
beforeSpecs := before.GenerateFlowSpecs()
125-
afterSpecs := after.GenerateFlowSpecs()
124+
beforeSpecs, beforeCleanup := before.GenerateFlowSpecs()
125+
defer beforeCleanup(beforeSpecs)
126+
afterSpecs, afterCleanup := after.GenerateFlowSpecs()
127+
defer afterCleanup(afterSpecs)
126128

127129
// How many nodes are in after that are not in before, or are in both but
128130
// have changed their spec?

pkg/sql/distsql_running.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,8 @@ func (dsp *DistSQLPlanner) Run(
675675
evalCtx *extendedEvalContext,
676676
finishedSetupFn func(localFlow flowinfra.Flow),
677677
) {
678-
flows := plan.GenerateFlowSpecs()
678+
// Ignore the cleanup function since we will release each spec separately.
679+
flows, _ := plan.GenerateFlowSpecs()
679680
gatewayFlowSpec, ok := flows[dsp.gatewaySQLInstanceID]
680681
if !ok {
681682
recv.SetError(errors.Errorf("expected to find gateway flow"))

pkg/sql/explain_plan.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ func (e *explainPlanNode) startExec(params runParams) error {
8888
// cause an error or panic, so swallow the error. See #40677 for example.
8989
finalizePlanWithRowCount(params.ctx, planCtx, physicalPlan, plan.mainRowCount)
9090
ob.AddDistribution(physicalPlan.Distribution.String())
91-
flows := physicalPlan.GenerateFlowSpecs()
91+
flows, flowsCleanup := physicalPlan.GenerateFlowSpecs()
92+
defer flowsCleanup(flows)
9293

9394
ctxSessionData := planCtx.EvalContext().SessionData()
9495
var willVectorize bool

pkg/sql/explain_vec.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ func (n *explainVecNode) startExec(params runParams) error {
5656
}
5757

5858
finalizePlanWithRowCount(params.ctx, planCtx, physPlan, n.plan.mainRowCount)
59-
flows := physPlan.GenerateFlowSpecs()
59+
flows, flowsCleanup := physPlan.GenerateFlowSpecs()
60+
defer flowsCleanup(flows)
6061
flowCtx := newFlowCtxForExplainPurposes(params.ctx, params.p)
6162

6263
// We want to get the vectorized plan which would be executed with the

pkg/sql/physicalplan/physical_plan.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -979,7 +979,14 @@ func (p *PhysicalPlan) PopulateEndpoints() {
979979

980980
// GenerateFlowSpecs takes a plan (with populated endpoints) and generates the
981981
// set of FlowSpecs (one per node involved in the plan).
982-
func (p *PhysicalPlan) GenerateFlowSpecs() map[base.SQLInstanceID]*execinfrapb.FlowSpec {
982+
//
983+
// The returned function should be called whenever the caller is done with all
984+
// FlowSpecs. The caller is free to ignore it if the specs will be released
985+
// separately.
986+
func (p *PhysicalPlan) GenerateFlowSpecs() (
987+
_ map[base.SQLInstanceID]*execinfrapb.FlowSpec,
988+
cleanup func(map[base.SQLInstanceID]*execinfrapb.FlowSpec),
989+
) {
983990
flowID := execinfrapb.FlowID{
984991
UUID: p.FlowID,
985992
}
@@ -993,7 +1000,15 @@ func (p *PhysicalPlan) GenerateFlowSpecs() map[base.SQLInstanceID]*execinfrapb.F
9931000
}
9941001
flowSpec.Processors = append(flowSpec.Processors, proc.Spec)
9951002
}
996-
return flows
1003+
// Note that we don't return an anonymous function with no arguments to not
1004+
// incur an allocation.
1005+
return flows, releaseAll
1006+
}
1007+
1008+
func releaseAll(flows map[base.SQLInstanceID]*execinfrapb.FlowSpec) {
1009+
for _, flowSpec := range flows {
1010+
ReleaseFlowSpec(flowSpec)
1011+
}
9971012
}
9981013

9991014
// SetRowEstimates updates p according to the row estimates of left and right

pkg/sql/physicalplan/specs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func ReleaseFlowSpec(spec *execinfrapb.FlowSpec) {
3434
if tr := spec.Processors[i].Core.TableReader; tr != nil {
3535
releaseTableReaderSpec(tr)
3636
}
37+
spec.Processors[i] = execinfrapb.ProcessorSpec{}
3738
}
3839
*spec = execinfrapb.FlowSpec{
3940
Processors: spec.Processors[:0],

pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,12 @@ go_library(
66
name = "persistedsqlstats",
77
srcs = [
88
"cluster_settings.go",
9-
"combined_iterator.go",
109
"compaction_exec.go",
1110
"compaction_scheduling.go",
1211
"controller.go",
1312
"flush.go",
14-
"mem_iterator.go",
1513
"provider.go",
1614
"scheduled_job_monitor.go",
17-
"stmt_reader.go",
18-
"txn_reader.go",
1915
],
2016
embed = [":persistedsqlstats_go_proto"],
2117
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats",
@@ -40,7 +36,6 @@ go_library(
4036
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
4137
"//pkg/sql/sqlstats/sslocal",
4238
"//pkg/sql/types",
43-
"//pkg/util",
4439
"//pkg/util/admission/admissionpb",
4540
"//pkg/util/log",
4641
"//pkg/util/metric",
@@ -66,7 +61,6 @@ go_test(
6661
"datadriven_test.go",
6762
"flush_test.go",
6863
"main_test.go",
69-
"reader_test.go",
7064
"scheduled_sql_stats_compaction_test.go",
7165
],
7266
data = glob(["testdata/**"]),

0 commit comments

Comments
 (0)