import re from typing import Any, Dict, List, Optional, Union from django.db import models from django.db.models import Q from django.template.defaultfilters import slugify from django.utils.translation import gettext_lazy as _ from aleksis.apps.matrix.matrix import do_matrix_request from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel from aleksis.core.models import Group, Person from aleksis.core.util.core_helpers import get_site_preferences class MatrixProfile(ExtensibleModel): """Model for a Matrix profile.""" matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True) person = models.OneToOneField( Person, on_delete=models.CASCADE, verbose_name=_("Person"), null=True, blank=True, related_name="matrix_profile", ) @classmethod def build_matrix_id(cls, username, homeserver: Optional[str] = None): homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"] return f"@{username}:{homeserver}" @classmethod def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]: if hasattr(person, "matrix_profile"): return person.matrix_profile if not person.user: raise ValueError("Person must have a user.") if not get_site_preferences()["matrix__homeserver_ids"]: return None new_profile = MatrixProfile( matrix_id=cls.build_matrix_id(person.user.username), person=person ) if commit: new_profile.save() return new_profile class Meta: verbose_name = _("Matrix profile") verbose_name_plural = _("Matrix profiles") class MatrixRoom(ExtensiblePolymorphicModel): """Model for a Matrix room.""" room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True) alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True) group = models.OneToOneField( Group, on_delete=models.CASCADE, verbose_name=_("Group"), related_name="matrix_room", ) @classmethod def from_group(self, group: Group): """Create a Matrix room from a group.""" from .matrix import MatrixException, do_matrix_request try: room = MatrixRoom.objects.get(group=group) except MatrixRoom.DoesNotExist: room = MatrixRoom(group=group) if room.room_id: # Existing room, check if still accessible r = do_matrix_request("GET", f"directory/list/room/{room.room_id}") else: # Room does not exist, create it alias = slugify(group.short_name or group.name) profiles_to_invite = list( self.get_profiles_for_group(group).values_list("matrix_id", flat=True) ) alias_found = False while not alias_found: try: r = self._create_room(group.name, alias, profiles_to_invite) alias_found = True except MatrixException as e: print(e.args, get_site_preferences()["matrix__disambiguate_room_aliases"]) if ( not get_site_preferences()["matrix__disambiguate_room_aliases"] or e.args[0].get("errcode") != "M_ROOM_IN_USE" ): raise MatrixException(*e.args) match = re.match(r"^(.*)-(\d+)$", alias) if match: # Counter found, increase prefix = match.group(1) counter = int(match.group(2)) + 1 alias = f"{prefix}-{counter}" else: # Counter not found, add one alias = f"{alias}-2" room.room_id = r["room_id"] room.alias = r["room_alias"] room.save() return room @classmethod def _create_room(self, name, alias, invite: List[str]): from .matrix import do_matrix_request body = {"preset": "private_chat", "name": name, "room_alias_name": alias, "invite": invite} r = do_matrix_request("POST", "createRoom", body=body) return r @property def power_levels(self) -> Dict[str, int]: """Return the power levels for this room.""" from aleksis.apps.matrix.matrix import do_matrix_request r = do_matrix_request("GET", f"rooms/{self.room_id}/state") event = list(filter(lambda x: x["type"] == "m.room.power_levels", r)) user_levels = event[0]["content"]["users"] return user_levels @property def members(self) -> List[str]: from aleksis.apps.matrix.matrix import do_matrix_request r = do_matrix_request( "GET", f"rooms/{self.room_id}/members", body={"membership": ["join", "invite"]} ) return [m["state_key"] for m in r["chunk"]] def _invite(self, profile: MatrixProfile) -> Dict[str, Any]: """Invite a user to this room.""" from aleksis.apps.matrix.matrix import do_matrix_request r = do_matrix_request( "POST", f"rooms/{self.room_id}/invite", body={"user_id": profile.matrix_id}, ) return r def _set_power_levels(self, power_levels: Dict[str, int]) -> Dict[str, Any]: """Set the power levels for this room.""" r = do_matrix_request( "PUT", f"rooms/{self.room_id}/state/m.room.power_levels/", body={"users": power_levels}, ) return r @classmethod def get_profiles_for_group(cls, group: Group): """Get all profile objects for the members/owners of a group.""" existing_profiles = MatrixProfile.objects.filter( Q(person__member_of=group) | Q(person__owner_of=group) ) profiles_to_create = [] for person in ( Person.objects.filter(user__isnull=False) .filter(Q(member_of=group) | Q(owner_of=group)) .exclude(matrix_profile__in=existing_profiles) .distinct() ): new_profile = MatrixProfile.from_person(person) if new_profile: profiles_to_create.append(new_profile) MatrixProfile.objects.bulk_create(profiles_to_create) all_profiles = MatrixProfile.objects.filter( Q(person__in=group.members.all()) | Q(person__in=group.owners.all()) ) return all_profiles def get_profiles(self): """Get all profile objects for the members/owners of this group.""" return self.get_profiles_for_group(self.group) def sync_profiles(self): """Sync profiles for this room.""" all_profiles = self.get_profiles() members = self.members # Invite all users who are not in the room yet for profile in all_profiles: if profile.matrix_id not in members: # Now invite self._invite(profile) # Set power levels for all users # Mod = 50 = Owners # User = 0 = Members user_levels = self.power_levels for profile in all_profiles: if profile.person in self.group.owners.all(): user_levels[profile.matrix_id] = 50 elif profile.person in self.group.members.all(): user_levels[profile.matrix_id] = 0 self._set_power_levels(user_levels) def sync(self): """Sync this room.""" self.sync_profiles() class Meta: verbose_name = _("Matrix room") verbose_name_plural = _("Matrix rooms") permissions = (("use_group_in_matrix", "Can use group in Matrix"),) class MatrixSpace(MatrixRoom): children = models.ManyToManyField( to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents" ) class Meta: verbose_name = _("Matrix space") verbose_name_plural = _("Matrix spaces")