diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 005f4f42..232f4feb 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -271,7 +271,7 @@ func (w *worker) _next() { }) msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) switch err { - case nil, nats.ErrTimeout: + case nil: // Is the server shutting down? If so, stop processing. if w.r.ProcessContext.Context().Err() != nil { return @@ -279,13 +279,7 @@ func (w *worker) _next() { // Make sure that once we're done here, we queue up another call // to _next in the inbox. defer w.Act(nil, w._next) - - // If no error was reported, but we didn't get exactly one message, - // then skip over this and try again on the next iteration. - if len(msgs) != 1 { - return - } - case context.DeadlineExceeded, context.Canceled: + case nats.ErrTimeout, context.DeadlineExceeded, context.Canceled: // Is the server shutting down? If so, stop processing. if w.r.ProcessContext.Context().Err() != nil { return @@ -335,6 +329,12 @@ func (w *worker) _next() { // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // If no error was reported, but we didn't get exactly one message, + // then skip over this and try again on the next iteration. + if len(msgs) != 1 { + return + } + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again.