From c8b89f245e47f1dfd5c6e149319bf6473cc75458 Mon Sep 17 00:00:00 2001 From: Luck Date: Fri, 5 Feb 2021 12:00:16 +0000 Subject: [PATCH] Tidy up RabbitMQMessenger a bit --- .../messaging/rabbitmq/RabbitMQMessenger.java | 127 ++++++++++-------- .../messaging/redis/RedisMessenger.java | 19 +-- 2 files changed, 77 insertions(+), 69 deletions(-) diff --git a/common/src/main/java/me/lucko/luckperms/common/messaging/rabbitmq/RabbitMQMessenger.java b/common/src/main/java/me/lucko/luckperms/common/messaging/rabbitmq/RabbitMQMessenger.java index ac16fac5c..6d84deb2c 100644 --- a/common/src/main/java/me/lucko/luckperms/common/messaging/rabbitmq/RabbitMQMessenger.java +++ b/common/src/main/java/me/lucko/luckperms/common/messaging/rabbitmq/RabbitMQMessenger.java @@ -51,6 +51,9 @@ public class RabbitMQMessenger implements Messenger { private static final int DEFAULT_PORT = 5672; private static final String EXCHANGE = "luckperms"; private static final String ROUTING_KEY = "luckperms:update"; + private static final boolean CHANNEL_PROP_DURABLE = false; + private static final boolean CHANNEL_PROP_EXCLUSIVE = true; + private static final boolean CHANNEL_PROP_AUTO_DELETE = true; private final LuckPermsPlugin plugin; private final IncomingMessageConsumer consumer; @@ -77,7 +80,7 @@ public class RabbitMQMessenger implements Messenger { this.connectionFactory.setUsername(username); this.connectionFactory.setPassword(password); - this.sub = new Subscription(this); + this.sub = new Subscription(); this.plugin.getBootstrap().getScheduler().executeAsync(this.sub); } @@ -103,20 +106,68 @@ public class RabbitMQMessenger implements Messenger { } } - private static class Subscription implements Runnable { - private final RabbitMQMessenger parent; - private boolean isClosed = false; - private boolean firstStartup = true; + /** + * Checks the connection, and re-opens it if necessary. + * + * @return true if the connection is now alive, false otherwise + */ + private boolean checkAndReopenConnection(boolean firstStartup) { + boolean connectionAlive = this.connection != null && this.connection.isOpen(); + boolean channelAlive = this.channel != null && this.channel.isOpen(); - private Subscription(RabbitMQMessenger parent) { - this.parent = parent; + if (connectionAlive && channelAlive) { + return true; } + // cleanup existing + if (this.channel != null && this.channel.isOpen()) { + try { + this.channel.close(); + } catch (Exception e) { + // ignore + } + } + if (this.connection != null && this.connection.isOpen()) { + try { + this.connection.close(); + } catch (Exception e) { + // ignore + } + } + + // (re)create + + if (!firstStartup) { + this.plugin.getLogger().warn("RabbitMQ pubsub connection dropped, trying to re-open the connection"); + } + + try { + this.connection = this.connectionFactory.newConnection(); + this.channel = this.connection.createChannel(); + + String queue = this.channel.queueDeclare("", CHANNEL_PROP_DURABLE, CHANNEL_PROP_EXCLUSIVE, CHANNEL_PROP_AUTO_DELETE, null).getQueue(); + this.channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, CHANNEL_PROP_DURABLE, CHANNEL_PROP_AUTO_DELETE, null); + this.channel.queueBind(queue, EXCHANGE, ROUTING_KEY); + this.channel.basicConsume(queue, true, this.sub, tag -> {}); + + if (!firstStartup) { + this.plugin.getLogger().info("RabbitMQ pubsub connection re-established"); + } + return true; + } catch (Exception ignored) { + return false; + } + } + + private class Subscription implements Runnable, DeliverCallback { + private boolean isClosed = false; + @Override public void run() { + boolean firstStartup = true; while (!Thread.interrupted() && !this.isClosed) { try { - if (!checkAndReopenConnection()) { + if (!checkAndReopenConnection(firstStartup)) { // Sleep for 5 seconds to prevent massive spam in console Thread.sleep(5000); continue; @@ -124,61 +175,23 @@ public class RabbitMQMessenger implements Messenger { // Check connection life every every 30 seconds Thread.sleep(30_000); - } catch (InterruptedException ie) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { - this.firstStartup = false; + firstStartup = false; } } } - private boolean checkAndReopenConnection() { - boolean channelIsDead = this.parent.channel == null || !this.parent.channel.isOpen(); - if (channelIsDead) { - boolean connectionIsDead = this.parent.connection == null || !this.parent.connection.isOpen(); - if (connectionIsDead) { - if (!this.firstStartup) { - this.parent.plugin.getLogger().warn("RabbitMQ pubsub connection dropped, trying to re-open the connection"); - } - try { - this.parent.connection = this.parent.connectionFactory.newConnection(); - this.parent.channel = this.parent.connection.createChannel(); - - String queue = this.parent.channel.queueDeclare("", false, true, true, null).getQueue(); - this.parent.channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, false, true, null); - this.parent.channel.queueBind(queue, EXCHANGE, ROUTING_KEY); - this.parent.channel.basicConsume(queue, true, new ChannelListener(), (consumerTag) -> { }); - - if (!this.firstStartup) { - this.parent.plugin.getLogger().info("RabbitMQ pubsub connection re-established"); - } - return true; - } catch (Exception ignored) { - return false; - } - } else { - try { - this.parent.channel = this.parent.connection.createChannel(); - return true; - } catch (Exception ignored) { - return false; - } - } - } - return true; - } - - private class ChannelListener implements DeliverCallback { - @Override - public void handle(String consumerTag, Delivery message) { - try { - byte[] data = message.getBody(); - ByteArrayDataInput input = ByteStreams.newDataInput(data); - String msg = input.readUTF(); - Subscription.this.parent.consumer.consumeIncomingMessageAsString(msg); - } catch (Exception e) { - e.printStackTrace(); - } + @Override + public void handle(String consumerTag, Delivery message) { + try { + byte[] data = message.getBody(); + ByteArrayDataInput input = ByteStreams.newDataInput(data); + String msg = input.readUTF(); + RabbitMQMessenger.this.consumer.consumeIncomingMessageAsString(msg); + } catch (Exception e) { + e.printStackTrace(); } } } diff --git a/common/src/main/java/me/lucko/luckperms/common/messaging/redis/RedisMessenger.java b/common/src/main/java/me/lucko/luckperms/common/messaging/redis/RedisMessenger.java index 8ea272487..859a93ed4 100644 --- a/common/src/main/java/me/lucko/luckperms/common/messaging/redis/RedisMessenger.java +++ b/common/src/main/java/me/lucko/luckperms/common/messaging/redis/RedisMessenger.java @@ -63,7 +63,7 @@ public class RedisMessenger implements Messenger { this.jedisPool = new JedisPool(new JedisPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, password, ssl); - this.sub = new Subscription(this); + this.sub = new Subscription(); this.plugin.getBootstrap().getScheduler().executeAsync(this.sub); } @@ -82,26 +82,21 @@ public class RedisMessenger implements Messenger { this.jedisPool.destroy(); } - private static class Subscription extends JedisPubSub implements Runnable { - private final RedisMessenger parent; - - private Subscription(RedisMessenger parent) { - this.parent = parent; - } + private class Subscription extends JedisPubSub implements Runnable { @Override public void run() { boolean wasBroken = false; - while (!Thread.interrupted() && !this.parent.jedisPool.isClosed()) { - try (Jedis jedis = this.parent.jedisPool.getResource()) { + while (!Thread.interrupted() && !RedisMessenger.this.jedisPool.isClosed()) { + try (Jedis jedis = RedisMessenger.this.jedisPool.getResource()) { if (wasBroken) { - this.parent.plugin.getLogger().info("Redis pubsub connection re-established"); + RedisMessenger.this.plugin.getLogger().info("Redis pubsub connection re-established"); wasBroken = false; } jedis.subscribe(this, CHANNEL); } catch (Exception e) { wasBroken = true; - this.parent.plugin.getLogger().warn("Redis pubsub connection dropped, trying to re-open the connection", e); + RedisMessenger.this.plugin.getLogger().warn("Redis pubsub connection dropped, trying to re-open the connection", e); try { unsubscribe(); } catch (Exception ignored) { @@ -123,7 +118,7 @@ public class RedisMessenger implements Messenger { if (!channel.equals(CHANNEL)) { return; } - this.parent.consumer.consumeIncomingMessageAsString(msg); + RedisMessenger.this.consumer.consumeIncomingMessageAsString(msg); } }