diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index fdf77640..5f120626 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -52,7 +52,7 @@ jobs: - name: Build "build" image if: github.ref_name == 'main' || github.event_name == 'release' id: docker_build_cache - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v6 with: target: build cache-from: type=registry,ref=ghcr.io/${{ env.GHCR_NAMESPACE }}/dendrite-monolith:buildcache @@ -66,7 +66,7 @@ jobs: - name: Build main monolith image if: github.ref_name == 'main' id: docker_build_monolith - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v6 with: cache-from: type=registry,ref=ghcr.io/${{ env.GHCR_NAMESPACE }}/dendrite-monolith:buildcache context: . @@ -79,7 +79,7 @@ jobs: - name: Build release monolith image if: github.event_name == 'release' # Only for GitHub releases id: docker_build_monolith_release - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v6 with: context: . platforms: ${{ env.PLATFORMS }} @@ -135,7 +135,7 @@ jobs: - name: Build main Pinecone demo image if: github.ref_name == 'main' id: docker_build_demo_pinecone - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v6 with: cache-from: type=registry,ref=ghcr.io/${{ env.GHCR_NAMESPACE }}/dendrite-monolith:buildcache context: . @@ -149,7 +149,7 @@ jobs: - name: Build release Pinecone demo image if: github.event_name == 'release' # Only for GitHub releases id: docker_build_demo_pinecone_release - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v6 with: cache-from: type=registry,ref=ghcr.io/${{ env.GHCR_NAMESPACE }}/dendrite-monolith:buildcache context: . @@ -195,7 +195,7 @@ jobs: - name: Build main Yggdrasil demo image if: github.ref_name == 'main' id: docker_build_demo_yggdrasil - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v6 with: cache-from: type=registry,ref=ghcr.io/${{ env.GHCR_NAMESPACE }}/dendrite-monolith:buildcache context: . @@ -209,7 +209,7 @@ jobs: - name: Build release Yggdrasil demo image if: github.event_name == 'release' # Only for GitHub releases id: docker_build_demo_yggdrasil_release - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v6 with: cache-from: type=registry,ref=ghcr.io/${{ env.GHCR_NAMESPACE }}/dendrite-monolith:buildcache context: . diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml index aa5f024b..24a5cba6 100644 --- a/.github/workflows/gh-pages.yml +++ b/.github/workflows/gh-pages.yml @@ -30,7 +30,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 - name: Setup Pages - uses: actions/configure-pages@v2 + uses: actions/configure-pages@v5 - name: Build with Jekyll uses: actions/jekyll-build-pages@v1 with: @@ -49,4 +49,4 @@ jobs: steps: - name: Deploy to GitHub Pages id: deployment - uses: actions/deploy-pages@v1 + uses: actions/deploy-pages@v4 diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go index 2fb5e675..562d559d 100644 --- a/cmd/dendrite-demo-pinecone/monolith/monolith.go +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -138,6 +138,7 @@ func (p *P2PMonolith) SetupDendrite( rsAPI.SetFederationAPI(fsAPI, keyRing) userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, enableMetrics, fsAPI.IsBlacklistedOrBackingOff) + rsAPI.SetUserAPI(userAPI) asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI) diff --git a/go.mod b/go.mod index 1e1511d7..b0b9e7a5 100644 --- a/go.mod +++ b/go.mod @@ -29,15 +29,15 @@ require ( github.com/matrix-org/pinecone v0.11.1-0.20230810010612-ea4c33717fd7 github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.24 - github.com/nats-io/nats-server/v2 v2.10.23 + github.com/nats-io/nats-server/v2 v2.10.24 github.com/nats-io/nats.go v1.38.0 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/opentracing/opentracing-go v1.2.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/client_golang v1.20.5 github.com/sirupsen/logrus v1.9.3 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/uber/jaeger-client-go v2.30.0+incompatible @@ -53,7 +53,7 @@ require ( golang.org/x/term v0.27.0 gopkg.in/h2non/bimg.v1 v1.1.9 gopkg.in/yaml.v2 v2.4.0 - gotest.tools/v3 v3.4.0 + gotest.tools/v3 v3.5.1 maunium.net/go/mautrix v0.15.1 modernc.org/sqlite v1.34.5 ) @@ -83,7 +83,7 @@ require ( github.com/blevesearch/zapx/v14 v14.3.10 // indirect github.com/blevesearch/zapx/v15 v15.3.13 // indirect github.com/blevesearch/zapx/v16 v16.0.12 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.6.0 // indirect @@ -114,7 +114,8 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/jwt/v2 v2.5.8 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/jwt/v2 v2.7.3 // indirect github.com/nats-io/nkeys v0.4.9 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect @@ -122,9 +123,9 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.48.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/quic-go/quic-go v0.46.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect diff --git a/go.sum b/go.sum index dedb8053..19381492 100644 --- a/go.sum +++ b/go.sum @@ -65,8 +65,8 @@ github.com/blevesearch/zapx/v16 v16.0.12 h1:Uccxvjmn+hQ6ywQP+wIiTpdq9LnAviGoryJO github.com/blevesearch/zapx/v16 v16.0.12/go.mod h1:MYnOshRfSm4C4drxx1LGRI+MVFByykJ2anDY1fxdk9Q= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/codeclysm/extract v2.2.0+incompatible h1:q3wyckoA30bhUSiwdQezMqVhwd8+WGE64/GL//LtUhI= github.com/codeclysm/extract v2.2.0+incompatible/go.mod h1:2nhFMPHiU9At61hz+12bfrlpXSUrOnK+wR+KlGO4Uks= github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= @@ -219,6 +219,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= @@ -264,10 +266,12 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= -github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.23 h1:jvfb9cEi5h8UG6HkZgJGdn9f1UPaX3Dohk0PohEekJI= -github.com/nats-io/nats-server/v2 v2.10.23/go.mod h1:hMFnpDT2XUXsvHglABlFl/uroQCCOcW6X/0esW6GpBk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4= +github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= @@ -300,14 +304,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= -github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= -github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= -github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= -github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/quic-go/quic-go v0.46.0 h1:uuwLClEEyk1DNvchH8uCByQVjo3yKL9opKulExNDs7Y= github.com/quic-go/quic-go v0.46.0/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= @@ -329,8 +333,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= @@ -430,7 +434,6 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -461,7 +464,6 @@ golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -499,8 +501,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o= -gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8= maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho= maunium.net/go/mautrix v0.15.1 h1:pmCtMjYRpd83+2UL+KTRFYQo5to0373yulimvLK+1k0= diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go index a048e4d0..c37f917c 100644 --- a/setup/config/config_jetstream.go +++ b/setup/config/config_jetstream.go @@ -15,6 +15,8 @@ type JetStream struct { // The prefix to use for stream names for this homeserver - really only // useful if running more than one Dendrite on the same NATS deployment. TopicPrefix string `yaml:"topic_prefix"` + // The JetStream domain, if needed. + JetStreamDomain string `yaml:"js_domain"` // Keep all storage in memory. This is mostly useful for unit tests. InMemory bool `yaml:"in_memory"` // Disable logging. This is mostly useful for unit tests. diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 53365216..672f3e6a 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -22,7 +22,7 @@ func JetStreamConsumer( f func(ctx context.Context, msgs []*nats.Msg) bool, opts ...nats.SubOpt, ) error { - defer func() { + defer func(durable string) { // If there are existing consumers from before they were pull // consumers, we need to clean up the old push consumers. However, // in order to not affect the interest-based policies, we need to @@ -33,86 +33,93 @@ func JetStreamConsumer( logrus.WithContext(ctx).Warnf("Failed to clean up old consumer %q", durable) } } - }() + }(durable) - name := durable + "Pull" - sub, err := js.PullSubscribe(subj, name, opts...) + durable = durable + "Pull" + sub, err := js.PullSubscribe(subj, durable, opts...) if err != nil { sentry.CaptureException(err) - return fmt.Errorf("nats.SubscribeSync: %w", err) + logrus.WithContext(ctx).WithError(err).Warnf("Failed to configure durable %q", durable) + return err } - go func() { - for { - // If the parent context has given up then there's no point in - // carrying on doing anything, so stop the listener. - select { - case <-ctx.Done(): - if err := sub.Unsubscribe(); err != nil { - logrus.WithContext(ctx).Warnf("Failed to unsubscribe %q", durable) - } - return - default: - } - // The context behaviour here is surprising — we supply a context - // so that we can interrupt the fetch if we want, but NATS will still - // enforce its own deadline (roughly 5 seconds by default). Therefore - // it is our responsibility to check whether our context expired or - // not when a context error is returned. Footguns. Footguns everywhere. - msgs, err := sub.Fetch(batch, nats.Context(ctx)) - if err != nil { - if err == context.Canceled || err == context.DeadlineExceeded { - // Work out whether it was the JetStream context that expired - // or whether it was our supplied context. - select { - case <-ctx.Done(): - // The supplied context expired, so we want to stop the - // consumer altogether. - return - default: - // The JetStream context expired, so the fetch probably - // just timed out and we should try again. - continue - } - } else if errors.Is(err, nats.ErrConsumerDeleted) { - // The consumer was deleted so stop. + go jetStreamConsumerWorker(ctx, sub, subj, batch, f) + return nil +} + +func jetStreamConsumerWorker( + ctx context.Context, sub *nats.Subscription, subj string, batch int, + f func(ctx context.Context, msgs []*nats.Msg) bool, +) { + for { + // If the parent context has given up then there's no point in + // carrying on doing anything, so stop the listener. + select { + case <-ctx.Done(): + return + default: + } + // The context behaviour here is surprising — we supply a context + // so that we can interrupt the fetch if we want, but NATS will still + // enforce its own deadline (roughly 5 seconds by default). Therefore + // it is our responsibility to check whether our context expired or + // not when a context error is returned. Footguns. Footguns everywhere. + msgs, err := sub.Fetch(batch, nats.Context(ctx)) + if err != nil { + if err == context.Canceled || err == context.DeadlineExceeded { + // Work out whether it was the JetStream context that expired + // or whether it was our supplied context. + select { + case <-ctx.Done(): + // The supplied context expired, so we want to stop the + // consumer altogether. return - } else { - // Unfortunately, there's no ErrServerShutdown or similar, so we need to compare the string - if err.Error() == "nats: Server Shutdown" { - logrus.WithContext(ctx).Warn("nats server shutting down") - return - } - // Something else went wrong, so we'll panic. - sentry.CaptureException(err) - logrus.WithContext(ctx).WithField("subject", subj).Fatal(err) - } - } - if len(msgs) < 1 { - continue - } - for _, msg := range msgs { - if err = msg.InProgress(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) - sentry.CaptureException(err) + default: + // The JetStream context expired, so the fetch probably + // just timed out and we should try again. continue } - } - if f(ctx, msgs) { - for _, msg := range msgs { - if err = msg.AckSync(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) - sentry.CaptureException(err) - } - } + } else if errors.Is(err, nats.ErrTimeout) { + // Pull request was invalidated, try again. + continue + } else if errors.Is(err, nats.ErrConsumerLeadershipChanged) { + // Leadership changed so pending pull requests became invalidated, + // just try again. + continue + } else if err.Error() == "nats: Server Shutdown" { + // The server is shutting down, but we'll rely on reconnect + // behaviour to try and either connect us to another node (if + // clustered) or to reconnect when the server comes back up. + continue } else { - for _, msg := range msgs { - if err = msg.Nak(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) - sentry.CaptureException(err) - } + // Something else went wrong. + logrus.WithContext(ctx).WithField("subject", subj).WithError(err).Warn("Error on pull subscriber fetch") + return + } + } + if len(msgs) < 1 { + continue + } + for _, msg := range msgs { + if err = msg.InProgress(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) + sentry.CaptureException(err) + continue + } + } + if f(ctx, msgs) { + for _, msg := range msgs { + if err = msg.AckSync(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) + sentry.CaptureException(err) + } + } + } else { + for _, msg := range msgs { + if err = msg.Nak(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) + sentry.CaptureException(err) } } } - }() - return nil + } } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index cc896f8e..e28688af 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/getsentry/sentry-go" "github.com/sirupsen/logrus" "github.com/element-hq/dendrite/setup/config" @@ -36,17 +35,20 @@ func DeleteAllStreams(js natsclient.JetStreamContext, cfg *config.JetStream) { func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { natsLock.Lock() defer natsLock.Unlock() - // check if we need an in-process NATS Server - if len(cfg.Addresses) != 0 { - // reuse existing connections - if s.nc != nil { - return s.js, s.nc - } + var err error + + // If an existing connection exists, return it. + if s.nc != nil && s.js != nil { + return s.js, s.nc + } + + // For connecting to an external NATS server. + if len(cfg.Addresses) > 0 { s.js, s.nc = setupNATS(process, cfg, nil) return s.js, s.nc } - if s.Server == nil { - var err error + + if len(cfg.Addresses) == 0 && s.Server == nil { opts := &natsserver.Options{ ServerName: "monolith", DontListen: true, @@ -58,8 +60,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS NoLog: cfg.NoLog, SyncAlways: true, } - s.Server, err = natsserver.NewServer(opts) - if err != nil { + if s.Server, err = natsserver.NewServer(opts); err != nil { panic(err) } if !cfg.NoLog { @@ -75,29 +76,42 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS s.WaitForShutdown() process.ComponentFinished() }() + if !s.ReadyForConnections(time.Second * 60) { + logrus.Fatalln("NATS did not start in time") + } } - if !s.ReadyForConnections(time.Second * 60) { - logrus.Fatalln("NATS did not start in time") - } - // reuse existing connections - if s.nc != nil { - return s.js, s.nc - } - nc, err := natsclient.Connect("", natsclient.InProcessServer(s)) - if err != nil { + + // No existing process connection, create a new one. + if s.nc, err = natsclient.Connect("", natsclient.InProcessServer(s.Server)); err != nil { logrus.Fatalln("Failed to create NATS client") } - js, _ := setupNATS(process, cfg, nc) - s.js = js - s.nc = nc - return js, nc + s.js, s.nc = setupNATS(process, cfg, s.nc) + return s.js, s.nc } // nolint:gocyclo func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { + jsOpts := []natsclient.JSOpt{} + if cfg.JetStreamDomain != "" { + jsOpts = append(jsOpts, natsclient.Domain(cfg.JetStreamDomain)) + } + if nc == nil { var err error - opts := []natsclient.Option{} + opts := []natsclient.Option{ + natsclient.Name("Dendrite"), + natsclient.MaxReconnects(-1), // Try forever + natsclient.ReconnectJitter(time.Second, time.Second), + natsclient.ReconnectWait(time.Second * 10), + natsclient.ReconnectHandler(func(c *natsclient.Conn) { + js, jerr := c.JetStream(jsOpts...) + if jerr != nil { + logrus.WithError(jerr).Panic("Unable to get JetStream context in reconnect handler") + return + } + checkAndConfigureStreams(process, cfg, js) + }), + } if cfg.DisableTLSValidation { opts = append(opts, natsclient.Secure(&tls.Config{ InsecureSkipVerify: true, @@ -113,15 +127,19 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc } } - s, err := nc.JetStream() + js, err := nc.JetStream(jsOpts...) if err != nil { logrus.WithError(err).Panic("Unable to get JetStream context") return nil, nil } + checkAndConfigureStreams(process, cfg, js) + return js, nc +} +func checkAndConfigureStreams(process *process.ProcessContext, cfg *config.JetStream, js natsclient.JetStreamContext) { for _, stream := range streams { // streams are defined in streams.go name := cfg.Prefixed(stream.Name) - info, err := s.StreamInfo(name) + info, err := js.StreamInfo(name) if err != nil && err != natsclient.ErrStreamNotFound { logrus.WithError(err).Fatal("Unable to get stream info") } @@ -153,11 +171,11 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc case info.Config.MaxAge != stream.MaxAge: // Try updating the stream first, as many things can be updated // non-destructively. - if info, err = s.UpdateStream(stream); err != nil { + if info, err = js.UpdateStream(stream); err != nil { logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name) // We failed to update the stream, this is a last attempt to get // things working but may result in data loss. - if err = s.DeleteStream(name); err != nil { + if err = js.DeleteStream(name); err != nil { logrus.WithError(err).Fatalf("Unable to delete stream %q", name) } info = nil @@ -176,7 +194,7 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc namespaced := *stream namespaced.Name = name namespaced.Subjects = subjects - if _, err = s.AddStream(&namespaced); err != nil { + if _, err = js.AddStream(&namespaced); err != nil { logger := logrus.WithError(err).WithFields(logrus.Fields{ "stream": namespaced.Name, "subjects": namespaced.Subjects, @@ -193,10 +211,9 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc // we can't recover anything that was queued on the disk but we // will still be able to start and run hopefully in the meantime. logger.WithError(err).Error("Unable to add stream") - sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err)) namespaced.Storage = natsclient.MemoryStorage - if _, err = s.AddStream(&namespaced); err != nil { + if _, err = js.AddStream(&namespaced); err != nil { // We tried to add the stream in-memory instead but something // went wrong. That's an unrecoverable situation so we will // give up at this point. @@ -208,7 +225,6 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc // disk will be left alone, but our ability to recover from a // future crash will be limited. Yell about it. err := fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory", namespaced.Name) - sentry.CaptureException(err) process.Degraded(err) } } @@ -229,15 +245,13 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc streamName := cfg.Matrix.JetStream.Prefixed(stream) for _, consumer := range consumers { consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull" - consumerInfo, err := s.ConsumerInfo(streamName, consumerName) + consumerInfo, err := js.ConsumerInfo(streamName, consumerName) if err != nil || consumerInfo == nil { continue } - if err = s.DeleteConsumer(streamName, consumerName); err != nil { + if err = js.DeleteConsumer(streamName, consumerName); err != nil { logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream) } } } - - return s, nc }