Skip to content

Commit ec53107

Browse files
author
Chao Xu
committed
adding the core migrator that is responsible to migrate a single resource type
1 parent 369ff9b commit ec53107

File tree

4 files changed

+582
-0
lines changed

4 files changed

+582
-0
lines changed

pkg/migrator/core.go

+251
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package migrator
18+
19+
import (
20+
"fmt"
21+
"reflect"
22+
"sync"
23+
"time"
24+
25+
"k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/api/meta"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29+
"k8s.io/apimachinery/pkg/runtime/schema"
30+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
31+
"k8s.io/client-go/dynamic"
32+
)
33+
34+
var metadataAccessor = meta.NewAccessor()
35+
36+
const (
37+
defaultChunkLimit = 500
38+
defaultConcurrency = 1
39+
)
40+
41+
type progressTracker interface {
42+
save(continueToken string) error
43+
load() (continueToken string, err error)
44+
}
45+
46+
type dummyProgress struct{}
47+
48+
func (d *dummyProgress) load() (string, error) {
49+
return "", nil
50+
}
51+
52+
func (d *dummyProgress) save(continueToken string) error {
53+
return nil
54+
}
55+
56+
type migrator struct {
57+
resource schema.GroupVersionResource
58+
client dynamic.Interface
59+
progress progressTracker
60+
concurrency int
61+
}
62+
63+
// NewMigrator creates a migrator that can migrate a single resource type.
64+
func NewMigrator(resource schema.GroupVersionResource, client dynamic.Interface) *migrator {
65+
return &migrator{
66+
resource: resource,
67+
client: client,
68+
progress: &dummyProgress{},
69+
concurrency: defaultConcurrency,
70+
}
71+
}
72+
73+
func (m *migrator) get(namespace, name string) (*unstructured.Unstructured, error) {
74+
// if namespace is empty, .Namespace(namespace) is ineffective.
75+
return m.client.
76+
Resource(m.resource).
77+
Namespace(namespace).
78+
Get(name, metav1.GetOptions{})
79+
}
80+
81+
func (m *migrator) put(namespace, name string, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
82+
// if namespace is empty, .Namespace(namespace) is ineffective.
83+
return m.client.
84+
Resource(m.resource).
85+
Namespace(namespace).
86+
Update(obj)
87+
}
88+
89+
func (m *migrator) list(options metav1.ListOptions) (*unstructured.UnstructuredList, error) {
90+
return m.client.
91+
Resource(m.resource).
92+
Namespace(metav1.NamespaceAll).
93+
List(options)
94+
}
95+
96+
// Run migrates all the instances of the resource type managed by the migrator.
97+
func (m *migrator) Run() error {
98+
continueToken, err := m.progress.load()
99+
if err != nil {
100+
return err
101+
}
102+
for {
103+
list, listError := m.list(
104+
metav1.ListOptions{
105+
Limit: defaultChunkLimit,
106+
Continue: continueToken,
107+
},
108+
)
109+
if listError != nil && !errors.IsResourceExpired(listError) {
110+
if canRetry(listError) {
111+
continue
112+
}
113+
return listError
114+
}
115+
if listError != nil && errors.IsResourceExpired(listError) {
116+
token, err := inconsistentContinueToken(listError)
117+
if err != nil {
118+
return err
119+
}
120+
continueToken = token
121+
m.progress.save(continueToken)
122+
continue
123+
}
124+
if err := m.migrateList(list); err != nil {
125+
return err
126+
}
127+
token, err := metadataAccessor.Continue(list)
128+
if err != nil {
129+
return err
130+
}
131+
if len(token) == 0 {
132+
return nil
133+
}
134+
continueToken = token
135+
m.progress.save(continueToken)
136+
}
137+
}
138+
139+
func (m *migrator) migrateList(l *unstructured.UnstructuredList) error {
140+
stop := make(chan struct{})
141+
defer close(stop)
142+
workc := make(chan *unstructured.Unstructured)
143+
go func() {
144+
defer close(workc)
145+
for i := range l.Items {
146+
select {
147+
case workc <- &l.Items[i]:
148+
case <-stop:
149+
return
150+
}
151+
}
152+
}()
153+
154+
var wg sync.WaitGroup
155+
wg.Add(m.concurrency)
156+
errc := make(chan error)
157+
for i := 0; i < m.concurrency; i++ {
158+
go func() {
159+
defer wg.Done()
160+
m.worker(stop, workc, errc)
161+
}()
162+
}
163+
164+
go func() {
165+
wg.Wait()
166+
close(errc)
167+
}()
168+
169+
var errors []error
170+
for err := range errc {
171+
errors = append(errors, err)
172+
}
173+
return utilerrors.NewAggregate(errors)
174+
}
175+
176+
func (m *migrator) worker(stop <-chan struct{}, workc <-chan *unstructured.Unstructured, errc chan<- error) {
177+
for item := range workc {
178+
err := m.migrateOneItem(item)
179+
if err != nil {
180+
select {
181+
case errc <- err:
182+
continue
183+
case <-stop:
184+
return
185+
}
186+
}
187+
}
188+
}
189+
190+
func (m *migrator) migrateOneItem(item *unstructured.Unstructured) error {
191+
namespace, err := metadataAccessor.Namespace(item)
192+
if err != nil {
193+
return err
194+
}
195+
name, err := metadataAccessor.Name(item)
196+
if err != nil {
197+
return err
198+
}
199+
getBeforePut := false
200+
for {
201+
getBeforePut, err = m.try(namespace, name, item, getBeforePut)
202+
if err == nil || errors.IsNotFound(err) {
203+
return nil
204+
}
205+
if canRetry(err) {
206+
if seconds, delay := errors.SuggestsClientDelay(err); delay {
207+
time.Sleep(time.Duration(seconds) * time.Second)
208+
}
209+
continue
210+
}
211+
// error is not retriable
212+
return err
213+
}
214+
}
215+
216+
// try tries to migrate the single object by PUT. It refreshes the object via
217+
// GET if "get" is true. If the PUT fails due to conflicts, or the GET fails,
218+
// the function requests the next try to GET the new object.
219+
func (m *migrator) try(namespace, name string, item *unstructured.Unstructured, get bool) (bool, error) {
220+
var err error
221+
if get {
222+
item, err = m.get(namespace, name)
223+
if err != nil {
224+
return true, err
225+
}
226+
}
227+
_, err = m.put(namespace, name, item)
228+
if err == nil {
229+
return false, nil
230+
}
231+
return errors.IsConflict(err), err
232+
233+
// TODO: The oc admin uses a defer function to do bandwidth limiting
234+
// after doing all operations. The rate limiter is marked as an alpha
235+
// feature. Is it better than the built-in qps limit in the REST
236+
// client? Maybe it's necessary because not all resource types are of
237+
// the same size?
238+
}
239+
240+
// TODO: move this helper to "k8s.io/apimachinery/pkg/api/errors"
241+
func inconsistentContinueToken(err error) (string, error) {
242+
status, ok := err.(errors.APIStatus)
243+
if !ok {
244+
return "", fmt.Errorf("expected error to implement the APIStatus interface, got %v", reflect.TypeOf(err))
245+
}
246+
token := status.Status().ListMeta.Continue
247+
if len(token) == 0 {
248+
return "", fmt.Errorf("expected non empty continue token")
249+
}
250+
return token, nil
251+
}

0 commit comments

Comments
 (0)