Skip to content
Snippets Groups Projects
models.py 7.95 KiB
Newer Older
from typing import Any, Dict, List, Optional, Union
from django.db import models
Jonathan Weth's avatar
Jonathan Weth committed
from django.db.models import Q
from django.template.defaultfilters import slugify
from django.utils.translation import gettext_lazy as _

from aleksis.apps.matrix.matrix import do_matrix_request
from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel
from aleksis.core.models import Group, Person
Jonathan Weth's avatar
Jonathan Weth committed
from aleksis.core.util.core_helpers import get_site_preferences


class MatrixProfile(ExtensibleModel):
    """Model for a Matrix profile."""

    matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True)
    person = models.OneToOneField(
        Person,
        on_delete=models.CASCADE,
        verbose_name=_("Person"),
        null=True,
        blank=True,
        related_name="matrix_profile",
    )

Jonathan Weth's avatar
Jonathan Weth committed
    @classmethod
    def build_matrix_id(cls, username, homeserver: Optional[str] = None):
        homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"]
        return f"@{username}:{homeserver}"

    @classmethod
    def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]:
        if hasattr(person, "matrix_profile"):
            return person.matrix_profile
        if not person.user:
            raise ValueError("Person must have a user.")
        if not get_site_preferences()["matrix__homeserver_ids"]:
            return None
        new_profile = MatrixProfile(
            matrix_id=cls.build_matrix_id(person.user.username), person=person
        )
        if commit:
            new_profile.save()
        return new_profile

    class Meta:
        verbose_name = _("Matrix profile")
        verbose_name_plural = _("Matrix profiles")


class MatrixRoom(ExtensiblePolymorphicModel):
    """Model for a Matrix room."""

    room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True)
    alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True)
    group = models.OneToOneField(
        Group,
        on_delete=models.CASCADE,
        verbose_name=_("Group"),
        related_name="matrix_room",
    @classmethod
    def from_group(self, group: Group):
        """Create a Matrix room from a group."""
        from .matrix import MatrixException, do_matrix_request

        try:
            room = MatrixRoom.objects.get(group=group)
        except MatrixRoom.DoesNotExist:
            room = MatrixRoom(group=group)

        if room.room_id:
            # Existing room, check if still accessible
            r = do_matrix_request("GET", f"directory/list/room/{room.room_id}")
        else:
            # Room does not exist, create it
            alias = slugify(group.short_name or group.name)
            profiles_to_invite = list(
                self.get_profiles_for_group(group).values_list("matrix_id", flat=True)
            )

            alias_found = False
            while not alias_found:
                try:
                    r = self._create_room(group.name, alias, profiles_to_invite)
                    alias_found = True
                except MatrixException as e:
                    print(e.args, get_site_preferences()["matrix__disambiguate_room_aliases"])
                    if (
                        not get_site_preferences()["matrix__disambiguate_room_aliases"]
                        or e.args[0].get("errcode") != "M_ROOM_IN_USE"
                    ):
                        raise MatrixException(*e.args)

                match = re.match(r"^(.*)-(\d+)$", alias)
                if match:
                    # Counter found, increase
                    prefix = match.group(1)
                    counter = int(match.group(2)) + 1
                    alias = f"{prefix}-{counter}"
                else:
                    # Counter not found, add one
                    alias = f"{alias}-2"

            room.room_id = r["room_id"]
            room.alias = r["room_alias"]
            room.save()
    def _create_room(self, name, alias, invite: List[str]):
        from .matrix import do_matrix_request
        body = {"preset": "private_chat", "name": name, "room_alias_name": alias, "invite": invite}
        r = do_matrix_request("POST", "createRoom", body=body)
Jonathan Weth's avatar
Jonathan Weth committed
    @property
    def power_levels(self) -> Dict[str, int]:
        """Return the power levels for this room."""
        from aleksis.apps.matrix.matrix import do_matrix_request
Jonathan Weth's avatar
Jonathan Weth committed

        r = do_matrix_request("GET", f"rooms/{self.room_id}/state")
Jonathan Weth's avatar
Jonathan Weth committed

        event = list(filter(lambda x: x["type"] == "m.room.power_levels", r))
Jonathan Weth's avatar
Jonathan Weth committed
        user_levels = event[0]["content"]["users"]

        return user_levels

    @property
    def members(self) -> List[str]:
        from aleksis.apps.matrix.matrix import do_matrix_request

        r = do_matrix_request(
            "GET", f"rooms/{self.room_id}/members", body={"membership": ["join", "invite"]}
        )
        return [m["state_key"] for m in r["chunk"]]

Jonathan Weth's avatar
Jonathan Weth committed
    def _invite(self, profile: MatrixProfile) -> Dict[str, Any]:
        """Invite a user to this room."""
        from aleksis.apps.matrix.matrix import do_matrix_request
Jonathan Weth's avatar
Jonathan Weth committed

        r = do_matrix_request(
            "POST",
            f"rooms/{self.room_id}/invite",
            body={"user_id": profile.matrix_id},
Jonathan Weth's avatar
Jonathan Weth committed
        )
Jonathan Weth's avatar
Jonathan Weth committed

    def _set_power_levels(self, power_levels: Dict[str, int]) -> Dict[str, Any]:
        """Set the power levels for this room."""
        r = do_matrix_request(
            "PUT",
            f"rooms/{self.room_id}/state/m.room.power_levels/",
            body={"users": power_levels},
Jonathan Weth's avatar
Jonathan Weth committed
        )
Jonathan Weth's avatar
Jonathan Weth committed

    @classmethod
    def get_profiles_for_group(cls, group: Group):
        """Get all profile objects for the members/owners of a group."""
Jonathan Weth's avatar
Jonathan Weth committed
        existing_profiles = MatrixProfile.objects.filter(
            Q(person__member_of=group) | Q(person__owner_of=group)
Jonathan Weth's avatar
Jonathan Weth committed
        )
        profiles_to_create = []
        for person in (
            Person.objects.filter(user__isnull=False)
            .filter(Q(member_of=group) | Q(owner_of=group))
Jonathan Weth's avatar
Jonathan Weth committed
            .exclude(matrix_profile__in=existing_profiles)
            .distinct()
        ):
            new_profile = MatrixProfile.from_person(person)
            if new_profile:
                profiles_to_create.append(new_profile)
        MatrixProfile.objects.bulk_create(profiles_to_create)

        all_profiles = MatrixProfile.objects.filter(
            Q(person__in=group.members.all()) | Q(person__in=group.owners.all())
Jonathan Weth's avatar
Jonathan Weth committed
        )

        return all_profiles

    def get_profiles(self):
        """Get all profile objects for the members/owners of this group."""
        return self.get_profiles_for_group(self.group)

    def sync_profiles(self):
        """Sync profiles for this room."""
        all_profiles = self.get_profiles()
        members = self.members
Jonathan Weth's avatar
Jonathan Weth committed

        # Invite all users who are not in the room yet
        for profile in all_profiles:
            if profile.matrix_id not in members:
Jonathan Weth's avatar
Jonathan Weth committed
                # Now invite
                self._invite(profile)

        # Set power levels for all users
        # Mod = 50 = Owners
        # User = 0 = Members
        user_levels = self.power_levels
        for profile in all_profiles:
            if profile.person in self.group.owners.all():
                user_levels[profile.matrix_id] = 50
            elif profile.person in self.group.members.all():
                user_levels[profile.matrix_id] = 0
        self._set_power_levels(user_levels)

    def sync(self):
        """Sync this room."""
        self.sync_profiles()

    class Meta:
        verbose_name = _("Matrix room")
        verbose_name_plural = _("Matrix rooms")
        permissions = (("use_group_in_matrix", "Can use group in Matrix"),)


class MatrixSpace(MatrixRoom):
    children = models.ManyToManyField(
        to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents"
    )

    class Meta:
        verbose_name = _("Matrix space")
        verbose_name_plural = _("Matrix spaces")