This commit is contained in:
Till Faelligen 2025-05-17 19:26:02 +02:00
parent 4ee2ef0c6d
commit a6fe816066
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E

View file

@ -264,7 +264,11 @@ func (w *worker) _next() {
}) })
msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
switch err { switch err {
case nil: case nil, nats.ErrTimeout, context.DeadlineExceeded, context.Canceled:
// Is the server shutting down? If so, stop processing.
if w.r.ProcessContext.Context().Err() != nil {
return
}
// Make sure that once we're done here, we queue up another call // Make sure that once we're done here, we queue up another call
// to _next in the inbox. // to _next in the inbox.
defer w.Act(nil, w._next) defer w.Act(nil, w._next)
@ -275,11 +279,11 @@ func (w *worker) _next() {
return return
} }
case context.DeadlineExceeded, context.Canceled: case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound:
// The context exceeded, so we've been waiting for more than a // The consumer is gone, therefore it's reached the inactivity
// minute for activity in this room. At this point we will shut // threshold. Clean up and stop processing at this point, if a
// down the subscriber to free up resources. It'll get started // new event comes in for this room then the ordered consumer
// again if new activity happens. // over the entire stream will recreate this anyway.
if err = w.subscription.Unsubscribe(); err != nil { if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
} }