mirror of
https://github.com/element-hq/dendrite.git
synced 2025-09-14 13:22:26 +03:00
Ensure we always have a msg before processing
This commit is contained in:
parent
5e4d0c736a
commit
a6c2d5aa6a
1 changed files with 8 additions and 8 deletions
|
@ -271,7 +271,7 @@ func (w *worker) _next() {
|
||||||
})
|
})
|
||||||
msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
|
msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
|
||||||
switch err {
|
switch err {
|
||||||
case nil, nats.ErrTimeout:
|
case nil:
|
||||||
// Is the server shutting down? If so, stop processing.
|
// Is the server shutting down? If so, stop processing.
|
||||||
if w.r.ProcessContext.Context().Err() != nil {
|
if w.r.ProcessContext.Context().Err() != nil {
|
||||||
return
|
return
|
||||||
|
@ -279,13 +279,7 @@ func (w *worker) _next() {
|
||||||
// Make sure that once we're done here, we queue up another call
|
// Make sure that once we're done here, we queue up another call
|
||||||
// to _next in the inbox.
|
// to _next in the inbox.
|
||||||
defer w.Act(nil, w._next)
|
defer w.Act(nil, w._next)
|
||||||
|
case nats.ErrTimeout, context.DeadlineExceeded, context.Canceled:
|
||||||
// If no error was reported, but we didn't get exactly one message,
|
|
||||||
// then skip over this and try again on the next iteration.
|
|
||||||
if len(msgs) != 1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case context.DeadlineExceeded, context.Canceled:
|
|
||||||
// Is the server shutting down? If so, stop processing.
|
// Is the server shutting down? If so, stop processing.
|
||||||
if w.r.ProcessContext.Context().Err() != nil {
|
if w.r.ProcessContext.Context().Err() != nil {
|
||||||
return
|
return
|
||||||
|
@ -335,6 +329,12 @@ func (w *worker) _next() {
|
||||||
// Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure
|
// Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure
|
||||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
|
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
|
||||||
|
|
||||||
|
// If no error was reported, but we didn't get exactly one message,
|
||||||
|
// then skip over this and try again on the next iteration.
|
||||||
|
if len(msgs) != 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Try to unmarshal the input room event. If the JSON unmarshalling
|
// Try to unmarshal the input room event. If the JSON unmarshalling
|
||||||
// fails then we'll terminate the message — this notifies NATS that
|
// fails then we'll terminate the message — this notifies NATS that
|
||||||
// we are done with the message and never want to see it again.
|
// we are done with the message and never want to see it again.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue