from datetime import date from django.contrib.auth.models import User import pytest import requests from aleksis.apps.matrix.models import MatrixProfile, MatrixRoom from aleksis.core.models import Group, Person, SchoolTerm from aleksis.core.util.core_helpers import get_site_preferences pytestmark = pytest.mark.django_db SERVER_URL = "http://127.0.0.1:8008" def test_connection(synapse): assert synapse["listeners"][0]["port"] == 8008 assert requests.get(SERVER_URL).status_code == requests.codes.ok @pytest.fixture def matrix_bot_user(synapse): from aleksis.apps.matrix.matrix import build_url body = {"username": "aleksis-bot", "password": "test", "auth": {"type": "m.login.dummy"}} get_site_preferences()["matrix__homeserver"] = SERVER_URL r = requests.post(build_url("register"), json=body) print(r.text, build_url("register")) assert r.status_code == requests.codes.ok user = r.json() get_site_preferences()["matrix__user"] = user["user_id"] get_site_preferences()["matrix__device_id"] = user["device_id"] get_site_preferences()["matrix__access_token"] = user["access_token"] yield r.json() def test_create_room_for_group(matrix_bot_user): from aleksis.apps.matrix.matrix import build_url, get_headers g = Group.objects.create(name="Test Room") assert not MatrixRoom.objects.all().exists() room = MatrixRoom.from_group(g) assert ":matrix.aleksis.example.org" in room.room_id assert room.alias == "#test-room:matrix.aleksis.example.org" # On second get, it should be the same matrix room assert MatrixRoom.from_group(g) == room r = requests.get(build_url(f"rooms/{room.room_id}/aliases"), headers=get_headers()) aliases = r.json()["aliases"] assert "#test-room:matrix.aleksis.example.org" in aliases # def test_create_room_for_group_short_name(matrix_bot_user): g = Group.objects.create(name="Test Room", short_name="test") assert not MatrixRoom.objects.all().exists() room = MatrixRoom.from_group(g) assert room.alias == "#test:matrix.aleksis.example.org" def test_room_alias_collision_same_name(matrix_bot_user): from aleksis.apps.matrix.matrix import MatrixException g1 = Group.objects.create(name="Test Room") g2 = Group.objects.create(name="test-room") g3 = Group.objects.create(name="Test-Room") g4 = Group.objects.create(name="test room") room = MatrixRoom.from_group(g1) assert room.alias == "#test-room:matrix.aleksis.example.org" room = MatrixRoom.from_group(g2) assert room.alias == "#test-room-2:matrix.aleksis.example.org" room = MatrixRoom.from_group(g3) assert room.alias == "#test-room-3:matrix.aleksis.example.org" get_site_preferences()["matrix__disambiguate_room_aliases"] = False with pytest.raises(MatrixException): MatrixRoom.from_group(g4) def test_room_alias_collision_school_term(matrix_bot_user): school_term_a = SchoolTerm.objects.create( name="Test Term A", date_start=date(2020, 1, 1), date_end=date(2020, 12, 31) ) school_term_b = SchoolTerm.objects.create( name="Test Term B", date_start=date(2021, 1, 1), date_end=date(2021, 12, 31) ) g1 = Group.objects.create(name="Test Room", school_term=school_term_a) g2 = Group.objects.create(name="Test Room", school_term=school_term_b) room = MatrixRoom.from_group(g1) assert room.alias == "#test-room:matrix.aleksis.example.org" room = MatrixRoom.from_group(g2) assert room.alias == "#test-room-2:matrix.aleksis.example.org" def test_sync_room_members(matrix_bot_user): from aleksis.apps.matrix.matrix import build_url, get_headers get_site_preferences()["matrix__homeserver_ids"] = "matrix.aleksis.example.org" g = Group.objects.create(name="Test Room") u1 = User.objects.create_user("test1", "test1@example.org", "test1") u2 = User.objects.create_user("test2", "test2@example.org", "test2") u3 = User.objects.create_user("test3", "test3@example.org", "test3") u4 = User.objects.create_user("test4", "test4@example.org", "test4") u5 = User.objects.create_user("test5", "test5@example.org", "test5") p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1) p2 = Person.objects.create(first_name="Test 2", last_name="Person", user=u2) p3 = Person.objects.create(first_name="Test 3", last_name="Person", user=u3) p4 = Person.objects.create(first_name="Test 4", last_name="Person", user=u4) p5 = Person.objects.create(first_name="Test 5", last_name="Person", user=u5) g.members.set([p1, p2, p3]) g.owners.set([p4, p5]) room = MatrixRoom.from_group(g) room.sync_profiles() assert MatrixProfile.objects.all().count() == 5 assert p1.matrix_profile assert p1.matrix_profile.matrix_id == "@test1:matrix.aleksis.example.org" assert p2.matrix_profile assert p2.matrix_profile.matrix_id == "@test2:matrix.aleksis.example.org" assert p3.matrix_profile assert p3.matrix_profile.matrix_id == "@test3:matrix.aleksis.example.org" assert p4.matrix_profile assert p4.matrix_profile.matrix_id == "@test4:matrix.aleksis.example.org" assert p5.matrix_profile assert p5.matrix_profile.matrix_id == "@test5:matrix.aleksis.example.org" # Check members r = requests.get( build_url(f"rooms/{room.room_id}/members"), headers=get_headers(), json={"membership": ["join", "invite"]}, ) assert r.status_code == requests.codes.ok matrix_ids = [x["state_key"] for x in r.json()["chunk"]] assert p1.matrix_profile.matrix_id in matrix_ids assert p2.matrix_profile.matrix_id in matrix_ids assert p3.matrix_profile.matrix_id in matrix_ids assert p4.matrix_profile.matrix_id in matrix_ids assert p5.matrix_profile.matrix_id in matrix_ids # Get power levels r = requests.get(build_url(f"rooms/{room.room_id}/state"), headers=get_headers()) assert r.status_code == requests.codes.ok for event in r.json(): if not event["type"] == "m.room.power_levels": continue current_power_levels = event["content"]["users"] assert current_power_levels[p1.matrix_profile.matrix_id] == 0 assert current_power_levels[p2.matrix_profile.matrix_id] == 0 assert current_power_levels[p3.matrix_profile.matrix_id] == 0 assert current_power_levels[p4.matrix_profile.matrix_id] == 50 assert current_power_levels[p5.matrix_profile.matrix_id] == 50 break def test_sync_room_members_without_user(matrix_bot_user): get_site_preferences()["matrix__homeserver_ids"] = "matrix.aleksis.example.org" g = Group.objects.create(name="Test Room") u1 = User.objects.create_user("test1", "test1@example.org", "test1") p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1) p2 = Person.objects.create(first_name="Test 2", last_name="Person") g.members.set([p1, p2]) room = MatrixRoom.from_group(g) room.sync_profiles() assert MatrixProfile.objects.all().count() == 1 assert p1.matrix_profile assert p1.matrix_profile.matrix_id == "@test1:matrix.aleksis.example.org" assert not hasattr(p2, "matrix_profile") # test no homeserver for ids def test_sync_room_members_without_homeserver(matrix_bot_user): get_site_preferences()["matrix__homeserver_ids"] = "" g = Group.objects.create(name="Test Room") u1 = User.objects.create_user("test1", "test1@example.org", "test1") p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1) p2 = Person.objects.create(first_name="Test 2", last_name="Person") MatrixProfile.objects.create(person=p2, matrix_id="@test2:matrix.aleksis.example.org") g.members.set([p1, p2]) room = MatrixRoom.from_group(g) room.sync_profiles() assert MatrixProfile.objects.all().count() == 1 assert not hasattr(p1, "matrix_profile") assert p2.matrix_profile assert p2.matrix_profile.matrix_id == "@test2:matrix.aleksis.example.org"