Skip to content

Commit 48f6c3e

Browse files
author
Chao Xu
committed
adding the core migrator that is responsible to migrate a single resource type
1 parent 4d1d7d4 commit 48f6c3e

File tree

3 files changed

+332
-0
lines changed

3 files changed

+332
-0
lines changed

pkg/migrator/core.go

+242
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package migrator
18+
19+
import (
20+
"fmt"
21+
"reflect"
22+
"sync"
23+
24+
"k8s.io/apimachinery/pkg/api/errors"
25+
"k8s.io/apimachinery/pkg/api/meta"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
29+
"k8s.io/client-go/rest"
30+
)
31+
32+
var metadataAccessor = meta.NewAccessor()
33+
34+
type progressTracker interface {
35+
save(continueToken string) error
36+
load() (continueToken string, err error)
37+
}
38+
39+
type dummyProgress struct{}
40+
41+
func (d *dummyProgress) load() (string, error) {
42+
return "", nil
43+
}
44+
45+
func (d *dummyProgress) save(continueToken string) error {
46+
return nil
47+
}
48+
49+
type migrator struct {
50+
resource string
51+
namespaceScoped bool
52+
client *rest.RESTClient
53+
progress progressTracker
54+
concurrency int
55+
}
56+
57+
// NewMigrator creates a migrator that can migrate a single resource type.
58+
func NewMigrator(resource string, namespaceScoped bool, client *rest.RESTClient) *migrator {
59+
return &migrator{
60+
resource: resource,
61+
namespaceScoped, namespaceScoped,
62+
client: client,
63+
progress: dummyProgress{},
64+
concurrency: 5,
65+
}
66+
}
67+
68+
// get executes the GET request, but it does not decode the result.
69+
func (m *migrator) get(namespace, name string) rest.Result {
70+
req := m.client.Get().
71+
NamespaceIfScoped(namespace, m.NamespaceScoped).
72+
Resource(m.Resource).
73+
Name(name)
74+
return req.Do()
75+
}
76+
77+
// put executes the PUT request, but it does not decode the result.
78+
func (m *migrator) put(namespace, name string) rest.Result {
79+
req := m.client.Put().
80+
NamespaceIfScoped(namespace, m.NamespaceScoped).
81+
Resource(m.Resource).
82+
Name(name)
83+
return req.Do()
84+
}
85+
86+
func (m *migrator) list(namespace, options *metav1.ListOptions) (runtime.Object, error) {
87+
req := m.client.Get().
88+
NamespaceIfScoped(metav1.NamespaceAll, m.NamespaceScoped).
89+
Resource(m.Resource).
90+
VersionedParams(options, metav1.ParameterCodec)
91+
return req.Do().Get()
92+
}
93+
94+
// Run migrates all the instances of the resource type managed by the migrator.
95+
func (w *migrator) Run() error {
96+
continueToken, err := w.progress.load()
97+
if err != nil {
98+
return err
99+
}
100+
for {
101+
list, listError := w.list(
102+
w.resource,
103+
&metav1.ListOptions{
104+
Limit: 500,
105+
Continue: continueToken,
106+
},
107+
)
108+
if listError != nil && !errors.IsResourceExpired(listError) {
109+
if canRetry(retriable(listError)) {
110+
continue
111+
}
112+
return listError
113+
}
114+
if listError != nil && errors.IsResourceExpired(listError) {
115+
token, err := inconsistentContinueToken(listError)
116+
if err != nil {
117+
return err
118+
}
119+
continueToken = token
120+
w.progress.save(continueToken)
121+
continue
122+
}
123+
if err := w.migrateList(list); err != nil {
124+
return err
125+
}
126+
token, err := metadataAccessor.Continue(list)
127+
if err != nil {
128+
return err
129+
}
130+
if len(token) == 0 {
131+
return nil
132+
}
133+
continueToken = token
134+
w.progress.save(continueToken)
135+
}
136+
}
137+
138+
func (w *migrator) migrateList(l runtime.Object) error {
139+
stop := make(chan struct{})
140+
defer close(stop)
141+
items, err := meta.ExtractList(l)
142+
if err != nil {
143+
return err
144+
}
145+
workc := make(chan runtime.Object)
146+
go func() {
147+
defer close(workc)
148+
for _, item := range items {
149+
select {
150+
case workc <- item:
151+
case <-stop:
152+
return
153+
}
154+
}
155+
}()
156+
157+
var wg sync.WaitGroup
158+
wg.Add(w.concurrency)
159+
errc := make(chan error)
160+
for i := 0; i < w.concurrency; i++ {
161+
go func() {
162+
defer wg.Done()
163+
worker(stop, workc, errc)
164+
}()
165+
}
166+
167+
go func() {
168+
wg.Wait()
169+
close(errc)
170+
}()
171+
172+
var errors []error
173+
for err := range errc {
174+
errors = append(errors, err)
175+
}
176+
return utilerrors.NewAggregate(errors)
177+
}
178+
179+
func (w *migrator) worker(stop <-chan struct{}, workc <-chan runtime.Object, errc chan<- error) {
180+
for item := range workc {
181+
for {
182+
err := w.try(item)
183+
if err == nil {
184+
break
185+
}
186+
if canRetry(err) {
187+
continue
188+
}
189+
select {
190+
case errc <- err:
191+
continue
192+
case <-stop:
193+
return
194+
}
195+
}
196+
}
197+
}
198+
199+
// try tries to migrate the single object by GET and then PUT.
200+
func (w *migrator) try(work runtime.Object) error {
201+
namespace, err := metadataAccessor.Namespace(work)
202+
if err != nil {
203+
return err
204+
}
205+
name, err := metadataAccessor.Name(work)
206+
if err != nil {
207+
return err
208+
}
209+
get := w.client.get(namespace, name)
210+
data, err := get.Raw()
211+
if err != nil {
212+
return retriable(get.Error())
213+
}
214+
215+
put := w.client.put(namespace, name)
216+
if err := put.Error(); err != nil {
217+
return retriable(put.Error())
218+
}
219+
220+
// TODO: The oc migrator tried to count if the update causes changes.
221+
// To do so it needs to decode the PUT response and extract the
222+
// resourceVersion.
223+
224+
// TODO: The oc admin uses a defer function to do bandwidth limiting
225+
// after doing all operations. The rate limiter is marked as an alpha
226+
// feature. Is it better than the built-in qps limit in the REST
227+
// client? Maybe it's necessary because not all resource types are of
228+
// the same size?
229+
}
230+
231+
// TODO: move this helper to "k8s.io/apimachinery/pkg/api/errors"
232+
func inconsistentContinueToken(err error) (string, error) {
233+
status, ok := err.(errors.APIStatus)
234+
if !ok {
235+
return "", fmt.Errorf("expected error to implement the APIStatus interface, got %v", reflect.TypeOf(err))
236+
}
237+
token = status.Status().ListMeta.Continue
238+
if len(token) == 0 {
239+
return "", fmt.Errorf("expected non empty continue token")
240+
}
241+
return token, nil
242+
}

pkg/migrator/doc.go

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package migrator provides migrator that migrates a single resource.
18+
package migrator

pkg/migrator/errors.go

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package migrator
18+
19+
import (
20+
"k8s.io/apimachinery/pkg/api/errors"
21+
)
22+
23+
// ErrRetriable is a wrapper for an error that a migrator may use to indicate the
24+
// specific error can be retried.
25+
type ErrRetriable struct {
26+
error
27+
}
28+
29+
func (ErrRetriable) Temporary() bool { return true }
30+
31+
// ErrNotRetriable is a wrapper for an error that a migrator may use to indicate the
32+
// specific error cannot be retried.
33+
type ErrNotRetriable struct {
34+
error
35+
}
36+
37+
func (ErrNotRetriable) Temporary() bool { return false }
38+
39+
// TemporaryError is a wrapper interface that is used to determine if an error can be retried.
40+
type TemporaryError interface {
41+
error
42+
// Temporary should return true if this is a temporary error
43+
Temporary() bool
44+
}
45+
46+
// retriable adds retry information to the provided error.
47+
func retriable(err error) error {
48+
switch {
49+
case err == nil:
50+
return nil
51+
case errors.IsNotFound(err):
52+
// tolerate the deletion of resources during migration
53+
// report unchanged since we did not actually migrate this object
54+
return nil
55+
case errors.IsMethodNotSupported(err):
56+
return ErrNotRetriable{err}
57+
case errors.IsConflict(err):
58+
return ErrRetriable{err}
59+
case errors.IsServerTimeout(err):
60+
return ErrRetriable{err}
61+
default:
62+
return err
63+
}
64+
}
65+
66+
// canRetry returns true if the provided error indicates a retry is possible.
67+
func canRetry(err error) bool {
68+
if temp, ok := err.(TemporaryError); ok && temp.Temporary() {
69+
return true
70+
}
71+
return false
72+
}

0 commit comments

Comments
 (0)