diff --git a/src/peers.c b/src/peers.c index 6f22e1751..e79d9d9d3 100644 --- a/src/peers.c +++ b/src/peers.c @@ -2905,6 +2905,7 @@ static void peer_io_handler(struct appctx *appctx) int repl = 0; unsigned int maj_ver, min_ver; int prev_state; + int msg_done = 0; if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) { co_skip(sc_oc(sc), co_data(sc_oc(sc))); @@ -3107,6 +3108,19 @@ switchstate: applet_wont_consume(appctx); goto out; } + + /* check if we've already hit the rx limit (i.e. we've + * already gone through send_msgs and we don't want to + * process input messages again). We must absolutely + * leave via send_msgs otherwise we can leave the + * connection in a stuck state if acks are missing for + * example. + */ + if (msg_done >= peers_max_updates_at_once) { + applet_have_more_data(appctx); // make sure to come back here + goto send_msgs; + } + applet_will_consume(appctx); /* local peer is assigned of a lesson, start it */ @@ -3128,6 +3142,12 @@ switchstate: /* skip consumed message */ co_skip(sc_oc(sc), totl); + + /* make sure we don't process too many at once */ + if (msg_done >= peers_max_updates_at_once) + goto send_msgs; + msg_done++; + /* loop on that state to peek next message */ goto switchstate;