Newer
Older
from django.template.defaultfilters import slugify
from django.utils.translation import gettext_lazy as _
from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel
from aleksis.core.models import Group, Person
class MatrixProfile(ExtensibleModel):
"""Model for a Matrix profile."""
matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True)
person = models.OneToOneField(
Person,
on_delete=models.CASCADE,
verbose_name=_("Person"),
null=True,
blank=True,
related_name="matrix_profile",
)
class Meta:
verbose_name = _("Matrix profile")
verbose_name_plural = _("Matrix profiles")
class MatrixRoom(ExtensiblePolymorphicModel):
"""Model for a Matrix room."""
room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True)
alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True)
group = models.ForeignKey(
Group,
on_delete=models.CASCADE,
verbose_name=_("Group"),
null=True,
blank=True,
related_name="matrix_spaces",
)
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def from_group(self, group: Group):
"""Create a Matrix room from a group."""
from .matrix import MatrixException, build_url, get_headers
try:
room = MatrixRoom.objects.get(group=group)
except MatrixRoom.DoesNotExist:
room = MatrixRoom(group=group)
if room.room_id:
# Existing room, check if still accessible
r = requests.get(
build_url(f"directory/list/room/{room.room_id}"), headers=get_headers()
)
if not r.status_code == requests.codes.ok:
raise MatrixException()
else:
# Room does not exist, create it
alias = slugify(group.short_name or group.name)
r = self._create_group(group.name, alias)
while r.json().get("errcode") == "M_ROOM_IN_USE":
match = re.match(r"^(.*)-(\d+)$", alias)
if match:
# Counter found, increase
prefix = match.group(1)
counter = int(match.group(2)) + 1
alias = f"{prefix}-{counter}"
else:
# Counter not found, add one
alias = f"{alias}-2"
r = self._create_group(group.name, alias)
if r.status_code == requests.codes.ok:
room.room_id = r.json()["room_id"]
room.alias = r.json()["room_alias"]
room.save()
else:
raise MatrixException(r.text)
return room
@classmethod
def _create_group(self, name, alias):
from .matrix import build_url, get_headers
body = {"preset": "private_chat", "name": name, "room_alias_name": alias}
r = requests.post(build_url("createRoom"), headers=get_headers(), json=body)
return r
class Meta:
verbose_name = _("Matrix room")
verbose_name_plural = _("Matrix rooms")
class MatrixSpace(MatrixRoom):
children = models.ManyToManyField(
to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents"
)
class Meta:
verbose_name = _("Matrix space")
verbose_name_plural = _("Matrix spaces")