Newer
Older
from typing import Any, Dict, List, Optional, Union
from django.template.defaultfilters import slugify
from django.utils.translation import gettext_lazy as _
from aleksis.apps.matrix.matrix import do_matrix_request
from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel
from aleksis.core.models import Group, Person
from aleksis.core.util.core_helpers import get_site_preferences
class MatrixProfile(ExtensibleModel):
"""Model for a Matrix profile."""
matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True)
person = models.OneToOneField(
Person,
on_delete=models.CASCADE,
verbose_name=_("Person"),
null=True,
blank=True,
related_name="matrix_profile",
)
@classmethod
def build_matrix_id(cls, username, homeserver: Optional[str] = None):
homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"]
return f"@{username}:{homeserver}"
@classmethod
def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]:
if hasattr(person, "matrix_profile"):
return person.matrix_profile
if not person.user:
raise ValueError("Person must have a user.")
if not get_site_preferences()["matrix__homeserver_ids"]:
return None
new_profile = MatrixProfile(
matrix_id=cls.build_matrix_id(person.user.username), person=person
)
if commit:
new_profile.save()
return new_profile
class Meta:
verbose_name = _("Matrix profile")
verbose_name_plural = _("Matrix profiles")
class MatrixRoom(ExtensiblePolymorphicModel):
"""Model for a Matrix room."""
room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True)
alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True)
Group,
on_delete=models.CASCADE,
verbose_name=_("Group"),
@classmethod
def from_group(self, group: Group):
"""Create a Matrix room from a group."""
from .matrix import MatrixException, do_matrix_request
try:
room = MatrixRoom.objects.get(group=group)
except MatrixRoom.DoesNotExist:
room = MatrixRoom(group=group)
if room.room_id:
# Existing room, check if still accessible
r = do_matrix_request("GET", f"directory/list/room/{room.room_id}")
else:
# Room does not exist, create it
alias = slugify(group.short_name or group.name)
profiles_to_invite = list(
self.get_profiles_for_group(group).values_list("matrix_id", flat=True)
)
alias_found = False
while not alias_found:
try:
r = self._create_room(group.name, alias, profiles_to_invite)
alias_found = True
except MatrixException as e:
print(e.args, get_site_preferences()["matrix__disambiguate_room_aliases"])
if (
not get_site_preferences()["matrix__disambiguate_room_aliases"]
or e.args[0].get("errcode") != "M_ROOM_IN_USE"
):
raise MatrixException(*e.args)
match = re.match(r"^(.*)-(\d+)$", alias)
if match:
# Counter found, increase
prefix = match.group(1)
counter = int(match.group(2)) + 1
alias = f"{prefix}-{counter}"
else:
# Counter not found, add one
alias = f"{alias}-2"
room.room_id = r["room_id"]
room.alias = r["room_alias"]
room.save()
return room
@classmethod
def _create_room(self, name, alias, invite: List[str]):
from .matrix import do_matrix_request
body = {"preset": "private_chat", "name": name, "room_alias_name": alias, "invite": invite}
r = do_matrix_request("POST", "createRoom", body=body)
@property
def power_levels(self) -> Dict[str, int]:
"""Return the power levels for this room."""
from aleksis.apps.matrix.matrix import do_matrix_request
r = do_matrix_request("GET", f"rooms/{self.room_id}/state")
event = list(filter(lambda x: x["type"] == "m.room.power_levels", r))
user_levels = event[0]["content"]["users"]
return user_levels
@property
def members(self) -> List[str]:
from aleksis.apps.matrix.matrix import do_matrix_request
r = do_matrix_request(
"GET", f"rooms/{self.room_id}/members", body={"membership": ["join", "invite"]}
)
return [m["state_key"] for m in r["chunk"]]
def _invite(self, profile: MatrixProfile) -> Dict[str, Any]:
"""Invite a user to this room."""
from aleksis.apps.matrix.matrix import do_matrix_request
r = do_matrix_request(
"POST",
f"rooms/{self.room_id}/invite",
body={"user_id": profile.matrix_id},
def _set_power_levels(self, power_levels: Dict[str, int]) -> Dict[str, Any]:
"""Set the power levels for this room."""
r = do_matrix_request(
"PUT",
f"rooms/{self.room_id}/state/m.room.power_levels/",
body={"users": power_levels},
@classmethod
def get_profiles_for_group(cls, group: Group):
"""Get all profile objects for the members/owners of a group."""
Q(person__member_of=group) | Q(person__owner_of=group)
)
profiles_to_create = []
for person in (
Person.objects.filter(user__isnull=False)
.filter(Q(member_of=group) | Q(owner_of=group))
.exclude(matrix_profile__in=existing_profiles)
.distinct()
):
new_profile = MatrixProfile.from_person(person)
if new_profile:
profiles_to_create.append(new_profile)
MatrixProfile.objects.bulk_create(profiles_to_create)
all_profiles = MatrixProfile.objects.filter(
Q(person__in=group.members.all()) | Q(person__in=group.owners.all())
return all_profiles
def get_profiles(self):
"""Get all profile objects for the members/owners of this group."""
return self.get_profiles_for_group(self.group)
def sync_profiles(self):
"""Sync profiles for this room."""
all_profiles = self.get_profiles()
members = self.members
# Invite all users who are not in the room yet
for profile in all_profiles:
if profile.matrix_id not in members:
# Now invite
self._invite(profile)
# Set power levels for all users
# Mod = 50 = Owners
# User = 0 = Members
user_levels = self.power_levels
for profile in all_profiles:
if profile.person in self.group.owners.all():
user_levels[profile.matrix_id] = 50
elif profile.person in self.group.members.all():
user_levels[profile.matrix_id] = 0
self._set_power_levels(user_levels)
def sync(self):
"""Sync this room."""
self.sync_profiles()
class Meta:
verbose_name = _("Matrix room")
verbose_name_plural = _("Matrix rooms")
permissions = (("use_group_in_matrix", "Can use group in Matrix"),)
class MatrixSpace(MatrixRoom):
children = models.ManyToManyField(
to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents"
)
class Meta:
verbose_name = _("Matrix space")
verbose_name_plural = _("Matrix spaces")