fix syncapi tests

This commit is contained in:
Roman Isaev 2025-01-09 03:03:10 +00:00
parent 78457f30ed
commit 80ee52e092
No known key found for this signature in database
GPG key ID: 7BE2B6A6C89AEC7F

View file

@ -11,7 +11,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/element-hq/dendrite/federationapi/statistics"
"github.com/element-hq/dendrite/internal/caching" "github.com/element-hq/dendrite/internal/caching"
"github.com/element-hq/dendrite/internal/httputil" "github.com/element-hq/dendrite/internal/httputil"
"github.com/element-hq/dendrite/internal/sqlutil" "github.com/element-hq/dendrite/internal/sqlutil"
@ -19,6 +18,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
@ -28,7 +28,6 @@ import (
"github.com/element-hq/dendrite/syncapi/storage" "github.com/element-hq/dendrite/syncapi/storage"
"github.com/element-hq/dendrite/syncapi/synctypes" "github.com/element-hq/dendrite/syncapi/synctypes"
"github.com/element-hq/dendrite/clientapi/auth"
"github.com/element-hq/dendrite/clientapi/producers" "github.com/element-hq/dendrite/clientapi/producers"
"github.com/element-hq/dendrite/roomserver" "github.com/element-hq/dendrite/roomserver"
"github.com/element-hq/dendrite/roomserver/api" "github.com/element-hq/dendrite/roomserver/api"
@ -37,14 +36,9 @@ import (
"github.com/element-hq/dendrite/syncapi/types" "github.com/element-hq/dendrite/syncapi/types"
"github.com/element-hq/dendrite/test" "github.com/element-hq/dendrite/test"
"github.com/element-hq/dendrite/test/testrig" "github.com/element-hq/dendrite/test/testrig"
usrapi "github.com/element-hq/dendrite/userapi"
userapi "github.com/element-hq/dendrite/userapi/api" userapi "github.com/element-hq/dendrite/userapi/api"
) )
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
return &statistics.ServerStatistics{}, nil
}
type syncRoomserverAPI struct { type syncRoomserverAPI struct {
rsapi.SyncRoomserverAPI rsapi.SyncRoomserverAPI
rooms []*test.Room rooms []*test.Room
@ -126,6 +120,20 @@ func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.Pe
return nil return nil
} }
type userVerifier struct {
m map[string]struct {
Device *userapi.Device
Response *util.JSONResponse
}
}
func (u *userVerifier) VerifyUserFromRequest(req *http.Request) (*userapi.Device, *util.JSONResponse) {
if pair, ok := u.m[req.URL.Query().Get("access_token")]; ok {
return pair.Device, pair.Response
}
return nil, nil
}
func TestSyncAPIAccessTokens(t *testing.T) { func TestSyncAPIAccessTokens(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
testSyncAccessTokens(t, dbType) testSyncAccessTokens(t, dbType)
@ -153,13 +161,16 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
msgs := toNATSMsgs(t, cfg, room.Events()...) msgs := toNATSMsgs(t, cfg, room.Events()...)
uv := &userVerifier{}
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, nil, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, uv, caching.DisableMetrics)
testrig.MustPublishMsgs(t, jsctx, msgs...) testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct { testCases := []struct {
name string name string
req *http.Request req *http.Request
device *userapi.Device
response *util.JSONResponse
wantCode int wantCode int
wantJoinedRooms []string wantJoinedRooms []string
}{ }{
@ -168,6 +179,11 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"timeout": "0", "timeout": "0",
})), })),
device: nil,
response: &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: spec.UnknownToken("Unknown token"),
},
wantCode: 401, wantCode: 401,
}, },
{ {
@ -176,6 +192,11 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
"access_token": "foo", "access_token": "foo",
"timeout": "0", "timeout": "0",
})), })),
device: nil,
response: &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: spec.UnknownToken("Unknown token"),
},
wantCode: 401, wantCode: 401,
}, },
{ {
@ -184,11 +205,25 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
"access_token": alice.AccessToken, "access_token": alice.AccessToken,
"timeout": "0", "timeout": "0",
})), })),
device: &alice,
response: nil,
wantCode: 200, wantCode: 200,
wantJoinedRooms: []string{room.ID}, wantJoinedRooms: []string{room.ID},
}, },
} }
uv.m = make(map[string]struct {
Device *userapi.Device
Response *util.JSONResponse
}, len(testCases))
for _, tc := range testCases {
uv.m[tc.req.URL.Query().Get("access_token")] = struct {
Device *userapi.Device
Response *util.JSONResponse
}{Device: tc.device, Response: tc.response}
}
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool { syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync // wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID()) path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
@ -249,14 +284,20 @@ func testSyncEventFormatPowerLevels(t *testing.T, dbType test.DBType) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) uv := userVerifier{
userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} m: map[string]struct {
Device *userapi.Device
Response *util.JSONResponse
}{
alice.AccessToken: {Device: &alice, Response: nil},
},
}
defer close() defer close()
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
msgs := toNATSMsgs(t, cfg, room.Events()...) msgs := toNATSMsgs(t, cfg, room.Events()...)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &userVerifier, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &uv, caching.DisableMetrics)
testrig.MustPublishMsgs(t, jsctx, msgs...) testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct { testCases := []struct {
@ -409,9 +450,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
// m.room.history_visibility // m.room.history_visibility
msgs := toNATSMsgs(t, cfg, room.Events()...) msgs := toNATSMsgs(t, cfg, room.Events()...)
sinceTokens := make([]string, len(msgs)) sinceTokens := make([]string, len(msgs))
userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, nil, caching.DisableMetrics)
userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI}
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &userVerifier, caching.DisableMetrics)
for i, msg := range msgs { for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg) testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -499,9 +538,15 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) uv := userVerifier{
userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} m: map[string]struct {
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, &userVerifier, caching.DisableMetrics) Device *userapi.Device
Response *util.JSONResponse
}{
alice.AccessToken: {Device: &alice, Response: nil},
},
}
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, &uv, caching.DisableMetrics)
w := httptest.NewRecorder() w := httptest.NewRecorder()
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken, "access_token": alice.AccessToken,
@ -623,9 +668,15 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// Use the actual internal roomserver API // Use the actual internal roomserver API
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) uv := userVerifier{
userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} m: map[string]struct {
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, &userVerifier, caching.DisableMetrics) Device *userapi.Device
Response *util.JSONResponse
}{
bobDev.AccessToken: {Device: &bobDev, Response: nil},
},
}
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, &uv, caching.DisableMetrics)
for _, tc := range testCases { for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -894,11 +945,17 @@ func TestGetMembership(t *testing.T) {
// Use an actual roomserver for this // Use an actual roomserver for this
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
uv := userVerifier{
m: map[string]struct {
Device *userapi.Device
Response *util.JSONResponse
}{
aliceDev.AccessToken: {Device: &aliceDev, Response: nil},
bobDev.AccessToken: {Device: &bobDev, Response: nil},
},
}
userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, &uv, caching.DisableMetrics)
userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI}
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, &userVerifier, caching.DisableMetrics)
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
@ -965,12 +1022,18 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close() defer close()
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) uv := userVerifier{
userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} m: map[string]struct {
Device *userapi.Device
Response *util.JSONResponse
}{
alice.AccessToken: {Device: &alice, Response: nil},
},
}
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, &userVerifier, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, &uv, caching.DisableMetrics)
producer := producers.SyncAPIProducer{ producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
@ -1193,10 +1256,16 @@ func testContext(t *testing.T, dbType test.DBType) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) uv := userVerifier{
userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} m: map[string]struct {
Device *userapi.Device
Response *util.JSONResponse
}{
alice.AccessToken: {Device: &alice, Response: nil},
},
}
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, &userVerifier, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, &uv, caching.DisableMetrics)
room := test.NewRoom(t, user) room := test.NewRoom(t, user)
@ -1375,12 +1444,17 @@ func TestRemoveEditedEventFromSearchIndex(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
uv := userVerifier{
userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) m: map[string]struct {
userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} Device *userapi.Device
Response *util.JSONResponse
}{
alice.AccessToken: {Device: &alice, Response: nil},
},
}
room := test.NewRoom(t, user) room := test.NewRoom(t, user)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &userVerifier, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &uv, caching.DisableMetrics)
if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err) t.Fatalf("failed to send events: %v", err)