import time from datetime import date from django.contrib.auth.models import User import pytest import requests from celery.result import AsyncResult from aleksis.apps.matrix.matrix import do_matrix_request from aleksis.apps.matrix.models import MatrixProfile, MatrixRoom, MatrixSpace from aleksis.core.models import Group, Person, SchoolTerm from aleksis.core.util.core_helpers import get_site_preferences pytestmark = pytest.mark.django_db SERVER_URL = "http://127.0.0.1:8008" def test_connection(synapse): assert synapse["listeners"][0]["port"] == 8008 assert requests.get(SERVER_URL).status_code == requests.codes.ok @pytest.fixture def matrix_bot_user(synapse): from aleksis.apps.matrix.matrix import build_url body = {"username": "aleksis-bot", "password": "test", "auth": {"type": "m.login.dummy"}} get_site_preferences()["matrix__homeserver"] = SERVER_URL r = requests.post(build_url("register"), json=body) print(r.text, build_url("register")) assert r.status_code == requests.codes.ok user = r.json() get_site_preferences()["matrix__user"] = user["user_id"] get_site_preferences()["matrix__device_id"] = user["device_id"] get_site_preferences()["matrix__access_token"] = user["access_token"] yield user def test_matrix_bot_user(matrix_bot_user): print(matrix_bot_user) assert True def test_create_room_for_group(matrix_bot_user): from aleksis.apps.matrix.matrix import build_url, get_headers g = Group.objects.create(name="Test Room") assert not MatrixRoom.objects.all().exists() room = MatrixRoom.from_group(g) assert ":matrix.aleksis.example.org" in room.room_id assert room.alias == "#test-room:matrix.aleksis.example.org" # On second get, it should be the same matrix room assert MatrixRoom.from_group(g) == room r = do_matrix_request("GET", f"rooms/{room.room_id}/aliases") aliases = r["aliases"] assert "#test-room:matrix.aleksis.example.org" in aliases # def test_create_room_for_group_short_name(matrix_bot_user): g = Group.objects.create(name="Test Room", short_name="test") assert not MatrixRoom.objects.all().exists() room = MatrixRoom.from_group(g) assert room.alias == "#test:matrix.aleksis.example.org" def test_room_alias_collision_same_name(matrix_bot_user): from aleksis.apps.matrix.matrix import MatrixException g1 = Group.objects.create(name="Test Room") g2 = Group.objects.create(name="test-room") g3 = Group.objects.create(name="Test-Room") g4 = Group.objects.create(name="test room") get_site_preferences()["matrix__disambiguate_room_aliases"] = True room = MatrixRoom.from_group(g1) assert room.alias == "#test-room:matrix.aleksis.example.org" room = MatrixRoom.from_group(g2) assert room.alias == "#test-room-2:matrix.aleksis.example.org" room = MatrixRoom.from_group(g3) assert room.alias == "#test-room-3:matrix.aleksis.example.org" get_site_preferences()["matrix__disambiguate_room_aliases"] = False with pytest.raises(MatrixException): MatrixRoom.from_group(g4) def test_room_alias_collision_school_term(matrix_bot_user): get_site_preferences()["matrix__disambiguate_room_aliases"] = True school_term_a = SchoolTerm.objects.create( name="Test Term A", date_start=date(2020, 1, 1), date_end=date(2020, 12, 31) ) school_term_b = SchoolTerm.objects.create( name="Test Term B", date_start=date(2021, 1, 1), date_end=date(2021, 12, 31) ) g1 = Group.objects.create(name="Test Room", school_term=school_term_a) g2 = Group.objects.create(name="Test Room", school_term=school_term_b) room = MatrixRoom.from_group(g1) assert room.alias == "#test-room:matrix.aleksis.example.org" room = MatrixRoom.from_group(g2) assert room.alias == "#test-room-2:matrix.aleksis.example.org" def test_sync_room_members(matrix_bot_user): from aleksis.apps.matrix.matrix import build_url, get_headers get_site_preferences()["matrix__homeserver_ids"] = "matrix.aleksis.example.org" g = Group.objects.create(name="Test Room") u1 = User.objects.create_user("test1", "test1@example.org", "test1") u2 = User.objects.create_user("test2", "test2@example.org", "test2") u3 = User.objects.create_user("test3", "test3@example.org", "test3") u4 = User.objects.create_user("test4", "test4@example.org", "test4") u5 = User.objects.create_user("test5", "test5@example.org", "test5") p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1) p2 = Person.objects.create(first_name="Test 2", last_name="Person", user=u2) p3 = Person.objects.create(first_name="Test 3", last_name="Person", user=u3) p4 = Person.objects.create(first_name="Test 4", last_name="Person", user=u4) p5 = Person.objects.create(first_name="Test 5", last_name="Person", user=u5) g.members.set([p1, p2, p3]) g.owners.set([p4, p5]) room = MatrixRoom.from_group(g) room.sync_profiles() assert MatrixProfile.objects.all().count() == 5 assert p1.matrix_profile assert p1.matrix_profile.matrix_id == "@test1:matrix.aleksis.example.org" assert p2.matrix_profile assert p2.matrix_profile.matrix_id == "@test2:matrix.aleksis.example.org" assert p3.matrix_profile assert p3.matrix_profile.matrix_id == "@test3:matrix.aleksis.example.org" assert p4.matrix_profile assert p4.matrix_profile.matrix_id == "@test4:matrix.aleksis.example.org" assert p5.matrix_profile assert p5.matrix_profile.matrix_id == "@test5:matrix.aleksis.example.org" # Check members r = do_matrix_request( "GET", f"rooms/{room.room_id}/members", body={"membership": ["join", "invite"]}, ) matrix_ids = [x["state_key"] for x in r["chunk"]] assert p1.matrix_profile.matrix_id in matrix_ids assert p2.matrix_profile.matrix_id in matrix_ids assert p3.matrix_profile.matrix_id in matrix_ids assert p4.matrix_profile.matrix_id in matrix_ids assert p5.matrix_profile.matrix_id in matrix_ids # Get power levels r = do_matrix_request("GET", f"rooms/{room.room_id}/state") for event in r: if not event["type"] == "m.room.power_levels": continue current_power_levels = event["content"]["users"] assert current_power_levels[p1.matrix_profile.matrix_id] == 0 assert current_power_levels[p2.matrix_profile.matrix_id] == 0 assert current_power_levels[p3.matrix_profile.matrix_id] == 0 assert current_power_levels[p4.matrix_profile.matrix_id] == 50 assert current_power_levels[p5.matrix_profile.matrix_id] == 50 break def test_sync_room_members_without_user(matrix_bot_user): get_site_preferences()["matrix__homeserver_ids"] = "matrix.aleksis.example.org" g = Group.objects.create(name="Test Room") u1 = User.objects.create_user("test1", "test1@example.org", "test1") p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1) p2 = Person.objects.create(first_name="Test 2", last_name="Person") g.members.set([p1, p2]) room = MatrixRoom.from_group(g) room.sync_profiles() assert MatrixProfile.objects.all().count() == 1 assert p1.matrix_profile assert p1.matrix_profile.matrix_id == "@test1:matrix.aleksis.example.org" assert not hasattr(p2, "matrix_profile") # test no homeserver for ids def test_sync_room_members_without_homeserver(matrix_bot_user): get_site_preferences()["matrix__homeserver_ids"] = "" g = Group.objects.create(name="Test Room") u1 = User.objects.create_user("test1", "test1@example.org", "test1") p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1) p2 = Person.objects.create(first_name="Test 2", last_name="Person") MatrixProfile.objects.create(person=p2, matrix_id="@test2:matrix.aleksis.example.org") g.members.set([p1, p2]) room = MatrixRoom.from_group(g) room.sync_profiles() assert MatrixProfile.objects.all().count() == 1 assert not hasattr(p1, "matrix_profile") assert p2.matrix_profile assert p2.matrix_profile.matrix_id == "@test2:matrix.aleksis.example.org" def test_use_room_sync(matrix_bot_user): from aleksis.apps.matrix.matrix import build_url, get_headers get_site_preferences()["matrix__homeserver_ids"] = "matrix.aleksis.example.org" g = Group.objects.create(name="Test Room") u1 = User.objects.create_user("test1", "test1@example.org", "test1") p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1) g.members.add(p1) r = g.use_in_matrix(sync=True) assert isinstance(r, MatrixRoom) assert MatrixProfile.objects.all().count() == 1 assert p1.matrix_profile assert p1.matrix_profile.matrix_id == "@test1:matrix.aleksis.example.org" from django.test import TransactionTestCase, override_settings @pytest.mark.usefixtures("celery_worker", "matrix_bot_user") @override_settings(CELERY_BROKER_URL="memory://localhost//") @override_settings(HAYSTACK_SIGNAL_PROCESSOR="") class MatrixCeleryTest(TransactionTestCase): serialized_rollback = True def test_use_room_async(self): get_site_preferences()["matrix__homeserver_ids"] = "matrix.aleksis.example.org" g = Group.objects.create(name="Test Room") u1 = User.objects.create_user("test1", "test1@example.org", "test1") p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1) g.members.add(p1) r = g.use_in_matrix(sync=False) assert isinstance(r, AsyncResult) time.sleep(3) assert MatrixProfile.objects.all().count() == 1 assert p1.matrix_profile assert p1.matrix_profile.matrix_id == "@test1:matrix.aleksis.example.org" def test_space_creation(matrix_bot_user): parent_group = Group.objects.create(name="Test Group") child_1 = Group.objects.create(name="Test Group 1") child_2 = Group.objects.create(name="Test Group 2") child_3 = Group.objects.create(name="Test Group 3") parent_group.child_groups.set([child_1, child_2, child_3]) parent_group.use_in_matrix(sync=True) get_site_preferences()["matrix__use_spaces"] = True space = MatrixSpace.from_group(parent_group) r = do_matrix_request("GET", f"rooms/{space.room_id}/state") events = {x["type"]: x for x in r} assert events["m.room.create"]["content"]["type"] == "m.space" space.ensure_children() rooms = MatrixRoom.get_queryset().values_list("group_id", flat=True) assert child_1.pk in rooms assert child_2.pk in rooms assert child_3.pk in rooms space.sync_children() r = do_matrix_request("GET", f"rooms/{space.room_id}/state") interesting_events = [x["state_key"] for x in r if x["type"] == "m.space.child"] assert len(interesting_events) == 4 rooms = list( MatrixRoom.get_queryset() .filter(group__in=[parent_group, child_1, child_2, child_3]) .values_list("room_id", flat=True) ) assert len(rooms) == 4 assert set(interesting_events) == set(rooms) def test_space_creation_with_child_spaces(matrix_bot_user): parent_group = Group.objects.create(name="Test Group") child_1 = Group.objects.create(name="Test Group 1") child_1_child_1 = Group.objects.create(name="Test Group 1 1") child_1_child_2 = Group.objects.create(name="Test Group 1 2") child_1.child_groups.set([child_1_child_1, child_1_child_2]) child_2 = Group.objects.create(name="Test Group 2") child_3 = Group.objects.create(name="Test Group 3") parent_group.child_groups.set([child_1, child_2, child_3]) parent_group.use_in_matrix(sync=True) get_site_preferences()["matrix__use_spaces"] = True space = MatrixSpace.from_group(parent_group) space.ensure_children() rooms = MatrixRoom.get_queryset().values_list("group_id", flat=True) assert child_1.pk in rooms assert child_2.pk in rooms assert child_3.pk in rooms assert child_1_child_1.pk in rooms assert child_1_child_2.pk in rooms spaces = MatrixSpace.get_queryset().values_list("group_id", flat=True) assert parent_group.pk in spaces assert child_1.pk in spaces space.sync_children() r = do_matrix_request("GET", f"rooms/{space.room_id}/state") interesting_events = [x["state_key"] for x in r if x["type"] == "m.space.child"] assert len(interesting_events) == 4 rooms = list( MatrixRoom.get_queryset() .filter(group__in=[parent_group, child_2, child_3]) .values_list("room_id", flat=True) ) + list(MatrixSpace.objects.filter(group=child_1).values_list("room_id", flat=True)) assert len(rooms) == 4 assert set(interesting_events) == set(rooms) space = MatrixSpace.objects.get(group=child_1) r = do_matrix_request("GET", f"rooms/{space.room_id}/state") interesting_events = [x["state_key"] for x in r if x["type"] == "m.space.child"] assert len(interesting_events) == 3 rooms = list( MatrixRoom.get_queryset() .filter(group__in=[child_1, child_1_child_1, child_1_child_2]) .values_list("room_id", flat=True) ) assert len(rooms) == 3 assert set(interesting_events) == set(rooms) def test_alias_room_id_using_group(matrix_bot_user): g = Group.objects.create(name="Test Room") room = MatrixRoom.from_group(g) child_1 = Group.objects.create(name="Test Group 1") g.child_groups.set([child_1]) room.sync() assert MatrixSpace.objects.get_queryset().count() == 1 assert g.matrix_room_id == room.room_id assert g.matrix_alias == room.alias