From a6c2d5aa6adc22ad8016cd8552e90e36b2aee3db Mon Sep 17 00:00:00 2001 From: Kegan Dougal <7190048+kegsay@users.noreply.github.com> Date: Tue, 12 Aug 2025 19:50:48 +0100 Subject: [PATCH] Ensure we always have a msg before processing --- roomserver/internal/input/input.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 005f4f42..232f4feb 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -271,7 +271,7 @@ func (w *worker) _next() { }) msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) switch err { - case nil, nats.ErrTimeout: + case nil: // Is the server shutting down? If so, stop processing. if w.r.ProcessContext.Context().Err() != nil { return @@ -279,13 +279,7 @@ func (w *worker) _next() { // Make sure that once we're done here, we queue up another call // to _next in the inbox. defer w.Act(nil, w._next) - - // If no error was reported, but we didn't get exactly one message, - // then skip over this and try again on the next iteration. - if len(msgs) != 1 { - return - } - case context.DeadlineExceeded, context.Canceled: + case nats.ErrTimeout, context.DeadlineExceeded, context.Canceled: // Is the server shutting down? If so, stop processing. if w.r.ProcessContext.Context().Err() != nil { return @@ -335,6 +329,12 @@ func (w *worker) _next() { // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // If no error was reported, but we didn't get exactly one message, + // then skip over this and try again on the next iteration. + if len(msgs) != 1 { + return + } + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again.