35
35
import org .apache .rocketmq .remoting .netty .NettyRequestProcessor ;
36
36
import org .apache .rocketmq .remoting .netty .RequestTask ;
37
37
import org .apache .rocketmq .remoting .protocol .RemotingCommand ;
38
+ import org .apache .rocketmq .remoting .protocol .heartbeat .SubscriptionData ;
39
+ import org .apache .rocketmq .store .ConsumeQueueExt ;
40
+ import org .apache .rocketmq .store .MessageFilter ;
38
41
39
42
import static org .apache .rocketmq .broker .longpolling .PollingResult .NOT_POLLING ;
40
43
import static org .apache .rocketmq .broker .longpolling .PollingResult .POLLING_FULL ;
@@ -147,39 +150,61 @@ public void run() {
147
150
}
148
151
149
152
public void notifyMessageArrivingWithRetryTopic (final String topic , final int queueId ) {
153
+ this .notifyMessageArrivingWithRetryTopic (topic , queueId , null , 0L , null , null );
154
+ }
155
+
156
+ public void notifyMessageArrivingWithRetryTopic (final String topic , final int queueId ,
157
+ Long tagsCode , long msgStoreTime , byte [] filterBitMap , Map <String , String > properties ) {
150
158
String notifyTopic ;
151
159
if (KeyBuilder .isPopRetryTopicV2 (topic )) {
152
160
notifyTopic = KeyBuilder .parseNormalTopic (topic );
153
161
} else {
154
162
notifyTopic = topic ;
155
163
}
156
- notifyMessageArriving (notifyTopic , queueId );
164
+ notifyMessageArriving (notifyTopic , queueId , tagsCode , msgStoreTime , filterBitMap , properties );
157
165
}
158
166
159
- public void notifyMessageArriving (final String topic , final int queueId ) {
167
+ public void notifyMessageArriving (final String topic , final int queueId ,
168
+ Long tagsCode , long msgStoreTime , byte [] filterBitMap , Map <String , String > properties ) {
160
169
ConcurrentHashMap <String , Byte > cids = topicCidMap .get (topic );
161
170
if (cids == null ) {
162
171
return ;
163
172
}
164
173
for (Map .Entry <String , Byte > cid : cids .entrySet ()) {
165
174
if (queueId >= 0 ) {
166
- notifyMessageArriving (topic , cid .getKey (), - 1 );
175
+ notifyMessageArriving (topic , - 1 , cid .getKey (), tagsCode , msgStoreTime , filterBitMap , properties );
167
176
}
168
- notifyMessageArriving (topic , cid .getKey (), queueId );
177
+ notifyMessageArriving (topic , queueId , cid .getKey (), tagsCode , msgStoreTime , filterBitMap , properties );
169
178
}
170
179
}
171
180
172
- public boolean notifyMessageArriving (final String topic , final String cid , final int queueId ) {
181
+ public boolean notifyMessageArriving (final String topic , final int queueId , final String cid ,
182
+ Long tagsCode , long msgStoreTime , byte [] filterBitMap , Map <String , String > properties ) {
173
183
ConcurrentSkipListSet <PopRequest > remotingCommands = pollingMap .get (KeyBuilder .buildPollingKey (topic , cid , queueId ));
174
184
if (remotingCommands == null || remotingCommands .isEmpty ()) {
175
185
return false ;
176
186
}
187
+
177
188
PopRequest popRequest = pollRemotingCommands (remotingCommands );
178
189
if (popRequest == null ) {
179
190
return false ;
180
191
}
192
+
193
+ if (popRequest .getMessageFilter () != null && popRequest .getSubscriptionData () != null ) {
194
+ boolean match = popRequest .getMessageFilter ().isMatchedByConsumeQueue (tagsCode ,
195
+ new ConsumeQueueExt .CqExtUnit (tagsCode , msgStoreTime , filterBitMap ));
196
+ if (match && properties != null ) {
197
+ match = popRequest .getMessageFilter ().isMatchedByCommitLog (null , properties );
198
+ }
199
+ if (!match ) {
200
+ remotingCommands .add (popRequest );
201
+ totalPollingNum .incrementAndGet ();
202
+ return false ;
203
+ }
204
+ }
205
+
181
206
if (brokerController .getBrokerConfig ().isEnablePopLog ()) {
182
- POP_LOGGER .info ("lock release , new msg arrive , wakeUp : {}" , popRequest );
207
+ POP_LOGGER .info ("lock release, new msg arrive, wakeUp: {}" , popRequest );
183
208
}
184
209
return wakeUp (popRequest );
185
210
}
@@ -221,6 +246,11 @@ public boolean wakeUp(final PopRequest request) {
221
246
*/
222
247
public PollingResult polling (final ChannelHandlerContext ctx , RemotingCommand remotingCommand ,
223
248
final PollingHeader requestHeader ) {
249
+ return this .polling (ctx , remotingCommand , requestHeader , null , null );
250
+ }
251
+
252
+ public PollingResult polling (final ChannelHandlerContext ctx , RemotingCommand remotingCommand ,
253
+ final PollingHeader requestHeader , SubscriptionData subscriptionData , MessageFilter messageFilter ) {
224
254
if (requestHeader .getPollTime () <= 0 || this .isStopped ()) {
225
255
return NOT_POLLING ;
226
256
}
@@ -234,7 +264,7 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
234
264
}
235
265
cids .putIfAbsent (requestHeader .getConsumerGroup (), Byte .MIN_VALUE );
236
266
long expired = requestHeader .getBornTime () + requestHeader .getPollTime ();
237
- final PopRequest request = new PopRequest (remotingCommand , ctx , expired );
267
+ final PopRequest request = new PopRequest (remotingCommand , ctx , expired , subscriptionData , messageFilter );
238
268
boolean isFull = totalPollingNum .get () >= this .brokerController .getBrokerConfig ().getMaxPopPollingSize ();
239
269
if (isFull ) {
240
270
POP_LOGGER .info ("polling {}, result POLLING_FULL, total:{}" , remotingCommand , totalPollingNum .get ());
0 commit comments