Use AckExplicitPolicy instead of AckAllPolicy (#3288)

Fixes https://github.com/matrix-org/dendrite/issues/3240 and potentially
a root cause for state resets.

While testing, I've had added some more debug logging:
```
time="2023-12-16T18:13:11.319458084Z" level=warning msg="already processed event" event_id="$qFYMl_F2vb1N0yxmvlFAMhqhGhLKq4kA-o_YCQKH7tQ" kind=KindNew times=2
time="2023-12-16T18:13:14.537389126Z" level=warning msg="already processed event" event_id="$EU-LTsKErT6Mt1k12-p_3xOHfiLaK6gtwVDlZ35lSuo" kind=KindNew times=5
time="2023-12-16T18:13:16.789551206Z" level=warning msg="already processed event" event_id="$dIPuAfTL5x0VyG873LKPslQeljCSxFT1WKxUtjIMUGE" kind=KindNew times=5
time="2023-12-16T18:13:17.383838767Z" level=warning msg="already processed event" event_id="$7noSZiCkzerpkz_UBO3iatpRnaOiPx-3IXc0GPDQVGE" kind=KindNew times=2
time="2023-12-16T18:13:22.091946597Z" level=warning msg="already processed event" event_id="$3Lvo3Wbi2ol9-nNbQ93N-E2MuGQCJZo5397KkFH-W6E" kind=KindNew times=1
time="2023-12-16T18:13:23.026417446Z" level=warning msg="already processed event" event_id="$lj1xS46zsLBCChhKOLJEG-bu7z-_pq9i_Y2DUIjzGy4" kind=KindNew times=4
```

So we did receive the same event over and over again. Given they are
`KindNew`, we don't short circuit if we already processed them, which
potentially caused the state to be calculated with a now wrong state
snapshot.

Also fixes the back pressure metric. We now correctly increment the
counter once we sent the message to NATS and decrement it once we
actually processed an event.
This commit is contained in:
Till 2023-12-19 08:25:47 +01:00 committed by GitHub
parent d65449c782
commit f93d1c4790
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 158 additions and 47 deletions

View file

@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
w.Lock()
defer w.Unlock()
if !loaded || w.subscription == nil {
streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
logger := logrus.WithFields(logrus.Fields{
"stream_name": streamName,
"consumer": consumer,
})
// Create the consumer. We do this as a specific step rather than
// letting PullSubscribe create it for us because we need the consumer
// to outlive the subscription. If we do it this way, we can Bind in the
@ -135,21 +140,62 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// before it. This is necessary because otherwise our consumer will never
// acknowledge things we filtered out for other subjects and therefore they
// will linger around forever.
if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckAllPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
InactiveThreshold: inactiveThreshold,
},
); err != nil {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
info, err := w.r.JetStream.ConsumerInfo(streamName, consumer)
if err != nil && !errors.Is(err, nats.ErrConsumerNotFound) {
// log and return, we will retry anyway
logger.WithError(err).Errorf("failed to get consumer info")
return
}
consumerConfig := &nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
InactiveThreshold: inactiveThreshold,
}
// The consumer already exists, try to update if necessary.
if info != nil {
// Not using reflect.DeepEqual here, since consumerConfig does not explicitly set
// e.g. the consumerName, which is added by NATS later. So this would result
// in constantly updating/recreating the consumer.
switch {
case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds():
// Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update
// existing consumers.
fallthrough
case info.Config.AckPolicy != consumerConfig.AckPolicy:
// We've changed the AckPolicy from AckAll to AckExplicit, this needs a
// recreation of the consumer. (Note: Only a few changes actually need a recreat)
logger.Warn("Consumer already exists, trying to update it.")
// Try updating the consumer first
if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil {
// We failed to update the consumer, recreate it
logger.WithError(err).Warn("Unable to update consumer, recreating...")
if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil {
logger.WithError(err).Fatal("Unable to delete consumer")
return
}
// Set info to nil, so it can be recreated with the correct config.
info = nil
}
}
}
if info == nil {
// Create the consumer with the correct config
if _, err = w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
consumerConfig,
); err != nil {
logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
return
}
}
// Bind to our durable consumer. We want to receive all messages waiting
// for this subject and we want to manually acknowledge them, so that we
// can ensure they are only cleaned up when we are done processing them.
@ -162,7 +208,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
nats.InactiveThreshold(inactiveThreshold),
)
if err != nil {
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
return
}
@ -263,21 +309,23 @@ func (w *worker) _next() {
return
}
// Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again.
msg := msgs[0]
var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term()
// using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS
_ = msg.Term(nats.AckWait(time.Second * 5))
return
}
if scope := sentry.CurrentHub().Scope(); scope != nil {
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
}
roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Process the room event. If something goes wrong then we'll tell
// NATS to terminate the message. We'll store the error result as
@ -307,10 +355,15 @@ func (w *worker) _next() {
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process event")
}
_ = msg.Term()
// Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled),
// the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg
// after restarting. We only Ack if the context was not yet canceled.
if w.r.ProcessContext.Context().Err() == nil {
_ = msg.AckSync()
}
errString = err.Error()
} else {
_ = msg.Ack()
_ = msg.AckSync()
}
// If it was a synchronous input request then the "sync" field
@ -381,6 +434,9 @@ func (r *Inputer) queueInputRoomEvents(
}).Error("Roomserver failed to queue async event")
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
// Now that the event is queued, increment the room backpressure
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
}
return
}

View file

@ -48,8 +48,10 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
)
// TODO: Does this value make sense?
const MaximumMissingProcessingTime = time.Minute * 2
// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch
// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded
// NATS queues the event for redelivery.
const MaximumMissingProcessingTime = time.Minute * 5
var processRoomEventDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{

View file

@ -12,7 +12,9 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"
@ -1231,3 +1233,54 @@ func TestNewServerACLs(t *testing.T) {
assert.Equal(t, false, banned)
})
}
// Validate that changing the AckPolicy/AckWait of room consumers
// results in their recreation
func TestRoomConsumerRecreation(t *testing.T) {
alice := test.NewUser(t)
room := test.NewRoom(t, alice)
// As this is DB unrelated, just use SQLite
cfg, processCtx, closeDB := testrig.CreateConfig(t, test.DBTypeSQLite)
defer closeDB()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
natsInstance := &jetstream.NATSInstance{}
// Prepare a stream and consumer using the old configuration
jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
streamName := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent)
consumer := cfg.Global.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(room.ID))
subject := cfg.Global.JetStream.Prefixed(jetstream.InputRoomEventSubj(room.ID))
consumerConfig := &nats.ConsumerConfig{
Durable: consumer,
AckPolicy: nats.AckAllPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject,
AckWait: (time.Minute * 2) + (time.Second * 10),
InactiveThreshold: time.Hour * 24,
}
// Create the consumer with the old config
_, err := jsCtx.AddConsumer(streamName, consumerConfig)
assert.NoError(t, err)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
// start JetStream listeners
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
// let the RS create the events, this also recreates the Consumers
err = api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false)
assert.NoError(t, err)
// Validate that AckPolicy and AckWait has changed
info, err := jsCtx.ConsumerInfo(streamName, consumer)
assert.NoError(t, err)
assert.Equal(t, nats.AckExplicitPolicy, info.Config.AckPolicy)
wantAckWait := input.MaximumMissingProcessingTime + (time.Second * 10)
assert.Equal(t, wantAckWait, info.Config.AckWait)
}