From d4c5804ad22f9acdbca30d90c721bf988d465b6b Mon Sep 17 00:00:00 2001 From: Kegan Dougal <7190048+kegsay@users.noreply.github.com> Date: Tue, 12 Aug 2025 20:08:30 +0100 Subject: [PATCH] bugfix: ensure we release the lock (#3628) The `case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound:` bit was merge conflicted in https://github.com/element-hq/dendrite/pull/3588 so it broke the locking order. --- roomserver/internal/input/input.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 0e2d71c4..232f4feb 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -271,7 +271,7 @@ func (w *worker) _next() { }) msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) switch err { - case nil, nats.ErrTimeout, context.DeadlineExceeded, context.Canceled: + case nil: // Is the server shutting down? If so, stop processing. if w.r.ProcessContext.Context().Err() != nil { return @@ -279,26 +279,27 @@ func (w *worker) _next() { // Make sure that once we're done here, we queue up another call // to _next in the inbox. defer w.Act(nil, w._next) - - // If no error was reported, but we didn't get exactly one message, - // then skip over this and try again on the next iteration. - if len(msgs) != 1 { + case nats.ErrTimeout, context.DeadlineExceeded, context.Canceled: + // Is the server shutting down? If so, stop processing. + if w.r.ProcessContext.Context().Err() != nil { return } - case context.DeadlineExceeded, context.Canceled: // The context exceeded, so we've been waiting for more than a // minute for activity in this room. At this point we will shut // down the subscriber to free up resources. It'll get started // again if new activity happens. w.Lock() - defer w.Unlock() // inside the lock, let's check if the ephemeral consumer saw something new! // If so, we do have new messages after all, they just came at a bad time. if w.ephemeralSeq > w.durableSeq { w.Act(nil, w._next) + w.Unlock() return } + w.Unlock() case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound: + w.Lock() + defer w.Unlock() // The consumer is gone, therefore it's reached the inactivity // threshold. Clean up and stop processing at this point, if a // new event comes in for this room then the ordered consumer @@ -308,7 +309,6 @@ func (w *worker) _next() { } w.subscription = nil return - default: // Something went wrong while trying to fetch the next event // from the queue. In which case, we'll shut down the subscriber @@ -329,6 +329,12 @@ func (w *worker) _next() { // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // If no error was reported, but we didn't get exactly one message, + // then skip over this and try again on the next iteration. + if len(msgs) != 1 { + return + } + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again.