1 package org
.asamk
.signal
.manager
.storage
.groups
;
3 import com
.google
.protobuf
.InvalidProtocolBufferException
;
5 import org
.asamk
.signal
.manager
.groups
.GroupId
;
6 import org
.asamk
.signal
.manager
.groups
.GroupIdV1
;
7 import org
.asamk
.signal
.manager
.groups
.GroupIdV2
;
8 import org
.asamk
.signal
.manager
.groups
.GroupUtils
;
9 import org
.asamk
.signal
.manager
.storage
.Database
;
10 import org
.asamk
.signal
.manager
.storage
.Utils
;
11 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientId
;
12 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientIdCreator
;
13 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientResolver
;
14 import org
.signal
.libsignal
.zkgroup
.InvalidInputException
;
15 import org
.signal
.libsignal
.zkgroup
.groups
.GroupMasterKey
;
16 import org
.signal
.storageservice
.protos
.groups
.local
.DecryptedGroup
;
17 import org
.slf4j
.Logger
;
18 import org
.slf4j
.LoggerFactory
;
19 import org
.whispersystems
.signalservice
.api
.push
.DistributionId
;
20 import org
.whispersystems
.signalservice
.api
.util
.UuidUtil
;
22 import java
.sql
.Connection
;
23 import java
.sql
.ResultSet
;
24 import java
.sql
.SQLException
;
25 import java
.sql
.Types
;
26 import java
.util
.Arrays
;
27 import java
.util
.Collection
;
28 import java
.util
.List
;
29 import java
.util
.Objects
;
31 import java
.util
.stream
.Collectors
;
32 import java
.util
.stream
.Stream
;
34 public class GroupStore
{
36 private final static Logger logger
= LoggerFactory
.getLogger(GroupStore
.class);
37 private static final String TABLE_GROUP_V2
= "group_v2";
38 private static final String TABLE_GROUP_V1
= "group_v1";
39 private static final String TABLE_GROUP_V1_MEMBER
= "group_v1_member";
41 private final Database database
;
42 private final RecipientResolver recipientResolver
;
43 private final RecipientIdCreator recipientIdCreator
;
45 public static void createSql(Connection connection
) throws SQLException
{
46 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
47 try (final var statement
= connection
.createStatement()) {
48 statement
.executeUpdate("""
49 CREATE TABLE group_v2 (
50 _id INTEGER PRIMARY KEY,
51 group_id BLOB UNIQUE NOT NULL,
52 master_key BLOB NOT NULL,
54 distribution_id BLOB UNIQUE NOT NULL,
55 blocked INTEGER NOT NULL DEFAULT FALSE,
56 permission_denied INTEGER NOT NULL DEFAULT FALSE
58 CREATE TABLE group_v1 (
59 _id INTEGER PRIMARY KEY,
60 group_id BLOB UNIQUE NOT NULL,
61 group_id_v2 BLOB UNIQUE,
64 expiration_time INTEGER NOT NULL DEFAULT 0,
65 blocked INTEGER NOT NULL DEFAULT FALSE,
66 archived INTEGER NOT NULL DEFAULT FALSE
68 CREATE TABLE group_v1_member (
69 _id INTEGER PRIMARY KEY,
70 group_id INTEGER NOT NULL REFERENCES group_v1 (_id) ON DELETE CASCADE,
71 recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE,
72 UNIQUE(group_id, recipient_id)
79 final Database database
,
80 final RecipientResolver recipientResolver
,
81 final RecipientIdCreator recipientIdCreator
83 this.database
= database
;
84 this.recipientResolver
= recipientResolver
;
85 this.recipientIdCreator
= recipientIdCreator
;
88 public void updateGroup(GroupInfo group
) {
89 try (final var connection
= database
.getConnection()) {
90 connection
.setAutoCommit(false);
91 final Long internalId
;
98 ).formatted(group
instanceof GroupInfoV1 ? TABLE_GROUP_V1
: TABLE_GROUP_V2
);
99 try (final var statement
= connection
.prepareStatement(sql
)) {
100 statement
.setBytes(1, group
.getGroupId().serialize());
101 internalId
= Utils
.executeQueryForOptional(statement
, res
-> res
.getLong("_id")).orElse(null);
103 insertOrReplaceGroup(connection
, internalId
, group
);
105 } catch (SQLException e
) {
106 throw new RuntimeException("Failed update recipient store", e
);
110 public void deleteGroup(GroupId groupId
) {
111 if (groupId
instanceof GroupIdV1 groupIdV1
) {
112 deleteGroup(groupIdV1
);
113 } else if (groupId
instanceof GroupIdV2 groupIdV2
) {
114 deleteGroup(groupIdV2
);
118 public void deleteGroup(GroupIdV1 groupIdV1
) {
124 ).formatted(TABLE_GROUP_V1
);
125 try (final var connection
= database
.getConnection()) {
126 try (final var statement
= connection
.prepareStatement(sql
)) {
127 statement
.setBytes(1, groupIdV1
.serialize());
128 statement
.executeUpdate();
130 } catch (SQLException e
) {
131 throw new RuntimeException("Failed update group store", e
);
135 public void deleteGroup(GroupIdV2 groupIdV2
) {
141 ).formatted(TABLE_GROUP_V2
);
142 try (final var connection
= database
.getConnection()) {
143 try (final var statement
= connection
.prepareStatement(sql
)) {
144 statement
.setBytes(1, groupIdV2
.serialize());
145 statement
.executeUpdate();
147 } catch (SQLException e
) {
148 throw new RuntimeException("Failed update group store", e
);
152 public GroupInfo
getGroup(GroupId groupId
) {
153 try (final var connection
= database
.getConnection()) {
154 if (groupId
instanceof GroupIdV1 groupIdV1
) {
155 final var group
= getGroup(connection
, groupIdV1
);
159 return getGroupV2ByV1Id(connection
, groupIdV1
);
160 } else if (groupId
instanceof GroupIdV2 groupIdV2
) {
161 final var group
= getGroup(connection
, groupIdV2
);
165 return getGroupV1ByV2Id(connection
, groupIdV2
);
167 } catch (SQLException e
) {
168 throw new RuntimeException("Failed read from group store", e
);
170 throw new AssertionError("Invalid group id type");
173 public GroupInfoV1
getOrCreateGroupV1(GroupIdV1 groupId
) {
174 try (final var connection
= database
.getConnection()) {
175 var group
= getGroup(connection
, groupId
);
181 if (getGroupV2ByV1Id(connection
, groupId
) == null) {
182 return new GroupInfoV1(groupId
);
186 } catch (SQLException e
) {
187 throw new RuntimeException("Failed read from group store", e
);
191 public List
<GroupInfo
> getGroups() {
192 return Stream
.concat(getGroupsV2().stream(), getGroupsV1().stream()).toList();
195 public void mergeRecipients(final RecipientId recipientId
, final RecipientId toBeMergedRecipientId
) {
200 WHERE recipient_id = ?
202 ).formatted(TABLE_GROUP_V1_MEMBER
);
203 try (final var connection
= database
.getConnection()) {
204 try (final var statement
= connection
.prepareStatement(sql
)) {
205 statement
.setLong(1, recipientId
.id());
206 statement
.setLong(2, toBeMergedRecipientId
.id());
207 final var updatedRows
= statement
.executeUpdate();
208 if (updatedRows
> 0) {
209 logger
.info("Updated {} group members when merging recipients", updatedRows
);
212 } catch (SQLException e
) {
213 throw new RuntimeException("Failed update group store", e
);
217 void addLegacyGroups(final Collection
<GroupInfo
> groups
) {
218 logger
.debug("Migrating legacy groups to database");
219 long start
= System
.nanoTime();
220 try (final var connection
= database
.getConnection()) {
221 connection
.setAutoCommit(false);
222 for (final var group
: groups
) {
223 insertOrReplaceGroup(connection
, null, group
);
226 } catch (SQLException e
) {
227 throw new RuntimeException("Failed update group store", e
);
229 logger
.debug("Complete groups migration took {}ms", (System
.nanoTime() - start
) / 1000000);
232 private void insertOrReplaceGroup(
233 final Connection connection
, Long internalId
, final GroupInfo group
234 ) throws SQLException
{
235 if (group
instanceof GroupInfoV1 groupV1
) {
236 if (internalId
!= null) {
237 final var sqlDeleteMembers
= "DELETE FROM %s where group_id = ?".formatted(TABLE_GROUP_V1_MEMBER
);
238 try (final var statement
= connection
.prepareStatement(sqlDeleteMembers
)) {
239 statement
.setLong(1, internalId
);
240 statement
.executeUpdate();
244 INSERT OR REPLACE INTO %s (_id, group_id, group_id_v2, name, color, expiration_time, blocked, archived)
245 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
246 """.formatted(TABLE_GROUP_V1
);
247 try (final var statement
= connection
.prepareStatement(sql
)) {
248 if (internalId
== null) {
249 statement
.setNull(1, Types
.NUMERIC
);
251 statement
.setLong(1, internalId
);
253 statement
.setBytes(2, groupV1
.getGroupId().serialize());
254 statement
.setBytes(3, groupV1
.getExpectedV2Id().serialize());
255 statement
.setString(4, groupV1
.getTitle());
256 statement
.setString(5, groupV1
.color
);
257 statement
.setLong(6, groupV1
.getMessageExpirationTimer());
258 statement
.setBoolean(7, groupV1
.isBlocked());
259 statement
.setBoolean(8, groupV1
.archived
);
260 statement
.executeUpdate();
262 if (internalId
== null) {
263 final var generatedKeys
= statement
.getGeneratedKeys();
264 if (generatedKeys
.next()) {
265 internalId
= generatedKeys
.getLong(1);
267 throw new RuntimeException("Failed to add new recipient to database");
271 final var sqlInsertMember
= """
272 INSERT OR REPLACE INTO %s (group_id, recipient_id)
274 """.formatted(TABLE_GROUP_V1_MEMBER
);
275 try (final var statement
= connection
.prepareStatement(sqlInsertMember
)) {
276 for (final var recipient
: groupV1
.getMembers()) {
277 statement
.setLong(1, internalId
);
278 statement
.setLong(2, recipient
.id());
279 statement
.executeUpdate();
282 } else if (group
instanceof GroupInfoV2 groupV2
) {
285 INSERT OR REPLACE INTO %s (_id, group_id, master_key, group_data, distribution_id, blocked, distribution_id)
286 VALUES (?, ?, ?, ?, ?, ?, ?)
288 ).formatted(TABLE_GROUP_V2
);
289 try (final var statement
= connection
.prepareStatement(sql
)) {
290 if (internalId
== null) {
291 statement
.setNull(1, Types
.NUMERIC
);
293 statement
.setLong(1, internalId
);
295 statement
.setBytes(2, groupV2
.getGroupId().serialize());
296 statement
.setBytes(3, groupV2
.getMasterKey().serialize());
297 if (groupV2
.getGroup() == null) {
298 statement
.setNull(4, Types
.NUMERIC
);
300 statement
.setBytes(4, groupV2
.getGroup().toByteArray());
302 statement
.setBytes(5, UuidUtil
.toByteArray(groupV2
.getDistributionId().asUuid()));
303 statement
.setBoolean(6, groupV2
.isBlocked());
304 statement
.setBoolean(7, groupV2
.isPermissionDenied());
305 statement
.executeUpdate();
308 throw new AssertionError("Invalid group id type");
312 private List
<GroupInfoV2
> getGroupsV2() {
315 SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied
318 ).formatted(TABLE_GROUP_V2
);
319 try (final var connection
= database
.getConnection()) {
320 try (final var statement
= connection
.prepareStatement(sql
)) {
321 return Utils
.executeQueryForStream(statement
, this::getGroupInfoV2FromResultSet
)
322 .filter(Objects
::nonNull
)
325 } catch (SQLException e
) {
326 throw new RuntimeException("Failed read from group store", e
);
330 private GroupInfoV2
getGroup(Connection connection
, GroupIdV2 groupIdV2
) throws SQLException
{
333 SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied
337 ).formatted(TABLE_GROUP_V2
);
338 try (final var statement
= connection
.prepareStatement(sql
)) {
339 statement
.setBytes(1, groupIdV2
.serialize());
340 return Utils
.executeQueryForOptional(statement
, this::getGroupInfoV2FromResultSet
).orElse(null);
344 private GroupInfoV2
getGroupInfoV2FromResultSet(ResultSet resultSet
) throws SQLException
{
346 final var groupId
= resultSet
.getBytes("group_id");
347 final var masterKey
= resultSet
.getBytes("master_key");
348 final var groupData
= resultSet
.getBytes("group_data");
349 final var distributionId
= resultSet
.getBytes("distribution_id");
350 final var blocked
= resultSet
.getBoolean("blocked");
351 final var permissionDenied
= resultSet
.getBoolean("permission_denied");
352 return new GroupInfoV2(GroupId
.v2(groupId
),
353 new GroupMasterKey(masterKey
),
354 groupData
== null ?
null : DecryptedGroup
.parseFrom(groupData
),
355 DistributionId
.from(UuidUtil
.parseOrThrow(distributionId
)),
359 } catch (InvalidInputException
| InvalidProtocolBufferException e
) {
364 private List
<GroupInfoV1
> getGroupsV1() {
367 SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
370 ).formatted(TABLE_GROUP_V1_MEMBER
, TABLE_GROUP_V1
);
371 try (final var connection
= database
.getConnection()) {
372 try (final var statement
= connection
.prepareStatement(sql
)) {
373 return Utils
.executeQueryForStream(statement
, this::getGroupInfoV1FromResultSet
)
374 .filter(Objects
::nonNull
)
377 } catch (SQLException e
) {
378 throw new RuntimeException("Failed read from group store", e
);
382 private GroupInfoV1
getGroup(Connection connection
, GroupIdV1 groupIdV1
) throws SQLException
{
385 SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
389 ).formatted(TABLE_GROUP_V1_MEMBER
, TABLE_GROUP_V1
);
390 try (final var statement
= connection
.prepareStatement(sql
)) {
391 statement
.setBytes(1, groupIdV1
.serialize());
392 return Utils
.executeQueryForOptional(statement
, this::getGroupInfoV1FromResultSet
).orElse(null);
396 private GroupInfoV1
getGroupInfoV1FromResultSet(ResultSet resultSet
) throws SQLException
{
397 final var groupId
= resultSet
.getBytes("group_id");
398 final var groupIdV2
= resultSet
.getBytes("group_id_v2");
399 final var name
= resultSet
.getString("name");
400 final var color
= resultSet
.getString("color");
401 final var membersString
= resultSet
.getString("members");
402 final var members
= membersString
== null
403 ? Set
.<RecipientId
>of()
404 : Arrays
.stream(membersString
.split(","))
405 .map(Integer
::valueOf
)
406 .map(recipientIdCreator
::create
)
407 .collect(Collectors
.toSet());
408 final var expirationTime
= resultSet
.getInt("expiration_time");
409 final var blocked
= resultSet
.getBoolean("blocked");
410 final var archived
= resultSet
.getBoolean("archived");
411 return new GroupInfoV1(GroupId
.v1(groupId
),
412 groupIdV2
== null ?
null : GroupId
.v2(groupIdV2
),
421 private GroupInfoV2
getGroupV2ByV1Id(final Connection connection
, final GroupIdV1 groupId
) throws SQLException
{
422 return getGroup(connection
, GroupUtils
.getGroupIdV2(groupId
));
425 private GroupInfoV1
getGroupV1ByV2Id(Connection connection
, GroupIdV2 groupIdV2
) throws SQLException
{
428 SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
430 WHERE g.group_id_v2 = ?
432 ).formatted(TABLE_GROUP_V1_MEMBER
, TABLE_GROUP_V1
);
433 try (final var statement
= connection
.prepareStatement(sql
)) {
434 statement
.setBytes(1, groupIdV2
.serialize());
435 return Utils
.executeQueryForOptional(statement
, this::getGroupInfoV1FromResultSet
).orElse(null);