Use Duration for timeout

This commit is contained in:
AsamK 2021-12-20 16:12:37 +01:00
parent fa3c79828a
commit 71e0c3f80c
4 changed files with 16 additions and 14 deletions

View file

@ -33,6 +33,7 @@ import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -219,7 +220,7 @@ public interface Manager extends Closeable {
/** /**
* Receive new messages from server, returns if no new message arrive in a timespan of timeout. * Receive new messages from server, returns if no new message arrive in a timespan of timeout.
*/ */
void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException; void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException;
/** /**
* Receive new messages from server, returns only if the thread is interrupted. * Receive new messages from server, returns only if the thread is interrupted.

View file

@ -97,6 +97,7 @@ import java.net.URISyntaxException;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.SignatureException; import java.security.SignatureException;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -1005,7 +1006,7 @@ public class ManagerImpl implements Manager {
logger.debug("Starting receiving messages"); logger.debug("Starting receiving messages");
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
try { try {
receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> { receiveMessagesInternal(Duration.ofMinutes(1), false, (envelope, e) -> {
synchronized (messageHandlers) { synchronized (messageHandlers) {
Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
try { try {
@ -1072,17 +1073,17 @@ public class ManagerImpl implements Manager {
} }
@Override @Override
public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException { public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException {
receiveMessages(timeout, unit, true, handler); receiveMessages(timeout, true, handler);
} }
@Override @Override
public void receiveMessages(ReceiveMessageHandler handler) throws IOException { public void receiveMessages(ReceiveMessageHandler handler) throws IOException {
receiveMessages(1L, TimeUnit.HOURS, false, handler); receiveMessages(Duration.ofMinutes(1), false, handler);
} }
private void receiveMessages( private void receiveMessages(
long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
) throws IOException { ) throws IOException {
if (isReceiving()) { if (isReceiving()) {
throw new IllegalStateException("Already receiving message."); throw new IllegalStateException("Already receiving message.");
@ -1090,7 +1091,7 @@ public class ManagerImpl implements Manager {
isReceivingSynchronous = true; isReceivingSynchronous = true;
receiveThread = Thread.currentThread(); receiveThread = Thread.currentThread();
try { try {
receiveMessagesInternal(timeout, unit, returnOnTimeout, handler); receiveMessagesInternal(timeout, returnOnTimeout, handler);
} finally { } finally {
receiveThread = null; receiveThread = null;
hasCaughtUpWithOldMessages = false; hasCaughtUpWithOldMessages = false;
@ -1099,7 +1100,7 @@ public class ManagerImpl implements Manager {
} }
private void receiveMessagesInternal( private void receiveMessagesInternal(
long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
) throws IOException { ) throws IOException {
retryFailedReceivedMessages(handler); retryFailedReceivedMessages(handler);
@ -1128,7 +1129,7 @@ public class ManagerImpl implements Manager {
} }
logger.debug("Checking for new message from server"); logger.debug("Checking for new message from server");
try { try {
var result = signalWebSocket.readOrEmpty(unit.toMillis(timeout), envelope1 -> { var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
final var recipientId = envelope1.hasSourceUuid() final var recipientId = envelope1.hasSourceUuid()
? resolveRecipient(envelope1.getSourceAddress()) ? resolveRecipient(envelope1.getSourceAddress())
: null; : null;

View file

@ -17,8 +17,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
public class ReceiveCommand implements LocalCommand { public class ReceiveCommand implements LocalCommand {
@ -59,7 +59,7 @@ public class ReceiveCommand implements LocalCommand {
if (timeout < 0) { if (timeout < 0) {
m.receiveMessages(handler); m.receiveMessages(handler);
} else { } else {
m.receiveMessages((long) (timeout * 1000), TimeUnit.MILLISECONDS, handler); m.receiveMessages(Duration.ofMillis((long) (timeout * 1000)), handler);
} }
} catch (IOException e) { } catch (IOException e) {
throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e); throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e);

View file

@ -42,6 +42,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -50,7 +51,6 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -494,11 +494,11 @@ public class DbusManagerImpl implements Manager {
@Override @Override
public void receiveMessages( public void receiveMessages(
final long timeout, final TimeUnit unit, final ReceiveMessageHandler handler final Duration timeout, final ReceiveMessageHandler handler
) throws IOException { ) throws IOException {
addReceiveHandler(handler); addReceiveHandler(handler);
try { try {
Thread.sleep(unit.toMillis(timeout)); Thread.sleep(timeout.toMillis());
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
removeReceiveHandler(handler); removeReceiveHandler(handler);