1 package org
.asamk
.signal
.manager
.storage
.groups
;
3 import com
.google
.protobuf
.InvalidProtocolBufferException
;
5 import org
.asamk
.signal
.manager
.api
.GroupId
;
6 import org
.asamk
.signal
.manager
.api
.GroupIdV1
;
7 import org
.asamk
.signal
.manager
.api
.GroupIdV2
;
8 import org
.asamk
.signal
.manager
.groups
.GroupUtils
;
9 import org
.asamk
.signal
.manager
.storage
.Database
;
10 import org
.asamk
.signal
.manager
.storage
.Utils
;
11 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientId
;
12 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientIdCreator
;
13 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientResolver
;
14 import org
.signal
.libsignal
.zkgroup
.InvalidInputException
;
15 import org
.signal
.libsignal
.zkgroup
.groups
.GroupMasterKey
;
16 import org
.signal
.storageservice
.protos
.groups
.local
.DecryptedGroup
;
17 import org
.slf4j
.Logger
;
18 import org
.slf4j
.LoggerFactory
;
19 import org
.whispersystems
.signalservice
.api
.push
.DistributionId
;
20 import org
.whispersystems
.signalservice
.api
.util
.UuidUtil
;
22 import java
.sql
.Connection
;
23 import java
.sql
.ResultSet
;
24 import java
.sql
.SQLException
;
25 import java
.sql
.Types
;
26 import java
.util
.Arrays
;
27 import java
.util
.Collection
;
28 import java
.util
.List
;
29 import java
.util
.Objects
;
31 import java
.util
.stream
.Collectors
;
32 import java
.util
.stream
.Stream
;
34 public class GroupStore
{
36 private final static Logger logger
= LoggerFactory
.getLogger(GroupStore
.class);
37 private static final String TABLE_GROUP_V2
= "group_v2";
38 private static final String TABLE_GROUP_V1
= "group_v1";
39 private static final String TABLE_GROUP_V1_MEMBER
= "group_v1_member";
41 private final Database database
;
42 private final RecipientResolver recipientResolver
;
43 private final RecipientIdCreator recipientIdCreator
;
45 public static void createSql(Connection connection
) throws SQLException
{
46 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
47 try (final var statement
= connection
.createStatement()) {
48 statement
.executeUpdate("""
49 CREATE TABLE group_v2 (
50 _id INTEGER PRIMARY KEY,
51 group_id BLOB UNIQUE NOT NULL,
52 master_key BLOB NOT NULL,
54 distribution_id BLOB UNIQUE NOT NULL,
55 blocked INTEGER NOT NULL DEFAULT FALSE,
56 permission_denied INTEGER NOT NULL DEFAULT FALSE
58 CREATE TABLE group_v1 (
59 _id INTEGER PRIMARY KEY,
60 group_id BLOB UNIQUE NOT NULL,
61 group_id_v2 BLOB UNIQUE,
64 expiration_time INTEGER NOT NULL DEFAULT 0,
65 blocked INTEGER NOT NULL DEFAULT FALSE,
66 archived INTEGER NOT NULL DEFAULT FALSE
68 CREATE TABLE group_v1_member (
69 _id INTEGER PRIMARY KEY,
70 group_id INTEGER NOT NULL REFERENCES group_v1 (_id) ON DELETE CASCADE,
71 recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE,
72 UNIQUE(group_id, recipient_id)
79 final Database database
,
80 final RecipientResolver recipientResolver
,
81 final RecipientIdCreator recipientIdCreator
83 this.database
= database
;
84 this.recipientResolver
= recipientResolver
;
85 this.recipientIdCreator
= recipientIdCreator
;
88 public void updateGroup(GroupInfo group
) {
89 try (final var connection
= database
.getConnection()) {
90 connection
.setAutoCommit(false);
91 final Long internalId
;
98 ).formatted(group
instanceof GroupInfoV1 ? TABLE_GROUP_V1
: TABLE_GROUP_V2
);
99 try (final var statement
= connection
.prepareStatement(sql
)) {
100 statement
.setBytes(1, group
.getGroupId().serialize());
101 internalId
= Utils
.executeQueryForOptional(statement
, res
-> res
.getLong("_id")).orElse(null);
103 insertOrReplaceGroup(connection
, internalId
, group
);
105 } catch (SQLException e
) {
106 throw new RuntimeException("Failed update recipient store", e
);
110 public void deleteGroup(GroupId groupId
) {
111 if (groupId
instanceof GroupIdV1 groupIdV1
) {
112 deleteGroup(groupIdV1
);
113 } else if (groupId
instanceof GroupIdV2 groupIdV2
) {
114 deleteGroup(groupIdV2
);
118 public void deleteGroup(GroupIdV1 groupIdV1
) {
124 ).formatted(TABLE_GROUP_V1
);
125 try (final var connection
= database
.getConnection()) {
126 try (final var statement
= connection
.prepareStatement(sql
)) {
127 statement
.setBytes(1, groupIdV1
.serialize());
128 statement
.executeUpdate();
130 } catch (SQLException e
) {
131 throw new RuntimeException("Failed update group store", e
);
135 public void deleteGroup(GroupIdV2 groupIdV2
) {
141 ).formatted(TABLE_GROUP_V2
);
142 try (final var connection
= database
.getConnection()) {
143 try (final var statement
= connection
.prepareStatement(sql
)) {
144 statement
.setBytes(1, groupIdV2
.serialize());
145 statement
.executeUpdate();
147 } catch (SQLException e
) {
148 throw new RuntimeException("Failed update group store", e
);
152 public GroupInfo
getGroup(GroupId groupId
) {
153 try (final var connection
= database
.getConnection()) {
154 if (groupId
instanceof GroupIdV1 groupIdV1
) {
155 final var group
= getGroup(connection
, groupIdV1
);
159 return getGroupV2ByV1Id(connection
, groupIdV1
);
160 } else if (groupId
instanceof GroupIdV2 groupIdV2
) {
161 final var group
= getGroup(connection
, groupIdV2
);
165 return getGroupV1ByV2Id(connection
, groupIdV2
);
167 } catch (SQLException e
) {
168 throw new RuntimeException("Failed read from group store", e
);
170 throw new AssertionError("Invalid group id type");
173 public GroupInfoV1
getOrCreateGroupV1(GroupIdV1 groupId
) {
174 try (final var connection
= database
.getConnection()) {
175 var group
= getGroup(connection
, groupId
);
181 if (getGroupV2ByV1Id(connection
, groupId
) == null) {
182 return new GroupInfoV1(groupId
);
186 } catch (SQLException e
) {
187 throw new RuntimeException("Failed read from group store", e
);
191 public List
<GroupInfo
> getGroups() {
192 return Stream
.concat(getGroupsV2().stream(), getGroupsV1().stream()).toList();
195 public void mergeRecipients(
196 final Connection connection
, final RecipientId recipientId
, final RecipientId toBeMergedRecipientId
197 ) throws SQLException
{
202 WHERE recipient_id = ?
204 ).formatted(TABLE_GROUP_V1_MEMBER
);
205 try (final var statement
= connection
.prepareStatement(sql
)) {
206 statement
.setLong(1, recipientId
.id());
207 statement
.setLong(2, toBeMergedRecipientId
.id());
208 final var updatedRows
= statement
.executeUpdate();
209 if (updatedRows
> 0) {
210 logger
.info("Updated {} group members when merging recipients", updatedRows
);
215 void addLegacyGroups(final Collection
<GroupInfo
> groups
) {
216 logger
.debug("Migrating legacy groups to database");
217 long start
= System
.nanoTime();
218 try (final var connection
= database
.getConnection()) {
219 connection
.setAutoCommit(false);
220 for (final var group
: groups
) {
221 insertOrReplaceGroup(connection
, null, group
);
224 } catch (SQLException e
) {
225 throw new RuntimeException("Failed update group store", e
);
227 logger
.debug("Complete groups migration took {}ms", (System
.nanoTime() - start
) / 1000000);
230 private void insertOrReplaceGroup(
231 final Connection connection
, Long internalId
, final GroupInfo group
232 ) throws SQLException
{
233 if (group
instanceof GroupInfoV1 groupV1
) {
234 if (internalId
!= null) {
235 final var sqlDeleteMembers
= "DELETE FROM %s where group_id = ?".formatted(TABLE_GROUP_V1_MEMBER
);
236 try (final var statement
= connection
.prepareStatement(sqlDeleteMembers
)) {
237 statement
.setLong(1, internalId
);
238 statement
.executeUpdate();
242 INSERT OR REPLACE INTO %s (_id, group_id, group_id_v2, name, color, expiration_time, blocked, archived)
243 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
245 """.formatted(TABLE_GROUP_V1
);
246 try (final var statement
= connection
.prepareStatement(sql
)) {
247 if (internalId
== null) {
248 statement
.setNull(1, Types
.NUMERIC
);
250 statement
.setLong(1, internalId
);
252 statement
.setBytes(2, groupV1
.getGroupId().serialize());
253 statement
.setBytes(3, groupV1
.getExpectedV2Id().serialize());
254 statement
.setString(4, groupV1
.getTitle());
255 statement
.setString(5, groupV1
.color
);
256 statement
.setLong(6, groupV1
.getMessageExpirationTimer());
257 statement
.setBoolean(7, groupV1
.isBlocked());
258 statement
.setBoolean(8, groupV1
.archived
);
259 final var generatedKey
= Utils
.executeQueryForOptional(statement
, Utils
::getIdMapper
);
261 if (internalId
== null) {
262 if (generatedKey
.isPresent()) {
263 internalId
= generatedKey
.get();
265 throw new RuntimeException("Failed to add new group to database");
269 final var sqlInsertMember
= """
270 INSERT OR REPLACE INTO %s (group_id, recipient_id)
272 """.formatted(TABLE_GROUP_V1_MEMBER
);
273 try (final var statement
= connection
.prepareStatement(sqlInsertMember
)) {
274 for (final var recipient
: groupV1
.getMembers()) {
275 statement
.setLong(1, internalId
);
276 statement
.setLong(2, recipient
.id());
277 statement
.executeUpdate();
280 } else if (group
instanceof GroupInfoV2 groupV2
) {
283 INSERT OR REPLACE INTO %s (_id, group_id, master_key, group_data, distribution_id, blocked, distribution_id)
284 VALUES (?, ?, ?, ?, ?, ?, ?)
286 ).formatted(TABLE_GROUP_V2
);
287 try (final var statement
= connection
.prepareStatement(sql
)) {
288 if (internalId
== null) {
289 statement
.setNull(1, Types
.NUMERIC
);
291 statement
.setLong(1, internalId
);
293 statement
.setBytes(2, groupV2
.getGroupId().serialize());
294 statement
.setBytes(3, groupV2
.getMasterKey().serialize());
295 if (groupV2
.getGroup() == null) {
296 statement
.setNull(4, Types
.NUMERIC
);
298 statement
.setBytes(4, groupV2
.getGroup().toByteArray());
300 statement
.setBytes(5, UuidUtil
.toByteArray(groupV2
.getDistributionId().asUuid()));
301 statement
.setBoolean(6, groupV2
.isBlocked());
302 statement
.setBoolean(7, groupV2
.isPermissionDenied());
303 statement
.executeUpdate();
306 throw new AssertionError("Invalid group id type");
310 private List
<GroupInfoV2
> getGroupsV2() {
313 SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied
316 ).formatted(TABLE_GROUP_V2
);
317 try (final var connection
= database
.getConnection()) {
318 try (final var statement
= connection
.prepareStatement(sql
)) {
319 return Utils
.executeQueryForStream(statement
, this::getGroupInfoV2FromResultSet
)
320 .filter(Objects
::nonNull
)
323 } catch (SQLException e
) {
324 throw new RuntimeException("Failed read from group store", e
);
328 private GroupInfoV2
getGroup(Connection connection
, GroupIdV2 groupIdV2
) throws SQLException
{
331 SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied
335 ).formatted(TABLE_GROUP_V2
);
336 try (final var statement
= connection
.prepareStatement(sql
)) {
337 statement
.setBytes(1, groupIdV2
.serialize());
338 return Utils
.executeQueryForOptional(statement
, this::getGroupInfoV2FromResultSet
).orElse(null);
342 private GroupInfoV2
getGroupInfoV2FromResultSet(ResultSet resultSet
) throws SQLException
{
344 final var groupId
= resultSet
.getBytes("group_id");
345 final var masterKey
= resultSet
.getBytes("master_key");
346 final var groupData
= resultSet
.getBytes("group_data");
347 final var distributionId
= resultSet
.getBytes("distribution_id");
348 final var blocked
= resultSet
.getBoolean("blocked");
349 final var permissionDenied
= resultSet
.getBoolean("permission_denied");
350 return new GroupInfoV2(GroupId
.v2(groupId
),
351 new GroupMasterKey(masterKey
),
352 groupData
== null ?
null : DecryptedGroup
.parseFrom(groupData
),
353 DistributionId
.from(UuidUtil
.parseOrThrow(distributionId
)),
357 } catch (InvalidInputException
| InvalidProtocolBufferException e
) {
362 private List
<GroupInfoV1
> getGroupsV1() {
365 SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
368 ).formatted(TABLE_GROUP_V1_MEMBER
, TABLE_GROUP_V1
);
369 try (final var connection
= database
.getConnection()) {
370 try (final var statement
= connection
.prepareStatement(sql
)) {
371 return Utils
.executeQueryForStream(statement
, this::getGroupInfoV1FromResultSet
)
372 .filter(Objects
::nonNull
)
375 } catch (SQLException e
) {
376 throw new RuntimeException("Failed read from group store", e
);
380 private GroupInfoV1
getGroup(Connection connection
, GroupIdV1 groupIdV1
) throws SQLException
{
383 SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
387 ).formatted(TABLE_GROUP_V1_MEMBER
, TABLE_GROUP_V1
);
388 try (final var statement
= connection
.prepareStatement(sql
)) {
389 statement
.setBytes(1, groupIdV1
.serialize());
390 return Utils
.executeQueryForOptional(statement
, this::getGroupInfoV1FromResultSet
).orElse(null);
394 private GroupInfoV1
getGroupInfoV1FromResultSet(ResultSet resultSet
) throws SQLException
{
395 final var groupId
= resultSet
.getBytes("group_id");
396 final var groupIdV2
= resultSet
.getBytes("group_id_v2");
397 final var name
= resultSet
.getString("name");
398 final var color
= resultSet
.getString("color");
399 final var membersString
= resultSet
.getString("members");
400 final var members
= membersString
== null
401 ? Set
.<RecipientId
>of()
402 : Arrays
.stream(membersString
.split(","))
403 .map(Integer
::valueOf
)
404 .map(recipientIdCreator
::create
)
405 .collect(Collectors
.toSet());
406 final var expirationTime
= resultSet
.getInt("expiration_time");
407 final var blocked
= resultSet
.getBoolean("blocked");
408 final var archived
= resultSet
.getBoolean("archived");
409 return new GroupInfoV1(GroupId
.v1(groupId
),
410 groupIdV2
== null ?
null : GroupId
.v2(groupIdV2
),
419 private GroupInfoV2
getGroupV2ByV1Id(final Connection connection
, final GroupIdV1 groupId
) throws SQLException
{
420 return getGroup(connection
, GroupUtils
.getGroupIdV2(groupId
));
423 private GroupInfoV1
getGroupV1ByV2Id(Connection connection
, GroupIdV2 groupIdV2
) throws SQLException
{
426 SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
428 WHERE g.group_id_v2 = ?
430 ).formatted(TABLE_GROUP_V1_MEMBER
, TABLE_GROUP_V1
);
431 try (final var statement
= connection
.prepareStatement(sql
)) {
432 statement
.setBytes(1, groupIdV2
.serialize());
433 return Utils
.executeQueryForOptional(statement
, this::getGroupInfoV1FromResultSet
).orElse(null);