From 80ee52e09242b488c799c0646c01e4fbd30f5467 Mon Sep 17 00:00:00 2001 From: Roman Isaev Date: Thu, 9 Jan 2025 03:03:10 +0000 Subject: [PATCH] fix syncapi tests --- syncapi/syncapi_test.go | 142 ++++++++++++++++++++++++++++++---------- 1 file changed, 108 insertions(+), 34 deletions(-) diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index b3dbf86c..bd322f66 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/element-hq/dendrite/federationapi/statistics" "github.com/element-hq/dendrite/internal/caching" "github.com/element-hq/dendrite/internal/httputil" "github.com/element-hq/dendrite/internal/sqlutil" @@ -19,6 +18,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/matrix-org/util" "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" @@ -28,7 +28,6 @@ import ( "github.com/element-hq/dendrite/syncapi/storage" "github.com/element-hq/dendrite/syncapi/synctypes" - "github.com/element-hq/dendrite/clientapi/auth" "github.com/element-hq/dendrite/clientapi/producers" "github.com/element-hq/dendrite/roomserver" "github.com/element-hq/dendrite/roomserver/api" @@ -37,14 +36,9 @@ import ( "github.com/element-hq/dendrite/syncapi/types" "github.com/element-hq/dendrite/test" "github.com/element-hq/dendrite/test/testrig" - usrapi "github.com/element-hq/dendrite/userapi" userapi "github.com/element-hq/dendrite/userapi/api" ) -var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) { - return &statistics.ServerStatistics{}, nil -} - type syncRoomserverAPI struct { rsapi.SyncRoomserverAPI rooms []*test.Room @@ -126,6 +120,20 @@ func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.Pe return nil } +type userVerifier struct { + m map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + } +} + +func (u *userVerifier) VerifyUserFromRequest(req *http.Request) (*userapi.Device, *util.JSONResponse) { + if pair, ok := u.m[req.URL.Query().Get("access_token")]; ok { + return pair.Device, pair.Response + } + return nil, nil +} + func TestSyncAPIAccessTokens(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { testSyncAccessTokens(t, dbType) @@ -153,13 +161,16 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) msgs := toNATSMsgs(t, cfg, room.Events()...) + uv := &userVerifier{} - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, nil, caching.DisableMetrics) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, uv, caching.DisableMetrics) testrig.MustPublishMsgs(t, jsctx, msgs...) testCases := []struct { name string req *http.Request + device *userapi.Device + response *util.JSONResponse wantCode int wantJoinedRooms []string }{ @@ -168,6 +179,11 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "timeout": "0", })), + device: nil, + response: &util.JSONResponse{ + Code: http.StatusUnauthorized, + JSON: spec.UnknownToken("Unknown token"), + }, wantCode: 401, }, { @@ -176,6 +192,11 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { "access_token": "foo", "timeout": "0", })), + device: nil, + response: &util.JSONResponse{ + Code: http.StatusUnauthorized, + JSON: spec.UnknownToken("Unknown token"), + }, wantCode: 401, }, { @@ -184,11 +205,25 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { "access_token": alice.AccessToken, "timeout": "0", })), + device: &alice, + response: nil, wantCode: 200, wantJoinedRooms: []string{room.ID}, }, } + uv.m = make(map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + }, len(testCases)) + for _, tc := range testCases { + + uv.m[tc.req.URL.Query().Get("access_token")] = struct { + Device *userapi.Device + Response *util.JSONResponse + }{Device: tc.device, Response: tc.response} + } + syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool { // wait for the last sent eventID to come down sync path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID()) @@ -249,14 +284,20 @@ func testSyncEventFormatPowerLevels(t *testing.T, dbType test.DBType) { cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) natsInstance := jetstream.NATSInstance{} - userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) - userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} + uv := userVerifier{ + m: map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + }{ + alice.AccessToken: {Device: &alice, Response: nil}, + }, + } defer close() jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) msgs := toNATSMsgs(t, cfg, room.Events()...) - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &userVerifier, caching.DisableMetrics) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &uv, caching.DisableMetrics) testrig.MustPublishMsgs(t, jsctx, msgs...) testCases := []struct { @@ -409,9 +450,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { // m.room.history_visibility msgs := toNATSMsgs(t, cfg, room.Events()...) sinceTokens := make([]string, len(msgs)) - userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) - userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &userVerifier, caching.DisableMetrics) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, nil, caching.DisableMetrics) for i, msg := range msgs { testrig.MustPublishMsgs(t, jsctx, msg) time.Sleep(100 * time.Millisecond) @@ -499,9 +538,15 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) { jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) - userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) - userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, &userVerifier, caching.DisableMetrics) + uv := userVerifier{ + m: map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + }{ + alice.AccessToken: {Device: &alice, Response: nil}, + }, + } + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, &uv, caching.DisableMetrics) w := httptest.NewRecorder() routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "access_token": alice.AccessToken, @@ -623,9 +668,15 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { // Use the actual internal roomserver API rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI.SetFederationAPI(nil, nil) - userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) - userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, &userVerifier, caching.DisableMetrics) + uv := userVerifier{ + m: map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + }{ + bobDev.AccessToken: {Device: &bobDev, Response: nil}, + }, + } + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, &uv, caching.DisableMetrics) for _, tc := range testCases { testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) @@ -894,11 +945,17 @@ func TestGetMembership(t *testing.T) { // Use an actual roomserver for this rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI.SetFederationAPI(nil, nil) + uv := userVerifier{ + m: map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + }{ + aliceDev.AccessToken: {Device: &aliceDev, Response: nil}, + bobDev.AccessToken: {Device: &bobDev, Response: nil}, + }, + } - userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) - userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} - - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, &userVerifier, caching.DisableMetrics) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, &uv, caching.DisableMetrics) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -965,12 +1022,18 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) defer close() natsInstance := jetstream.NATSInstance{} - userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) - userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} + uv := userVerifier{ + m: map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + }{ + alice.AccessToken: {Device: &alice, Response: nil}, + }, + } jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, &userVerifier, caching.DisableMetrics) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, &uv, caching.DisableMetrics) producer := producers.SyncAPIProducer{ TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), @@ -1193,10 +1256,16 @@ func testContext(t *testing.T, dbType test.DBType) { rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI.SetFederationAPI(nil, nil) - userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) - userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} + uv := userVerifier{ + m: map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + }{ + alice.AccessToken: {Device: &alice, Response: nil}, + }, + } - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, &userVerifier, caching.DisableMetrics) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, &uv, caching.DisableMetrics) room := test.NewRoom(t, user) @@ -1375,12 +1444,17 @@ func TestRemoveEditedEventFromSearchIndex(t *testing.T) { rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics) rsAPI.SetFederationAPI(nil, nil) - - userAPI := usrapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff) - userVerifier := auth.DefaultUserVerifier{UserAPI: userAPI} + uv := userVerifier{ + m: map[string]struct { + Device *userapi.Device + Response *util.JSONResponse + }{ + alice.AccessToken: {Device: &alice, Response: nil}, + }, + } room := test.NewRoom(t, user) - AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &userVerifier, caching.DisableMetrics) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, &uv, caching.DisableMetrics) if err := api.SendEvents(processCtx.Context(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err)