@@ -74,24 +74,32 @@ go(DbName, AllDocs0, Opts) ->
74
74
rexi_monitor :stop (RexiMon )
75
75
end .
76
76
77
- handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, _Worker , # acc {} = Acc0 ) ->
77
+ handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, Worker , # acc {} = Acc0 ) ->
78
78
# acc {grouped_docs = GroupedDocs } = Acc0 ,
79
79
NewGrpDocs = [X || {# shard {node = N }, _ } = X <- GroupedDocs , N =/= NodeRef ],
80
- skip_message (Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs });
80
+ Acc1 = Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs },
81
+ Acc2 = start_followers (Worker , Acc1 ),
82
+ skip_message (Acc2 );
81
83
handle_message ({rexi_EXIT , _ }, Worker , # acc {} = Acc0 ) ->
82
84
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
83
85
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
84
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
86
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
87
+ Acc2 = start_followers (Worker , Acc1 ),
88
+ skip_message (Acc2 );
85
89
handle_message ({error , all_dbs_active }, Worker , # acc {} = Acc0 ) ->
86
90
% treat it like rexi_EXIT, the hope at least one copy will return successfully
87
91
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
88
92
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
89
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
93
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
94
+ Acc2 = start_followers (Worker , Acc1 ),
95
+ skip_message (Acc2 );
90
96
handle_message (internal_server_error , Worker , # acc {} = Acc0 ) ->
91
97
% happens when we fail to load validation functions in an RPC worker
92
98
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
93
99
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
94
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
100
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
101
+ Acc2 = start_followers (Worker , Acc1 ),
102
+ skip_message (Acc2 );
95
103
handle_message (attachment_chunk_received , _Worker , # acc {} = Acc0 ) ->
96
104
{ok , Acc0 };
97
105
handle_message ({ok , Replies }, Worker , # acc {} = Acc0 ) ->
@@ -102,34 +110,42 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
102
110
grouped_docs = GroupedDocs ,
103
111
reply = DocReplyDict0
104
112
} = Acc0 ,
105
- {value , {_ , Docs }, NewGrpDocs } = lists :keytake (Worker , 1 , GroupedDocs ),
106
- Acc1 = start_followers (Worker , Acc0 ),
107
- DocReplyDict = append_update_replies (Docs , Replies , DocReplyDict0 ),
108
- case {WaitingCount , dict :size (DocReplyDict )} of
109
- {1 , _ } ->
110
- % last message has arrived, we need to conclude things
111
- {Health , W , Reply } = dict :fold (
112
- fun force_reply /3 ,
113
- {ok , W , []},
114
- DocReplyDict
115
- ),
116
- {stop , {Health , Reply }};
117
- {_ , DocCount } ->
118
- % we've got at least one reply for each document, let's take a look
119
- case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
120
- continue ->
121
- {ok , Acc1 # acc {
122
- waiting_count = WaitingCount - 1 ,
123
- grouped_docs = NewGrpDocs ,
124
- reply = DocReplyDict
125
- }};
126
- {stop , W , FinalReplies } ->
127
- {stop , {ok , FinalReplies }}
128
- end ;
129
- _ ->
130
- {ok , Acc1 # acc {
131
- waiting_count = WaitingCount - 1 , grouped_docs = NewGrpDocs , reply = DocReplyDict
132
- }}
113
+ {value , {_ , Docs }, NewGrpDocs0 } = lists :keytake (Worker , 1 , GroupedDocs ),
114
+ DocReplyDict = append_update_replies (Docs , Replies , W , DocReplyDict0 ),
115
+ Acc1 = Acc0 # acc {grouped_docs = NewGrpDocs0 , reply = DocReplyDict },
116
+ Acc2 = remove_conflicts (Docs , Replies , Acc1 ),
117
+ NewGrpDocs = Acc2 # acc .grouped_docs ,
118
+ case skip_message (Acc2 ) of
119
+ {stop , Msg } ->
120
+ {stop , Msg };
121
+ {ok , Acc3 } ->
122
+ Acc4 = start_followers (Worker , Acc3 ),
123
+ case {Acc4 # acc .waiting_count , dict :size (DocReplyDict )} of
124
+ {1 , _ } ->
125
+ % last message has arrived, we need to conclude things
126
+ {Health , W , Reply } = dict :fold (
127
+ fun force_reply /3 ,
128
+ {ok , W , []},
129
+ DocReplyDict
130
+ ),
131
+ {stop , {Health , Reply }};
132
+ {_ , DocCount } ->
133
+ % we've got at least one reply for each document, let's take a look
134
+ case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
135
+ continue ->
136
+ {ok , Acc4 # acc {
137
+ waiting_count = Acc4 # acc .waiting_count - 1 ,
138
+ grouped_docs = NewGrpDocs
139
+ }};
140
+ {stop , W , FinalReplies } ->
141
+ {stop , {ok , FinalReplies }}
142
+ end ;
143
+ _ ->
144
+ {ok , Acc4 # acc {
145
+ waiting_count = Acc4 # acc .waiting_count - 1 ,
146
+ grouped_docs = NewGrpDocs
147
+ }}
148
+ end
133
149
end ;
134
150
handle_message ({missing_stub , Stub }, _ , _ ) ->
135
151
throw ({missing_stub , Stub });
@@ -368,13 +384,43 @@ start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
368
384
% for unit tests below.
369
385
ok .
370
386
371
- append_update_replies ([], [], DocReplyDict ) ->
387
+ append_update_replies ([], [], _W , DocReplyDict ) ->
372
388
DocReplyDict ;
373
- append_update_replies ([Doc | Rest ], [], Dict0 ) ->
389
+ append_update_replies ([Doc | Rest ], [], W , Dict0 ) ->
374
390
% icky, if replicated_changes only errors show up in result
375
- append_update_replies (Rest , [], dict :append (Doc , noreply , Dict0 ));
376
- append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], Dict0 ) ->
377
- append_update_replies (Rest1 , Rest2 , dict :append (Doc , Reply , Dict0 )).
391
+ append_update_replies (Rest , [], W , dict :append (Doc , noreply , Dict0 ));
392
+ append_update_replies ([Doc | Rest1 ], [conflict | Rest2 ], W , Dict0 ) ->
393
+ % % fake conflict replies from followers as we won't ask them
394
+ append_update_replies (
395
+ Rest1 , Rest2 , W , dict :append_list (Doc , lists :duplicate (W , conflict ), Dict0 )
396
+ );
397
+ append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], W , Dict0 ) ->
398
+ append_update_replies (Rest1 , Rest2 , W , dict :append (Doc , Reply , Dict0 )).
399
+
400
+ % % leader found a conflict, remove that doc from the other (follower) workers,
401
+ % % removing the worker entirely if no docs remain.
402
+ remove_conflicts ([], [], # acc {} = Acc0 ) ->
403
+ Acc0 ;
404
+ remove_conflicts ([Doc | DocRest ], [conflict | ReplyRest ], # acc {} = Acc0 ) ->
405
+ # acc {grouped_docs = GroupedDocs0 } = Acc0 ,
406
+ GroupedDocs1 = lists :foldl (
407
+ fun ({Worker , Docs }, FoldAcc ) ->
408
+ case lists :delete (Doc , Docs ) of
409
+ [] ->
410
+ FoldAcc ;
411
+ Rest ->
412
+ [{Worker , Rest } | FoldAcc ]
413
+ end
414
+ end ,
415
+ [],
416
+ GroupedDocs0
417
+ ),
418
+ Acc1 = Acc0 # acc {waiting_count = length (GroupedDocs1 ), grouped_docs = GroupedDocs1 },
419
+ remove_conflicts (DocRest , ReplyRest , Acc1 );
420
+ remove_conflicts ([_Doc | DocRest ], [_Reply | ReplyRest ], # acc {} = Acc0 ) ->
421
+ remove_conflicts (DocRest , ReplyRest , Acc0 );
422
+ remove_conflicts ([_Doc | DocRest ], [], # acc {} = Acc0 ) ->
423
+ remove_conflicts (DocRest , [], Acc0 ).
378
424
379
425
skip_message (# acc {waiting_count = 0 , w = W , reply = DocReplyDict }) ->
380
426
{Health , W , Reply } = dict :fold (fun force_reply /3 , {ok , W , []}, DocReplyDict ),
0 commit comments