diff --git a/context/context.go b/context/context.go index 483a293..69da43c 100644 --- a/context/context.go +++ b/context/context.go @@ -517,6 +517,8 @@ func (ctx *FunctionContext) GetNativeContext() context.Context { } func (ctx *FunctionContext) SetNativeContext(c context.Context) { + ctx.mu.Lock() + defer ctx.mu.Unlock() ctx.Ctx = c } diff --git a/runtime/async/async.go b/runtime/async/async.go index 3913595..efc8c87 100644 --- a/runtime/async/async.go +++ b/runtime/async/async.go @@ -10,7 +10,7 @@ import ( daprruntime "github.com/dapr/dapr/pkg/runtime" dapr "github.com/dapr/go-sdk/service/common" grpcsvc "github.com/dapr/go-sdk/service/grpc" - httpsvc "github.com/dapr/go-sdk/service/grpc" + httpsvc "github.com/dapr/go-sdk/service/http" "k8s.io/klog/v2" ofctx "github.com/OpenFunction/functions-framework-go/context" @@ -25,6 +25,7 @@ const ( ) type Runtime struct { + protocol string port string pattern string handler dapr.Service @@ -49,24 +50,23 @@ func NewAsyncRuntime(port string, pattern string) (*Runtime, error) { }, nil } - var newService func(address string) (s dapr.Service, err error) + var handler dapr.Service protocol := daprruntime.Protocol(os.Getenv(protocolEnvVar)) switch protocol { case daprruntime.HTTPProtocol: - newService = httpsvc.NewService - case daprruntime.GRPCProtocol: - newService = grpcsvc.NewService + handler = httpsvc.NewService(fmt.Sprintf(":%s", port)) default: protocol = daprruntime.GRPCProtocol - newService = grpcsvc.NewService - } - handler, err := newService(fmt.Sprintf(":%s", port)) - if err != nil { - klog.Errorf("failed to create dapr %v service: %v\n", protocol, err) - return nil, err + service, err := grpcsvc.NewService(fmt.Sprintf(":%s", port)) + if err != nil { + klog.Errorf("failed to create dapr grpc service: %v\n", err) + return nil, err + } + handler = service } return &Runtime{ + protocol: string(protocol), port: port, pattern: pattern, handler: handler, @@ -75,7 +75,7 @@ func NewAsyncRuntime(port string, pattern string) (*Runtime, error) { } func (r *Runtime) Start(ctx context.Context) error { - klog.Infof("Async Function serving grpc: listening on port %s", r.port) + klog.Infof("Async Function serving %s: listening on port %s", r.protocol, r.port) klog.Fatal(r.handler.Start()) return nil } @@ -121,6 +121,7 @@ func (r *Runtime) RegisterOpenFunction( input.Uri = input.ComponentName funcErr = r.handler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) { rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins) + rm.FuncContext.SetNativeContext(c) rm.FuncContext.SetEvent(n, in) rm.FunctionRunWrapperWithHooks(rf.GetOpenFunctionFunction()) @@ -141,8 +142,12 @@ func (r *Runtime) RegisterOpenFunction( PubsubName: input.ComponentName, Topic: input.Uri, } + if r.protocol == string(daprruntime.HTTPProtocol) { + sub.Route = fmt.Sprintf("/%s", input.Uri) + } funcErr = r.handler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) { rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins) + rm.FuncContext.SetNativeContext(c) rm.FuncContext.SetEvent(n, e) rm.FunctionRunWrapperWithHooks(rf.GetOpenFunctionFunction())