Skip to content
Snippets Groups Projects
models.py 7.4 KiB
Newer Older
Jonathan Weth's avatar
Jonathan Weth committed
from typing import Any, Dict, Optional, Union
from django.db import models
Jonathan Weth's avatar
Jonathan Weth committed
from django.db.models import Q
from django.template.defaultfilters import slugify
from django.utils.translation import gettext_lazy as _

from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel
from aleksis.core.models import Group, Person
Jonathan Weth's avatar
Jonathan Weth committed
from aleksis.core.util.core_helpers import get_site_preferences


class MatrixProfile(ExtensibleModel):
    """Model for a Matrix profile."""

    matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True)
    person = models.OneToOneField(
        Person,
        on_delete=models.CASCADE,
        verbose_name=_("Person"),
        null=True,
        blank=True,
        related_name="matrix_profile",
    )

Jonathan Weth's avatar
Jonathan Weth committed
    @classmethod
    def build_matrix_id(cls, username, homeserver: Optional[str] = None):
        homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"]
        return f"@{username}:{homeserver}"

    @classmethod
    def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]:
        if hasattr(person, "matrix_profile"):
            return person.matrix_profile
        if not person.user:
            raise ValueError("Person must have a user.")
        if not get_site_preferences()["matrix__homeserver_ids"]:
            return None
        new_profile = MatrixProfile(
            matrix_id=cls.build_matrix_id(person.user.username), person=person
        )
        if commit:
            new_profile.save()
        return new_profile

    class Meta:
        verbose_name = _("Matrix profile")
        verbose_name_plural = _("Matrix profiles")


class MatrixRoom(ExtensiblePolymorphicModel):
    """Model for a Matrix room."""

    room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True)
    alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True)
    group = models.ForeignKey(
        Group,
        on_delete=models.CASCADE,
        verbose_name=_("Group"),
        null=True,
        blank=True,
        related_name="matrix_spaces",
    )

    @classmethod
    def from_group(self, group: Group):
        """Create a Matrix room from a group."""
        from .matrix import MatrixException, build_url, get_headers

        try:
            room = MatrixRoom.objects.get(group=group)
        except MatrixRoom.DoesNotExist:
            room = MatrixRoom(group=group)

        if room.room_id:
            # Existing room, check if still accessible
            r = requests.get(
                build_url(f"directory/list/room/{room.room_id}"), headers=get_headers()
            )
            if not r.status_code == requests.codes.ok:
                raise MatrixException()
        else:
            # Room does not exist, create it
            alias = slugify(group.short_name or group.name)
            r = self._create_group(group.name, alias)
            while r.json().get("errcode") == "M_ROOM_IN_USE":
                match = re.match(r"^(.*)-(\d+)$", alias)
                if match:
                    # Counter found, increase
                    prefix = match.group(1)
                    counter = int(match.group(2)) + 1
                    alias = f"{prefix}-{counter}"
                else:
                    # Counter not found, add one
                    alias = f"{alias}-2"
                r = self._create_group(group.name, alias)

            if r.status_code == requests.codes.ok:
                room.room_id = r.json()["room_id"]
                room.alias = r.json()["room_alias"]
                room.save()
            else:
                raise MatrixException(r.text)
        return room

    @classmethod
    def _create_group(self, name, alias):
        from .matrix import build_url, get_headers

        body = {"preset": "private_chat", "name": name, "room_alias_name": alias}
        r = requests.post(build_url("createRoom"), headers=get_headers(), json=body)

        return r

Jonathan Weth's avatar
Jonathan Weth committed
    @property
    def power_levels(self) -> Dict[str, int]:
        """Return the power levels for this room."""
        from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers

        r = requests.get(build_url(f"rooms/{self.room_id}/state"), headers=get_headers())
        if r.status_code != requests.codes.ok:
            raise MatrixException(r.text)

        event = list(filter(lambda x: x["type"] == "m.room.power_levels", r.json()))
        user_levels = event[0]["content"]["users"]

        return user_levels

    def _invite(self, profile: MatrixProfile) -> Dict[str, Any]:
        """Invite a user to this room."""
        from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers

        r = requests.post(
            build_url(f"rooms/{self.room_id}/invite"),
            headers=get_headers(),
            json={"user_id": profile.matrix_id},
        )
        if not r.status_code == requests.codes.ok:
            raise MatrixException(r.text)
        return r.json()

    def _set_power_levels(self, power_levels: Dict[str, int]) -> Dict[str, Any]:
        """Set the power levels for this room."""
        from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers

        r = requests.put(
            build_url(f"rooms/{self.room_id}/state/m.room.power_levels/"),
            headers=get_headers(),
            json={"users": power_levels},
        )
        print(r.text, r.status_code)
        if not r.status_code == requests.codes.ok:
            raise MatrixException(r.text)
        return r.json()

    def sync_profiles(self):
        """Sync profiles for this room."""
        existing_profiles = MatrixProfile.objects.filter(
            Q(person__member_of=self.group) | Q(person__owner_of=self.group)
        )
        profiles_to_create = []
        for person in (
            Person.objects.filter(user__isnull=False)
            .filter(Q(member_of=self.group) | Q(owner_of=self.group))
            .exclude(matrix_profile__in=existing_profiles)
            .distinct()
        ):
            new_profile = MatrixProfile.from_person(person)
            if new_profile:
                profiles_to_create.append(new_profile)
        MatrixProfile.objects.bulk_create(profiles_to_create)

        all_profiles = MatrixProfile.objects.filter(
            Q(person__in=self.group.members.all()) | Q(person__in=self.group.owners.all())
        )
        user_levels = self.power_levels

        # Invite all users who are not in the room yet
        for profile in all_profiles:
            if profile.matrix_id not in user_levels:
                # Now invite
                self._invite(profile)

        # Set power levels for all users
        # Mod = 50 = Owners
        # User = 0 = Members
        user_levels = self.power_levels
        for profile in all_profiles:
            if profile.person in self.group.owners.all():
                user_levels[profile.matrix_id] = 50
            elif profile.person in self.group.members.all():
                user_levels[profile.matrix_id] = 0
        self._set_power_levels(user_levels)

    class Meta:
        verbose_name = _("Matrix room")
        verbose_name_plural = _("Matrix rooms")


class MatrixSpace(MatrixRoom):
    children = models.ManyToManyField(
        to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents"
    )

    class Meta:
        verbose_name = _("Matrix space")
        verbose_name_plural = _("Matrix spaces")