From a6fe816066210be37e62ae49d14ac2fe3d8299c0 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Sat, 17 May 2025 19:26:02 +0200 Subject: [PATCH] Backport https://github.com/neilalexander/harmony/commit/ee17a7071e97eeaaf2dbc9de7bef8a52c4e654a3 --- roomserver/internal/input/input.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 46cd206a..3f0b19c3 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -264,7 +264,11 @@ func (w *worker) _next() { }) msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) switch err { - case nil: + case nil, nats.ErrTimeout, context.DeadlineExceeded, context.Canceled: + // Is the server shutting down? If so, stop processing. + if w.r.ProcessContext.Context().Err() != nil { + return + } // Make sure that once we're done here, we queue up another call // to _next in the inbox. defer w.Act(nil, w._next) @@ -275,11 +279,11 @@ func (w *worker) _next() { return } - case context.DeadlineExceeded, context.Canceled: - // The context exceeded, so we've been waiting for more than a - // minute for activity in this room. At this point we will shut - // down the subscriber to free up resources. It'll get started - // again if new activity happens. + case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound: + // The consumer is gone, therefore it's reached the inactivity + // threshold. Clean up and stop processing at this point, if a + // new event comes in for this room then the ordered consumer + // over the entire stream will recreate this anyway. if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) }