Skip to content
This repository was archived by the owner on Jun 7, 2021. It is now read-only.

Knative Event wiring #12

Merged
merged 2 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 205 additions & 113 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ required = [
"k8s.io/gengo/args",
"sigs.k8s.io/controller-tools/pkg/crd/generator",
]

[[override]]
name = "github.com/knative/serving"
branch = "master"

[[override]]
name = "github.com/tekton/pipeline"
Expand Down
10 changes: 8 additions & 2 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"runtime"

// Import knative types
knv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1"
knv1beta1 "github.com/knative/serving/pkg/apis/serving/v1beta1"
kneventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
knv1alpha1 "knative.dev/serving/pkg/apis/serving/v1alpha1"
knv1beta1 "knative.dev/serving/pkg/apis/serving/v1beta1"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down Expand Up @@ -117,6 +118,11 @@ func main() {
os.Exit(1)
}

if err := kneventing.AddToScheme(mgr.GetScheme()); err != nil {
log.Error(err, "Can't register the knative eventing scheme")
os.Exit(1)
}

// Setup all Controllers
if err := controller.AddToManager(mgr); err != nil {
log.Error(err, "")
Expand Down
3 changes: 2 additions & 1 deletion deploy/crds/faas_v1alpha1_jsfunction_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ spec:
type: object
spec:
properties:
events:
type: boolean
func:
type: string
package:
type: string
required:
- func
- package
type: object
status:
properties:
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/faas/v1alpha1/jsfunction_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
// +k8s:openapi-gen=true
type JSFunctionSpec struct {
Func string `json:"func"`
Package string `json:"package"`
Package string `json:"package,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this as part of the PR :)

Events bool `json:"events,omitempty"`
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file
// Add custom validation using kubebuilder tags: https://book.kubebuilder.io/beyond_basics/generating_crd.html
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/faas/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions pkg/apis/faas/v1alpha1/zz_generated.openapi.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// +build !ignore_autogenerated

// Code generated by openapi-gen. DO NOT EDIT.

// This file was autogenerated by openapi-gen. Do not edit it manually!

package v1alpha1
Expand Down Expand Up @@ -80,8 +78,14 @@ func schema_pkg_apis_faas_v1alpha1_JSFunctionSpec(ref common.ReferenceCallback)
Format: "",
},
},
"events": {
SchemaProps: spec.SchemaProps{
Type: []string{"boolean"},
Format: "",
},
},
},
Required: []string{"func", "package"},
Required: []string{"func"},
},
},
Dependencies: []string{},
Expand Down
154 changes: 145 additions & 9 deletions pkg/controller/jsfunction/jsfunction_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"fmt"

knv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1"
knv1beta1 "github.com/knative/serving/pkg/apis/serving/v1beta1"
kneventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
knv1alpha1 "knative.dev/serving/pkg/apis/serving/v1alpha1"
knv1beta1 "knative.dev/serving/pkg/apis/serving/v1beta1"

faasv1alpha1 "github.com/openshift-cloud-functions/js-function-operator/pkg/apis/faas/v1alpha1"

Expand Down Expand Up @@ -60,6 +61,22 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// Watch for changes to secondary resources KnChanel and KnSubscription and requeue the owner JSFunction
err = c.Watch(&source.Kind{Type: &kneventing.Channel{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &faasv1alpha1.JSFunction{},
})
if err != nil {
return err
}
err = c.Watch(&source.Kind{Type: &kneventing.Subscription{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &faasv1alpha1.JSFunction{},
})
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -100,8 +117,8 @@ func (r *ReconcileJSFunction) Reconcile(request reconcile.Request) (reconcile.Re
}

// Check if a Service for this JSFunction already exists, if not create a new one
found := &knv1alpha1.Service{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: function.Name, Namespace: function.Namespace}, found)
knService := &knv1alpha1.Service{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: function.Name, Namespace: function.Namespace}, knService)
if err != nil && errors.IsNotFound(err) {
// No service for this function exists. Create a new one

Expand All @@ -113,7 +130,7 @@ func (r *ReconcileJSFunction) Reconcile(request reconcile.Request) (reconcile.Re
}
err = r.client.Create(context.TODO(), configMap)
if err != nil {
reqLogger.Error(err, "Failed to create new ConfigMap holding function.", "Service.Namespace", configMap.Namespace, "ConfigMap.Name", configMap.Name)
reqLogger.Error(err, "Failed to create new ConfigMap holding function.", "ConfigMap.Namespace", configMap.Namespace, "ConfigMap.Name", configMap.Name)
return reconcile.Result{}, err
}

Expand All @@ -137,18 +154,83 @@ func (r *ReconcileJSFunction) Reconcile(request reconcile.Request) (reconcile.Re
return reconcile.Result{}, err
}

reqLogger.Info("JSFunction Service exists.", "Service.Namespace", found.Namespace, "Service.Name", found.Name)
/////// Knative Eventing section
// Create or delete Channel
knChannel := &kneventing.Channel{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: function.Name, Namespace: function.Namespace}, knChannel)
if err != nil && errors.IsNotFound(err) {
if function.Spec.Events {
// Create channel
channel, err := r.channelForFunction(function)
if err != nil {
return reconcile.Result{}, err
}
err = r.client.Create(context.TODO(), channel)
if err != nil {
reqLogger.Error(err, "Failed to create new Channel.", "Channel.Namespace", channel.Namespace, "Channel.Name", channel.Name)
return reconcile.Result{}, err
}
}
} else {
if !function.Spec.Events && knChannel.ObjectMeta.DeletionTimestamp == nil {
err = r.client.Delete(context.TODO(), knChannel)
if err != nil && !errors.IsNotFound(err) {
reqLogger.Error(err, "failed to delete Channel")
return reconcile.Result{}, err
}
}
}

// Create or delete Subscription
knSubscription := &kneventing.Subscription{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: function.Name, Namespace: function.Namespace}, knSubscription)
if err != nil && errors.IsNotFound(err) {
if function.Spec.Events {
// Create subscription
subscription, err := r.subscriptionForFunction(function)
if err != nil {
return reconcile.Result{}, err
}
err = r.client.Create(context.TODO(), subscription)
if err != nil {
reqLogger.Error(err, "Failed to create new Subscription.", "Subscription.Namespace", subscription.Namespace, "Subscription.Name", subscription.Name)
return reconcile.Result{}, err
}
}
} else {
if !function.Spec.Events && knSubscription.ObjectMeta.DeletionTimestamp == nil {
err = r.client.Delete(context.TODO(), knSubscription)
if err != nil && !errors.IsNotFound(err) {
reqLogger.Error(err, "failed to delete Subscription")
return reconcile.Result{}, err
}
}

}
///////

// TODO update the JSFunction status with the pod names
// TODO update status nodes if necessary

reqLogger.Info("JSFunction Service exists.", "Service.Namespace", knService.Namespace, "Service.Name", knService.Name)
return reconcile.Result{}, nil
}

func (r *ReconcileJSFunction) configMapWithFunction(f *faasv1alpha1.JSFunction) (*corev1.ConfigMap, error) {

data := map[string]string{"index.js": f.Spec.Func}

if f.Spec.Package != "" {
data["package.json"] = f.Spec.Package
}

// Create a config map containing the user code
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: f.Name,
Namespace: f.Namespace,
},
Data: map[string]string{"index.js": f.Spec.Func, "package.json": f.Spec.Package},
Data: data,
}
if err := controllerutil.SetControllerReference(f, configMap, r.scheme); err != nil {
return nil, err
Expand Down Expand Up @@ -191,10 +273,10 @@ func createPodSpec(functionName, configMapName string) corev1.PodSpec {
volumeName := fmt.Sprintf("%s-source", functionName)
return corev1.PodSpec{
Containers: []corev1.Container{{
Image: "docker.io/lanceball/js-runtime",
Image: "docker.io/zroubalik/js-runtime",
Name: fmt.Sprintf("nodejs-%s", functionName),
Ports: []corev1.ContainerPort{{
ContainerPort: 8181,
ContainerPort: 8080,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rhuss originally changed this from 8080 to 8181. I subsequently ran into problems using 8080, but can't recall which it was of the many problems I have run into lately. :) Not sure what it should really be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8181 didn't worked for me, it was listening on 8080

}},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -221,3 +303,57 @@ func createConfigMapVolume(volumeName, configMapName string) corev1.Volume {
},
}
}

func (r *ReconcileJSFunction) channelForFunction(f *faasv1alpha1.JSFunction) (*kneventing.Channel, error) {

channel := &kneventing.Channel{
ObjectMeta: metav1.ObjectMeta{
Name: f.Name,
Namespace: f.Namespace,
},
Spec: kneventing.ChannelSpec{
Provisioner: &corev1.ObjectReference{
Name: "in-memory",
Kind: "InMemoryChannel",
APIVersion: kneventing.SchemeGroupVersion.String(),
},
},
}

// Set JSFunction instance as the owner and controller
if err := controllerutil.SetControllerReference(f, channel, r.scheme); err != nil {
return nil, err
}
return channel, nil
}

func (r *ReconcileJSFunction) subscriptionForFunction(f *faasv1alpha1.JSFunction) (*kneventing.Subscription, error) {
subscription := &kneventing.Subscription{
ObjectMeta: metav1.ObjectMeta{
Name: f.Name,
Namespace: f.Namespace,
},

Spec: kneventing.SubscriptionSpec{
Channel: corev1.ObjectReference{
Name: f.Name,
Kind: "Channel",
APIVersion: kneventing.SchemeGroupVersion.String(),
},
Subscriber: &kneventing.SubscriberSpec{
Ref: &corev1.ObjectReference{
Name: f.Name,
Kind: "Service",
APIVersion: knv1alpha1.SchemeGroupVersion.String(),
},
},
},
}

// Set JSFunction instance as the owner and controller
if err := controllerutil.SetControllerReference(f, subscription, r.scheme); err != nil {
return nil, err
}

return subscription, nil
}