Skip to content

Commit 27b9ab6

Browse files
committed
MFOJTIK: remove kubectl dependency from apps apiserver
1 parent 482efe8 commit 27b9ab6

File tree

3 files changed

+120
-63
lines changed

3 files changed

+120
-63
lines changed

pkg/apps/apiserver/apiserver.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"k8s.io/apimachinery/pkg/runtime/serializer"
99
"k8s.io/apiserver/pkg/registry/rest"
1010
genericapiserver "k8s.io/apiserver/pkg/server"
11+
"k8s.io/client-go/kubernetes"
1112
restclient "k8s.io/client-go/rest"
1213
"k8s.io/kubernetes/pkg/api/legacyscheme"
1314
kclientsetinternal "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@@ -114,6 +115,10 @@ func (c *completedConfig) newV1RESTStorage() (map[string]rest.Storage, error) {
114115
if err != nil {
115116
return nil, err
116117
}
118+
kubeClient, err := kubernetes.NewForConfig(c.ExtraConfig.KubeAPIServerClientConfig)
119+
if err != nil {
120+
return nil, err
121+
}
117122

118123
deployConfigStorage, deployConfigStatusStorage, deployConfigScaleStorage, err := deployconfigetcd.NewREST(c.GenericConfig.RESTOptionsGetter)
119124
if err != nil {
@@ -133,7 +138,7 @@ func (c *completedConfig) newV1RESTStorage() (map[string]rest.Storage, error) {
133138
v1Storage["deploymentConfigs/scale"] = deployConfigScaleStorage
134139
v1Storage["deploymentConfigs/status"] = deployConfigStatusStorage
135140
v1Storage["deploymentConfigs/rollback"] = deployConfigRollbackStorage
136-
v1Storage["deploymentConfigs/log"] = deploylogregistry.NewREST(openshiftInternalAppsClient.Apps(), kubeInternalClient.Core(), kubeInternalClient.Core())
141+
v1Storage["deploymentConfigs/log"] = deploylogregistry.NewREST(openshiftInternalAppsClient.Apps(), kubeInternalClient.Core(), kubeClient.CoreV1())
137142
v1Storage["deploymentConfigs/instantiate"] = dcInstantiateStorage
138143
return v1Storage, nil
139144
}

pkg/apps/registry/deploylog/rest.go

+114-24
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,28 @@ package deploylog
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sort"
78
"time"
89

910
"github.com/golang/glog"
10-
kapiv1 "k8s.io/api/core/v1"
11-
"k8s.io/apimachinery/pkg/api/errors"
11+
12+
corev1 "k8s.io/api/core/v1"
13+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1214
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/fields"
1316
"k8s.io/apimachinery/pkg/labels"
1417
"k8s.io/apimachinery/pkg/runtime"
1518
"k8s.io/apimachinery/pkg/util/wait"
19+
"k8s.io/apimachinery/pkg/watch"
1620
apirequest "k8s.io/apiserver/pkg/endpoints/request"
1721
genericrest "k8s.io/apiserver/pkg/registry/generic/rest"
1822
"k8s.io/apiserver/pkg/registry/rest"
23+
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
1924
kapi "k8s.io/kubernetes/pkg/apis/core"
2025
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
2126
"k8s.io/kubernetes/pkg/controller"
22-
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
2327

2428
apiserverrest "github.com/openshift/origin/pkg/apiserver/rest"
2529
appsapi "github.com/openshift/origin/pkg/apps/apis/apps"
@@ -40,12 +44,12 @@ const (
4044
type REST struct {
4145
dcClient appsclient.DeploymentConfigsGetter
4246
rcClient kcoreclient.ReplicationControllersGetter
43-
podClient kcoreclient.PodsGetter
47+
podClient corev1client.PodsGetter
4448
timeout time.Duration
4549
interval time.Duration
4650

4751
// for unit testing
48-
getLogsFn func(podNamespace, podName string, logOpts *kapi.PodLogOptions) (runtime.Object, error)
52+
getLogsFn func(podNamespace, podName string, logOpts *corev1.PodLogOptions) (runtime.Object, error)
4953
}
5054

5155
// REST implements GetterWithOptions
@@ -55,7 +59,7 @@ var _ = rest.GetterWithOptions(&REST{})
5559
// one for deployments (replication controllers) and one for pods to get the necessary
5660
// attributes to assemble the URL to which the request shall be redirected in order to
5761
// get the deployment logs.
58-
func NewREST(dcClient appsclient.DeploymentConfigsGetter, rcClient kcoreclient.ReplicationControllersGetter, podClient kcoreclient.PodsGetter) *REST {
62+
func NewREST(dcClient appsclient.DeploymentConfigsGetter, rcClient kcoreclient.ReplicationControllersGetter, podClient corev1client.PodsGetter) *REST {
5963
r := &REST{
6064
dcClient: dcClient,
6165
rcClient: rcClient,
@@ -83,27 +87,27 @@ func (r *REST) Get(ctx context.Context, name string, opts runtime.Object) (runti
8387
// Ensure we have a namespace in the context
8488
namespace, ok := apirequest.NamespaceFrom(ctx)
8589
if !ok {
86-
return nil, errors.NewBadRequest("namespace parameter required.")
90+
return nil, apierrors.NewBadRequest("namespace parameter required.")
8791
}
8892

8993
// Validate DeploymentLogOptions
9094
deployLogOpts, ok := opts.(*appsapi.DeploymentLogOptions)
9195
if !ok {
92-
return nil, errors.NewBadRequest("did not get an expected options.")
96+
return nil, apierrors.NewBadRequest("did not get an expected options.")
9397
}
9498
if errs := validation.ValidateDeploymentLogOptions(deployLogOpts); len(errs) > 0 {
95-
return nil, errors.NewInvalid(appsapi.Kind("DeploymentLogOptions"), "", errs)
99+
return nil, apierrors.NewInvalid(appsapi.Kind("DeploymentLogOptions"), "", errs)
96100
}
97101

98102
// Fetch deploymentConfig and check latest version; if 0, there are no deployments
99103
// for this config
100104
config, err := r.dcClient.DeploymentConfigs(namespace).Get(name, metav1.GetOptions{})
101105
if err != nil {
102-
return nil, errors.NewNotFound(appsapi.Resource("deploymentconfig"), name)
106+
return nil, apierrors.NewNotFound(appsapi.Resource("deploymentconfig"), name)
103107
}
104108
desiredVersion := config.Status.LatestVersion
105109
if desiredVersion == 0 {
106-
return nil, errors.NewBadRequest(fmt.Sprintf("no deployment exists for deploymentConfig %q", config.Name))
110+
return nil, apierrors.NewBadRequest(fmt.Sprintf("no deployment exists for deploymentConfig %q", config.Name))
107111
}
108112

109113
// Support retrieving logs for older deployments
@@ -113,12 +117,12 @@ func (r *REST) Get(ctx context.Context, name string, opts runtime.Object) (runti
113117
if deployLogOpts.Previous {
114118
desiredVersion--
115119
if desiredVersion < 1 {
116-
return nil, errors.NewBadRequest(fmt.Sprintf("no previous deployment exists for deploymentConfig %q", config.Name))
120+
return nil, apierrors.NewBadRequest(fmt.Sprintf("no previous deployment exists for deploymentConfig %q", config.Name))
117121
}
118122
}
119123
case *deployLogOpts.Version <= 0 || *deployLogOpts.Version > config.Status.LatestVersion:
120124
// Invalid version
121-
return nil, errors.NewBadRequest(fmt.Sprintf("invalid version for deploymentConfig %q: %d", config.Name, *deployLogOpts.Version))
125+
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid version for deploymentConfig %q: %d", config.Name, *deployLogOpts.Version))
122126
default:
123127
desiredVersion = *deployLogOpts.Version
124128
}
@@ -143,16 +147,16 @@ func (r *REST) Get(ctx context.Context, name string, opts runtime.Object) (runti
143147
}
144148
glog.V(4).Infof("Deployment %s is in %s state, waiting for it to start...", appsutil.LabelForDeployment(target), status)
145149

146-
if err := appsutil.WaitForRunningDeployerPod(r.podClient, target, r.timeout); err != nil {
147-
return nil, errors.NewBadRequest(fmt.Sprintf("failed to run deployer pod %s: %v", podName, err))
150+
if err := WaitForRunningDeployerPod(r.podClient, target, r.timeout); err != nil {
151+
return nil, apierrors.NewBadRequest(fmt.Sprintf("failed to run deployer pod %s: %v", podName, err))
148152
}
149153

150154
latest, ok, err := registry.WaitForRunningDeployment(r.rcClient, target, r.timeout)
151155
if err != nil {
152-
return nil, errors.NewBadRequest(fmt.Sprintf("unable to wait for deployment %s to run: %v", appsutil.LabelForDeployment(target), err))
156+
return nil, apierrors.NewBadRequest(fmt.Sprintf("unable to wait for deployment %s to run: %v", appsutil.LabelForDeployment(target), err))
153157
}
154158
if !ok {
155-
return nil, errors.NewServerTimeout(kapi.Resource("ReplicationController"), "get", 2)
159+
return nil, apierrors.NewServerTimeout(kapi.Resource("ReplicationController"), "get", 2)
156160
}
157161
if appsutil.IsCompleteDeployment(latest) {
158162
podName, err = r.returnApplicationPodName(target)
@@ -167,11 +171,11 @@ func (r *REST) Get(ctx context.Context, name string, opts runtime.Object) (runti
167171
}
168172
}
169173

170-
logOpts := appsapi.DeploymentToPodLogOptions(deployLogOpts)
174+
logOpts := DeploymentToPodLogOptions(deployLogOpts)
171175
return r.getLogsFn(namespace, podName, logOpts)
172176
}
173177

174-
func (r *REST) getLogs(podNamespace, podName string, logOpts *kapi.PodLogOptions) (runtime.Object, error) {
178+
func (r *REST) getLogs(podNamespace, podName string, logOpts *corev1.PodLogOptions) (runtime.Object, error) {
175179
logRequest := r.podClient.Pods(podNamespace).GetLogs(podName, logOpts)
176180

177181
readerCloser, err := logRequest.Stream()
@@ -196,7 +200,7 @@ func (r *REST) waitForExistingDeployment(namespace, name string) (*kapi.Replicat
196200
condition := func() (bool, error) {
197201
target, err = r.rcClient.ReplicationControllers(namespace).Get(name, metav1.GetOptions{})
198202
switch {
199-
case errors.IsNotFound(err):
203+
case apierrors.IsNotFound(err):
200204
return false, nil
201205
case err != nil:
202206
return false, err
@@ -206,7 +210,7 @@ func (r *REST) waitForExistingDeployment(namespace, name string) (*kapi.Replicat
206210

207211
err = wait.PollImmediate(r.interval, r.timeout, condition)
208212
if err == wait.ErrWaitTimeout {
209-
err = errors.NewNotFound(kapi.Resource("replicationcontrollers"), name)
213+
err = apierrors.NewNotFound(kapi.Resource("replicationcontrollers"), name)
210214
}
211215
return target, err
212216
}
@@ -215,11 +219,97 @@ func (r *REST) waitForExistingDeployment(namespace, name string) (*kapi.Replicat
215219
// view its logs.
216220
func (r *REST) returnApplicationPodName(target *kapi.ReplicationController) (string, error) {
217221
selector := labels.SelectorFromValidatedSet(labels.Set(target.Spec.Selector))
218-
sortBy := func(pods []*kapiv1.Pod) sort.Interface { return controller.ByLogging(pods) }
222+
sortBy := func(pods []*corev1.Pod) sort.Interface { return controller.ByLogging(pods) }
219223

220-
firstPod, _, err := kcmdutil.GetFirstPod(r.podClient, target.Namespace, selector.String(), r.timeout, sortBy)
224+
firstPod, _, err := GetFirstPod(r.podClient, target.Namespace, selector.String(), r.timeout, sortBy)
221225
if err != nil {
222-
return "", errors.NewInternalError(err)
226+
return "", apierrors.NewInternalError(err)
223227
}
224228
return firstPod.Name, nil
225229
}
230+
231+
// GetFirstPod returns a pod matching the namespace and label selector
232+
// and the number of all pods that match the label selector.
233+
func GetFirstPod(client corev1client.PodsGetter, namespace string, selector string, timeout time.Duration, sortBy func([]*corev1.Pod) sort.Interface) (*corev1.Pod, int, error) {
234+
options := metav1.ListOptions{LabelSelector: selector}
235+
236+
podList, err := client.Pods(namespace).List(options)
237+
if err != nil {
238+
return nil, 0, err
239+
}
240+
pods := []*corev1.Pod{}
241+
for i := range podList.Items {
242+
pods = append(pods, &podList.Items[i])
243+
}
244+
if len(pods) > 0 {
245+
sort.Sort(sortBy(pods))
246+
return pods[0], len(podList.Items), nil
247+
}
248+
249+
// Watch until we observe a pod
250+
options.ResourceVersion = podList.ResourceVersion
251+
w, err := client.Pods(namespace).Watch(options)
252+
if err != nil {
253+
return nil, 0, err
254+
}
255+
defer w.Stop()
256+
257+
condition := func(event watch.Event) (bool, error) {
258+
return event.Type == watch.Added || event.Type == watch.Modified, nil
259+
}
260+
event, err := watch.Until(timeout, w, condition)
261+
if err != nil {
262+
return nil, 0, err
263+
}
264+
pod, ok := event.Object.(*corev1.Pod)
265+
if !ok {
266+
return nil, 0, fmt.Errorf("%#v is not a pod event", event)
267+
}
268+
return pod, 1, nil
269+
}
270+
271+
// WaitForRunningDeployerPod waits a given period of time until the deployer pod
272+
// for given replication controller is not running.
273+
func WaitForRunningDeployerPod(podClient corev1client.PodsGetter, rc *kapi.ReplicationController, timeout time.Duration) error {
274+
podName := appsutil.DeployerPodNameForDeployment(rc.Name)
275+
canGetLogs := func(p *corev1.Pod) bool {
276+
return corev1.PodSucceeded == p.Status.Phase || corev1.PodFailed == p.Status.Phase || corev1.PodRunning == p.Status.Phase
277+
}
278+
pod, err := podClient.Pods(rc.Namespace).Get(podName, metav1.GetOptions{})
279+
if err == nil && canGetLogs(pod) {
280+
return nil
281+
}
282+
watcher, err := podClient.Pods(rc.Namespace).Watch(
283+
metav1.ListOptions{
284+
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
285+
},
286+
)
287+
if err != nil {
288+
return err
289+
}
290+
291+
defer watcher.Stop()
292+
_, err = watch.Until(timeout, watcher, func(e watch.Event) (bool, error) {
293+
if e.Type == watch.Error {
294+
return false, fmt.Errorf("encountered error while watching for pod: %v", e.Object)
295+
}
296+
obj, isPod := e.Object.(*corev1.Pod)
297+
if !isPod {
298+
return false, errors.New("received unknown object while watching for pods")
299+
}
300+
return canGetLogs(obj), nil
301+
})
302+
return err
303+
}
304+
305+
func DeploymentToPodLogOptions(opts *appsapi.DeploymentLogOptions) *corev1.PodLogOptions {
306+
return &corev1.PodLogOptions{
307+
Container: opts.Container,
308+
Follow: opts.Follow,
309+
SinceSeconds: opts.SinceSeconds,
310+
SinceTime: opts.SinceTime,
311+
Timestamps: opts.Timestamps,
312+
TailLines: opts.TailLines,
313+
LimitBytes: opts.LimitBytes,
314+
}
315+
}

pkg/apps/util/util.go

-38
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package util
22

33
import (
4-
"errors"
54
"fmt"
65
"reflect"
76
"sort"
@@ -12,13 +11,11 @@ import (
1211
"k8s.io/api/core/v1"
1312
"k8s.io/apimachinery/pkg/api/meta"
1413
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15-
"k8s.io/apimachinery/pkg/fields"
1614
"k8s.io/apimachinery/pkg/labels"
1715
"k8s.io/apimachinery/pkg/runtime"
1816
"k8s.io/apimachinery/pkg/runtime/schema"
1917
"k8s.io/apimachinery/pkg/util/diff"
2018
"k8s.io/apimachinery/pkg/util/sets"
21-
"k8s.io/apimachinery/pkg/watch"
2219
"k8s.io/client-go/dynamic"
2320
scaleclient "k8s.io/client-go/scale"
2421
"k8s.io/kubernetes/pkg/api/legacyscheme"
@@ -27,7 +24,6 @@ import (
2724
kapi "k8s.io/kubernetes/pkg/apis/core"
2825
kapiv1 "k8s.io/kubernetes/pkg/apis/core/v1"
2926
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
30-
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
3127
kdeplutil "k8s.io/kubernetes/pkg/controller/deployment/util"
3228
"k8s.io/kubernetes/pkg/kubectl"
3329

@@ -783,40 +779,6 @@ func RolloutExceededTimeoutSeconds(config *appsapi.DeploymentConfig, latestRC *v
783779
return int64(time.Since(latestRC.CreationTimestamp.Time).Seconds()) > timeoutSeconds
784780
}
785781

786-
// WaitForRunningDeployerPod waits a given period of time until the deployer pod
787-
// for given replication controller is not running.
788-
func WaitForRunningDeployerPod(podClient kcoreclient.PodsGetter, rc *api.ReplicationController, timeout time.Duration) error {
789-
podName := DeployerPodNameForDeployment(rc.Name)
790-
canGetLogs := func(p *api.Pod) bool {
791-
return api.PodSucceeded == p.Status.Phase || api.PodFailed == p.Status.Phase || api.PodRunning == p.Status.Phase
792-
}
793-
pod, err := podClient.Pods(rc.Namespace).Get(podName, metav1.GetOptions{})
794-
if err == nil && canGetLogs(pod) {
795-
return nil
796-
}
797-
watcher, err := podClient.Pods(rc.Namespace).Watch(
798-
metav1.ListOptions{
799-
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
800-
},
801-
)
802-
if err != nil {
803-
return err
804-
}
805-
806-
defer watcher.Stop()
807-
_, err = watch.Until(timeout, watcher, func(e watch.Event) (bool, error) {
808-
if e.Type == watch.Error {
809-
return false, fmt.Errorf("encountered error while watching for pod: %v", e.Object)
810-
}
811-
obj, isPod := e.Object.(*api.Pod)
812-
if !isPod {
813-
return false, errors.New("received unknown object while watching for pods")
814-
}
815-
return canGetLogs(obj), nil
816-
})
817-
return err
818-
}
819-
820782
// ByLatestVersionAsc sorts deployments by LatestVersion ascending.
821783
type ByLatestVersionAsc []*api.ReplicationController
822784

0 commit comments

Comments
 (0)