import re from typing import Any, Optional, Union from django.db import models from django.db.models import Q from django.template.defaultfilters import slugify from django.utils.translation import gettext_lazy as _ from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel from aleksis.core.models import Group, Person from aleksis.core.util.core_helpers import get_site_preferences from .util.matrix import MatrixException, do_matrix_request class MatrixProfile(ExtensibleModel): """Model for a Matrix profile.""" matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True) person = models.OneToOneField( Person, on_delete=models.CASCADE, verbose_name=_("Person"), null=True, blank=True, related_name="matrix_profile", ) @classmethod def build_matrix_id(cls, username, homeserver: Optional[str] = None): homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"] return f"@{username}:{homeserver}" @classmethod def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]: if hasattr(person, "matrix_profile"): return person.matrix_profile if not person.user: raise ValueError("Person must have a user.") if not get_site_preferences()["matrix__homeserver_ids"]: return None new_profile = MatrixProfile( matrix_id=cls.build_matrix_id(person.user.username), person=person ) if commit: new_profile.save() return new_profile class Meta: verbose_name = _("Matrix profile") verbose_name_plural = _("Matrix profiles") class MatrixRoom(ExtensiblePolymorphicModel): """Model for a Matrix room.""" room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True) alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True) group = models.ForeignKey( Group, on_delete=models.CASCADE, verbose_name=_("Group"), related_name="matrix_rooms", ) @classmethod def get_queryset(cls): return cls.objects.not_instance_of(MatrixSpace) @classmethod def build_alias(cls, group: Group) -> str: return slugify(group.short_name or group.name) @classmethod def from_group(cls, group: Group) -> "MatrixRoom": """Create a Matrix room from a group.""" try: room = cls.get_queryset().get(group=group) except cls.DoesNotExist: room = cls(group=group) if room.room_id: # Existing room, check if still accessible r = do_matrix_request("GET", f"directory/list/room/{room.room_id}") else: # Room does not exist, create it alias = cls.build_alias(group) profiles_to_invite = list( cls.get_profiles_for_group(group).values_list("matrix_id", flat=True) ) alias_found = False while not alias_found: try: r = cls._create_room(group.name, alias, profiles_to_invite) alias_found = True except MatrixException as e: if ( not get_site_preferences()["matrix__disambiguate_room_aliases"] or e.args[0].get("errcode") != "M_ROOM_IN_USE" ): raise match = re.match(r"^(.*)-(\d+)$", alias) if match: # Counter found, increase prefix = match.group(1) counter = int(match.group(2)) + 1 alias = f"{prefix}-{counter}" else: # Counter not found, add one alias = f"{alias}-2" room.room_id = r["room_id"] room.alias = r["room_alias"] room.save() return room @classmethod def _create_room( self, name, alias, invite: Optional[list[str]] = None, creation_content: Optional[dict] = None, ) -> dict[str, Any]: body = {"preset": "private_chat", "name": name, "room_alias_name": alias} if invite: body["invite"] = invite if creation_content: body["creation_content"] = creation_content r = do_matrix_request("POST", "createRoom", body=body) return r def get_power_levels(self) -> dict[str, int]: """Return the power levels for this room.""" r = do_matrix_request("GET", f"rooms/{self.room_id}/state") event = list(filter(lambda x: x["type"] == "m.room.power_levels", r)) user_levels = event[0]["content"]["users"] return user_levels def get_members(self) -> list[str]: r = do_matrix_request( "GET", f"rooms/{self.room_id}/members", body={"membership": ["join", "invite"]} ) return [m["state_key"] for m in r["chunk"]] def _invite(self, profile: MatrixProfile) -> dict[str, Any]: """Invite a user to this room.""" r = do_matrix_request( "POST", f"rooms/{self.room_id}/invite", body={"user_id": profile.matrix_id}, ) return r def _set_power_levels(self, power_levels: dict[str, int]) -> dict[str, Any]: """Set the power levels for this room.""" r = do_matrix_request( "PUT", f"rooms/{self.room_id}/state/m.room.power_levels/", body={"users": power_levels}, ) return r @classmethod def get_profiles_for_group(cls, group: Group): """Get all profile objects for the members/owners of a group.""" existing_profiles = MatrixProfile.objects.filter( Q(person__member_of=group) | Q(person__owner_of=group) ) profiles_to_create = [] for person in ( Person.objects.filter(user__isnull=False) .filter(Q(member_of=group) | Q(owner_of=group)) .exclude(matrix_profile__in=existing_profiles) .distinct() ): new_profile = MatrixProfile.from_person(person) if new_profile: profiles_to_create.append(new_profile) MatrixProfile.objects.bulk_create(profiles_to_create) all_profiles = MatrixProfile.objects.filter( Q(person__in=group.members.all()) | Q(person__in=group.owners.all()) ).distinct() return all_profiles def get_profiles(self): """Get all profile objects for the members/owners of this group.""" return self.get_profiles_for_group(self.group) def sync_profiles(self): """Sync profiles for this room.""" all_profiles = self.get_profiles() members = self.get_members() # Invite all users who are not in the room yet for profile in all_profiles: if profile.matrix_id not in members: # Now invite self._invite(profile) # Set power levels for all users user_levels = self.get_power_levels() for profile in all_profiles: if profile.person in self.group.owners.all(): power_level = get_site_preferences()["matrix__power_level_for_owners"] else: power_level = get_site_preferences()["matrix__power_level_for_members"] if ( profile.matrix_id not in user_levels or user_levels[profile.matrix_id] < power_level or get_site_preferences()["matrix__reduce_power_levels"] ): user_levels[profile.matrix_id] = power_level self._set_power_levels(user_levels) def sync_space(self): if self.group.child_groups.all(): # Do space stuff space = MatrixSpace.from_group(self.group) space.sync() return None def sync(self): """Sync this room.""" self.sync_profiles() if get_site_preferences()["matrix__use_spaces"]: self.sync_space() class Meta: verbose_name = _("Matrix room") verbose_name_plural = _("Matrix rooms") permissions = (("use_group_in_matrix", "Can use group in Matrix"),) class MatrixSpace(MatrixRoom): children = models.ManyToManyField( to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents" ) @classmethod def get_queryset(cls): return cls.objects.instance_of(MatrixSpace) @classmethod def build_alias(cls, group: Group) -> str: """Build an alias for this space.""" return slugify(group.short_name or group.name) + "-space" @classmethod def _create_room( self, name, alias, invite: Optional[list[str]] = None, creation_content: Optional[dict] = None, ) -> dict[str, Any]: if not creation_content: creation_content = {} creation_content["type"] = "m.space" return super()._create_room(name, alias, invite, creation_content) def get_children(self) -> list[str]: """Get all children (rooms/spaces) of this space.""" r = do_matrix_request("GET", f"rooms/{self.room_id}/state") return [c["state_key"] for c in r if c["type"] == "m.space.child"] def _add_child(self, room_id: str): """Add a child room to this space.""" r = do_matrix_request( "PUT", f"/_matrix/client/v3/rooms/{self.room_id}/state/m.space.child/{room_id}", body={"via": [get_site_preferences()["matrix__homeserver_ids"]]}, ) return r def sync_children(self): """Sync membership of child spaces and rooms.""" current_children = self.get_children() child_spaces = MatrixSpace.get_queryset().filter( group__in=self.group.child_groups.filter(child_groups__isnull=False) ).values_list("room_id", flat=True) child_rooms = MatrixRoom.get_queryset().filter( Q(group__in=self.group.child_groups.filter(child_groups__isnull=True)) | Q(group=self.group) ).values_list("room_id", flat=True) child_ids = list(child_spaces) + list(child_rooms) missing_ids = set(child_ids).difference(set(current_children)) for missing_id in missing_ids: self._add_child(missing_id) def ensure_children(self): """Ensure that all child rooms/spaces exist.""" for group in self.group.child_groups.all().prefetch_related("child_groups"): group.use_in_matrix(sync=True) if group.child_groups.all(): space = MatrixSpace.from_group(group) space.ensure_children() space.sync_children() def sync(self): """Sync this space.""" self.ensure_children() self.sync_children() class Meta: verbose_name = _("Matrix space") verbose_name_plural = _("Matrix spaces")