Skip to content

Commit 8117599

Browse files
author
Chao Xu
committed
using dynamic client
1 parent d3a61c4 commit 8117599

File tree

3 files changed

+256
-157
lines changed

3 files changed

+256
-157
lines changed

pkg/migrator/core.go

+73-71
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,24 @@ import (
2020
"fmt"
2121
"reflect"
2222
"sync"
23+
"time"
2324

2425
"k8s.io/apimachinery/pkg/api/errors"
2526
"k8s.io/apimachinery/pkg/api/meta"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27-
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29+
"k8s.io/apimachinery/pkg/runtime/schema"
2830
utilerrors "k8s.io/apimachinery/pkg/util/errors"
29-
"k8s.io/client-go/rest"
31+
"k8s.io/client-go/dynamic"
3032
)
3133

3234
var metadataAccessor = meta.NewAccessor()
3335

36+
const (
37+
defaultChunkLimit = 500
38+
defaultConcurrency = 1
39+
)
40+
3441
type progressTracker interface {
3542
save(continueToken string) error
3643
load() (continueToken string, err error)
@@ -47,55 +54,43 @@ func (d *dummyProgress) save(continueToken string) error {
4754
}
4855

4956
type migrator struct {
50-
resource string
51-
namespaceScoped bool
52-
// Creator is responsible to configure a rest client. This includes
53-
// creating a working serializer.
54-
// We don't use dynamic client because it only supports json, which
55-
// increases the memory usage of the apiserver. When we support
56-
// migrating CRD or aggregated apis, we need to plumb in a dynamic
57-
// client as a backup.
58-
client rest.Interface
57+
resource schema.GroupVersionResource
58+
client dynamic.Interface
5959
progress progressTracker
6060
concurrency int
6161
}
6262

6363
// NewMigrator creates a migrator that can migrate a single resource type.
64-
func NewMigrator(resource string, namespaceScoped bool, client rest.Interface) *migrator {
64+
func NewMigrator(resource schema.GroupVersionResource, client dynamic.Interface) *migrator {
6565
return &migrator{
66-
resource: resource,
67-
namespaceScoped: namespaceScoped,
68-
client: client,
69-
progress: &dummyProgress{},
70-
concurrency: 5,
66+
resource: resource,
67+
client: client,
68+
progress: &dummyProgress{},
69+
concurrency: defaultConcurrency,
7170
}
7271
}
7372

74-
// get executes the GET request, but it does not decode the result.
75-
func (m *migrator) get(namespace, name string) rest.Result {
76-
req := m.client.Get().
77-
NamespaceIfScoped(namespace, m.namespaceScoped).
73+
func (m *migrator) get(namespace, name string) (*unstructured.Unstructured, error) {
74+
// if namespace is empty, .Namespace(namespace) is ineffective.
75+
return m.client.
7876
Resource(m.resource).
79-
Name(name)
80-
return req.Do()
77+
Namespace(namespace).
78+
Get(name, metav1.GetOptions{})
8179
}
8280

83-
// put executes the PUT request, but it does not decode the result.
84-
func (m *migrator) put(namespace, name string, data []byte) rest.Result {
85-
req := m.client.Put().
86-
NamespaceIfScoped(namespace, m.namespaceScoped).
81+
func (m *migrator) put(namespace, name string, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
82+
// if namespace is empty, .Namespace(namespace) is ineffective.
83+
return m.client.
8784
Resource(m.resource).
88-
Name(name).
89-
Body(data)
90-
return req.Do()
85+
Namespace(namespace).
86+
Update(obj)
9187
}
9288

93-
func (m *migrator) list(options *metav1.ListOptions) (runtime.Object, error) {
94-
req := m.client.Get().
95-
NamespaceIfScoped(metav1.NamespaceAll, m.namespaceScoped).
89+
func (m *migrator) list(options metav1.ListOptions) (*unstructured.UnstructuredList, error) {
90+
return m.client.
9691
Resource(m.resource).
97-
VersionedParams(options, metav1.ParameterCodec)
98-
return req.Do().Get()
92+
Namespace(metav1.NamespaceAll).
93+
List(options)
9994
}
10095

10196
// Run migrates all the instances of the resource type managed by the migrator.
@@ -106,13 +101,13 @@ func (m *migrator) Run() error {
106101
}
107102
for {
108103
list, listError := m.list(
109-
&metav1.ListOptions{
110-
Limit: 500,
104+
metav1.ListOptions{
105+
Limit: defaultChunkLimit,
111106
Continue: continueToken,
112107
},
113108
)
114109
if listError != nil && !errors.IsResourceExpired(listError) {
115-
if canRetry(retriable(listError)) {
110+
if canRetry(listError) {
116111
continue
117112
}
118113
return listError
@@ -141,19 +136,15 @@ func (m *migrator) Run() error {
141136
}
142137
}
143138

144-
func (m *migrator) migrateList(l runtime.Object) error {
139+
func (m *migrator) migrateList(l *unstructured.UnstructuredList) error {
145140
stop := make(chan struct{})
146141
defer close(stop)
147-
items, err := meta.ExtractList(l)
148-
if err != nil {
149-
return err
150-
}
151-
workc := make(chan runtime.Object)
142+
workc := make(chan *unstructured.Unstructured)
152143
go func() {
153144
defer close(workc)
154-
for _, item := range items {
145+
for i := range l.Items {
155146
select {
156-
case workc <- item:
147+
case workc <- &l.Items[i]:
157148
case <-stop:
158149
return
159150
}
@@ -182,16 +173,10 @@ func (m *migrator) migrateList(l runtime.Object) error {
182173
return utilerrors.NewAggregate(errors)
183174
}
184175

185-
func (m *migrator) worker(stop <-chan struct{}, workc <-chan runtime.Object, errc chan<- error) {
176+
func (m *migrator) worker(stop <-chan struct{}, workc <-chan *unstructured.Unstructured, errc chan<- error) {
186177
for item := range workc {
187-
for {
188-
err := m.try(item)
189-
if err == nil {
190-
break
191-
}
192-
if canRetry(err) {
193-
continue
194-
}
178+
err := m.migrateOneItem(item)
179+
if err != nil {
195180
select {
196181
case errc <- err:
197182
continue
@@ -202,37 +187,54 @@ func (m *migrator) worker(stop <-chan struct{}, workc <-chan runtime.Object, err
202187
}
203188
}
204189

205-
// try tries to migrate the single object by GET and then PUT.
206-
func (m *migrator) try(work runtime.Object) error {
207-
namespace, err := metadataAccessor.Namespace(work)
190+
func (m *migrator) migrateOneItem(item *unstructured.Unstructured) error {
191+
namespace, err := metadataAccessor.Namespace(item)
208192
if err != nil {
209193
return err
210194
}
211-
name, err := metadataAccessor.Name(work)
195+
name, err := metadataAccessor.Name(item)
212196
if err != nil {
213197
return err
214198
}
215-
get := m.get(namespace, name)
216-
data, err := get.Raw()
217-
if err != nil {
218-
return retriable(get.Error())
199+
getBeforePut := false
200+
for {
201+
getBeforePut, err = m.try(namespace, name, item, getBeforePut)
202+
if err == nil || errors.IsNotFound(err) {
203+
return nil
204+
}
205+
if canRetry(err) {
206+
if seconds, delay := errors.SuggestsClientDelay(err); delay {
207+
time.Sleep(time.Duration(seconds) * time.Second)
208+
}
209+
continue
210+
}
211+
// error is not retriable
212+
return err
219213
}
214+
}
220215

221-
put := m.put(namespace, name, data)
222-
if err := put.Error(); err != nil {
223-
return retriable(put.Error())
216+
// try tries to migrate the single object by PUT. It refreshes the object via
217+
// GET if "get" is true. If the PUT fails due to conflicts, or the GET fails,
218+
// the function requests the next try to GET the new object.
219+
func (m *migrator) try(namespace, name string, item *unstructured.Unstructured, get bool) (bool, error) {
220+
var err error
221+
if get {
222+
item, err = m.get(namespace, name)
223+
if err != nil {
224+
return true, err
225+
}
224226
}
225-
226-
// TODO: The oc migrator tried to count if the update causes changes.
227-
// To do so it needs to decode the PUT response and extract the
228-
// resourceVersion.
227+
_, err = m.put(namespace, name, item)
228+
if err == nil {
229+
return false, nil
230+
}
231+
return errors.IsConflict(err), err
229232

230233
// TODO: The oc admin uses a defer function to do bandwidth limiting
231234
// after doing all operations. The rate limiter is marked as an alpha
232235
// feature. Is it better than the built-in qps limit in the REST
233236
// client? Maybe it's necessary because not all resource types are of
234237
// the same size?
235-
return nil
236238
}
237239

238240
// TODO: move this helper to "k8s.io/apimachinery/pkg/api/errors"

0 commit comments

Comments
 (0)