Skip to content

Commit 30f1033

Browse files
committed
Add test for recorded binding clean-up after exchange deletion
(cherry picked from commit b451798)
1 parent f9c3018 commit 30f1033

File tree

2 files changed

+25
-12
lines changed

2 files changed

+25
-12
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

+6-12
Original file line numberDiff line numberDiff line change
@@ -1163,25 +1163,19 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
11631163
}
11641164

11651165
Set<RecordedBinding> removeBindingsWithDestination(String s) {
1166-
final Set<RecordedBinding> result = new LinkedHashSet<>();
1167-
synchronized (this.recordedBindings) {
1168-
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
1169-
RecordedBinding b = it.next();
1170-
if(b.getDestination().equals(s)) {
1171-
it.remove();
1172-
result.add(b);
1173-
}
1174-
}
1175-
}
1176-
return result;
1166+
return this.removeBindingsWithCondition(b -> b.getSource().equals(s));
11771167
}
11781168

11791169
Set<RecordedBinding> removeBindingsWithSource(String s) {
1170+
return this.removeBindingsWithCondition(b -> b.getSource().equals(s));
1171+
}
1172+
1173+
private Set<RecordedBinding> removeBindingsWithCondition(Predicate<RecordedBinding> condition) {
11801174
final Set<RecordedBinding> result = new LinkedHashSet<>();
11811175
synchronized (this.recordedBindings) {
11821176
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
11831177
RecordedBinding b = it.next();
1184-
if (b.getSource().equals(s)) {
1178+
if (condition.test(b)) {
11851179
it.remove();
11861180
result.add(b);
11871181
}

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

+19
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,21 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
865865
}
866866
}
867867

868+
@Test public void thatBindingFromDeletedExchangeIsDeleted() throws IOException, InterruptedException {
869+
String q = generateQueueName();
870+
channel.queueDeclare(q, false, false, false, null);
871+
try {
872+
String x = generateExchangeName();
873+
channel.exchangeDeclare(x, "fanout");
874+
channel.queueBind(q, x, "");
875+
assertRecordedBinding(connection, 1);
876+
channel.exchangeDelete(x);
877+
assertRecordedBinding(connection, 0);
878+
} finally {
879+
channel.queueDelete(q);
880+
}
881+
}
882+
868883
private void assertConsumerCount(int exp, String q) throws IOException {
869884
assertThat(channel.queueDeclarePassive(q).getConsumerCount()).isEqualTo(exp);
870885
}
@@ -1017,4 +1032,8 @@ private static void assertRecordedQueues(Connection conn, int size) {
10171032
private static void assertRecordedExchanges(Connection conn, int size) {
10181033
assertThat(((AutorecoveringConnection)conn).getRecordedExchanges()).hasSize(size);
10191034
}
1035+
1036+
private static void assertRecordedBinding(Connection conn, int size) {
1037+
assertThat(((AutorecoveringConnection)conn).getRecordedBindings()).hasSize(size);
1038+
}
10201039
}

0 commit comments

Comments
 (0)