Skip to content
Snippets Groups Projects
Verified Commit 1d05d27a authored by Jonathan Weth's avatar Jonathan Weth :keyboard:
Browse files

Sync room members

parent e76bde5a
No related branches found
No related tags found
No related merge requests found
import re
from typing import Any, Dict, Optional, Union
from django.db import models
from django.db.models import Q
from django.template.defaultfilters import slugify
from django.utils.translation import gettext_lazy as _
......@@ -8,6 +10,7 @@ import requests
from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel
from aleksis.core.models import Group, Person
from aleksis.core.util.core_helpers import get_site_preferences
class MatrixProfile(ExtensibleModel):
......@@ -23,6 +26,26 @@ class MatrixProfile(ExtensibleModel):
related_name="matrix_profile",
)
@classmethod
def build_matrix_id(cls, username, homeserver: Optional[str] = None):
homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"]
return f"@{username}:{homeserver}"
@classmethod
def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]:
if hasattr(person, "matrix_profile"):
return person.matrix_profile
if not person.user:
raise ValueError("Person must have a user.")
if not get_site_preferences()["matrix__homeserver_ids"]:
return None
new_profile = MatrixProfile(
matrix_id=cls.build_matrix_id(person.user.username), person=person
)
if commit:
new_profile.save()
return new_profile
class Meta:
verbose_name = _("Matrix profile")
verbose_name_plural = _("Matrix profiles")
......@@ -92,6 +115,86 @@ class MatrixRoom(ExtensiblePolymorphicModel):
return r
@property
def power_levels(self) -> Dict[str, int]:
"""Return the power levels for this room."""
from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers
r = requests.get(build_url(f"rooms/{self.room_id}/state"), headers=get_headers())
if r.status_code != requests.codes.ok:
raise MatrixException(r.text)
event = list(filter(lambda x: x["type"] == "m.room.power_levels", r.json()))
user_levels = event[0]["content"]["users"]
return user_levels
def _invite(self, profile: MatrixProfile) -> Dict[str, Any]:
"""Invite a user to this room."""
from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers
r = requests.post(
build_url(f"rooms/{self.room_id}/invite"),
headers=get_headers(),
json={"user_id": profile.matrix_id},
)
if not r.status_code == requests.codes.ok:
raise MatrixException(r.text)
return r.json()
def _set_power_levels(self, power_levels: Dict[str, int]) -> Dict[str, Any]:
"""Set the power levels for this room."""
from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers
r = requests.put(
build_url(f"rooms/{self.room_id}/state/m.room.power_levels/"),
headers=get_headers(),
json={"users": power_levels},
)
print(r.text, r.status_code)
if not r.status_code == requests.codes.ok:
raise MatrixException(r.text)
return r.json()
def sync_profiles(self):
"""Sync profiles for this room."""
existing_profiles = MatrixProfile.objects.filter(
Q(person__member_of=self.group) | Q(person__owner_of=self.group)
)
profiles_to_create = []
for person in (
Person.objects.filter(user__isnull=False)
.filter(Q(member_of=self.group) | Q(owner_of=self.group))
.exclude(matrix_profile__in=existing_profiles)
.distinct()
):
new_profile = MatrixProfile.from_person(person)
if new_profile:
profiles_to_create.append(new_profile)
MatrixProfile.objects.bulk_create(profiles_to_create)
all_profiles = MatrixProfile.objects.filter(
Q(person__in=self.group.members.all()) | Q(person__in=self.group.owners.all())
)
user_levels = self.power_levels
# Invite all users who are not in the room yet
for profile in all_profiles:
if profile.matrix_id not in user_levels:
# Now invite
self._invite(profile)
# Set power levels for all users
# Mod = 50 = Owners
# User = 0 = Members
user_levels = self.power_levels
for profile in all_profiles:
if profile.person in self.group.owners.all():
user_levels[profile.matrix_id] = 50
elif profile.person in self.group.members.all():
user_levels[profile.matrix_id] = 0
self._set_power_levels(user_levels)
class Meta:
verbose_name = _("Matrix room")
verbose_name_plural = _("Matrix rooms")
......
......@@ -16,6 +16,15 @@ class Homeserver(StringPreference):
default = ""
@site_preferences_registry.register
class HomeserverForIDs(StringPreference):
section = matrix
name = "homeserver_ids"
verbose_name = _("Name of Matrix homeserver used for auto-generating Matrix IDs")
help_text = _("Leave empty to not create Matrix IDs automatically")
default = ""
@site_preferences_registry.register
class AccessToken(StringPreference):
section = matrix
......
from datetime import date
from django.contrib.auth.models import User
import pytest
import requests
from aleksis.apps.matrix.models import MatrixRoom
from aleksis.core.models import Group, SchoolTerm
from aleksis.apps.matrix.models import MatrixProfile, MatrixRoom
from aleksis.core.models import Group, Person, SchoolTerm
from aleksis.core.util.core_helpers import get_site_preferences
pytestmark = pytest.mark.django_db
......@@ -105,3 +107,117 @@ def test_room_alias_collision_school_term(matrix_bot_user):
room = MatrixRoom.from_group(g2)
assert room.alias == "#test-room-2:matrix.aleksis.example.org"
def test_sync_room_members(matrix_bot_user):
from aleksis.apps.matrix.matrix import build_url, get_headers
get_site_preferences()["matrix__homeserver_ids"] = "matrix.aleksis.example.org"
g = Group.objects.create(name="Test Room")
u1 = User.objects.create_user("test1", "test1@example.org", "test1")
u2 = User.objects.create_user("test2", "test2@example.org", "test2")
u3 = User.objects.create_user("test3", "test3@example.org", "test3")
u4 = User.objects.create_user("test4", "test4@example.org", "test4")
u5 = User.objects.create_user("test5", "test5@example.org", "test5")
p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1)
p2 = Person.objects.create(first_name="Test 2", last_name="Person", user=u2)
p3 = Person.objects.create(first_name="Test 3", last_name="Person", user=u3)
p4 = Person.objects.create(first_name="Test 4", last_name="Person", user=u4)
p5 = Person.objects.create(first_name="Test 5", last_name="Person", user=u5)
g.members.set([p1, p2, p3])
g.owners.set([p4, p5])
room = MatrixRoom.from_group(g)
room.sync_profiles()
assert MatrixProfile.objects.all().count() == 5
assert p1.matrix_profile
assert p1.matrix_profile.matrix_id == "@test1:matrix.aleksis.example.org"
assert p2.matrix_profile
assert p2.matrix_profile.matrix_id == "@test2:matrix.aleksis.example.org"
assert p3.matrix_profile
assert p3.matrix_profile.matrix_id == "@test3:matrix.aleksis.example.org"
assert p4.matrix_profile
assert p4.matrix_profile.matrix_id == "@test4:matrix.aleksis.example.org"
assert p5.matrix_profile
assert p5.matrix_profile.matrix_id == "@test5:matrix.aleksis.example.org"
# Check members
r = requests.get(
build_url(f"rooms/{room.room_id}/members"),
headers=get_headers(),
json={"membership": ["join", "invite"]},
)
assert r.status_code == requests.codes.ok
matrix_ids = [x["state_key"] for x in r.json()["chunk"]]
assert p1.matrix_profile.matrix_id in matrix_ids
assert p2.matrix_profile.matrix_id in matrix_ids
assert p3.matrix_profile.matrix_id in matrix_ids
assert p4.matrix_profile.matrix_id in matrix_ids
assert p5.matrix_profile.matrix_id in matrix_ids
# Get power levels
r = requests.get(build_url(f"rooms/{room.room_id}/state"), headers=get_headers())
assert r.status_code == requests.codes.ok
for event in r.json():
if not event["type"] == "m.room.power_levels":
continue
current_power_levels = event["content"]["users"]
assert current_power_levels[p1.matrix_profile.matrix_id] == 0
assert current_power_levels[p2.matrix_profile.matrix_id] == 0
assert current_power_levels[p3.matrix_profile.matrix_id] == 0
assert current_power_levels[p4.matrix_profile.matrix_id] == 50
assert current_power_levels[p5.matrix_profile.matrix_id] == 50
break
def test_sync_room_members_without_user(matrix_bot_user):
get_site_preferences()["matrix__homeserver_ids"] = "matrix.aleksis.example.org"
g = Group.objects.create(name="Test Room")
u1 = User.objects.create_user("test1", "test1@example.org", "test1")
p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1)
p2 = Person.objects.create(first_name="Test 2", last_name="Person")
g.members.set([p1, p2])
room = MatrixRoom.from_group(g)
room.sync_profiles()
assert MatrixProfile.objects.all().count() == 1
assert p1.matrix_profile
assert p1.matrix_profile.matrix_id == "@test1:matrix.aleksis.example.org"
assert not hasattr(p2, "matrix_profile")
# test no homeserver for ids
def test_sync_room_members_without_homeserver(matrix_bot_user):
get_site_preferences()["matrix__homeserver_ids"] = ""
g = Group.objects.create(name="Test Room")
u1 = User.objects.create_user("test1", "test1@example.org", "test1")
p1 = Person.objects.create(first_name="Test", last_name="Person", user=u1)
p2 = Person.objects.create(first_name="Test 2", last_name="Person")
MatrixProfile.objects.create(person=p2, matrix_id="@test2:matrix.aleksis.example.org")
g.members.set([p1, p2])
room = MatrixRoom.from_group(g)
room.sync_profiles()
assert MatrixProfile.objects.all().count() == 1
assert not hasattr(p1, "matrix_profile")
assert p2.matrix_profile
assert p2.matrix_profile.matrix_id == "@test2:matrix.aleksis.example.org"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment