diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 46cd206a..3f0b19c3 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -264,7 +264,11 @@ func (w *worker) _next() { }) msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) switch err { - case nil: + case nil, nats.ErrTimeout, context.DeadlineExceeded, context.Canceled: + // Is the server shutting down? If so, stop processing. + if w.r.ProcessContext.Context().Err() != nil { + return + } // Make sure that once we're done here, we queue up another call // to _next in the inbox. defer w.Act(nil, w._next) @@ -275,11 +279,11 @@ func (w *worker) _next() { return } - case context.DeadlineExceeded, context.Canceled: - // The context exceeded, so we've been waiting for more than a - // minute for activity in this room. At this point we will shut - // down the subscriber to free up resources. It'll get started - // again if new activity happens. + case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound: + // The consumer is gone, therefore it's reached the inactivity + // threshold. Clean up and stop processing at this point, if a + // new event comes in for this room then the ordered consumer + // over the entire stream will recreate this anyway. if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) }