Skip to content
Snippets Groups Projects
models.py 11.56 KiB
import re
from typing import Any, Optional, Union

from django.db import models
from django.db.models import Q, QuerySet
from django.template.defaultfilters import slugify
from django.utils.translation import gettext_lazy as _

from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel
from aleksis.core.models import Group, Person
from aleksis.core.util.core_helpers import get_site_preferences

from .util.matrix import MatrixException, do_matrix_request


class MatrixProfile(ExtensibleModel):
    """Model for a Matrix profile."""

    matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True)
    person = models.OneToOneField(
        Person,
        on_delete=models.CASCADE,
        verbose_name=_("Person"),
        null=True,
        blank=True,
        related_name="matrix_profile",
    )

    @classmethod
    def build_matrix_id(cls, username: str, homeserver: Optional[str] = None) -> str:
        """Build a Matrix ID from a username."""
        homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"]
        return f"@{username}:{homeserver}"

    @classmethod
    def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]:
        """Get or create a Matrix profile from a person."""
        if hasattr(person, "matrix_profile"):
            return person.matrix_profile
        if not person.user:
            raise ValueError("Person must have a user.")
        if not get_site_preferences()["matrix__homeserver_ids"]:
            return None
        new_profile = MatrixProfile(
            matrix_id=cls.build_matrix_id(person.user.username), person=person
        )
        if commit:
            new_profile.save()
        return new_profile

    class Meta:
        verbose_name = _("Matrix profile")
        verbose_name_plural = _("Matrix profiles")


class MatrixRoom(ExtensiblePolymorphicModel):
    """Model for a Matrix room."""

    room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True)
    alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True)
    group = models.ForeignKey(
        Group,
        on_delete=models.CASCADE,
        verbose_name=_("Group"),
        related_name="matrix_rooms",
    )

    @classmethod
    def get_queryset(cls):
        """Get a queryset for only Matrix rooms."""
        return cls.objects.not_instance_of(MatrixSpace)

    @classmethod
    def build_alias(cls, group: Group) -> str:
        """Build a room alias from a group."""
        return slugify(group.short_name or group.name)

    @classmethod
    def from_group(cls, group: Group) -> "MatrixRoom":
        """Get or create a Matrix room from a group."""
        try:
            room = cls.get_queryset().get(group=group)
        except cls.DoesNotExist:
            room = cls(group=group)

        if room.room_id:
            # Existing room, check if still accessible
            r = do_matrix_request("GET", f"directory/list/room/{room.room_id}")
        else:
            # Room does not exist, create it
            alias = cls.build_alias(group)
            profiles_to_invite = list(
                cls.get_profiles_for_group(group).values_list("matrix_id", flat=True)
            )

            alias_found = False
            while not alias_found:
                try:
                    r = cls._create_room(group.name, alias, profiles_to_invite)
                    alias_found = True
                except MatrixException as e:
                    if (
                        not get_site_preferences()["matrix__disambiguate_room_aliases"]
                        or e.args[0].get("errcode") != "M_ROOM_IN_USE"
                    ):
                        raise

                match = re.match(r"^(.*)-(\d+)$", alias)
                if match:
                    # Counter found, increase
                    prefix = match.group(1)
                    counter = int(match.group(2)) + 1
                    alias = f"{prefix}-{counter}"
                else:
                    # Counter not found, add one
                    alias = f"{alias}-2"

            room.room_id = r["room_id"]
            room.alias = r["room_alias"]
            room.save()
        return room

    @classmethod
    def _create_room(
        self,
        name: str,
        alias: str,
        invite: Optional[list[str]] = None,
        creation_content: Optional[dict] = None,
    ) -> dict[str, Any]:
        """Create a Matrix room."""
        body = {"preset": "private_chat", "name": name, "room_alias_name": alias}

        if invite:
            body["invite"] = invite

        if creation_content:
            body["creation_content"] = creation_content

        r = do_matrix_request("POST", "createRoom", body=body)

        return r

    def get_power_levels(self) -> dict[str, int]:
        """Return the power levels for this room."""
        r = do_matrix_request("GET", f"rooms/{self.room_id}/state")

        event = list(filter(lambda x: x["type"] == "m.room.power_levels", r))
        user_levels = event[0]["content"]["users"]

        return user_levels

    def get_members(self) -> list[str]:
        """Get all members of this room."""
        r = do_matrix_request("GET", f"rooms/{self.room_id}/members")
        return [
            m["state_key"]
            for m in filter(lambda c: c["content"]["membership"] in ("join", "invite"), r["chunk"])
        ]

    def _invite(self, profile: MatrixProfile) -> dict[str, Any]:
        """Invite a user to this room."""
        r = do_matrix_request(
            "POST",
            f"rooms/{self.room_id}/invite",
            body={"user_id": profile.matrix_id},
        )
        return r

    def _set_power_levels(self, power_levels: dict[str, int]) -> dict[str, Any]:
        """Set the power levels for this room."""
        r = do_matrix_request(
            "PUT",
            f"rooms/{self.room_id}/state/m.room.power_levels/",
            body={"users": power_levels},
        )
        return r

    @classmethod
    def get_profiles_for_group(cls, group: Group) -> QuerySet:
        """Get all profile objects for the members/owners of a group."""
        existing_profiles = MatrixProfile.objects.filter(
            Q(person__member_of=group) | Q(person__owner_of=group)
        )
        profiles_to_create = []
        for person in (
            Person.objects.filter(user__isnull=False)
            .filter(Q(member_of=group) | Q(owner_of=group))
            .exclude(matrix_profile__in=existing_profiles)
            .distinct()
        ):
            new_profile = MatrixProfile.from_person(person)
            if new_profile:
                profiles_to_create.append(new_profile)
        MatrixProfile.objects.bulk_create(profiles_to_create)

        all_profiles = MatrixProfile.objects.filter(
            Q(person__in=group.members.all()) | Q(person__in=group.owners.all())
        ).distinct()

        return all_profiles

    def get_profiles(self) -> QuerySet:
        """Get all profile objects for the members/owners of this group."""
        return self.get_profiles_for_group(self.group)

    def sync_profiles(self):
        """Sync profiles for this room."""
        all_profiles = self.get_profiles()
        members = self.get_members()

        # Invite all users who are not in the room yet
        for profile in all_profiles:
            if profile.matrix_id not in members:
                # Now invite
                self._invite(profile)

        # Set power levels for all users
        user_levels = self.get_power_levels()
        for profile in all_profiles:
            if profile.person in self.group.owners.all():
                power_level = get_site_preferences()["matrix__power_level_for_owners"]
            else:
                power_level = get_site_preferences()["matrix__power_level_for_members"]

            if (
                profile.matrix_id not in user_levels
                or user_levels[profile.matrix_id] < power_level
                or get_site_preferences()["matrix__reduce_power_levels"]
            ):
                user_levels[profile.matrix_id] = power_level
        self._set_power_levels(user_levels)

    def sync_space(self):
        """Sync the space for this room."""
        if self.group.child_groups.all():
            # Do space stuff
            space = MatrixSpace.from_group(self.group)
            space.sync()
        return None

    def sync(self):
        """Sync this room."""
        self.sync_profiles()
        if get_site_preferences()["matrix__use_spaces"]:
            self.sync_space()

    class Meta:
        verbose_name = _("Matrix room")
        verbose_name_plural = _("Matrix rooms")
        permissions = (("provision_group_in_matrix", "Can provision group in Matrix"),)


class MatrixSpace(MatrixRoom):
    children = models.ManyToManyField(
        to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents"
    )

    @classmethod
    def get_queryset(cls):
        """Get a queryset with only Matrix spaces."""
        return cls.objects.instance_of(MatrixSpace)

    @classmethod
    def build_alias(cls, group: Group) -> str:
        """Build an alias for this space."""
        return slugify(group.short_name or group.name) + "-space"

    @classmethod
    def _create_room(
        self,
        name: str,
        alias: str,
        invite: Optional[list[str]] = None,
        creation_content: Optional[dict] = None,
    ) -> dict[str, Any]:
        """Create a Matrix space."""
        if not creation_content:
            creation_content = {}
        creation_content["type"] = "m.space"
        return super()._create_room(name, alias, invite, creation_content)

    def get_children(self) -> list[str]:
        """Get all children (rooms/spaces) of this space."""
        r = do_matrix_request("GET", f"rooms/{self.room_id}/state")
        return [c["state_key"] for c in r if c["type"] == "m.space.child"]

    def _add_child(self, room_id: str) -> dict[str, Any]:
        """Add a child room to this space."""
        r = do_matrix_request(
            "PUT",
            f"/_matrix/client/v3/rooms/{self.room_id}/state/m.space.child/{room_id}",
            body={"via": [get_site_preferences()["matrix__homeserver_ids"]]},
        )
        return r

    def sync_children(self):
        """Sync membership of child spaces and rooms."""
        current_children = self.get_children()
        child_spaces = (
            MatrixSpace.get_queryset()
            .filter(group__in=self.group.child_groups.filter(child_groups__isnull=False))
            .values_list("room_id", flat=True)
        )
        child_rooms = (
            MatrixRoom.get_queryset()
            .filter(
                Q(group__in=self.group.child_groups.filter(child_groups__isnull=True))
                | Q(group=self.group)
            )
            .values_list("room_id", flat=True)
        )

        child_ids = list(child_spaces) + list(child_rooms)

        missing_ids = set(child_ids).difference(set(current_children))

        for missing_id in missing_ids:
            self._add_child(missing_id)

    def ensure_children(self):
        """Ensure that all child rooms/spaces exist."""
        for group in self.group.child_groups.all().prefetch_related("child_groups"):
            group.provision_in_matrix(sync=True)
            if group.child_groups.all():
                space = MatrixSpace.from_group(group)
                space.ensure_children()
                space.sync_children()

    def sync(self):
        """Sync this space."""
        self.ensure_children()
        self.sync_children()

    class Meta:
        verbose_name = _("Matrix space")
        verbose_name_plural = _("Matrix spaces")