Skip to content

Commit c550929

Browse files
committed
[ISSUE apache#9347] Optimization of Prioritizing Original Sender in Transactional Message Checkbacks
1 parent bbe87f5 commit c550929

File tree

4 files changed

+36
-12
lines changed

4 files changed

+36
-12
lines changed

broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java

+8
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
6161
this.lastUpdateTimestamp = lastUpdateTimestamp;
6262
}
6363

64+
public boolean isActive() {
65+
return getChannel().isActive();
66+
}
67+
68+
public boolean isWritable() {
69+
return getChannel().isWritable();
70+
}
71+
6472
@Override
6573
public int hashCode() {
6674
final int prime = 31;

broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java

+25-11
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,17 @@ public synchronized void unregisterProducer(final String group, final ClientChan
206206
}
207207

208208
public Channel getAvailableChannel(String groupId) {
209+
return getAvailableChannel(groupId, null);
210+
}
211+
212+
public Channel getAvailableChannel(String groupId, String clientId) {
209213
if (groupId == null) {
210214
return null;
211215
}
212-
List<Channel> channelList;
216+
List<ClientChannelInfo> channelList;
213217
ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
214218
if (channelClientChannelInfoHashMap != null) {
215-
channelList = new ArrayList<>(channelClientChannelInfoHashMap.keySet());
219+
channelList = new ArrayList<>(channelClientChannelInfoHashMap.values());
216220
} else {
217221
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
218222
return null;
@@ -224,25 +228,35 @@ public Channel getAvailableChannel(String groupId) {
224228
return null;
225229
}
226230

227-
Channel lastActiveChannel = null;
231+
Channel firstChannel = null;
232+
Channel secondChannel = null;
228233

229234
int index = positiveAtomicCounter.incrementAndGet() % size;
230-
Channel channel = channelList.get(index);
235+
ClientChannelInfo clientChannel = channelList.get(index);
231236
int count = 0;
232-
boolean isOk = channel.isActive() && channel.isWritable();
237+
boolean isOk = clientChannel.isActive() && clientChannel.isWritable();
238+
boolean isSendClient = clientChannel.getClientId().equals(clientId);
239+
233240
while (count++ < GET_AVAILABLE_CHANNEL_RETRY_COUNT) {
241+
if (isOk && isSendClient) {
242+
return clientChannel.getChannel();
243+
}
234244
if (isOk) {
235-
return channel;
245+
firstChannel = clientChannel.getChannel();
246+
}
247+
if (clientChannel.isActive()) {
248+
secondChannel = clientChannel.getChannel();
236249
}
237-
if (channel.isActive()) {
238-
lastActiveChannel = channel;
250+
if (clientChannel.isActive() && isSendClient) {
251+
secondChannel = clientChannel.getChannel();
239252
}
240253
index = (++index) % size;
241-
channel = channelList.get(index);
242-
isOk = channel.isActive() && channel.isWritable();
254+
clientChannel = channelList.get(index);
255+
isOk = clientChannel.isActive() && clientChannel.isWritable();
256+
isSendClient = clientChannel.getClientId().equals(clientId);
243257
}
244258

245-
return lastActiveChannel;
259+
return firstChannel != null ? firstChannel : secondChannel;
246260
}
247261

248262
public Channel findChannel(String clientId) {

broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception {
6161
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
6262
msgExt.setStoreSize(0);
6363
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
64-
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
64+
String clientId = msgExt.getProperty(MessageConst.PROPERTY_INSTANCE_ID);
65+
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId, clientId);
6566
if (channel != null) {
6667
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
6768
} else {

client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -1392,6 +1392,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
13921392
SendResult sendResult = null;
13931393
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
13941394
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
1395+
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INSTANCE_ID, this.mQClientFactory.getClientId());
13951396
try {
13961397
sendResult = this.send(msg);
13971398
} catch (Exception e) {

0 commit comments

Comments
 (0)