From a8dfd38fe6225a21127ca95b2d35be1b0907e462 Mon Sep 17 00:00:00 2001 From: ruViolence <78062896+ruViolence@users.noreply.github.com> Date: Fri, 5 Feb 2021 01:10:30 +0800 Subject: [PATCH] RabbitMQ messenger support (#2874) --- bukkit/build.gradle | 1 + bukkit/src/main/resources/config.yml | 10 + bungee/build.gradle | 1 + bungee/src/main/resources/config.yml | 10 + common/build.gradle | 1 + .../luckperms/common/config/ConfigKeys.java | 20 ++ .../common/dependencies/Dependency.java | 7 + .../dependencies/DependencyRegistry.java | 4 + .../common/messaging/MessagingFactory.java | 34 ++++ .../messaging/rabbitmq/RabbitMQMessenger.java | 185 ++++++++++++++++++ fabric/build.gradle | 1 + fabric/src/main/resources/luckperms.conf | 11 ++ nukkit/build.gradle | 1 + nukkit/src/main/resources/config.yml | 10 + sponge/build.gradle | 1 + sponge/src/main/resources/luckperms.conf | 11 ++ velocity/build.gradle | 1 + velocity/src/main/resources/config.yml | 10 + 18 files changed, 319 insertions(+) create mode 100644 common/src/main/java/me/lucko/luckperms/common/messaging/rabbitmq/RabbitMQMessenger.java diff --git a/bukkit/build.gradle b/bukkit/build.gradle index 8b54c2d00..e785456d3 100644 --- a/bukkit/build.gradle +++ b/bukkit/build.gradle @@ -48,6 +48,7 @@ shadowJar { relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb' relocate 'org.bson', 'me.lucko.luckperms.lib.bson' relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis' + relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq' relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2' relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate' } diff --git a/bukkit/src/main/resources/config.yml b/bukkit/src/main/resources/config.yml index ff1085c00..acbbeaa45 100644 --- a/bukkit/src/main/resources/config.yml +++ b/bukkit/src/main/resources/config.yml @@ -223,6 +223,8 @@ watch-files: true # installed. # => redis Uses Redis pub-sub to push changes. Your server connection info must be configured # below. +# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be +# configured below. # => auto Attempts to automatically setup a messaging service using redis or sql. messaging-service: auto @@ -246,6 +248,14 @@ redis: address: localhost password: '' +# Settings for RabbitMQ. +# Port 5672 is used by default; set address to "host:port" if differs +rabbitmq: + enabled: false + address: localhost + username: 'guest' + password: 'guest' + diff --git a/bungee/build.gradle b/bungee/build.gradle index d601b85a4..29ebf6806 100644 --- a/bungee/build.gradle +++ b/bungee/build.gradle @@ -39,6 +39,7 @@ shadowJar { relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb' relocate 'org.bson', 'me.lucko.luckperms.lib.bson' relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis' + relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq' relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2' relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate' } diff --git a/bungee/src/main/resources/config.yml b/bungee/src/main/resources/config.yml index 1c1d4e387..b73d3f966 100644 --- a/bungee/src/main/resources/config.yml +++ b/bungee/src/main/resources/config.yml @@ -221,6 +221,8 @@ watch-files: true # configured below. # => redisbungee Uses Redis pub-sub to push changes, using the RedisBungee API. You need to have # the RedisBungee plugin installed. +# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be +# configured below. # => auto Attempts to automatically setup a messaging service using redis or sql. messaging-service: auto @@ -244,6 +246,14 @@ redis: address: localhost password: '' +# Settings for RabbitMQ. +# Port 5672 is used by default; set address to "host:port" if differs +rabbitmq: + enabled: false + address: localhost + username: 'guest' + password: 'guest' + diff --git a/common/build.gradle b/common/build.gradle index 3f66ed81f..f1edd15e7 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -51,6 +51,7 @@ dependencies { } compile 'com.zaxxer:HikariCP:3.4.5' compile 'redis.clients:jedis:3.3.0' + compile 'com.rabbitmq:amqp-client:5.10.0' compile 'org.mongodb:mongo-java-driver:3.12.2' compile 'org.yaml:snakeyaml:1.23' } diff --git a/common/src/main/java/me/lucko/luckperms/common/config/ConfigKeys.java b/common/src/main/java/me/lucko/luckperms/common/config/ConfigKeys.java index 915c4d25c..fd7624c30 100644 --- a/common/src/main/java/me/lucko/luckperms/common/config/ConfigKeys.java +++ b/common/src/main/java/me/lucko/luckperms/common/config/ConfigKeys.java @@ -619,6 +619,26 @@ public final class ConfigKeys { */ public static final ConfigKey REDIS_SSL = notReloadable(booleanKey("redis.ssl", false)); + /** + * If rabbitmq messaging is enabled + */ + public static final ConfigKey RABBITMQ_ENABLED = notReloadable(booleanKey("rabbitmq.enabled", false)); + + /** + * The address of the rabbitmq server + */ + public static final ConfigKey RABBITMQ_ADDRESS = notReloadable(stringKey("rabbitmq.address", null)); + + /** + * The username in use by the rabbitmq server + */ + public static final ConfigKey RABBITMQ_USERNAME = notReloadable(stringKey("rabbitmq.username", "guest")); + + /** + * The password in use by the rabbitmq server, or an empty string if there is no password + */ + public static final ConfigKey RABBITMQ_PASSWORD = notReloadable(stringKey("rabbitmq.password", "guest")); + /** * The URL of the bytebin instance used to upload data */ diff --git a/common/src/main/java/me/lucko/luckperms/common/dependencies/Dependency.java b/common/src/main/java/me/lucko/luckperms/common/dependencies/Dependency.java index 359b8e9aa..f784f5762 100644 --- a/common/src/main/java/me/lucko/luckperms/common/dependencies/Dependency.java +++ b/common/src/main/java/me/lucko/luckperms/common/dependencies/Dependency.java @@ -212,6 +212,13 @@ public enum Dependency { Relocation.of("jedis", "redis{}clients{}jedis"), Relocation.of("commonspool2", "org{}apache{}commons{}pool2") ), + RABBITMQ( + "com{}rabbitmq", + "amqp-client", + "5.10.0", + "z9/pvANy1mMAQb+VJtN+COybfxPeg/m9BgcfZRO1jcE=", + Relocation.of("rabbitmq", "com{}rabbitmq") + ), COMMONS_POOL_2( "org.apache.commons", "commons-pool2", diff --git a/common/src/main/java/me/lucko/luckperms/common/dependencies/DependencyRegistry.java b/common/src/main/java/me/lucko/luckperms/common/dependencies/DependencyRegistry.java index a98c49b6a..3f6698b30 100644 --- a/common/src/main/java/me/lucko/luckperms/common/dependencies/DependencyRegistry.java +++ b/common/src/main/java/me/lucko/luckperms/common/dependencies/DependencyRegistry.java @@ -82,6 +82,10 @@ public class DependencyRegistry { dependencies.add(Dependency.SLF4J_SIMPLE); } + if (this.plugin.getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)) { + dependencies.add(Dependency.RABBITMQ); + } + // don't load slf4j if it's already present if ((dependencies.contains(Dependency.SLF4J_API) || dependencies.contains(Dependency.SLF4J_SIMPLE)) && slf4jPresent()) { dependencies.remove(Dependency.SLF4J_API); diff --git a/common/src/main/java/me/lucko/luckperms/common/messaging/MessagingFactory.java b/common/src/main/java/me/lucko/luckperms/common/messaging/MessagingFactory.java index 1902acbc6..e549661e8 100644 --- a/common/src/main/java/me/lucko/luckperms/common/messaging/MessagingFactory.java +++ b/common/src/main/java/me/lucko/luckperms/common/messaging/MessagingFactory.java @@ -27,6 +27,7 @@ package me.lucko.luckperms.common.messaging; import me.lucko.luckperms.common.config.ConfigKeys; import me.lucko.luckperms.common.config.LuckPermsConfiguration; +import me.lucko.luckperms.common.messaging.rabbitmq.RabbitMQMessenger; import me.lucko.luckperms.common.messaging.redis.RedisMessenger; import me.lucko.luckperms.common.messaging.sql.SqlMessenger; import me.lucko.luckperms.common.plugin.LuckPermsPlugin; @@ -62,6 +63,8 @@ public class MessagingFactory

{ if (messagingType.equals("auto")) { if (this.plugin.getConfiguration().get(ConfigKeys.REDIS_ENABLED)) { messagingType = "redis"; + } else if (this.plugin.getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)) { + messagingType = "rabbitmq"; } else { for (StorageImplementation implementation : this.plugin.getStorage().getImplementations()) { if (implementation instanceof SqlStorage) { @@ -101,6 +104,16 @@ public class MessagingFactory

{ } else { this.plugin.getLogger().warn("Messaging Service was set to redis, but redis is not enabled!"); } + } else if (messagingType.equals("rabbitmq")) { + if (this.plugin.getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)) { + try { + return new LuckPermsMessagingService(this.plugin, new RabbitMQMessengerProvider()); + } catch (Exception e) { + getPlugin().getLogger().severe("Exception occurred whilst enabling RabbitMQ messaging service", e); + } + } else { + this.plugin.getLogger().warn("Messaging Service was set to rabbitmq, but rabbitmq is not enabled!"); + } } else if (messagingType.equals("sql")) { try { return new LuckPermsMessagingService(this.plugin, new SqlMessengerProvider()); @@ -136,6 +149,27 @@ public class MessagingFactory

{ } } + private class RabbitMQMessengerProvider implements MessengerProvider { + + @Override + public @NonNull String getName() { + return "RabbitMQ"; + } + + @Override + public @NonNull Messenger obtain(@NonNull IncomingMessageConsumer incomingMessageConsumer) { + RabbitMQMessenger rabbitmq = new RabbitMQMessenger(getPlugin(), incomingMessageConsumer); + + LuckPermsConfiguration config = getPlugin().getConfiguration(); + String address = config.get(ConfigKeys.RABBITMQ_ADDRESS); + String username = config.get(ConfigKeys.RABBITMQ_USERNAME); + String password = config.get(ConfigKeys.RABBITMQ_PASSWORD); + + rabbitmq.init(address, username, password); + return rabbitmq; + } + } + private class SqlMessengerProvider implements MessengerProvider { @Override diff --git a/common/src/main/java/me/lucko/luckperms/common/messaging/rabbitmq/RabbitMQMessenger.java b/common/src/main/java/me/lucko/luckperms/common/messaging/rabbitmq/RabbitMQMessenger.java new file mode 100644 index 000000000..ebdc1e80a --- /dev/null +++ b/common/src/main/java/me/lucko/luckperms/common/messaging/rabbitmq/RabbitMQMessenger.java @@ -0,0 +1,185 @@ +/* + * This file is part of LuckPerms, licensed under the MIT License. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package me.lucko.luckperms.common.messaging.rabbitmq; + +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.Delivery; + +import me.lucko.luckperms.common.plugin.LuckPermsPlugin; + +import net.luckperms.api.messenger.IncomingMessageConsumer; +import net.luckperms.api.messenger.Messenger; +import net.luckperms.api.messenger.message.OutgoingMessage; + +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * An implementation of {@link Messenger} using RabbitMQ. + */ +public class RabbitMQMessenger implements Messenger { + private static final int DEFAULT_PORT = 5672; + private static final String EXCHANGE = "luckperms"; + private static final String ROUTING_KEY = "luckperms:update"; + + private final LuckPermsPlugin plugin; + private final IncomingMessageConsumer consumer; + + private ConnectionFactory connectionFactory; + private Connection connection; + private Channel channel; + private Subscription sub; + + public RabbitMQMessenger(LuckPermsPlugin plugin, IncomingMessageConsumer consumer) { + this.plugin = plugin; + this.consumer = consumer; + } + + public void init(String address, String username, String password) { + String[] addressSplit = address.split(":"); + String host = addressSplit[0]; + int port = addressSplit.length > 1 ? Integer.parseInt(addressSplit[1]) : DEFAULT_PORT; + + this.connectionFactory = new ConnectionFactory(); + this.connectionFactory.setHost(host); + this.connectionFactory.setPort(port); + this.connectionFactory.setUsername(username); + this.connectionFactory.setPassword(password); + + this.sub = new Subscription(this); + this.plugin.getBootstrap().getScheduler().executeAsync(this.sub); + } + + @Override + public void sendOutgoingMessage(@NonNull OutgoingMessage outgoingMessage) { + try { + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + output.writeUTF(outgoingMessage.asEncodedString()); + this.channel.basicPublish(EXCHANGE, ROUTING_KEY, new AMQP.BasicProperties.Builder().build(), output.toByteArray()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void close() { + try { + this.channel.close(); + this.connection.close(); + this.sub.isClosed = true; + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static class Subscription implements Runnable { + private final RabbitMQMessenger parent; + private boolean isClosed = false; + private boolean firstStartup = true; + + private Subscription(RabbitMQMessenger parent) { + this.parent = parent; + } + + @Override + public void run() { + while (!Thread.interrupted() && !this.isClosed) { + try { + if (!checkAndReopenConnection()) { + // Sleep for 5 seconds to prevent massive spam in console + Thread.sleep(5000); + continue; + } + + // Check connection life every every 30 seconds + Thread.sleep(30_000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + this.firstStartup = false; + } + } + } + + private boolean checkAndReopenConnection() { + boolean channelIsDead = this.parent.channel == null || !this.parent.channel.isOpen(); + if (channelIsDead) { + boolean connectionIsDead = this.parent.connection == null || !this.parent.connection.isOpen(); + if (connectionIsDead) { + if (!this.firstStartup) { + this.parent.plugin.getLogger().warn("RabbitMQ pubsub connection dropped, trying to re-open the connection"); + } + try { + this.parent.connection = this.parent.connectionFactory.newConnection(); + this.parent.channel = this.parent.connection.createChannel(); + + String queue = this.parent.channel.queueDeclare("", false, true, true, null).getQueue(); + this.parent.channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, false, true, null); + this.parent.channel.queueBind(queue, EXCHANGE, ROUTING_KEY); + this.parent.channel.basicConsume(queue, true, new ChannelListener(), (consumerTag) -> { }); + + if (!this.firstStartup) { + this.parent.plugin.getLogger().info("RabbitMQ pubsub connection re-established"); + } + return true; + } catch (Exception ignored) { + return false; + } + } else { + try { + this.parent.channel = this.parent.connection.createChannel(); + return true; + } catch (Exception ignored) { + return false; + } + } + } + return true; + } + + private class ChannelListener implements DeliverCallback { + @Override + public void handle(String consumerTag, Delivery message) { + try { + byte[] data = message.getBody(); + ByteArrayDataInput input = ByteStreams.newDataInput(data); + String msg = input.readUTF(); + Subscription.this.parent.consumer.consumeIncomingMessageAsString(msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + +} diff --git a/fabric/build.gradle b/fabric/build.gradle index fd0f0431d..7a083697a 100644 --- a/fabric/build.gradle +++ b/fabric/build.gradle @@ -75,6 +75,7 @@ shadowJar { relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb' relocate 'org.bson', 'me.lucko.luckperms.lib.bson' relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis' + relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq' relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2' relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate' } diff --git a/fabric/src/main/resources/luckperms.conf b/fabric/src/main/resources/luckperms.conf index 19e9ab882..ace81ec33 100644 --- a/fabric/src/main/resources/luckperms.conf +++ b/fabric/src/main/resources/luckperms.conf @@ -226,6 +226,8 @@ watch-files = true # Won't work if you have more than one proxy. # => redis Uses Redis pub-sub to push changes. Your server connection info must be configured # below. +# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be +# configured below. # => auto Attempts to automatically setup a messaging service using redis or sql. messaging-service = "auto" @@ -250,6 +252,15 @@ redis { password = "" } +# Settings for RabbitMQ. +# Port 5672 is used by default; set address to "host:port" if differs +rabbitmq { + enabled = false + address = "localhost" + username = "guest" + password = "guest" +} + diff --git a/nukkit/build.gradle b/nukkit/build.gradle index 24de20678..1397f6599 100644 --- a/nukkit/build.gradle +++ b/nukkit/build.gradle @@ -42,6 +42,7 @@ shadowJar { relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb' relocate 'org.bson', 'me.lucko.luckperms.lib.bson' relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis' + relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq' relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2' relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate' } diff --git a/nukkit/src/main/resources/config.yml b/nukkit/src/main/resources/config.yml index bf8ecab40..90204e6dc 100644 --- a/nukkit/src/main/resources/config.yml +++ b/nukkit/src/main/resources/config.yml @@ -218,6 +218,8 @@ watch-files: true # option is set to 'auto' and SQL storage is in use. Set to 'notsql' to disable this. # => redis Uses Redis pub-sub to push changes. Your server connection info must be configured # below. +# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be +# configured below. # => auto Attempts to automatically setup a messaging service using redis or sql. messaging-service: auto @@ -241,6 +243,14 @@ redis: address: localhost password: '' +# Settings for RabbitMQ. +# Port 5672 is used by default; set address to "host:port" if differs +rabbitmq: + enabled: false + address: localhost + username: 'guest' + password: 'guest' + diff --git a/sponge/build.gradle b/sponge/build.gradle index 65b7b2c9d..022455fb1 100644 --- a/sponge/build.gradle +++ b/sponge/build.gradle @@ -55,6 +55,7 @@ shadowJar { relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb' relocate 'org.bson', 'me.lucko.luckperms.lib.bson' relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis' + relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq' relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2' relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate' } diff --git a/sponge/src/main/resources/luckperms.conf b/sponge/src/main/resources/luckperms.conf index 7719f5294..a18dad182 100644 --- a/sponge/src/main/resources/luckperms.conf +++ b/sponge/src/main/resources/luckperms.conf @@ -226,6 +226,8 @@ watch-files = true # Won't work if you have more than one proxy. # => redis Uses Redis pub-sub to push changes. Your server connection info must be configured # below. +# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be +# configured below. # => auto Attempts to automatically setup a messaging service using redis or sql. messaging-service = "auto" @@ -250,6 +252,15 @@ redis { password = "" } +# Settings for RabbitMQ. +# Port 5672 is used by default; set address to "host:port" if differs +rabbitmq { + enabled = false + address = "localhost" + username = "guest" + password = "guest" +} + diff --git a/velocity/build.gradle b/velocity/build.gradle index 40f74a454..9ea150c14 100644 --- a/velocity/build.gradle +++ b/velocity/build.gradle @@ -41,6 +41,7 @@ shadowJar { relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb' relocate 'org.bson', 'me.lucko.luckperms.lib.bson' relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis' + relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq' relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2' relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate' relocate 'org.yaml.snakeyaml', 'me.lucko.luckperms.lib.yaml' diff --git a/velocity/src/main/resources/config.yml b/velocity/src/main/resources/config.yml index 106307982..6f38c4bf1 100644 --- a/velocity/src/main/resources/config.yml +++ b/velocity/src/main/resources/config.yml @@ -212,6 +212,8 @@ watch-files: true # servers. Won't work if you have more than one Velocity proxy. # => redis Uses Redis pub-sub to push changes. Your server connection info must be # configured below. +# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be +# configured below. # => auto Attempts to automatically setup a messaging service using redis or sql. messaging-service: auto @@ -235,6 +237,14 @@ redis: address: localhost password: '' +# Settings for RabbitMQ. +# Port 5672 is used by default; set address to "host:port" if differs +rabbitmq: + enabled: false + address: localhost + username: 'guest' + password: 'guest' +