Newer
Older
from django.template.defaultfilters import slugify
from django.utils.translation import gettext_lazy as _
from aleksis.core.mixins import ExtensibleModel, ExtensiblePolymorphicModel
from aleksis.core.models import Group, Person
from aleksis.core.util.core_helpers import get_site_preferences
class MatrixProfile(ExtensibleModel):
"""Model for a Matrix profile."""
matrix_id = models.CharField(max_length=255, verbose_name=_("Matrix ID"), unique=True)
person = models.OneToOneField(
Person,
on_delete=models.CASCADE,
verbose_name=_("Person"),
null=True,
blank=True,
related_name="matrix_profile",
)
@classmethod
def build_matrix_id(cls, username, homeserver: Optional[str] = None):
homeserver = homeserver or get_site_preferences()["matrix__homeserver_ids"]
return f"@{username}:{homeserver}"
@classmethod
def from_person(cls, person: Person, commit: bool = False) -> Union["MatrixProfile", None]:
if hasattr(person, "matrix_profile"):
return person.matrix_profile
if not person.user:
raise ValueError("Person must have a user.")
if not get_site_preferences()["matrix__homeserver_ids"]:
return None
new_profile = MatrixProfile(
matrix_id=cls.build_matrix_id(person.user.username), person=person
)
if commit:
new_profile.save()
return new_profile
class Meta:
verbose_name = _("Matrix profile")
verbose_name_plural = _("Matrix profiles")
class MatrixRoom(ExtensiblePolymorphicModel):
"""Model for a Matrix room."""
room_id = models.CharField(max_length=255, verbose_name=_("Room ID"), unique=True)
alias = models.CharField(max_length=255, verbose_name=_("Alias"), unique=True, blank=True)
group = models.ForeignKey(
Group,
on_delete=models.CASCADE,
verbose_name=_("Group"),
null=True,
blank=True,
related_name="matrix_spaces",
)
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@classmethod
def from_group(self, group: Group):
"""Create a Matrix room from a group."""
from .matrix import MatrixException, build_url, get_headers
try:
room = MatrixRoom.objects.get(group=group)
except MatrixRoom.DoesNotExist:
room = MatrixRoom(group=group)
if room.room_id:
# Existing room, check if still accessible
r = requests.get(
build_url(f"directory/list/room/{room.room_id}"), headers=get_headers()
)
if not r.status_code == requests.codes.ok:
raise MatrixException()
else:
# Room does not exist, create it
alias = slugify(group.short_name or group.name)
r = self._create_group(group.name, alias)
while r.json().get("errcode") == "M_ROOM_IN_USE":
match = re.match(r"^(.*)-(\d+)$", alias)
if match:
# Counter found, increase
prefix = match.group(1)
counter = int(match.group(2)) + 1
alias = f"{prefix}-{counter}"
else:
# Counter not found, add one
alias = f"{alias}-2"
r = self._create_group(group.name, alias)
if r.status_code == requests.codes.ok:
room.room_id = r.json()["room_id"]
room.alias = r.json()["room_alias"]
room.save()
else:
raise MatrixException(r.text)
return room
@classmethod
def _create_group(self, name, alias):
from .matrix import build_url, get_headers
body = {"preset": "private_chat", "name": name, "room_alias_name": alias}
r = requests.post(build_url("createRoom"), headers=get_headers(), json=body)
return r
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@property
def power_levels(self) -> Dict[str, int]:
"""Return the power levels for this room."""
from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers
r = requests.get(build_url(f"rooms/{self.room_id}/state"), headers=get_headers())
if r.status_code != requests.codes.ok:
raise MatrixException(r.text)
event = list(filter(lambda x: x["type"] == "m.room.power_levels", r.json()))
user_levels = event[0]["content"]["users"]
return user_levels
def _invite(self, profile: MatrixProfile) -> Dict[str, Any]:
"""Invite a user to this room."""
from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers
r = requests.post(
build_url(f"rooms/{self.room_id}/invite"),
headers=get_headers(),
json={"user_id": profile.matrix_id},
)
if not r.status_code == requests.codes.ok:
raise MatrixException(r.text)
return r.json()
def _set_power_levels(self, power_levels: Dict[str, int]) -> Dict[str, Any]:
"""Set the power levels for this room."""
from aleksis.apps.matrix.matrix import MatrixException, build_url, get_headers
r = requests.put(
build_url(f"rooms/{self.room_id}/state/m.room.power_levels/"),
headers=get_headers(),
json={"users": power_levels},
)
print(r.text, r.status_code)
if not r.status_code == requests.codes.ok:
raise MatrixException(r.text)
return r.json()
def sync_profiles(self):
"""Sync profiles for this room."""
existing_profiles = MatrixProfile.objects.filter(
Q(person__member_of=self.group) | Q(person__owner_of=self.group)
)
profiles_to_create = []
for person in (
Person.objects.filter(user__isnull=False)
.filter(Q(member_of=self.group) | Q(owner_of=self.group))
.exclude(matrix_profile__in=existing_profiles)
.distinct()
):
new_profile = MatrixProfile.from_person(person)
if new_profile:
profiles_to_create.append(new_profile)
MatrixProfile.objects.bulk_create(profiles_to_create)
all_profiles = MatrixProfile.objects.filter(
Q(person__in=self.group.members.all()) | Q(person__in=self.group.owners.all())
)
user_levels = self.power_levels
# Invite all users who are not in the room yet
for profile in all_profiles:
if profile.matrix_id not in user_levels:
# Now invite
self._invite(profile)
# Set power levels for all users
# Mod = 50 = Owners
# User = 0 = Members
user_levels = self.power_levels
for profile in all_profiles:
if profile.person in self.group.owners.all():
user_levels[profile.matrix_id] = 50
elif profile.person in self.group.members.all():
user_levels[profile.matrix_id] = 0
self._set_power_levels(user_levels)
class Meta:
verbose_name = _("Matrix room")
verbose_name_plural = _("Matrix rooms")
class MatrixSpace(MatrixRoom):
children = models.ManyToManyField(
to=MatrixRoom, verbose_name=_("Child rooms/spaces"), blank=True, related_name="parents"
)
class Meta:
verbose_name = _("Matrix space")
verbose_name_plural = _("Matrix spaces")