Implement MessageSendLog for resending after encryption error

This commit is contained in:
AsamK 2022-01-23 20:50:23 +01:00
parent 3491782912
commit 95cc0ae7fd
16 changed files with 960 additions and 80 deletions

View file

@ -19,6 +19,8 @@ dependencies {
implementation("com.google.protobuf", "protobuf-javalite", "3.11.4")
implementation("org.bouncycastle", "bcprov-jdk15on", "1.70")
implementation("org.slf4j", "slf4j-api", "1.7.32")
implementation("org.xerial", "sqlite-jdbc", "3.36.0.3")
implementation("com.zaxxer", "HikariCP", "5.0.1")
}
configurations {

View file

@ -67,6 +67,7 @@ public interface Manager extends Closeable {
throw new NotRegisteredException();
}
account.initDatabase();
final var serviceEnvironmentConfig = ServiceConfig.getServiceEnvironmentConfig(serviceEnvironment, userAgent);
return new ManagerImpl(account, pathConfig, serviceEnvironmentConfig, userAgent);

View file

@ -571,6 +571,17 @@ public class ManagerImpl implements Manager {
) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException {
var delete = new SignalServiceDataMessage.RemoteDelete(targetSentTimestamp);
final var messageBuilder = SignalServiceDataMessage.newBuilder().withRemoteDelete(delete);
for (final var recipient : recipients) {
if (recipient instanceof RecipientIdentifier.Single r) {
try {
final var recipientId = context.getRecipientHelper().resolveRecipient(r);
account.getMessageSendLogStore().deleteEntryForRecipientNonGroup(targetSentTimestamp, recipientId);
} catch (UnregisteredRecipientException ignored) {
}
} else if (recipient instanceof RecipientIdentifier.Group r) {
account.getMessageSendLogStore().deleteEntryForGroup(targetSentTimestamp, r.groupId());
}
}
return sendMessage(messageBuilder, recipients);
}

View file

@ -0,0 +1,42 @@
package org.asamk.signal.manager.actions;
import org.asamk.signal.manager.helper.Context;
import org.asamk.signal.manager.storage.recipients.RecipientId;
import org.asamk.signal.manager.storage.sendLog.MessageSendLogEntry;
import java.util.Objects;
public class ResendMessageAction implements HandleAction {
private final RecipientId recipientId;
private final long timestamp;
private final MessageSendLogEntry messageSendLogEntry;
public ResendMessageAction(
final RecipientId recipientId, final long timestamp, final MessageSendLogEntry messageSendLogEntry
) {
this.recipientId = recipientId;
this.timestamp = timestamp;
this.messageSendLogEntry = messageSendLogEntry;
}
@Override
public void execute(Context context) throws Throwable {
context.getSendHelper().resendMessage(recipientId, timestamp, messageSendLogEntry);
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResendMessageAction that = (ResendMessageAction) o;
return timestamp == that.timestamp
&& recipientId.equals(that.recipientId)
&& messageSendLogEntry.equals(that.messageSendLogEntry);
}
@Override
public int hashCode() {
return Objects.hash(recipientId, timestamp, messageSendLogEntry);
}
}

View file

@ -7,6 +7,7 @@ import org.asamk.signal.manager.UntrustedIdentityException;
import org.asamk.signal.manager.actions.HandleAction;
import org.asamk.signal.manager.actions.RefreshPreKeysAction;
import org.asamk.signal.manager.actions.RenewSessionAction;
import org.asamk.signal.manager.actions.ResendMessageAction;
import org.asamk.signal.manager.actions.RetrieveProfileAction;
import org.asamk.signal.manager.actions.RetrieveStorageDataAction;
import org.asamk.signal.manager.actions.SendGroupInfoAction;
@ -41,6 +42,7 @@ import org.signal.zkgroup.profiles.ProfileKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.libsignal.SignalProtocolAddress;
import org.whispersystems.libsignal.protocol.DecryptionErrorMessage;
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
@ -165,6 +167,13 @@ public final class IncomingMessageHandler {
// address/uuid is validated by unidentified sender certificate
account.getRecipientStore().resolveRecipientTrusted(content.getSender());
}
if (envelope.isReceipt()) {
final var senderPair = getSender(envelope, content);
final var sender = senderPair.first();
final var senderDeviceId = senderPair.second();
account.getMessageSendLogStore().deleteEntryForRecipient(envelope.getTimestamp(), sender, senderDeviceId);
}
if (isMessageBlocked(envelope, content)) {
logger.info("Ignoring a message from blocked user/group: {}", envelope.getTimestamp());
return List.of();
@ -198,6 +207,14 @@ public final class IncomingMessageHandler {
final var sender = senderPair.first();
final var senderDeviceId = senderPair.second();
if (content.getReceiptMessage().isPresent()) {
final var message = content.getReceiptMessage().get();
if (message.isDeliveryReceipt()) {
account.getMessageSendLogStore()
.deleteEntriesForRecipient(message.getTimestamps(), sender, senderDeviceId);
}
}
if (content.getSenderKeyDistributionMessage().isPresent()) {
final var message = content.getSenderKeyDistributionMessage().get();
final var protocolAddress = new SignalProtocolAddress(context.getRecipientHelper()
@ -212,15 +229,10 @@ public final class IncomingMessageHandler {
if (content.getDecryptionErrorMessage().isPresent()) {
var message = content.getDecryptionErrorMessage().get();
logger.debug("Received a decryption error message (resend request for {})", message.getTimestamp());
if (message.getRatchetKey().isPresent()) {
if (message.getDeviceId() == account.getDeviceId() && account.getSessionStore()
.isCurrentRatchetKey(sender, senderDeviceId, message.getRatchetKey().get())) {
logger.debug("Renewing the session with sender");
actions.add(new RenewSessionAction(sender));
}
if (message.getDeviceId() == account.getDeviceId()) {
handleDecryptionErrorMessage(actions, sender, senderDeviceId, message);
} else {
logger.debug("Reset shared sender keys with this recipient");
account.getSenderKeyStore().deleteSharedWith(sender);
logger.debug("Request is for another one of our devices");
}
}
@ -246,6 +258,54 @@ public final class IncomingMessageHandler {
return actions;
}
private void handleDecryptionErrorMessage(
final List<HandleAction> actions,
final RecipientId sender,
final int senderDeviceId,
final DecryptionErrorMessage message
) {
final var logEntries = account.getMessageSendLogStore()
.findMessages(sender, senderDeviceId, message.getTimestamp(), !message.getRatchetKey().isPresent());
for (final var logEntry : logEntries) {
actions.add(new ResendMessageAction(sender, message.getTimestamp(), logEntry));
}
if (message.getRatchetKey().isPresent()) {
if (account.getSessionStore().isCurrentRatchetKey(sender, senderDeviceId, message.getRatchetKey().get())) {
if (logEntries.isEmpty()) {
logger.debug("Renewing the session with sender");
actions.add(new RenewSessionAction(sender));
} else {
logger.trace("Archiving the session with sender, a resend message has already been queued");
context.getAccount().getSessionStore().archiveSessions(sender);
}
}
return;
}
var found = false;
for (final var logEntry : logEntries) {
if (logEntry.groupId().isEmpty()) {
continue;
}
final var group = account.getGroupStore().getGroup(logEntry.groupId().get());
if (group == null) {
continue;
}
found = true;
logger.trace("Deleting shared sender key with {} ({}): {}",
sender,
senderDeviceId,
group.getDistributionId());
account.getSenderKeyStore().deleteSharedWith(sender, senderDeviceId, group.getDistributionId());
}
if (!found) {
logger.debug("Reset all shared sender keys with this recipient, no related message found in send log");
account.getSenderKeyStore().deleteSharedWith(sender);
}
}
private List<HandleAction> handleSyncMessage(
final SignalServiceSyncMessage syncMessage, final RecipientId sender, final boolean ignoreAttachments
) {

View file

@ -1,5 +1,7 @@
package org.asamk.signal.manager.helper;
import com.google.protobuf.ByteString;
import org.asamk.signal.manager.SignalDependencies;
import org.asamk.signal.manager.api.UnregisteredRecipientException;
import org.asamk.signal.manager.groups.GroupId;
@ -11,11 +13,13 @@ import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.groups.GroupInfo;
import org.asamk.signal.manager.storage.recipients.Profile;
import org.asamk.signal.manager.storage.recipients.RecipientId;
import org.asamk.signal.manager.storage.sendLog.MessageSendLogEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.libsignal.InvalidKeyException;
import org.whispersystems.libsignal.InvalidRegistrationIdException;
import org.whispersystems.libsignal.NoSessionException;
import org.whispersystems.libsignal.SignalProtocolAddress;
import org.whispersystems.libsignal.protocol.DecryptionErrorMessage;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
@ -45,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class SendHelper {
@ -74,9 +79,7 @@ public class SendHelper {
messageBuilder.withProfileKey(account.getProfileKey().serialize());
final var message = messageBuilder.build();
final var result = sendMessage(message, recipientId);
handleSendMessageResult(result);
return result;
return sendMessage(message, recipientId);
}
/**
@ -90,29 +93,6 @@ public class SendHelper {
return sendAsGroupMessage(messageBuilder, g);
}
private List<SendMessageResult> sendAsGroupMessage(
final SignalServiceDataMessage.Builder messageBuilder, final GroupInfo g
) throws IOException, GroupSendingNotAllowedException {
GroupUtils.setGroupContext(messageBuilder, g);
messageBuilder.withExpiration(g.getMessageExpirationTimer());
final var message = messageBuilder.build();
final var recipients = g.getMembersWithout(account.getSelfRecipientId());
if (g.isAnnouncementGroup() && !g.isAdmin(account.getSelfRecipientId())) {
if (message.getBody().isPresent()
|| message.getAttachments().isPresent()
|| message.getQuote().isPresent()
|| message.getPreviews().isPresent()
|| message.getMentions().isPresent()
|| message.getSticker().isPresent()) {
throw new GroupSendingNotAllowedException(g.getGroupId(), g.getTitle());
}
}
return sendGroupMessage(message, recipients, g.getDistributionId());
}
/**
* Send a complete group message to the given recipients (should be current/old/new members)
* This method should only be used for create/update/quit group messages.
@ -122,31 +102,7 @@ public class SendHelper {
final Set<RecipientId> recipientIds,
final DistributionId distributionId
) throws IOException {
final var messageSender = dependencies.getMessageSender();
final var results = sendGroupMessageInternal((recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendDataMessage(
recipients,
unidentifiedAccess,
isRecipientUpdate,
ContentHint.DEFAULT,
message,
SignalServiceMessageSender.LegacyGroupEvents.EMPTY,
sendResult -> logger.trace("Partial message send result: {}", sendResult.isSuccess()),
() -> false),
(distId, recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendGroupDataMessage(distId,
recipients,
unidentifiedAccess,
isRecipientUpdate,
ContentHint.DEFAULT,
message,
SignalServiceMessageSender.SenderKeyGroupEvents.EMPTY),
recipientIds,
distributionId);
for (var r : results) {
handleSendMessageResult(r);
}
return results;
return sendGroupMessage(message, recipientIds, distributionId, ContentHint.IMPLICIT);
}
public SendMessageResult sendDeliveryReceipt(
@ -162,10 +118,14 @@ public class SendHelper {
public SendMessageResult sendReceiptMessage(
final SignalServiceReceiptMessage receiptMessage, final RecipientId recipientId
) {
return handleSendMessage(recipientId,
final var messageSendLogStore = account.getMessageSendLogStore();
final var result = handleSendMessage(recipientId,
(messageSender, address, unidentifiedAccess) -> messageSender.sendReceipt(address,
unidentifiedAccess,
receiptMessage));
messageSendLogStore.insertIfPossible(receiptMessage.getWhen(), result, ContentHint.IMPLICIT);
handleSendMessageResult(result);
return result;
}
public SendMessageResult sendRetryReceipt(
@ -175,15 +135,19 @@ public class SendHelper {
errorMessage.getTimestamp(),
recipientId,
errorMessage.getDeviceId());
return handleSendMessage(recipientId,
final var result = handleSendMessage(recipientId,
(messageSender, address, unidentifiedAccess) -> messageSender.sendRetryReceipt(address,
unidentifiedAccess,
groupId.transform(GroupId::serialize),
errorMessage));
handleSendMessageResult(result);
return result;
}
public SendMessageResult sendNullMessage(RecipientId recipientId) {
return handleSendMessage(recipientId, SignalServiceMessageSender::sendNullMessage);
final var result = handleSendMessage(recipientId, SignalServiceMessageSender::sendNullMessage);
handleSendMessageResult(result);
return result;
}
public SendMessageResult sendSelfMessage(
@ -225,10 +189,12 @@ public class SendHelper {
public SendMessageResult sendTypingMessage(
SignalServiceTypingMessage message, RecipientId recipientId
) {
return handleSendMessage(recipientId,
final var result = handleSendMessage(recipientId,
(messageSender, address, unidentifiedAccess) -> messageSender.sendTyping(address,
unidentifiedAccess,
message));
handleSendMessageResult(result);
return result;
}
public List<SendMessageResult> sendGroupTypingMessage(
@ -244,6 +210,142 @@ public class SendHelper {
return sendGroupTypingMessage(message, recipientIds, distributionId);
}
public SendMessageResult resendMessage(
final RecipientId recipientId, final long timestamp, final MessageSendLogEntry messageSendLogEntry
) {
if (messageSendLogEntry.groupId().isEmpty()) {
return handleSendMessage(recipientId,
(messageSender, address, unidentifiedAccess) -> messageSender.resendContent(address,
unidentifiedAccess,
timestamp,
messageSendLogEntry.content(),
messageSendLogEntry.contentHint(),
Optional.absent()));
}
final var groupId = messageSendLogEntry.groupId().get();
final var group = account.getGroupStore().getGroup(groupId);
if (group == null) {
logger.debug("Could not find a matching group for the groupId {}! Skipping message send.",
groupId.toBase64());
return null;
} else if (!group.getMembers().contains(recipientId)) {
logger.warn("The target user is no longer in the group {}! Skipping message send.", groupId.toBase64());
return null;
}
final var senderKeyDistributionMessage = dependencies.getMessageSender()
.getOrCreateNewGroupSession(group.getDistributionId());
final var distributionBytes = ByteString.copyFrom(senderKeyDistributionMessage.serialize());
final var contentToSend = messageSendLogEntry.content()
.toBuilder()
.setSenderKeyDistributionMessage(distributionBytes)
.build();
final var result = handleSendMessage(recipientId,
(messageSender, address, unidentifiedAccess) -> messageSender.resendContent(address,
unidentifiedAccess,
timestamp,
contentToSend,
messageSendLogEntry.contentHint(),
Optional.of(group.getGroupId().serialize())));
if (result.isSuccess()) {
final var address = context.getRecipientHelper().resolveSignalServiceAddress(recipientId);
final var addresses = result.getSuccess()
.getDevices()
.stream()
.map(device -> new SignalProtocolAddress(address.getIdentifier(), device))
.collect(Collectors.toList());
account.getSenderKeyStore().markSenderKeySharedWith(group.getDistributionId(), addresses);
}
return result;
}
private List<SendMessageResult> sendAsGroupMessage(
final SignalServiceDataMessage.Builder messageBuilder, final GroupInfo g
) throws IOException, GroupSendingNotAllowedException {
GroupUtils.setGroupContext(messageBuilder, g);
messageBuilder.withExpiration(g.getMessageExpirationTimer());
final var message = messageBuilder.build();
final var recipients = g.getMembersWithout(account.getSelfRecipientId());
if (g.isAnnouncementGroup() && !g.isAdmin(account.getSelfRecipientId())) {
if (message.getBody().isPresent()
|| message.getAttachments().isPresent()
|| message.getQuote().isPresent()
|| message.getPreviews().isPresent()
|| message.getMentions().isPresent()
|| message.getSticker().isPresent()) {
throw new GroupSendingNotAllowedException(g.getGroupId(), g.getTitle());
}
}
return sendGroupMessage(message, recipients, g.getDistributionId(), ContentHint.RESENDABLE);
}
private List<SendMessageResult> sendGroupMessage(
final SignalServiceDataMessage message,
final Set<RecipientId> recipientIds,
final DistributionId distributionId,
final ContentHint contentHint
) throws IOException {
final var messageSender = dependencies.getMessageSender();
final var messageSendLogStore = account.getMessageSendLogStore();
final AtomicLong entryId = new AtomicLong(-1);
final LegacySenderHandler legacySender = (recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendDataMessage(
recipients,
unidentifiedAccess,
isRecipientUpdate,
contentHint,
message,
SignalServiceMessageSender.LegacyGroupEvents.EMPTY,
sendResult -> {
logger.trace("Partial message send result: {}", sendResult.isSuccess());
synchronized (entryId) {
if (entryId.get() == -1) {
final var newId = messageSendLogStore.insertIfPossible(message.getTimestamp(),
sendResult,
contentHint);
entryId.set(newId);
} else {
messageSendLogStore.addRecipientToExistingEntryIfPossible(entryId.get(), sendResult);
}
}
},
() -> false);
final SenderKeySenderHandler senderKeySender = (distId, recipients, unidentifiedAccess, isRecipientUpdate) -> {
final var res = messageSender.sendGroupDataMessage(distId,
recipients,
unidentifiedAccess,
isRecipientUpdate,
contentHint,
message,
SignalServiceMessageSender.SenderKeyGroupEvents.EMPTY);
synchronized (entryId) {
if (entryId.get() == -1) {
final var newId = messageSendLogStore.insertIfPossible(message.getTimestamp(), res, contentHint);
entryId.set(newId);
} else {
messageSendLogStore.addRecipientToExistingEntryIfPossible(entryId.get(), res);
}
}
return res;
};
final var results = sendGroupMessageInternal(legacySender, senderKeySender, recipientIds, distributionId);
for (var r : results) {
handleSendMessageResult(r);
}
return results;
}
private List<SendMessageResult> sendGroupTypingMessage(
final SignalServiceTypingMessage message,
final Set<RecipientId> recipientIds,
@ -462,12 +564,16 @@ public class SendHelper {
private SendMessageResult sendMessage(
SignalServiceDataMessage message, RecipientId recipientId
) {
return handleSendMessage(recipientId,
final var messageSendLogStore = account.getMessageSendLogStore();
final var result = handleSendMessage(recipientId,
(messageSender, address, unidentifiedAccess) -> messageSender.sendDataMessage(address,
unidentifiedAccess,
ContentHint.DEFAULT,
ContentHint.RESENDABLE,
message,
SignalServiceMessageSender.IndividualSendEvents.EMPTY));
messageSendLogStore.insertIfPossible(message.getTimestamp(), result, ContentHint.RESENDABLE);
handleSendMessageResult(result);
return result;
}
private SendMessageResult handleSendMessage(RecipientId recipientId, SenderHandler s) {

View file

@ -0,0 +1,94 @@
package org.asamk.signal.manager.storage;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sqlite.SQLiteConfig;
import java.io.File;
import java.sql.Connection;
import java.sql.SQLException;
public class Database implements AutoCloseable {
private final static Logger logger = LoggerFactory.getLogger(SignalAccount.class);
private static final long DATABASE_VERSION = 1;
private final HikariDataSource dataSource;
private Database(final HikariDataSource dataSource) {
this.dataSource = dataSource;
}
public static Database init(File databaseFile) throws SQLException {
HikariDataSource dataSource = null;
try {
dataSource = getHikariDataSource(databaseFile.getAbsolutePath());
try (final var connection = dataSource.getConnection()) {
final var userVersion = getUserVersion(connection);
logger.trace("Current database version: {} Program database version: {}",
userVersion,
DATABASE_VERSION);
if (userVersion > DATABASE_VERSION) {
logger.error("Database has been updated by a newer signal-cli version");
throw new SQLException("Database has been updated by a newer signal-cli version");
} else if (userVersion < DATABASE_VERSION) {
if (userVersion < 1) {
logger.debug("Updating database: Creating message send log tables");
MessageSendLogStore.createSql(connection);
}
setUserVersion(connection, DATABASE_VERSION);
}
final var result = new Database(dataSource);
dataSource = null;
return result;
}
} finally {
if (dataSource != null) {
dataSource.close();
}
}
}
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
@Override
public void close() throws SQLException {
dataSource.close();
}
private static long getUserVersion(final Connection connection) throws SQLException {
try (final var statement = connection.createStatement()) {
final var resultSet = statement.executeQuery("PRAGMA user_version");
return resultSet.getLong(1);
}
}
private static void setUserVersion(final Connection connection, long userVersion) throws SQLException {
try (final var statement = connection.createStatement()) {
statement.executeUpdate("PRAGMA user_version = " + userVersion);
}
}
private static HikariDataSource getHikariDataSource(final String databaseFile) {
final var sqliteConfig = new SQLiteConfig();
sqliteConfig.setBusyTimeout(10_000);
sqliteConfig.setTransactionMode(SQLiteConfig.TransactionMode.IMMEDIATE);
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:sqlite:" + databaseFile);
config.setDataSourceProperties(sqliteConfig.toProperties());
config.setMinimumIdle(1);
config.setConnectionInitSql("PRAGMA foreign_keys=ON");
return new HikariDataSource(config);
}
}

View file

@ -27,6 +27,7 @@ import org.asamk.signal.manager.storage.recipients.Profile;
import org.asamk.signal.manager.storage.recipients.RecipientAddress;
import org.asamk.signal.manager.storage.recipients.RecipientId;
import org.asamk.signal.manager.storage.recipients.RecipientStore;
import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore;
import org.asamk.signal.manager.storage.senderKeys.SenderKeyStore;
import org.asamk.signal.manager.storage.sessions.SessionStore;
import org.asamk.signal.manager.storage.stickers.StickerStore;
@ -62,6 +63,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.security.SecureRandom;
import java.sql.SQLException;
import java.util.Base64;
import java.util.Date;
import java.util.HashSet;
@ -120,6 +122,9 @@ public class SignalAccount implements Closeable {
private ConfigurationStore.Storage configurationStoreStorage;
private MessageCache messageCache;
private MessageSendLogStore messageSendLogStore;
private Database database;
private SignalAccount(final FileChannel fileChannel, final FileLock lock) {
this.fileChannel = fileChannel;
@ -227,6 +232,10 @@ public class SignalAccount implements Closeable {
return signalAccount;
}
public void initDatabase() {
getDatabase();
}
private void clearAllPreKeys() {
this.preKeyIdOffset = new SecureRandom().nextInt(Medium.MAX_VALUE);
this.nextSignedPreKeyId = new SecureRandom().nextInt(Medium.MAX_VALUE);
@ -383,6 +392,10 @@ public class SignalAccount implements Closeable {
return new File(getUserPath(dataPath, account), "recipients-store");
}
private static File getDatabaseFile(File dataPath, String account) {
return new File(getUserPath(dataPath, account), "account.db");
}
public static boolean userExists(File dataPath, String account) {
if (account == null) {
return false;
@ -869,6 +882,21 @@ public class SignalAccount implements Closeable {
() -> messageCache = new MessageCache(getMessageCachePath(dataPath, account)));
}
public Database getDatabase() {
return getOrCreate(() -> database, () -> {
try {
database = Database.init(getDatabaseFile(dataPath, account));
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
}
public MessageSendLogStore getMessageSendLogStore() {
return getOrCreate(() -> messageSendLogStore,
() -> messageSendLogStore = new MessageSendLogStore(getRecipientStore(), getDatabase()));
}
public String getAccount() {
return account;
}
@ -1050,6 +1078,16 @@ public class SignalAccount implements Closeable {
@Override
public void close() {
synchronized (fileChannel) {
if (database != null) {
try {
database.close();
} catch (SQLException e) {
logger.warn("Failed to close account database: {}", e.getMessage(), e);
}
}
if (messageSendLogStore != null) {
messageSendLogStore.close();
}
try {
try {
lock.close();

View file

@ -0,0 +1,11 @@
package org.asamk.signal.manager.storage.sendLog;
import org.asamk.signal.manager.groups.GroupId;
import org.whispersystems.signalservice.api.crypto.ContentHint;
import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
import java.util.Optional;
public record MessageSendLogEntry(
Optional<GroupId> groupId, SignalServiceProtos.Content content, ContentHint contentHint
) {}

View file

@ -0,0 +1,396 @@
package org.asamk.signal.manager.storage.sendLog;
import org.asamk.signal.manager.groups.GroupId;
import org.asamk.signal.manager.groups.GroupUtils;
import org.asamk.signal.manager.storage.Database;
import org.asamk.signal.manager.storage.recipients.RecipientId;
import org.asamk.signal.manager.storage.recipients.RecipientResolver;
import org.signal.zkgroup.InvalidInputException;
import org.signal.zkgroup.groups.GroupMasterKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.signalservice.api.crypto.ContentHint;
import org.whispersystems.signalservice.api.messages.SendMessageResult;
import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class MessageSendLogStore implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
private static final Duration LOG_DURATION = Duration.ofDays(1);
private final RecipientResolver recipientResolver;
private final Database database;
private final Thread cleanupThread;
public MessageSendLogStore(
final RecipientResolver recipientResolver, final Database database
) {
this.recipientResolver = recipientResolver;
this.database = database;
this.cleanupThread = new Thread(() -> {
try {
final var interval = Duration.ofHours(1).toMillis();
while (true) {
try (final var connection = database.getConnection()) {
deleteOutdatedEntries(connection);
Thread.sleep(interval);
} catch (SQLException e) {
logger.warn("Deleting outdated entries failed");
break;
}
}
} catch (InterruptedException e) {
logger.debug("Stopping msl cleanup thread");
}
});
cleanupThread.setDaemon(true);
cleanupThread.start();
}
public static void createSql(Connection connection) throws SQLException {
try (final var statement = connection.createStatement()) {
statement.executeUpdate("""
CREATE TABLE message_send_log (
_id INTEGER PRIMARY KEY,
content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
recipient_id INTEGER NOT NULL,
device_id INTEGER NOT NULL
);
CREATE TABLE message_send_log_content (
_id INTEGER PRIMARY KEY,
group_id BLOB,
timestamp INTEGER NOT NULL,
content BLOB NOT NULL,
content_hint INTEGER NOT NULL
);
CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
CREATE INDEX msl_content_index ON message_send_log (content_id);
""");
}
}
public List<MessageSendLogEntry> findMessages(
final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey
) {
try (final var connection = database.getConnection()) {
deleteOutdatedEntries(connection);
try (final var statement = connection.prepareStatement(
"SELECT group_id, content, content_hint FROM %s l INNER JOIN %s lc ON l.content_id = lc._id WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?".formatted(
TABLE_MESSAGE_SEND_LOG,
TABLE_MESSAGE_SEND_LOG_CONTENT))) {
statement.setLong(1, recipientId.id());
statement.setInt(2, deviceId);
statement.setLong(3, timestamp);
try (var result = executeQueryForStream(statement, resultSet -> {
final var groupId = Optional.ofNullable(resultSet.getBytes("group_id"))
.map(GroupId::unknownVersion);
final SignalServiceProtos.Content content;
try {
content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
} catch (IOException e) {
logger.warn("Failed to parse content from message send log", e);
return null;
}
final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
return new MessageSendLogEntry(groupId, content, contentHint);
})) {
return result.filter(Objects::nonNull)
.filter(e -> !isSenderKey || e.groupId().isPresent())
.toList();
}
}
} catch (SQLException e) {
logger.warn("Failed read from message send log", e);
return List.of();
}
}
public long insertIfPossible(
long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint
) {
final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
if (recipientDevice == null) {
return -1;
}
return insert(List.of(recipientDevice),
sentTimestamp,
sendMessageResult.getSuccess().getContent().get(),
contentHint);
}
public long insertIfPossible(
long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint
) {
final var recipientDevices = sendMessageResults.stream()
.map(this::getRecipientDevices)
.filter(Objects::nonNull)
.toList();
if (recipientDevices.isEmpty()) {
return -1;
}
final var content = sendMessageResults.stream()
.filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
.map(r -> r.getSuccess().getContent().get())
.findFirst()
.get();
return insert(recipientDevices, sentTimestamp, content, contentHint);
}
public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
if (recipientDevice == null) {
return;
}
insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
}
public void addRecipientToExistingEntryIfPossible(
final long contentId, final List<SendMessageResult> sendMessageResults
) {
final var recipientDevices = sendMessageResults.stream()
.map(this::getRecipientDevices)
.filter(Objects::nonNull)
.toList();
if (recipientDevices.isEmpty()) {
return;
}
insertRecipientsForExistingContent(contentId, recipientDevices);
}
public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
try (final var connection = database.getConnection()) {
try (final var statement = connection.prepareStatement(
"DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id = ?".formatted(
TABLE_MESSAGE_SEND_LOG_CONTENT))) {
statement.setLong(1, sentTimestamp);
statement.setBytes(2, groupId.serialize());
statement.executeUpdate();
}
} catch (SQLException e) {
logger.warn("Failed delete from message send log", e);
}
}
public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) {
try (final var connection = database.getConnection()) {
connection.setAutoCommit(false);
try (final var statement = connection.prepareStatement(
"DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)".formatted(
TABLE_MESSAGE_SEND_LOG_CONTENT,
TABLE_MESSAGE_SEND_LOG))) {
statement.setLong(1, sentTimestamp);
statement.setLong(2, recipientId.id());
statement.executeUpdate();
}
deleteOrphanedLogContents(connection);
connection.commit();
} catch (SQLException e) {
logger.warn("Failed delete from message send log", e);
}
}
public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) {
deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId);
}
public void deleteEntriesForRecipient(List<Long> sentTimestamps, RecipientId recipientId, int deviceId) {
try (final var connection = database.getConnection()) {
connection.setAutoCommit(false);
try (final var statement = connection.prepareStatement(
"DELETE FROM %s AS l WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?".formatted(
TABLE_MESSAGE_SEND_LOG,
TABLE_MESSAGE_SEND_LOG_CONTENT))) {
for (final var sentTimestamp : sentTimestamps) {
statement.setLong(1, sentTimestamp);
statement.setLong(2, recipientId.id());
statement.setInt(3, deviceId);
statement.executeUpdate();
}
}
deleteOrphanedLogContents(connection);
connection.commit();
} catch (SQLException e) {
logger.warn("Failed delete from message send log", e);
}
}
@Override
public void close() {
cleanupThread.interrupt();
try {
cleanupThread.join();
} catch (InterruptedException ignored) {
}
}
private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress());
return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices());
} else {
return null;
}
}
private long insert(
final List<RecipientDevices> recipientDevices,
final long sentTimestamp,
final SignalServiceProtos.Content content,
final ContentHint contentHint
) {
byte[] groupId = getGroupId(content);
try (final var connection = database.getConnection()) {
connection.setAutoCommit(false);
final long contentId;
try (final var statement = connection.prepareStatement(
"INSERT INTO %s (timestamp, group_id, content, content_hint) VALUES (?,?,?,?)".formatted(
TABLE_MESSAGE_SEND_LOG_CONTENT))) {
statement.setLong(1, sentTimestamp);
statement.setBytes(2, groupId);
statement.setBytes(3, content.toByteArray());
statement.setInt(4, contentHint.getType());
statement.executeUpdate();
final var generatedKeys = statement.getGeneratedKeys();
if (generatedKeys.next()) {
contentId = generatedKeys.getLong(1);
} else {
contentId = -1;
}
}
if (contentId == -1) {
logger.warn("Failed to insert message send log content");
return -1;
}
insertRecipientsForExistingContent(contentId, recipientDevices, connection);
connection.commit();
return contentId;
} catch (SQLException e) {
logger.warn("Failed to insert into message send log", e);
return -1;
}
}
private byte[] getGroupId(final SignalServiceProtos.Content content) {
try {
return !content.hasDataMessage()
? null
: content.getDataMessage().hasGroup()
? content.getDataMessage().getGroup().getId().toByteArray()
: content.getDataMessage().hasGroupV2()
? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
.getGroupV2()
.getMasterKey()
.toByteArray())).serialize()
: null;
} catch (InvalidInputException e) {
logger.warn("Failed to parse groupId id from content");
return null;
}
}
private void insertRecipientsForExistingContent(
final long contentId, final List<RecipientDevices> recipientDevices
) {
try (final var connection = database.getConnection()) {
connection.setAutoCommit(false);
insertRecipientsForExistingContent(contentId, recipientDevices, connection);
connection.commit();
} catch (SQLException e) {
logger.warn("Failed to append recipients to message send log", e);
}
}
private void insertRecipientsForExistingContent(
final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
) throws SQLException {
try (final var statement = connection.prepareStatement(
"INSERT INTO %s (recipient_id, device_id, content_id) VALUES (?,?,?)".formatted(TABLE_MESSAGE_SEND_LOG))) {
for (final var recipientDevice : recipientDevices) {
for (final var deviceId : recipientDevice.deviceIds()) {
statement.setLong(1, recipientDevice.recipientId().id());
statement.setInt(2, deviceId);
statement.setLong(3, contentId);
statement.executeUpdate();
}
}
}
}
private void deleteOutdatedEntries(final Connection connection) throws SQLException {
try (final var statement = connection.prepareStatement("DELETE FROM %s WHERE timestamp < ?".formatted(
TABLE_MESSAGE_SEND_LOG_CONTENT))) {
statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
final var rowCount = statement.executeUpdate();
if (rowCount > 0) {
logger.debug("Removed {} outdated entries from the message send log", rowCount);
}
}
}
private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
try (final var statement = connection.prepareStatement(
"DELETE FROM %s WHERE _id NOT IN (SELECT content_id FROM %s)".formatted(TABLE_MESSAGE_SEND_LOG_CONTENT,
TABLE_MESSAGE_SEND_LOG))) {
statement.executeUpdate();
}
}
private <T> Stream<T> executeQueryForStream(
PreparedStatement statement, ResultSetMapper<T> mapper
) throws SQLException {
final var resultSet = statement.executeQuery();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
@Override
public boolean tryAdvance(final Consumer<? super T> consumer) {
try {
if (!resultSet.next()) {
return false;
}
consumer.accept(mapper.apply(resultSet));
return true;
} catch (SQLException e) {
logger.warn("Failed to read from database result", e);
throw new RuntimeException(e);
}
}
}, false);
}
private interface ResultSetMapper<T> {
T apply(ResultSet resultSet) throws SQLException;
}
private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
}

View file

@ -164,6 +164,21 @@ public class SenderKeySharedStore {
}
}
public void deleteSharedWith(
final RecipientId recipientId, final int deviceId, final DistributionId distributionId
) {
synchronized (sharedSenderKeys) {
final var entries = sharedSenderKeys.getOrDefault(distributionId.asUuid(), Set.of());
sharedSenderKeys.put(distributionId.asUuid(), new HashSet<>(entries) {
{
remove(new SenderKeySharedEntry(recipientId, deviceId));
}
});
saveLocked();
}
}
public void deleteAllFor(final DistributionId distributionId) {
synchronized (sharedSenderKeys) {
if (sharedSenderKeys.remove(distributionId.asUuid()) != null) {

View file

@ -71,6 +71,10 @@ public class SenderKeyStore implements SignalServiceSenderKeyStore {
senderKeySharedStore.deleteAllFor(recipientId);
}
public void deleteSharedWith(RecipientId recipientId, int deviceId, DistributionId distributionId) {
senderKeySharedStore.deleteSharedWith(recipientId, deviceId, distributionId);
}
public void deleteOurKey(RecipientId selfRecipientId, DistributionId distributionId) {
senderKeySharedStore.deleteAllFor(distributionId);
senderKeyRecordStore.deleteSenderKey(selfRecipientId, distributionId.asUuid());