diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java index e78d0f2d..8221718b 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java @@ -88,7 +88,7 @@ private Map getAuthenticationProviders(List writeToBroker(pulsarTopicName, adapter)) .whenComplete((unused, ex) -> { @@ -295,8 +300,10 @@ public void processSubscribe(final MqttAdapterMessage adapter) { log.debug("[Proxy Subscribe] [{}] msg: {}", clientId, msg); } registerTopicListener(adapter); - MqttSubscribeMessage mqttMessage = createMqttSubscribeMessage(msg, connection.getUserRole()); - adapter.setMqttMessage(mqttMessage); + if (proxyConfig.isMqttAuthorizationEnabled()) { + MqttSubscribeMessage mqttMessage = createMqttSubscribeMessage(msg, connection.getUserRole()); + adapter.setMqttMessage(mqttMessage); + } doSubscribe(adapter, false) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index 53d24aa7..90efa693 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -15,6 +15,7 @@ import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createWillMessage; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getAuthenticationRole; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getPacketId; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.pingResp; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.topicSubscriptions; import io.netty.channel.ChannelHandlerContext; @@ -221,8 +222,7 @@ private CompletableFuture doUnauthorized(MqttAdapterMessage adapter) { log.error("[Publish] not authorized to topic={}, userRole={}, CId= {}", msg.variableHeader().topicName(), connection.getUserRole(), connection.getClientId()); - int packetId = msg.variableHeader().packetId(); - packetId = packetId == -1 ? 1 : packetId; + int packetId = getPacketId(msg.variableHeader().packetId()); MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck .errorBuilder(connection.getProtocolVersion()) .packetId(packetId) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java index a0b328eb..fb25aaf2 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java @@ -189,16 +189,12 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage) return builder.build(); } - public static MqttConnectMessage createMqttConnectMessage(MqttConnectMessage connectMessage, - String authData) { + public static MqttConnectMessage createMqtt5ConnectMessage(MqttConnectMessage connectMessage) { final MqttConnectVariableHeader header = connectMessage.variableHeader(); - MqttProperties properties = new MqttProperties(); - properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData)); MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( MqttVersion.MQTT_5.protocolName(), MqttVersion.MQTT_5.protocolLevel(), header.hasUserName(), header.hasPassword(), header.isWillRetain(), header.willQos(), header.isWillFlag(), - header.isCleanSession(), header.keepAliveTimeSeconds(), properties - ); + header.isCleanSession(), header.keepAliveTimeSeconds(), connectMessage.variableHeader().properties()); MqttConnectMessage newConnectMessage = new MqttConnectMessage(connectMessage.fixedHeader(), variableHeader, connectMessage.payload()); return newConnectMessage; @@ -287,4 +283,11 @@ public static byte[] getAuthData(MqttConnectMessage connectMessage) { .getProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()); return authDataProperty != null ? authDataProperty.value() : null; } + + public static int getPacketId(int packetId) { + if (packetId < 1) { + return 1; + } + return packetId; + } }