Newer
Older
from typing import Any, Dict, List, Optional, Union
from django.template.defaultfilters import slugify
from django.utils.translation import gettext_lazy as _
from aleksis.apps.matrix.matrix import do_matrix_request
from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel
from aleksis.core.models import Group, Person
from aleksis.core.util.core_helpers import get_site_preferences
class MatrixProfile(ExtensibleModel):
"""Model for a Matrix profile."""
matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True)
person = models.OneToOneField(
Person,
on_delete=models.CASCADE,
verbose_name=_("Person"),
null=True,
blank=True,
related_name="matrix_profile",
)
@classmethod
def build_matrix_id(cls, username, homeserver: Optional[str] = None):
homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"]
return f"@{username}:{homeserver}"
@classmethod
def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]:
if hasattr(person, "matrix_profile"):
return person.matrix_profile
if not person.user:
raise ValueError("Person must have a user.")
if not get_site_preferences()["matrix__homeserver_ids"]:
return None
new_profile = MatrixProfile(
matrix_id=cls.build_matrix_id(person.user.username), person=person
)
if commit:
new_profile.save()
return new_profile
class Meta:
verbose_name = _("Matrix profile")
verbose_name_plural = _("Matrix profiles")
class MatrixRoom(ExtensiblePolymorphicModel):
"""Model for a Matrix room."""
room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True)
alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True)
Group,
on_delete=models.CASCADE,
verbose_name=_("Group"),
def get_queryset(cls):
return cls.objects.not_instance_of(MatrixSpace)
@classmethod
def build_alias(cls, group: Group) -> str:
return slugify(group.short_name or group.name)
@classmethod
def from_group(cls, group: Group) -> "MatrixRoom":
"""Create a Matrix room from a group."""
from .matrix import MatrixException, do_matrix_request
room = cls.get_queryset().get(group=group)
except cls.DoesNotExist:
room = cls(group=group)
if room.room_id:
# Existing room, check if still accessible
r = do_matrix_request("GET", f"directory/list/room/{room.room_id}")
else:
# Room does not exist, create it
cls.get_profiles_for_group(group).values_list("matrix_id", flat=True)
)
alias_found = False
while not alias_found:
try:
r = cls._create_room(group.name, alias, profiles_to_invite)
alias_found = True
except MatrixException as e:
print(e.args, get_site_preferences()["matrix__disambiguate_room_aliases"])
if (
not get_site_preferences()["matrix__disambiguate_room_aliases"]
or e.args[0].get("errcode") != "M_ROOM_IN_USE"
):
raise MatrixException(*e.args)
match = re.match(r"^(.*)-(\d+)$", alias)
if match:
# Counter found, increase
prefix = match.group(1)
counter = int(match.group(2)) + 1
alias = f"{prefix}-{counter}"
else:
# Counter not found, add one
alias = f"{alias}-2"
room.room_id = r["room_id"]
room.alias = r["room_alias"]
room.save()
return room
@classmethod
def _create_room(
self,
name,
alias,
invite: Optional[List[str]] = None,
creation_content: Optional[dict] = None,
) -> Dict[str, Any]:
from .matrix import do_matrix_request
body = {"preset": "private_chat", "name": name, "room_alias_name": alias}
if invite:
body["invite"] = invite
if creation_content:
body["creation_content"] = creation_content
r = do_matrix_request("POST", "createRoom", body=body)
@property
def power_levels(self) -> Dict[str, int]:
"""Return the power levels for this room."""
from aleksis.apps.matrix.matrix import do_matrix_request
r = do_matrix_request("GET", f"rooms/{self.room_id}/state")
event = list(filter(lambda x: x["type"] == "m.room.power_levels", r))
user_levels = event[0]["content"]["users"]
return user_levels
@property
def members(self) -> List[str]:
from aleksis.apps.matrix.matrix import do_matrix_request
r = do_matrix_request(
"GET", f"rooms/{self.room_id}/members", body={"membership": ["join", "invite"]}
)
return [m["state_key"] for m in r["chunk"]]
def _invite(self, profile: MatrixProfile) -> Dict[str, Any]:
"""Invite a user to this room."""
from aleksis.apps.matrix.matrix import do_matrix_request
r = do_matrix_request(
"POST",
f"rooms/{self.room_id}/invite",
body={"user_id": profile.matrix_id},
def _set_power_levels(self, power_levels: Dict[str, int]) -> Dict[str, Any]:
"""Set the power levels for this room."""
r = do_matrix_request(
"PUT",
f"rooms/{self.room_id}/state/m.room.power_levels/",
body={"users": power_levels},
@classmethod
def get_profiles_for_group(cls, group: Group):
"""Get all profile objects for the members/owners of a group."""
Q(person__member_of=group) | Q(person__owner_of=group)
)
profiles_to_create = []
for person in (
Person.objects.filter(user__isnull=False)
.filter(Q(member_of=group) | Q(owner_of=group))
.exclude(matrix_profile__in=existing_profiles)
.distinct()
):
new_profile = MatrixProfile.from_person(person)
if new_profile:
profiles_to_create.append(new_profile)
MatrixProfile.objects.bulk_create(profiles_to_create)
all_profiles = MatrixProfile.objects.filter(
Q(person__in=group.members.all()) | Q(person__in=group.owners.all())
return all_profiles
def get_profiles(self):
"""Get all profile objects for the members/owners of this group."""
return self.get_profiles_for_group(self.group)
def sync_profiles(self):
"""Sync profiles for this room."""
all_profiles = self.get_profiles()
members = self.members
# Invite all users who are not in the room yet
for profile in all_profiles:
if profile.matrix_id not in members:
# Now invite
self._invite(profile)
# Set power levels for all users
# Mod = 50 = Owners
# User = 0 = Members
user_levels = self.power_levels
for profile in all_profiles:
if profile.person in self.group.owners.all():
user_levels[profile.matrix_id] = 50
elif profile.person in self.group.members.all():
user_levels[profile.matrix_id] = 0
self._set_power_levels(user_levels)
def sync_space(self):
if self.group.child_groups.all():
# Do space stuff
space = MatrixSpace.from_group(self.group)
space.sync()
return None
def sync(self):
"""Sync this room."""
self.sync_profiles()
if get_site_preferences()["matrix__use_spaces"]:
self.sync_space()
class Meta:
verbose_name = _("Matrix room")
verbose_name_plural = _("Matrix rooms")
permissions = (("use_group_in_matrix", "Can use group in Matrix"),)
class MatrixSpace(MatrixRoom):
children = models.ManyToManyField(
to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents"
)
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
@classmethod
def get_queryset(cls):
return cls.objects.instance_of(MatrixSpace)
@classmethod
def build_alias(cls, group: Group) -> str:
"""Build an alias for this space."""
return slugify(group.short_name or group.name) + "-space"
@classmethod
def _create_room(
self,
name,
alias,
invite: Optional[List[str]] = None,
creation_content: Optional[dict] = None,
) -> Dict[str, Any]:
if not creation_content:
creation_content = {}
creation_content["type"] = "m.space"
return super()._create_room(name, alias, invite, creation_content)
@property
def child_spaces(self) -> List[str]:
"""Get all child spaces of this space."""
from aleksis.apps.matrix.matrix import do_matrix_request
r = do_matrix_request("GET", f"rooms/{self.room_id}/state")
return [c["state_key"] for c in r if c["type"] == "m.space.child"]
def _add_child(self, room_id: str):
"""Add a child room to this space."""
r = do_matrix_request(
"PUT",
f"/_matrix/client/v3/rooms/{self.room_id}/state/m.space.child/{room_id}",
body={"via": [get_site_preferences()["matrix__homeserver_ids"]]},
)
return r
def sync_children(self):
"""Sync membership of child spaces and rooms."""
current_children = self.child_spaces
child_spaces = MatrixSpace.get_queryset().filter(
group__in=self.group.child_groups.filter(child_groups__isnull=False)
)
child_rooms = MatrixRoom.get_queryset().filter(
Q(group__in=self.group.child_groups.filter(child_groups__isnull=True))
| Q(group=self.group)
)
child_ids = [m.room_id for m in list(child_spaces) + list(child_rooms)]
missing_ids = set(child_ids).difference(set(current_children))
for missing_id in missing_ids:
self._add_child(missing_id)
def ensure_children(self):
"""Ensure that all child rooms/spaces exist."""
for group in self.group.child_groups.all().prefetch_related("child_groups"):
if group.child_groups.all():
space = MatrixSpace.from_group(group)
space.ensure_children()
else:
group.use_in_matrix(sync=True)
def sync(self):
"""Sync this space."""
self.ensure_children()
self.sync_children()
class Meta:
verbose_name = _("Matrix space")
verbose_name_plural = _("Matrix spaces")