Merge branch 'main' into kegan/v0.15.0

This commit is contained in:
Kegan Dougal 2025-08-12 20:08:44 +01:00 committed by GitHub
commit 56472da86b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -271,7 +271,7 @@ func (w *worker) _next() {
}) })
msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
switch err { switch err {
case nil, nats.ErrTimeout, context.DeadlineExceeded, context.Canceled: case nil:
// Is the server shutting down? If so, stop processing. // Is the server shutting down? If so, stop processing.
if w.r.ProcessContext.Context().Err() != nil { if w.r.ProcessContext.Context().Err() != nil {
return return
@ -279,26 +279,27 @@ func (w *worker) _next() {
// Make sure that once we're done here, we queue up another call // Make sure that once we're done here, we queue up another call
// to _next in the inbox. // to _next in the inbox.
defer w.Act(nil, w._next) defer w.Act(nil, w._next)
case nats.ErrTimeout, context.DeadlineExceeded, context.Canceled:
// If no error was reported, but we didn't get exactly one message, // Is the server shutting down? If so, stop processing.
// then skip over this and try again on the next iteration. if w.r.ProcessContext.Context().Err() != nil {
if len(msgs) != 1 {
return return
} }
case context.DeadlineExceeded, context.Canceled:
// The context exceeded, so we've been waiting for more than a // The context exceeded, so we've been waiting for more than a
// minute for activity in this room. At this point we will shut // minute for activity in this room. At this point we will shut
// down the subscriber to free up resources. It'll get started // down the subscriber to free up resources. It'll get started
// again if new activity happens. // again if new activity happens.
w.Lock() w.Lock()
defer w.Unlock()
// inside the lock, let's check if the ephemeral consumer saw something new! // inside the lock, let's check if the ephemeral consumer saw something new!
// If so, we do have new messages after all, they just came at a bad time. // If so, we do have new messages after all, they just came at a bad time.
if w.ephemeralSeq > w.durableSeq { if w.ephemeralSeq > w.durableSeq {
w.Act(nil, w._next) w.Act(nil, w._next)
w.Unlock()
return return
} }
w.Unlock()
case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound: case nats.ErrConsumerDeleted, nats.ErrConsumerNotFound:
w.Lock()
defer w.Unlock()
// The consumer is gone, therefore it's reached the inactivity // The consumer is gone, therefore it's reached the inactivity
// threshold. Clean up and stop processing at this point, if a // threshold. Clean up and stop processing at this point, if a
// new event comes in for this room then the ordered consumer // new event comes in for this room then the ordered consumer
@ -308,7 +309,6 @@ func (w *worker) _next() {
} }
w.subscription = nil w.subscription = nil
return return
default: default:
// Something went wrong while trying to fetch the next event // Something went wrong while trying to fetch the next event
// from the queue. In which case, we'll shut down the subscriber // from the queue. In which case, we'll shut down the subscriber
@ -329,6 +329,12 @@ func (w *worker) _next() {
// Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// If no error was reported, but we didn't get exactly one message,
// then skip over this and try again on the next iteration.
if len(msgs) != 1 {
return
}
// Try to unmarshal the input room event. If the JSON unmarshalling // Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that // fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again. // we are done with the message and never want to see it again.