import re from typing import Any, Dict, Optional, Union from django.db import models from django.db.models import Q from django.template.defaultfilters import slugify from django.utils.translation import gettext_lazy as _ import requests from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel from aleksis.core.models import Group, Person from aleksis.core.util.core_helpers import get_site_preferences class MatrixProfile(ExtensibleModel): """Model for a Matrix profile.""" matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True) person = models.OneToOneField( Person, on_delete=models.CASCADE, verbose_name=_("Person"), null=True, blank=True, related_name="matrix_profile", ) @classmethod def build_matrix_id(cls, username, homeserver: Optional[str] = None): homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"] return f"@{username}:{homeserver}" @classmethod def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]: if hasattr(person, "matrix_profile"): return person.matrix_profile if not person.user: raise ValueError("Person must have a user.") if not get_site_preferences()["matrix__homeserver_ids"]: return None new_profile = MatrixProfile( matrix_id=cls.build_matrix_id(person.user.username), person=person ) if commit: new_profile.save() return new_profile class Meta: verbose_name = _("Matrix profile") verbose_name_plural = _("Matrix profiles") class MatrixRoom(ExtensiblePolymorphicModel): """Model for a Matrix room.""" room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True) alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True) group = models.ForeignKey( Group, on_delete=models.CASCADE, verbose_name=_("Group"), null=True, blank=True, related_name="matrix_spaces", ) @classmethod def from_group(self, group: Group): """Create a Matrix room from a group.""" from .matrix import MatrixException, build_url, get_headers try: room = MatrixRoom.objects.get(group=group) except MatrixRoom.DoesNotExist: room = MatrixRoom(group=group) if room.room_id: # Existing room, check if still accessible r = requests.get( build_url(f"directory/list/room/{room.room_id}"), headers=get_headers() ) if not r.status_code == requests.codes.ok: raise MatrixException() else: # Room does not exist, create it alias = slugify(group.short_name or group.name) r = self._create_group(group.name, alias) while r.json().get("errcode") == "M_ROOM_IN_USE": match = re.match(r"^(.*)-(\d+)$", alias) if match: # Counter found, increase prefix = match.group(1) counter = int(match.group(2)) + 1 alias = f"{prefix}-{counter}" else: # Counter not found, add one alias = f"{alias}-2" r = self._create_group(group.name, alias) if r.status_code == requests.codes.ok: room.room_id = r.json()["room_id"] room.alias = r.json()["room_alias"] room.save() else: raise MatrixException(r.text) return room @classmethod def _create_group(self, name, alias): from .matrix import build_url, get_headers body = {"preset": "private_chat", "name": name, "room_alias_name": alias} r = requests.post(build_url("createRoom"), headers=get_headers(), json=body) return r @property def power_levels(self) -> Dict[str, int]: """Return the power levels for this room.""" from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers r = requests.get(build_url(f"rooms/{self.room_id}/state"), headers=get_headers()) if r.status_code != requests.codes.ok: raise MatrixException(r.text) event = list(filter(lambda x: x["type"] == "m.room.power_levels", r.json())) user_levels = event[0]["content"]["users"] return user_levels def _invite(self, profile: MatrixProfile) -> Dict[str, Any]: """Invite a user to this room.""" from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers r = requests.post( build_url(f"rooms/{self.room_id}/invite"), headers=get_headers(), json={"user_id": profile.matrix_id}, ) if not r.status_code == requests.codes.ok: raise MatrixException(r.text) return r.json() def _set_power_levels(self, power_levels: Dict[str, int]) -> Dict[str, Any]: """Set the power levels for this room.""" from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers r = requests.put( build_url(f"rooms/{self.room_id}/state/m.room.power_levels/"), headers=get_headers(), json={"users": power_levels}, ) print(r.text, r.status_code) if not r.status_code == requests.codes.ok: raise MatrixException(r.text) return r.json() def sync_profiles(self): """Sync profiles for this room.""" existing_profiles = MatrixProfile.objects.filter( Q(person__member_of=self.group) | Q(person__owner_of=self.group) ) profiles_to_create = [] for person in ( Person.objects.filter(user__isnull=False) .filter(Q(member_of=self.group) | Q(owner_of=self.group)) .exclude(matrix_profile__in=existing_profiles) .distinct() ): new_profile = MatrixProfile.from_person(person) if new_profile: profiles_to_create.append(new_profile) MatrixProfile.objects.bulk_create(profiles_to_create) all_profiles = MatrixProfile.objects.filter( Q(person__in=self.group.members.all()) | Q(person__in=self.group.owners.all()) ) user_levels = self.power_levels # Invite all users who are not in the room yet for profile in all_profiles: if profile.matrix_id not in user_levels: # Now invite self._invite(profile) # Set power levels for all users # Mod = 50 = Owners # User = 0 = Members user_levels = self.power_levels for profile in all_profiles: if profile.person in self.group.owners.all(): user_levels[profile.matrix_id] = 50 elif profile.person in self.group.members.all(): user_levels[profile.matrix_id] = 0 self._set_power_levels(user_levels) class Meta: verbose_name = _("Matrix room") verbose_name_plural = _("Matrix rooms") class MatrixSpace(MatrixRoom): children = models.ManyToManyField( to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents" ) class Meta: verbose_name = _("Matrix space") verbose_name_plural = _("Matrix spaces")