Skip to content

Commit 617c147

Browse files
authored
Merge pull request #66 from wrongerror/main
add route to subscription when using http service & bugfix
2 parents 28cbda8 + 86f7bcc commit 617c147

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

context/context.go

+2
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,8 @@ func (ctx *FunctionContext) GetNativeContext() context.Context {
517517
}
518518

519519
func (ctx *FunctionContext) SetNativeContext(c context.Context) {
520+
ctx.mu.Lock()
521+
defer ctx.mu.Unlock()
520522
ctx.Ctx = c
521523
}
522524

runtime/async/async.go

+17-12
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
daprruntime "github.com/dapr/dapr/pkg/runtime"
1111
dapr "github.com/dapr/go-sdk/service/common"
1212
grpcsvc "github.com/dapr/go-sdk/service/grpc"
13-
httpsvc "github.com/dapr/go-sdk/service/grpc"
13+
httpsvc "github.com/dapr/go-sdk/service/http"
1414
"k8s.io/klog/v2"
1515

1616
ofctx "github.com/OpenFunction/functions-framework-go/context"
@@ -25,6 +25,7 @@ const (
2525
)
2626

2727
type Runtime struct {
28+
protocol string
2829
port string
2930
pattern string
3031
handler dapr.Service
@@ -49,24 +50,23 @@ func NewAsyncRuntime(port string, pattern string) (*Runtime, error) {
4950
}, nil
5051
}
5152

52-
var newService func(address string) (s dapr.Service, err error)
53+
var handler dapr.Service
5354
protocol := daprruntime.Protocol(os.Getenv(protocolEnvVar))
5455
switch protocol {
5556
case daprruntime.HTTPProtocol:
56-
newService = httpsvc.NewService
57-
case daprruntime.GRPCProtocol:
58-
newService = grpcsvc.NewService
57+
handler = httpsvc.NewService(fmt.Sprintf(":%s", port))
5958
default:
6059
protocol = daprruntime.GRPCProtocol
61-
newService = grpcsvc.NewService
62-
}
63-
handler, err := newService(fmt.Sprintf(":%s", port))
64-
if err != nil {
65-
klog.Errorf("failed to create dapr %v service: %v\n", protocol, err)
66-
return nil, err
60+
service, err := grpcsvc.NewService(fmt.Sprintf(":%s", port))
61+
if err != nil {
62+
klog.Errorf("failed to create dapr grpc service: %v\n", err)
63+
return nil, err
64+
}
65+
handler = service
6766
}
6867

6968
return &Runtime{
69+
protocol: string(protocol),
7070
port: port,
7171
pattern: pattern,
7272
handler: handler,
@@ -75,7 +75,7 @@ func NewAsyncRuntime(port string, pattern string) (*Runtime, error) {
7575
}
7676

7777
func (r *Runtime) Start(ctx context.Context) error {
78-
klog.Infof("Async Function serving grpc: listening on port %s", r.port)
78+
klog.Infof("Async Function serving %s: listening on port %s", r.protocol, r.port)
7979
klog.Fatal(r.handler.Start())
8080
return nil
8181
}
@@ -121,6 +121,7 @@ func (r *Runtime) RegisterOpenFunction(
121121
input.Uri = input.ComponentName
122122
funcErr = r.handler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) {
123123
rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins)
124+
rm.FuncContext.SetNativeContext(c)
124125
rm.FuncContext.SetEvent(n, in)
125126
rm.FunctionRunWrapperWithHooks(rf.GetOpenFunctionFunction())
126127

@@ -141,8 +142,12 @@ func (r *Runtime) RegisterOpenFunction(
141142
PubsubName: input.ComponentName,
142143
Topic: input.Uri,
143144
}
145+
if r.protocol == string(daprruntime.HTTPProtocol) {
146+
sub.Route = fmt.Sprintf("/%s", input.Uri)
147+
}
144148
funcErr = r.handler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) {
145149
rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins)
150+
rm.FuncContext.SetNativeContext(c)
146151
rm.FuncContext.SetEvent(n, e)
147152
rm.FunctionRunWrapperWithHooks(rf.GetOpenFunctionFunction())
148153

0 commit comments

Comments
 (0)