1
0
mirror of https://github.com/lucko/LuckPerms.git synced 2025-08-28 08:39:48 +02:00

RabbitMQ messenger support (#2874)

This commit is contained in:
ruViolence
2021-02-05 01:10:30 +08:00
committed by GitHub
parent b7ac05a0a5
commit a8dfd38fe6
18 changed files with 319 additions and 0 deletions

View File

@@ -48,6 +48,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'
}

View File

@@ -223,6 +223,8 @@ watch-files: true
# installed.
# => redis Uses Redis pub-sub to push changes. Your server connection info must be configured
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@@ -246,6 +248,14 @@ redis:
address: localhost
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq:
enabled: false
address: localhost
username: 'guest'
password: 'guest'

View File

@@ -39,6 +39,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'
}

View File

@@ -221,6 +221,8 @@ watch-files: true
# configured below.
# => redisbungee Uses Redis pub-sub to push changes, using the RedisBungee API. You need to have
# the RedisBungee plugin installed.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@@ -244,6 +246,14 @@ redis:
address: localhost
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq:
enabled: false
address: localhost
username: 'guest'
password: 'guest'

View File

@@ -51,6 +51,7 @@ dependencies {
}
compile 'com.zaxxer:HikariCP:3.4.5'
compile 'redis.clients:jedis:3.3.0'
compile 'com.rabbitmq:amqp-client:5.10.0'
compile 'org.mongodb:mongo-java-driver:3.12.2'
compile 'org.yaml:snakeyaml:1.23'
}

View File

@@ -619,6 +619,26 @@ public final class ConfigKeys {
*/
public static final ConfigKey<Boolean> REDIS_SSL = notReloadable(booleanKey("redis.ssl", false));
/**
* If rabbitmq messaging is enabled
*/
public static final ConfigKey<Boolean> RABBITMQ_ENABLED = notReloadable(booleanKey("rabbitmq.enabled", false));
/**
* The address of the rabbitmq server
*/
public static final ConfigKey<String> RABBITMQ_ADDRESS = notReloadable(stringKey("rabbitmq.address", null));
/**
* The username in use by the rabbitmq server
*/
public static final ConfigKey<String> RABBITMQ_USERNAME = notReloadable(stringKey("rabbitmq.username", "guest"));
/**
* The password in use by the rabbitmq server, or an empty string if there is no password
*/
public static final ConfigKey<String> RABBITMQ_PASSWORD = notReloadable(stringKey("rabbitmq.password", "guest"));
/**
* The URL of the bytebin instance used to upload data
*/

View File

@@ -212,6 +212,13 @@ public enum Dependency {
Relocation.of("jedis", "redis{}clients{}jedis"),
Relocation.of("commonspool2", "org{}apache{}commons{}pool2")
),
RABBITMQ(
"com{}rabbitmq",
"amqp-client",
"5.10.0",
"z9/pvANy1mMAQb+VJtN+COybfxPeg/m9BgcfZRO1jcE=",
Relocation.of("rabbitmq", "com{}rabbitmq")
),
COMMONS_POOL_2(
"org.apache.commons",
"commons-pool2",

View File

@@ -82,6 +82,10 @@ public class DependencyRegistry {
dependencies.add(Dependency.SLF4J_SIMPLE);
}
if (this.plugin.getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)) {
dependencies.add(Dependency.RABBITMQ);
}
// don't load slf4j if it's already present
if ((dependencies.contains(Dependency.SLF4J_API) || dependencies.contains(Dependency.SLF4J_SIMPLE)) && slf4jPresent()) {
dependencies.remove(Dependency.SLF4J_API);

View File

@@ -27,6 +27,7 @@ package me.lucko.luckperms.common.messaging;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.config.LuckPermsConfiguration;
import me.lucko.luckperms.common.messaging.rabbitmq.RabbitMQMessenger;
import me.lucko.luckperms.common.messaging.redis.RedisMessenger;
import me.lucko.luckperms.common.messaging.sql.SqlMessenger;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
@@ -62,6 +63,8 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
if (messagingType.equals("auto")) {
if (this.plugin.getConfiguration().get(ConfigKeys.REDIS_ENABLED)) {
messagingType = "redis";
} else if (this.plugin.getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)) {
messagingType = "rabbitmq";
} else {
for (StorageImplementation implementation : this.plugin.getStorage().getImplementations()) {
if (implementation instanceof SqlStorage) {
@@ -101,6 +104,16 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
} else {
this.plugin.getLogger().warn("Messaging Service was set to redis, but redis is not enabled!");
}
} else if (messagingType.equals("rabbitmq")) {
if (this.plugin.getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)) {
try {
return new LuckPermsMessagingService(this.plugin, new RabbitMQMessengerProvider());
} catch (Exception e) {
getPlugin().getLogger().severe("Exception occurred whilst enabling RabbitMQ messaging service", e);
}
} else {
this.plugin.getLogger().warn("Messaging Service was set to rabbitmq, but rabbitmq is not enabled!");
}
} else if (messagingType.equals("sql")) {
try {
return new LuckPermsMessagingService(this.plugin, new SqlMessengerProvider());
@@ -136,6 +149,27 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
}
}
private class RabbitMQMessengerProvider implements MessengerProvider {
@Override
public @NonNull String getName() {
return "RabbitMQ";
}
@Override
public @NonNull Messenger obtain(@NonNull IncomingMessageConsumer incomingMessageConsumer) {
RabbitMQMessenger rabbitmq = new RabbitMQMessenger(getPlugin(), incomingMessageConsumer);
LuckPermsConfiguration config = getPlugin().getConfiguration();
String address = config.get(ConfigKeys.RABBITMQ_ADDRESS);
String username = config.get(ConfigKeys.RABBITMQ_USERNAME);
String password = config.get(ConfigKeys.RABBITMQ_PASSWORD);
rabbitmq.init(address, username, password);
return rabbitmq;
}
}
private class SqlMessengerProvider implements MessengerProvider {
@Override

View File

@@ -0,0 +1,185 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging.rabbitmq;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import net.luckperms.api.messenger.IncomingMessageConsumer;
import net.luckperms.api.messenger.Messenger;
import net.luckperms.api.messenger.message.OutgoingMessage;
import org.checkerframework.checker.nullness.qual.NonNull;
/**
* An implementation of {@link Messenger} using RabbitMQ.
*/
public class RabbitMQMessenger implements Messenger {
private static final int DEFAULT_PORT = 5672;
private static final String EXCHANGE = "luckperms";
private static final String ROUTING_KEY = "luckperms:update";
private final LuckPermsPlugin plugin;
private final IncomingMessageConsumer consumer;
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;
private Subscription sub;
public RabbitMQMessenger(LuckPermsPlugin plugin, IncomingMessageConsumer consumer) {
this.plugin = plugin;
this.consumer = consumer;
}
public void init(String address, String username, String password) {
String[] addressSplit = address.split(":");
String host = addressSplit[0];
int port = addressSplit.length > 1 ? Integer.parseInt(addressSplit[1]) : DEFAULT_PORT;
this.connectionFactory = new ConnectionFactory();
this.connectionFactory.setHost(host);
this.connectionFactory.setPort(port);
this.connectionFactory.setUsername(username);
this.connectionFactory.setPassword(password);
this.sub = new Subscription(this);
this.plugin.getBootstrap().getScheduler().executeAsync(this.sub);
}
@Override
public void sendOutgoingMessage(@NonNull OutgoingMessage outgoingMessage) {
try {
ByteArrayDataOutput output = ByteStreams.newDataOutput();
output.writeUTF(outgoingMessage.asEncodedString());
this.channel.basicPublish(EXCHANGE, ROUTING_KEY, new AMQP.BasicProperties.Builder().build(), output.toByteArray());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() {
try {
this.channel.close();
this.connection.close();
this.sub.isClosed = true;
} catch (Exception e) {
e.printStackTrace();
}
}
private static class Subscription implements Runnable {
private final RabbitMQMessenger parent;
private boolean isClosed = false;
private boolean firstStartup = true;
private Subscription(RabbitMQMessenger parent) {
this.parent = parent;
}
@Override
public void run() {
while (!Thread.interrupted() && !this.isClosed) {
try {
if (!checkAndReopenConnection()) {
// Sleep for 5 seconds to prevent massive spam in console
Thread.sleep(5000);
continue;
}
// Check connection life every every 30 seconds
Thread.sleep(30_000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} finally {
this.firstStartup = false;
}
}
}
private boolean checkAndReopenConnection() {
boolean channelIsDead = this.parent.channel == null || !this.parent.channel.isOpen();
if (channelIsDead) {
boolean connectionIsDead = this.parent.connection == null || !this.parent.connection.isOpen();
if (connectionIsDead) {
if (!this.firstStartup) {
this.parent.plugin.getLogger().warn("RabbitMQ pubsub connection dropped, trying to re-open the connection");
}
try {
this.parent.connection = this.parent.connectionFactory.newConnection();
this.parent.channel = this.parent.connection.createChannel();
String queue = this.parent.channel.queueDeclare("", false, true, true, null).getQueue();
this.parent.channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, false, true, null);
this.parent.channel.queueBind(queue, EXCHANGE, ROUTING_KEY);
this.parent.channel.basicConsume(queue, true, new ChannelListener(), (consumerTag) -> { });
if (!this.firstStartup) {
this.parent.plugin.getLogger().info("RabbitMQ pubsub connection re-established");
}
return true;
} catch (Exception ignored) {
return false;
}
} else {
try {
this.parent.channel = this.parent.connection.createChannel();
return true;
} catch (Exception ignored) {
return false;
}
}
}
return true;
}
private class ChannelListener implements DeliverCallback {
@Override
public void handle(String consumerTag, Delivery message) {
try {
byte[] data = message.getBody();
ByteArrayDataInput input = ByteStreams.newDataInput(data);
String msg = input.readUTF();
Subscription.this.parent.consumer.consumeIncomingMessageAsString(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

View File

@@ -75,6 +75,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'
}

View File

@@ -226,6 +226,8 @@ watch-files = true
# Won't work if you have more than one proxy.
# => redis Uses Redis pub-sub to push changes. Your server connection info must be configured
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service = "auto"
@@ -250,6 +252,15 @@ redis {
password = ""
}
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq {
enabled = false
address = "localhost"
username = "guest"
password = "guest"
}

View File

@@ -42,6 +42,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'
}

View File

@@ -218,6 +218,8 @@ watch-files: true
# option is set to 'auto' and SQL storage is in use. Set to 'notsql' to disable this.
# => redis Uses Redis pub-sub to push changes. Your server connection info must be configured
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@@ -241,6 +243,14 @@ redis:
address: localhost
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq:
enabled: false
address: localhost
username: 'guest'
password: 'guest'

View File

@@ -55,6 +55,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'
}

View File

@@ -226,6 +226,8 @@ watch-files = true
# Won't work if you have more than one proxy.
# => redis Uses Redis pub-sub to push changes. Your server connection info must be configured
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service = "auto"
@@ -250,6 +252,15 @@ redis {
password = ""
}
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq {
enabled = false
address = "localhost"
username = "guest"
password = "guest"
}

View File

@@ -41,6 +41,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'
relocate 'org.yaml.snakeyaml', 'me.lucko.luckperms.lib.yaml'

View File

@@ -212,6 +212,8 @@ watch-files: true
# servers. Won't work if you have more than one Velocity proxy.
# => redis Uses Redis pub-sub to push changes. Your server connection info must be
# configured below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@@ -235,6 +237,14 @@ redis:
address: localhost
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq:
enabled: false
address: localhost
username: 'guest'
password: 'guest'