@@ -853,61 +853,68 @@ static void MtmTransReceiver(Datum arg)
853
853
if (MtmIsCoordinator (ts )) {
854
854
switch (msg -> code ) {
855
855
case MSG_READY :
856
- Assert (ts -> nVotes < Mtm -> nLiveNodes );
857
- Mtm -> nodes [msg -> node - 1 ].transDelay += MtmGetCurrentTime () - ts -> csn ;
858
- ts -> xids [msg -> node - 1 ] = msg -> sxid ;
859
-
860
- if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask ) != 0 ) {
861
- /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
862
- commit on smaller subset of nodes */
863
- elog (WARNING , "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
864
- msg -> node , (long ) Mtm -> disabledNodeMask , (long ) msg -> disabledNodeMask );
856
+ if (ts -> nVotes >= Mtm -> nLiveNodes ) {
865
857
MtmAbortTransaction (ts );
866
- }
867
-
868
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
869
- /* All nodes are finished their transactions */
870
- if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
871
- MtmWakeUpBackend (ts );
872
- } else if (MtmUseDtm ) {
873
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
874
- ts -> nVotes = 1 ; /* I voted myself */
875
- MtmSendNotificationMessage (ts , MSG_PREPARE );
876
- } else {
877
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
878
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
879
- MtmWakeUpBackend (ts );
858
+ MtmWakeUpBackend (ts );
859
+ } else {
860
+ Mtm -> nodes [msg -> node - 1 ].transDelay += MtmGetCurrentTime () - ts -> csn ;
861
+ ts -> xids [msg -> node - 1 ] = msg -> sxid ;
862
+
863
+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask ) != 0 ) {
864
+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
865
+ commit on smaller subset of nodes */
866
+ elog (WARNING , "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx" ,
867
+ msg -> node , (long ) Mtm -> disabledNodeMask , (long ) msg -> disabledNodeMask );
868
+ MtmAbortTransaction (ts );
869
+ }
870
+
871
+ if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
872
+ /* All nodes are finished their transactions */
873
+ if (ts -> status == TRANSACTION_STATUS_ABORTED ) {
874
+ MtmWakeUpBackend (ts );
875
+ } else if (MtmUseDtm ) {
876
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
877
+ ts -> nVotes = 1 ; /* I voted myself */
878
+ MtmSendNotificationMessage (ts , MSG_PREPARE );
879
+ } else {
880
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
881
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
882
+ MtmWakeUpBackend (ts );
883
+ }
880
884
}
881
885
}
882
886
break ;
883
887
case MSG_ABORTED :
884
- Assert (ts -> nVotes < Mtm -> nLiveNodes );
885
888
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
886
889
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
887
890
MtmAbortTransaction (ts );
888
891
}
889
- if (++ ts -> nVotes = = Mtm -> nLiveNodes ) {
892
+ if (++ ts -> nVotes > = Mtm -> nLiveNodes ) {
890
893
MtmWakeUpBackend (ts );
891
894
}
892
895
break ;
893
896
case MSG_PREPARED :
894
- Assert (ts -> nVotes < Mtm -> nLiveNodes );
895
- if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
896
- Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
897
- if (msg -> csn > ts -> csn ) {
898
- ts -> csn = msg -> csn ;
899
- MtmSyncClock (ts -> csn );
900
- }
901
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
902
- ts -> csn = MtmAssignCSN ();
903
- ts -> status = TRANSACTION_STATUS_UNKNOWN ;
904
- MtmWakeUpBackend (ts );
905
- }
897
+ if (ts -> nVotes >= Mtm -> nLiveNodes ) {
898
+ MtmAbortTransaction (ts );
899
+ MtmWakeUpBackend (ts );
906
900
} else {
907
- if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
908
- MtmWakeUpBackend (ts );
909
- }
910
- }
901
+ if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
902
+ Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
903
+ if (msg -> csn > ts -> csn ) {
904
+ ts -> csn = msg -> csn ;
905
+ MtmSyncClock (ts -> csn );
906
+ }
907
+ if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
908
+ ts -> csn = MtmAssignCSN ();
909
+ ts -> status = TRANSACTION_STATUS_UNKNOWN ;
910
+ MtmWakeUpBackend (ts );
911
+ }
912
+ } else {
913
+ if (++ ts -> nVotes == Mtm -> nLiveNodes ) {
914
+ MtmWakeUpBackend (ts );
915
+ }
916
+ }
917
+ }
911
918
break ;
912
919
default :
913
920
Assert (false);
0 commit comments