Skip to content

Commit 03813fc

Browse files
Merge pull request kubernetes#18055 from sjenning/sync-cpu-manager
Automatic merge from submit-queue. Backport cpumanager checkpointing fixes kubernetes#56191 kubernetes#54410 This plus openshift/origin#18051 fully syncs cpumanager with kube upstream. @derekwaynecarr Origin-commit: 9e98436e109dc2e39022748ff09f86f9d41868c7
2 parents a2b3288 + 83d7038 commit 03813fc

File tree

5 files changed

+117
-80
lines changed

5 files changed

+117
-80
lines changed

pkg/kubelet/cm/cpumanager/cpu_manager.go

+21-2
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
152152
}
153153

154154
func (m *manager) Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) {
155-
glog.Infof("[cpumanger] starting with %s policy", m.policy.Name())
156-
glog.Infof("[cpumanger] reconciling every %v", m.reconcilePeriod)
155+
glog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
156+
glog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
157157

158158
m.activePods = activePods
159159
m.podStatusProvider = podStatusProvider
@@ -234,6 +234,25 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
234234
continue
235235
}
236236

237+
// Check whether container is present in state, there may be 3 reasons why it's not present:
238+
// - policy does not want to track the container
239+
// - kubelet has just been restarted - and there is no previous state file
240+
// - container has been removed from state by RemoveContainer call (DeletionTimestamp is set)
241+
if _, ok := m.state.GetCPUSet(containerID); !ok {
242+
if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil {
243+
glog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
244+
err := m.AddContainer(pod, &container, containerID)
245+
if err != nil {
246+
glog.Errorf("[cpumanager] reconcileState: failed to add container (pod: %s, container: %s, container id: %s, error: %v)", pod.Name, container.Name, containerID, err)
247+
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
248+
}
249+
} else {
250+
// if DeletionTimestamp is set, pod has already been removed from state
251+
// skip the pod/container since it's not running and will be deleted soon
252+
continue
253+
}
254+
}
255+
237256
cset := m.state.GetCPUSetOrDefault(containerID)
238257
if cset.IsEmpty() {
239258
// NOTE: This should not happen outside of tests.

pkg/kubelet/cm/cpumanager/policy.go

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
type Policy interface {
2626
Name() string
2727
Start(s state.State)
28+
// AddContainer call is idempotent
2829
AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error
30+
// RemoveContainer call is idempotent
2931
RemoveContainer(s state.State, containerID string) error
3032
}

pkg/kubelet/cm/cpumanager/policy_static.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,15 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
156156
}
157157

158158
func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error {
159-
glog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
160159
if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 {
160+
glog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
161161
// container belongs in an exclusively allocated pool
162+
163+
if _, ok := s.GetCPUSet(containerID); ok {
164+
glog.Infof("[cpumanager] static policy: container already present in state, skipping (container: %s, container id: %s)", container.Name, containerID)
165+
return nil
166+
}
167+
162168
cpuset, err := p.allocateCPUs(s, numCPUs)
163169
if err != nil {
164170
glog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err)

pkg/kubelet/cm/cpumanager/state/state_file.go

+41-34
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@ func NewFileState(filePath string, policyName string) State {
5151

5252
if err := stateFile.tryRestoreState(); err != nil {
5353
// could not restore state, init new state file
54-
glog.Infof("[cpumanager] state file: initializing empty state file - reason: \"%s\"", err)
55-
stateFile.cache.ClearState()
56-
stateFile.storeState()
54+
msg := fmt.Sprintf("[cpumanager] state file: unable to restore state from disk (%s)\n", err.Error()) +
55+
"Panicking because we cannot guarantee sane CPU affinity for existing containers.\n" +
56+
fmt.Sprintf("Please drain this node and delete the CPU manager state file \"%s\" before restarting Kubelet.", stateFile.stateFilePath)
57+
panic(msg)
5758
}
5859

5960
return stateFile
@@ -73,45 +74,51 @@ func (sf *stateFile) tryRestoreState() error {
7374

7475
var content []byte
7576

76-
if content, err = ioutil.ReadFile(sf.stateFilePath); os.IsNotExist(err) {
77-
// Create file
78-
if _, err = os.Create(sf.stateFilePath); err != nil {
79-
glog.Errorf("[cpumanager] state file: unable to create state file \"%s\":%s", sf.stateFilePath, err.Error())
80-
panic("[cpumanager] state file not created")
81-
}
82-
glog.Infof("[cpumanager] state file: created empty state file \"%s\"", sf.stateFilePath)
83-
} else {
84-
// File exists - try to read
85-
var readState stateFileData
77+
content, err = ioutil.ReadFile(sf.stateFilePath)
8678

87-
if err = json.Unmarshal(content, &readState); err != nil {
88-
glog.Warningf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath)
89-
return err
90-
}
79+
// If the state file does not exist or has zero length, write a new file.
80+
if os.IsNotExist(err) || len(content) == 0 {
81+
sf.storeState()
82+
glog.Infof("[cpumanager] state file: created new state file \"%s\"", sf.stateFilePath)
83+
return nil
84+
}
9185

92-
if sf.policyName != readState.PolicyName {
93-
return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName)
94-
}
86+
// Fail on any other file read error.
87+
if err != nil {
88+
return err
89+
}
90+
91+
// File exists; try to read it.
92+
var readState stateFileData
93+
94+
if err = json.Unmarshal(content, &readState); err != nil {
95+
glog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath)
96+
return err
97+
}
98+
99+
if sf.policyName != readState.PolicyName {
100+
return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName)
101+
}
102+
103+
if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil {
104+
glog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet)
105+
return err
106+
}
95107

96-
if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil {
97-
glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet)
108+
for containerID, cpuString := range readState.Entries {
109+
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
110+
glog.Errorf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString)
98111
return err
99112
}
113+
tmpAssignments[containerID] = tmpContainerCPUSet
114+
}
100115

101-
for containerID, cpuString := range readState.Entries {
102-
if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
103-
glog.Warningf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString)
104-
return err
105-
}
106-
tmpAssignments[containerID] = tmpContainerCPUSet
107-
}
116+
sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
117+
sf.cache.SetCPUAssignments(tmpAssignments)
108118

109-
sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
110-
sf.cache.SetCPUAssignments(tmpAssignments)
119+
glog.V(2).Infof("[cpumanager] state file: restored state from state file \"%s\"", sf.stateFilePath)
120+
glog.V(2).Infof("[cpumanager] state file: defaultCPUSet: %s", tmpDefaultCPUSet.String())
111121

112-
glog.V(2).Infof("[cpumanager] state file: restored state from state file \"%s\"", sf.stateFilePath)
113-
glog.V(2).Infof("[cpumanager] state file: defaultCPUSet: %s", tmpDefaultCPUSet.String())
114-
}
115122
return nil
116123
}
117124

pkg/kubelet/cm/cpumanager/state/state_file_test.go

+46-43
Original file line numberDiff line numberDiff line change
@@ -77,33 +77,31 @@ func TestFileStateTryRestore(t *testing.T) {
7777
stateFileContent string
7878
policyName string
7979
expErr string
80+
expPanic bool
8081
expectedState *stateMemory
8182
}{
8283
{
83-
"Invalid JSON - empty file",
84+
"Invalid JSON - one byte file",
8485
"\n",
8586
"none",
86-
"state file: could not unmarshal, corrupted state file",
87-
&stateMemory{
88-
assignments: ContainerCPUAssignments{},
89-
defaultCPUSet: cpuset.NewCPUSet(),
90-
},
87+
"[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)",
88+
true,
89+
&stateMemory{},
9190
},
9291
{
9392
"Invalid JSON - invalid content",
9493
"{",
9594
"none",
96-
"state file: could not unmarshal, corrupted state file",
97-
&stateMemory{
98-
assignments: ContainerCPUAssignments{},
99-
defaultCPUSet: cpuset.NewCPUSet(),
100-
},
95+
"[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)",
96+
true,
97+
&stateMemory{},
10198
},
10299
{
103100
"Try restore defaultCPUSet only",
104101
`{"policyName": "none", "defaultCpuSet": "4-6"}`,
105102
"none",
106103
"",
104+
false,
107105
&stateMemory{
108106
assignments: ContainerCPUAssignments{},
109107
defaultCPUSet: cpuset.NewCPUSet(4, 5, 6),
@@ -113,11 +111,9 @@ func TestFileStateTryRestore(t *testing.T) {
113111
"Try restore defaultCPUSet only - invalid name",
114112
`{"policyName": "none", "defaultCpuSet" "4-6"}`,
115113
"none",
116-
"",
117-
&stateMemory{
118-
assignments: ContainerCPUAssignments{},
119-
defaultCPUSet: cpuset.NewCPUSet(),
120-
},
114+
`[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`,
115+
true,
116+
&stateMemory{},
121117
},
122118
{
123119
"Try restore assignments only",
@@ -130,6 +126,7 @@ func TestFileStateTryRestore(t *testing.T) {
130126
}`,
131127
"none",
132128
"",
129+
false,
133130
&stateMemory{
134131
assignments: ContainerCPUAssignments{
135132
"container1": cpuset.NewCPUSet(4, 5, 6),
@@ -146,21 +143,17 @@ func TestFileStateTryRestore(t *testing.T) {
146143
"entries": {}
147144
}`,
148145
"B",
149-
"policy configured \"B\" != policy from state file \"A\"",
150-
&stateMemory{
151-
assignments: ContainerCPUAssignments{},
152-
defaultCPUSet: cpuset.NewCPUSet(),
153-
},
146+
`[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`,
147+
true,
148+
&stateMemory{},
154149
},
155150
{
156151
"Try restore invalid assignments",
157152
`{"entries": }`,
158153
"none",
159-
"state file: could not unmarshal, corrupted state file",
160-
&stateMemory{
161-
assignments: ContainerCPUAssignments{},
162-
defaultCPUSet: cpuset.NewCPUSet(),
163-
},
154+
"[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)",
155+
true,
156+
&stateMemory{},
164157
},
165158
{
166159
"Try restore valid file",
@@ -174,6 +167,7 @@ func TestFileStateTryRestore(t *testing.T) {
174167
}`,
175168
"none",
176169
"",
170+
false,
177171
&stateMemory{
178172
assignments: ContainerCPUAssignments{
179173
"container1": cpuset.NewCPUSet(4, 5, 6),
@@ -189,11 +183,9 @@ func TestFileStateTryRestore(t *testing.T) {
189183
"defaultCpuSet": "2-sd"
190184
}`,
191185
"none",
192-
"state file: could not parse state file",
193-
&stateMemory{
194-
assignments: ContainerCPUAssignments{},
195-
defaultCPUSet: cpuset.NewCPUSet(),
196-
},
186+
`[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`,
187+
true,
188+
&stateMemory{},
197189
},
198190
{
199191
"Try restore un-parsable assignments",
@@ -206,17 +198,16 @@ func TestFileStateTryRestore(t *testing.T) {
206198
}
207199
}`,
208200
"none",
209-
"state file: could not parse state file",
210-
&stateMemory{
211-
assignments: ContainerCPUAssignments{},
212-
defaultCPUSet: cpuset.NewCPUSet(),
213-
},
201+
`[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`,
202+
true,
203+
&stateMemory{},
214204
},
215205
{
216-
"TryRestoreState creates empty state file",
206+
"tryRestoreState creates empty state file",
217207
"",
218208
"none",
219209
"",
210+
false,
220211
&stateMemory{
221212
assignments: ContainerCPUAssignments{},
222213
defaultCPUSet: cpuset.NewCPUSet(),
@@ -226,11 +217,23 @@ func TestFileStateTryRestore(t *testing.T) {
226217

227218
for idx, tc := range testCases {
228219
t.Run(tc.description, func(t *testing.T) {
220+
defer func() {
221+
if tc.expPanic {
222+
r := recover()
223+
panicMsg := r.(string)
224+
if !strings.HasPrefix(panicMsg, tc.expErr) {
225+
t.Fatalf(`expected panic "%s" but got "%s"`, tc.expErr, panicMsg)
226+
} else {
227+
t.Logf(`got expected panic "%s"`, panicMsg)
228+
}
229+
}
230+
}()
231+
229232
sfilePath, err := ioutil.TempFile("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx))
230233
if err != nil {
231234
t.Errorf("cannot create temporary file: %q", err.Error())
232235
}
233-
// Don't create state file, let TryRestoreState figure out that is should create
236+
// Don't create state file, let tryRestoreState figure out that is should create
234237
if tc.stateFileContent != "" {
235238
writeToStateFile(sfilePath.Name(), tc.stateFileContent)
236239
}
@@ -245,11 +248,11 @@ func TestFileStateTryRestore(t *testing.T) {
245248
if tc.expErr != "" {
246249
if logData.String() != "" {
247250
if !strings.Contains(logData.String(), tc.expErr) {
248-
t.Errorf("TryRestoreState() error = %v, wantErr %v", logData.String(), tc.expErr)
251+
t.Errorf("tryRestoreState() error = %v, wantErr %v", logData.String(), tc.expErr)
249252
return
250253
}
251254
} else {
252-
t.Errorf("TryRestoreState() error = nil, wantErr %v", tc.expErr)
255+
t.Errorf("tryRestoreState() error = nil, wantErr %v", tc.expErr)
253256
return
254257
}
255258
}
@@ -268,7 +271,7 @@ func TestFileStateTryRestorePanic(t *testing.T) {
268271
}{
269272
"Panic creating file",
270273
true,
271-
"[cpumanager] state file not created",
274+
"[cpumanager] state file not written",
272275
}
273276

274277
t.Run(testCase.description, func(t *testing.T) {
@@ -277,10 +280,10 @@ func TestFileStateTryRestorePanic(t *testing.T) {
277280
if err := recover(); err != nil {
278281
if testCase.wantPanic {
279282
if testCase.panicMessage == err {
280-
t.Logf("TryRestoreState() got expected panic = %v", err)
283+
t.Logf("tryRestoreState() got expected panic = %v", err)
281284
return
282285
}
283-
t.Errorf("TryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage)
286+
t.Errorf("tryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage)
284287
}
285288
}
286289
}()

0 commit comments

Comments
 (0)