Refactor some JetStream helper code, add support for specifying JetStream domain (#3485)

This should gracefully handle some more potential errors that the
consumer fetches can return with retries, as well as setting some client
settings for reconnects etc when using an external NATS Server.

Also allow specifying the JetStream domain in case of a leafnode
scenario and better manage client reuse across Dendrite. And also update
NATS Server to 2.10.24 for good measure.

This code is backported from Harmony.

Signed-off-by: Neil Alexander <git@neilalexander.dev>

---------

Signed-off-by: Neil Alexander <git@neilalexander.dev>
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Co-authored-by: Till <2353100+S7evinK@users.noreply.github.com>
This commit is contained in:
Neil 2025-01-19 09:09:58 +00:00 committed by GitHub
parent 9de3e84fff
commit f4506a0d82
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 138 additions and 115 deletions

4
go.mod
View file

@ -29,7 +29,7 @@ require (
github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7
github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/matrix-org/util v0.0.0-20221111132719-399730281e66
github.com/mattn/go-sqlite3 v1.14.24 github.com/mattn/go-sqlite3 v1.14.24
github.com/nats-io/nats-server/v2 v2.10.23 github.com/nats-io/nats-server/v2 v2.10.24
github.com/nats-io/nats.go v1.38.0 github.com/nats-io/nats.go v1.38.0
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/opentracing/opentracing-go v1.2.0 github.com/opentracing/opentracing-go v1.2.0
@ -115,7 +115,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect github.com/morikuni/aec v1.0.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect github.com/mschoch/smat v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/jwt/v2 v2.5.8 // indirect github.com/nats-io/jwt/v2 v2.7.3 // indirect
github.com/nats-io/nkeys v0.4.9 // indirect github.com/nats-io/nkeys v0.4.9 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect

8
go.sum
View file

@ -268,10 +268,10 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.10.23 h1:jvfb9cEi5h8UG6HkZgJGdn9f1UPaX3Dohk0PohEekJI= github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4=
github.com/nats-io/nats-server/v2 v2.10.23/go.mod h1:hMFnpDT2XUXsvHglABlFl/uroQCCOcW6X/0esW6GpBk= github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=

View file

@ -15,6 +15,8 @@ type JetStream struct {
// The prefix to use for stream names for this homeserver - really only // The prefix to use for stream names for this homeserver - really only
// useful if running more than one Dendrite on the same NATS deployment. // useful if running more than one Dendrite on the same NATS deployment.
TopicPrefix string `yaml:"topic_prefix"` TopicPrefix string `yaml:"topic_prefix"`
// The JetStream domain, if needed.
JetStreamDomain string `yaml:"js_domain"`
// Keep all storage in memory. This is mostly useful for unit tests. // Keep all storage in memory. This is mostly useful for unit tests.
InMemory bool `yaml:"in_memory"` InMemory bool `yaml:"in_memory"`
// Disable logging. This is mostly useful for unit tests. // Disable logging. This is mostly useful for unit tests.

View file

@ -22,7 +22,7 @@ func JetStreamConsumer(
f func(ctx context.Context, msgs []*nats.Msg) bool, f func(ctx context.Context, msgs []*nats.Msg) bool,
opts ...nats.SubOpt, opts ...nats.SubOpt,
) error { ) error {
defer func() { defer func(durable string) {
// If there are existing consumers from before they were pull // If there are existing consumers from before they were pull
// consumers, we need to clean up the old push consumers. However, // consumers, we need to clean up the old push consumers. However,
// in order to not affect the interest-based policies, we need to // in order to not affect the interest-based policies, we need to
@ -33,23 +33,28 @@ func JetStreamConsumer(
logrus.WithContext(ctx).Warnf("Failed to clean up old consumer %q", durable) logrus.WithContext(ctx).Warnf("Failed to clean up old consumer %q", durable)
} }
} }
}() }(durable)
name := durable + "Pull" durable = durable + "Pull"
sub, err := js.PullSubscribe(subj, name, opts...) sub, err := js.PullSubscribe(subj, durable, opts...)
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
return fmt.Errorf("nats.SubscribeSync: %w", err) logrus.WithContext(ctx).WithError(err).Warnf("Failed to configure durable %q", durable)
return err
} }
go func() { go jetStreamConsumerWorker(ctx, sub, subj, batch, f)
return nil
}
func jetStreamConsumerWorker(
ctx context.Context, sub *nats.Subscription, subj string, batch int,
f func(ctx context.Context, msgs []*nats.Msg) bool,
) {
for { for {
// If the parent context has given up then there's no point in // If the parent context has given up then there's no point in
// carrying on doing anything, so stop the listener. // carrying on doing anything, so stop the listener.
select { select {
case <-ctx.Done(): case <-ctx.Done():
if err := sub.Unsubscribe(); err != nil {
logrus.WithContext(ctx).Warnf("Failed to unsubscribe %q", durable)
}
return return
default: default:
} }
@ -73,19 +78,23 @@ func JetStreamConsumer(
// just timed out and we should try again. // just timed out and we should try again.
continue continue
} }
} else if errors.Is(err, nats.ErrConsumerDeleted) { } else if errors.Is(err, nats.ErrTimeout) {
// The consumer was deleted so stop. // Pull request was invalidated, try again.
return continue
} else if errors.Is(err, nats.ErrConsumerLeadershipChanged) {
// Leadership changed so pending pull requests became invalidated,
// just try again.
continue
} else if err.Error() == "nats: Server Shutdown" {
// The server is shutting down, but we'll rely on reconnect
// behaviour to try and either connect us to another node (if
// clustered) or to reconnect when the server comes back up.
continue
} else { } else {
// Unfortunately, there's no ErrServerShutdown or similar, so we need to compare the string // Something else went wrong.
if err.Error() == "nats: Server Shutdown" { logrus.WithContext(ctx).WithField("subject", subj).WithError(err).Warn("Error on pull subscriber fetch")
logrus.WithContext(ctx).Warn("nats server shutting down")
return return
} }
// Something else went wrong, so we'll panic.
sentry.CaptureException(err)
logrus.WithContext(ctx).WithField("subject", subj).Fatal(err)
}
} }
if len(msgs) < 1 { if len(msgs) < 1 {
continue continue
@ -113,6 +122,4 @@ func JetStreamConsumer(
} }
} }
} }
}()
return nil
} }

View file

@ -8,7 +8,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/getsentry/sentry-go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/element-hq/dendrite/setup/config" "github.com/element-hq/dendrite/setup/config"
@ -36,17 +35,20 @@ func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) {
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
natsLock.Lock() natsLock.Lock()
defer natsLock.Unlock() defer natsLock.Unlock()
// check if we need an in-process NATS Server var err error
if len(cfg.Addresses) != 0 {
// reuse existing connections // If an existing connection exists, return it.
if s.nc != nil { if s.nc != nil && s.js != nil {
return s.js, s.nc return s.js, s.nc
} }
// For connecting to an external NATS server.
if len(cfg.Addresses) > 0 {
s.js, s.nc = setupNATS(process, cfg, nil) s.js, s.nc = setupNATS(process, cfg, nil)
return s.js, s.nc return s.js, s.nc
} }
if s.Server == nil {
var err error if len(cfg.Addresses) == 0 && s.Server == nil {
opts := &natsserver.Options{ opts := &natsserver.Options{
ServerName: "monolith", ServerName: "monolith",
DontListen: true, DontListen: true,
@ -58,8 +60,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
NoLog: cfg.NoLog, NoLog: cfg.NoLog,
SyncAlways: true, SyncAlways: true,
} }
s.Server, err = natsserver.NewServer(opts) if s.Server, err = natsserver.NewServer(opts); err != nil {
if err != nil {
panic(err) panic(err)
} }
if !cfg.NoLog { if !cfg.NoLog {
@ -75,29 +76,42 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
s.WaitForShutdown() s.WaitForShutdown()
process.ComponentFinished() process.ComponentFinished()
}() }()
}
if !s.ReadyForConnections(time.Second * 60) { if !s.ReadyForConnections(time.Second * 60) {
logrus.Fatalln("NATS did not start in time") logrus.Fatalln("NATS did not start in time")
} }
// reuse existing connections
if s.nc != nil {
return s.js, s.nc
} }
nc, err := natsclient.Connect("", natsclient.InProcessServer(s))
if err != nil { // No existing process connection, create a new one.
if s.nc, err = natsclient.Connect("", natsclient.InProcessServer(s.Server)); err != nil {
logrus.Fatalln("Failed to create NATS client") logrus.Fatalln("Failed to create NATS client")
} }
js, _ := setupNATS(process, cfg, nc) s.js, s.nc = setupNATS(process, cfg, s.nc)
s.js = js return s.js, s.nc
s.nc = nc
return js, nc
} }
// nolint:gocyclo // nolint:gocyclo
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
jsOpts := []natsclient.JSOpt{}
if cfg.JetStreamDomain != "" {
jsOpts = append(jsOpts, natsclient.Domain(cfg.JetStreamDomain))
}
if nc == nil { if nc == nil {
var err error var err error
opts := []natsclient.Option{} opts := []natsclient.Option{
natsclient.Name("Dendrite"),
natsclient.MaxReconnects(-1), // Try forever
natsclient.ReconnectJitter(time.Second, time.Second),
natsclient.ReconnectWait(time.Second * 10),
natsclient.ReconnectHandler(func(c *natsclient.Conn) {
js, jerr := c.JetStream(jsOpts...)
if jerr != nil {
logrus.WithError(jerr).Panic("Unable to get JetStream context in reconnect handler")
return
}
checkAndConfigureStreams(process, cfg, js)
}),
}
if cfg.DisableTLSValidation { if cfg.DisableTLSValidation {
opts = append(opts, natsclient.Secure(&tls.Config{ opts = append(opts, natsclient.Secure(&tls.Config{
InsecureSkipVerify: true, InsecureSkipVerify: true,
@ -113,15 +127,19 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
} }
} }
s, err := nc.JetStream() js, err := nc.JetStream(jsOpts...)
if err != nil { if err != nil {
logrus.WithError(err).Panic("Unable to get JetStream context") logrus.WithError(err).Panic("Unable to get JetStream context")
return nil, nil return nil, nil
} }
checkAndConfigureStreams(process, cfg, js)
return js, nc
}
func checkAndConfigureStreams(process *process.ProcessContext, cfg *config.JetStream, js natsclient.JetStreamContext) {
for _, stream := range streams { // streams are defined in streams.go for _, stream := range streams { // streams are defined in streams.go
name := cfg.Prefixed(stream.Name) name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name) info, err := js.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound { if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info") logrus.WithError(err).Fatal("Unable to get stream info")
} }
@ -153,11 +171,11 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
case info.Config.MaxAge != stream.MaxAge: case info.Config.MaxAge != stream.MaxAge:
// Try updating the stream first, as many things can be updated // Try updating the stream first, as many things can be updated
// non-destructively. // non-destructively.
if info, err = s.UpdateStream(stream); err != nil { if info, err = js.UpdateStream(stream); err != nil {
logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name) logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name)
// We failed to update the stream, this is a last attempt to get // We failed to update the stream, this is a last attempt to get
// things working but may result in data loss. // things working but may result in data loss.
if err = s.DeleteStream(name); err != nil { if err = js.DeleteStream(name); err != nil {
logrus.WithError(err).Fatalf("Unable to delete stream %q", name) logrus.WithError(err).Fatalf("Unable to delete stream %q", name)
} }
info = nil info = nil
@ -176,7 +194,7 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
namespaced := *stream namespaced := *stream
namespaced.Name = name namespaced.Name = name
namespaced.Subjects = subjects namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil { if _, err = js.AddStream(&namespaced); err != nil {
logger := logrus.WithError(err).WithFields(logrus.Fields{ logger := logrus.WithError(err).WithFields(logrus.Fields{
"stream": namespaced.Name, "stream": namespaced.Name,
"subjects": namespaced.Subjects, "subjects": namespaced.Subjects,
@ -193,10 +211,9 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
// we can't recover anything that was queued on the disk but we // we can't recover anything that was queued on the disk but we
// will still be able to start and run hopefully in the meantime. // will still be able to start and run hopefully in the meantime.
logger.WithError(err).Error("Unable to add stream") logger.WithError(err).Error("Unable to add stream")
sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err))
namespaced.Storage = natsclient.MemoryStorage namespaced.Storage = natsclient.MemoryStorage
if _, err = s.AddStream(&namespaced); err != nil { if _, err = js.AddStream(&namespaced); err != nil {
// We tried to add the stream in-memory instead but something // We tried to add the stream in-memory instead but something
// went wrong. That's an unrecoverable situation so we will // went wrong. That's an unrecoverable situation so we will
// give up at this point. // give up at this point.
@ -208,7 +225,6 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
// disk will be left alone, but our ability to recover from a // disk will be left alone, but our ability to recover from a
// future crash will be limited. Yell about it. // future crash will be limited. Yell about it.
err := fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory", namespaced.Name) err := fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory", namespaced.Name)
sentry.CaptureException(err)
process.Degraded(err) process.Degraded(err)
} }
} }
@ -229,15 +245,13 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
streamName := cfg.Matrix.JetStream.Prefixed(stream) streamName := cfg.Matrix.JetStream.Prefixed(stream)
for _, consumer := range consumers { for _, consumer := range consumers {
consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull" consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull"
consumerInfo, err := s.ConsumerInfo(streamName, consumerName) consumerInfo, err := js.ConsumerInfo(streamName, consumerName)
if err != nil || consumerInfo == nil { if err != nil || consumerInfo == nil {
continue continue
} }
if err = s.DeleteConsumer(streamName, consumerName); err != nil { if err = js.DeleteConsumer(streamName, consumerName); err != nil {
logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream) logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream)
} }
} }
} }
return s, nc
} }