@@ -34,24 +34,21 @@ type sagaInvocation struct {
34
34
startedByRPCID string
35
35
}
36
36
37
- func (si * sagaInvocation ) setCorrelationIDs (message * gbus.BusMessage , isEvent bool , targetService string ) {
37
+ func (si * sagaInvocation ) setCorrelationIDs (message * gbus.BusMessage , targetService string , semantics gbus. Semantics ) {
38
38
39
- message .CorrelationID = si .inboundMsg .ID
40
39
message .SagaID = si .sagaID
41
40
42
- if ! isEvent {
43
- //support saga-to-saga communication
44
- if si .inboundMsg .SagaID != "" {
45
- message . SagaCorrelationID = si . inboundMsg . SagaID
46
- }
41
+ if semantics == gbus . REPLY {
42
+ message . CorrelationID = si . inboundMsg . ID
43
+ message . SagaCorrelationID = si .inboundMsg .SagaID
44
+
45
+ } else if semantics == gbus . CMD {
47
46
//if the saga is potentially invoking itself then set the SagaCorrelationID to reflect that
48
47
//https://github.com/wework/grabbit/issues/64
49
48
if targetService == si .hostingSvc {
50
49
message .SagaCorrelationID = message .SagaID
51
50
}
52
-
53
51
}
54
-
55
52
}
56
53
func (si * sagaInvocation ) HostingSvc () string {
57
54
return si .hostingSvc
@@ -63,13 +60,13 @@ func (si *sagaInvocation) InvokingSvc() string {
63
60
64
61
func (si * sagaInvocation ) Reply (ctx context.Context , message * gbus.BusMessage ) error {
65
62
_ , targetService := si .decoratedInvocation .Routing ()
66
- si .setCorrelationIDs (message , false , targetService )
63
+ si .setCorrelationIDs (message , targetService , gbus . REPLY )
67
64
return si .decoratedInvocation .Reply (ctx , message )
68
65
}
69
66
70
67
func (si * sagaInvocation ) ReplyToInitiator (ctx context.Context , message * gbus.BusMessage ) error {
71
68
72
- si .setCorrelationIDs (message , false , si .startedBy )
69
+ si .setCorrelationIDs (message , si .startedBy , gbus . REPLY )
73
70
74
71
//override the correlation ids to those of the message creating the saga
75
72
message .SagaCorrelationID = si .startedBySaga
@@ -92,13 +89,13 @@ func (si *sagaInvocation) Ctx() context.Context {
92
89
93
90
func (si * sagaInvocation ) Send (ctx context.Context , toService string ,
94
91
command * gbus.BusMessage , policies ... gbus.MessagePolicy ) error {
95
- si .setCorrelationIDs (command , false , toService )
92
+ si .setCorrelationIDs (command , toService , gbus . CMD )
96
93
return si .decoratedBus .Send (ctx , toService , command , policies ... )
97
94
}
98
95
99
96
func (si * sagaInvocation ) Publish (ctx context.Context , exchange , topic string ,
100
97
event * gbus.BusMessage , policies ... gbus.MessagePolicy ) error {
101
- si .setCorrelationIDs (event , true , "" )
98
+ si .setCorrelationIDs (event , "" , gbus . EVT )
102
99
return si .decoratedBus .Publish (ctx , exchange , topic , event , policies ... )
103
100
}
104
101
0 commit comments