001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.Iterator; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028import javax.jms.IllegalStateException; 029import javax.jms.JMSException; 030 031import org.apache.activemq.DestinationDoesNotExistException; 032import org.apache.activemq.advisory.AdvisorySupport; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ConsumerBrokerExchange; 035import org.apache.activemq.broker.ProducerBrokerExchange; 036import org.apache.activemq.broker.region.policy.PolicyEntry; 037import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ConsumerControl; 040import org.apache.activemq.command.ConsumerId; 041import org.apache.activemq.command.ConsumerInfo; 042import org.apache.activemq.command.Message; 043import org.apache.activemq.command.MessageAck; 044import org.apache.activemq.command.MessageDispatchNotification; 045import org.apache.activemq.command.MessagePull; 046import org.apache.activemq.command.ProducerInfo; 047import org.apache.activemq.command.RemoveSubscriptionInfo; 048import org.apache.activemq.command.Response; 049import org.apache.activemq.filter.DestinationFilter; 050import org.apache.activemq.filter.DestinationMap; 051import org.apache.activemq.security.SecurityContext; 052import org.apache.activemq.thread.TaskRunnerFactory; 053import org.apache.activemq.usage.SystemUsage; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * 059 */ 060public abstract class AbstractRegion implements Region { 061 062 private static final Logger LOG = LoggerFactory.getLogger(AbstractRegion.class); 063 064 protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 065 protected final DestinationMap destinationMap = new DestinationMap(); 066 protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>(); 067 protected final SystemUsage usageManager; 068 protected final DestinationFactory destinationFactory; 069 protected final DestinationStatistics destinationStatistics; 070 protected final RegionStatistics regionStatistics = new RegionStatistics(); 071 protected final RegionBroker broker; 072 protected boolean autoCreateDestinations = true; 073 protected final TaskRunnerFactory taskRunnerFactory; 074 protected final ReentrantReadWriteLock destinationsLock = new ReentrantReadWriteLock(); 075 protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>(); 076 protected boolean started; 077 078 public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, 079 TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 080 if (broker == null) { 081 throw new IllegalArgumentException("null broker"); 082 } 083 this.broker = broker; 084 this.destinationStatistics = destinationStatistics; 085 this.usageManager = memoryManager; 086 this.taskRunnerFactory = taskRunnerFactory; 087 if (destinationFactory == null) { 088 throw new IllegalArgumentException("null destinationFactory"); 089 } 090 this.destinationFactory = destinationFactory; 091 } 092 093 @Override 094 public final void start() throws Exception { 095 started = true; 096 097 Set<ActiveMQDestination> inactiveDests = getInactiveDestinations(); 098 for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) { 099 ActiveMQDestination dest = iter.next(); 100 101 ConnectionContext context = new ConnectionContext(); 102 context.setBroker(broker.getBrokerService().getBroker()); 103 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 104 context.getBroker().addDestination(context, dest, false); 105 } 106 destinationsLock.readLock().lock(); 107 try{ 108 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 109 Destination dest = i.next(); 110 dest.start(); 111 } 112 } finally { 113 destinationsLock.readLock().unlock(); 114 } 115 } 116 117 @Override 118 public void stop() throws Exception { 119 started = false; 120 destinationsLock.readLock().lock(); 121 try{ 122 for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) { 123 Destination dest = i.next(); 124 dest.stop(); 125 } 126 } finally { 127 destinationsLock.readLock().unlock(); 128 } 129 130 destinationsLock.writeLock().lock(); 131 try { 132 destinations.clear(); 133 regionStatistics.getAdvisoryDestinations().reset(); 134 regionStatistics.getDestinations().reset(); 135 regionStatistics.getAllDestinations().reset(); 136 } finally { 137 destinationsLock.writeLock().unlock(); 138 } 139 } 140 141 @Override 142 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, 143 boolean createIfTemporary) throws Exception { 144 145 destinationsLock.writeLock().lock(); 146 try { 147 Destination dest = destinations.get(destination); 148 if (dest == null) { 149 if (destination.isTemporary() == false || createIfTemporary) { 150 // Limit the number of destinations that can be created if 151 // maxDestinations has been set on a policy 152 validateMaxDestinations(destination); 153 154 LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination); 155 dest = createDestination(context, destination); 156 // intercept if there is a valid interceptor defined 157 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 158 if (destinationInterceptor != null) { 159 dest = destinationInterceptor.intercept(dest); 160 } 161 dest.start(); 162 destinations.put(destination, dest); 163 updateRegionDestCounts(destination, 1); 164 destinationMap.put(destination, dest); 165 addSubscriptionsForDestination(context, dest); 166 } 167 if (dest == null) { 168 throw new DestinationDoesNotExistException(destination.getQualifiedName()); 169 } 170 } 171 return dest; 172 } finally { 173 destinationsLock.writeLock().unlock(); 174 } 175 } 176 177 public Map<ConsumerId, Subscription> getSubscriptions() { 178 return subscriptions; 179 } 180 181 182 /** 183 * Updates the counts in RegionStatistics based on whether or not the destination 184 * is an Advisory Destination or not 185 * 186 * @param destination the destination being used to determine which counters to update 187 * @param count the count to add to the counters 188 */ 189 protected void updateRegionDestCounts(ActiveMQDestination destination, int count) { 190 if (destination != null) { 191 if (AdvisorySupport.isAdvisoryTopic(destination)) { 192 regionStatistics.getAdvisoryDestinations().add(count); 193 } else { 194 regionStatistics.getDestinations().add(count); 195 } 196 regionStatistics.getAllDestinations().add(count); 197 } 198 } 199 200 /** 201 * This method checks whether or not the destination can be created based on 202 * {@link PolicyEntry#getMaxDestinations}, if it has been set. Advisory 203 * topics are ignored. 204 * 205 * @param destination 206 * @throws Exception 207 */ 208 protected void validateMaxDestinations(ActiveMQDestination destination) 209 throws Exception { 210 if (broker.getDestinationPolicy() != null) { 211 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 212 // Make sure the destination is not an advisory topic 213 if (entry != null && entry.getMaxDestinations() >= 0 214 && !AdvisorySupport.isAdvisoryTopic(destination)) { 215 // If there is an entry for this destination, look up the set of 216 // destinations associated with this policy 217 // If a destination isn't specified, then just count up 218 // non-advisory destinations (ie count all destinations) 219 int destinationSize = (int) (entry.getDestination() != null ? 220 destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount()); 221 if (destinationSize >= entry.getMaxDestinations()) { 222 if (entry.getDestination() != null) { 223 throw new IllegalStateException( 224 "The maxmimum number of destinations allowed ("+ entry.getMaxDestinations() + 225 ") for the policy " + entry.getDestination() + " has already been reached."); 226 // No destination has been set (default policy) 227 } else { 228 throw new IllegalStateException("The maxmimum number of destinations allowed (" 229 + entry.getMaxDestinations() + ") has already been reached."); 230 } 231 } 232 } 233 } 234 } 235 236 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { 237 List<Subscription> rc = new ArrayList<Subscription>(); 238 // Add all consumers that are interested in the destination. 239 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 240 Subscription sub = iter.next(); 241 if (sub.matches(dest.getActiveMQDestination())) { 242 try { 243 ConnectionContext originalContext = sub.getContext() != null ? sub.getContext() : context; 244 dest.addSubscription(originalContext, sub); 245 rc.add(sub); 246 } catch (SecurityException e) { 247 if (sub.isWildcard()) { 248 LOG.debug("Subscription denied for " + sub + " to destination " + 249 dest.getActiveMQDestination() + ": " + e.getMessage()); 250 } else { 251 throw e; 252 } 253 } 254 } 255 } 256 return rc; 257 258 } 259 260 @Override 261 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) 262 throws Exception { 263 264 // No timeout.. then try to shut down right way, fails if there are 265 // current subscribers. 266 if (timeout == 0) { 267 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 268 Subscription sub = iter.next(); 269 if (sub.matches(destination)) { 270 throw new JMSException("Destination still has an active subscription: " + destination); 271 } 272 } 273 } 274 275 if (timeout > 0) { 276 // TODO: implement a way to notify the subscribers that we want to 277 // take the down 278 // the destination and that they should un-subscribe.. Then wait up 279 // to timeout time before 280 // dropping the subscription. 281 } 282 283 LOG.debug("{} removing destination: {}", broker.getBrokerName(), destination); 284 285 destinationsLock.writeLock().lock(); 286 try { 287 Destination dest = destinations.remove(destination); 288 if (dest != null) { 289 updateRegionDestCounts(destination, -1); 290 291 // timeout<0 or we timed out, we now force any remaining 292 // subscriptions to un-subscribe. 293 for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { 294 Subscription sub = iter.next(); 295 if (sub.matches(destination)) { 296 dest.removeSubscription(context, sub, 0l); 297 } 298 } 299 destinationMap.remove(destination, dest); 300 dispose(context, dest); 301 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 302 if (destinationInterceptor != null) { 303 destinationInterceptor.remove(dest); 304 } 305 306 } else { 307 LOG.debug("Cannot remove a destination that doesn't exist: {}", destination); 308 } 309 } finally { 310 destinationsLock.writeLock().unlock(); 311 } 312 } 313 314 /** 315 * Provide an exact or wildcard lookup of destinations in the region 316 * 317 * @return a set of matching destination objects. 318 */ 319 @Override 320 @SuppressWarnings("unchecked") 321 public Set<Destination> getDestinations(ActiveMQDestination destination) { 322 destinationsLock.readLock().lock(); 323 try{ 324 return destinationMap.get(destination); 325 } finally { 326 destinationsLock.readLock().unlock(); 327 } 328 } 329 330 @Override 331 public Map<ActiveMQDestination, Destination> getDestinationMap() { 332 return destinations; 333 } 334 335 @Override 336 @SuppressWarnings("unchecked") 337 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 338 LOG.debug("{} adding consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() }); 339 ActiveMQDestination destination = info.getDestination(); 340 if (destination != null && !destination.isPattern() && !destination.isComposite()) { 341 // lets auto-create the destination 342 lookup(context, destination,true); 343 } 344 345 Object addGuard; 346 synchronized (consumerChangeMutexMap) { 347 addGuard = consumerChangeMutexMap.get(info.getConsumerId()); 348 if (addGuard == null) { 349 addGuard = new Object(); 350 consumerChangeMutexMap.put(info.getConsumerId(), addGuard); 351 } 352 } 353 synchronized (addGuard) { 354 Subscription o = subscriptions.get(info.getConsumerId()); 355 if (o != null) { 356 LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this."); 357 return o; 358 } 359 360 // We may need to add some destinations that are in persistent store 361 // but not active 362 // in the broker. 363 // 364 // TODO: think about this a little more. This is good cause 365 // destinations are not loaded into 366 // memory until a client needs to use the queue, but a management 367 // agent viewing the 368 // broker will not see a destination that exists in persistent 369 // store. We may want to 370 // eagerly load all destinations into the broker but have an 371 // inactive state for the 372 // destination which has reduced memory usage. 373 // 374 DestinationFilter.parseFilter(info.getDestination()); 375 376 Subscription sub = createSubscription(context, info); 377 378 // At this point we're done directly manipulating subscriptions, 379 // but we need to retain the synchronized block here. Consider 380 // otherwise what would happen if at this point a second 381 // thread added, then removed, as would be allowed with 382 // no mutex held. Remove is only essentially run once 383 // so everything after this point would be leaked. 384 385 // Add the subscription to all the matching queues. 386 // But copy the matches first - to prevent deadlocks 387 List<Destination> addList = new ArrayList<Destination>(); 388 destinationsLock.readLock().lock(); 389 try { 390 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 391 addList.add(dest); 392 } 393 } finally { 394 destinationsLock.readLock().unlock(); 395 } 396 397 List<Destination> removeList = new ArrayList<Destination>(); 398 for (Destination dest : addList) { 399 try { 400 dest.addSubscription(context, sub); 401 removeList.add(dest); 402 } catch (SecurityException e){ 403 if (sub.isWildcard()) { 404 LOG.debug("Subscription denied for " + sub + " to destination " + 405 dest.getActiveMQDestination() + ": " + e.getMessage()); 406 } else { 407 // remove partial subscriptions 408 for (Destination remove : removeList) { 409 try { 410 remove.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 411 } catch (Exception ex) { 412 LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex); 413 } 414 } 415 throw e; 416 } 417 } 418 } 419 removeList.clear(); 420 421 if (info.isBrowser()) { 422 ((QueueBrowserSubscription) sub).destinationsAdded(); 423 } 424 425 subscriptions.put(info.getConsumerId(), sub); 426 427 return sub; 428 } 429 } 430 431 /** 432 * Get all the Destinations that are in storage 433 * 434 * @return Set of all stored destinations 435 */ 436 @SuppressWarnings("rawtypes") 437 public Set getDurableDestinations() { 438 return destinationFactory.getDestinations(); 439 } 440 441 /** 442 * @return all Destinations that don't have active consumers 443 */ 444 protected Set<ActiveMQDestination> getInactiveDestinations() { 445 Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations(); 446 destinationsLock.readLock().lock(); 447 try { 448 inactiveDests.removeAll(destinations.keySet()); 449 } finally { 450 destinationsLock.readLock().unlock(); 451 } 452 return inactiveDests; 453 } 454 455 @Override 456 @SuppressWarnings("unchecked") 457 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 458 LOG.debug("{} removing consumer: {} for destination: {}", new Object[]{ broker.getBrokerName(), info.getConsumerId(), info.getDestination() }); 459 460 Subscription sub = subscriptions.remove(info.getConsumerId()); 461 // The sub could be removed elsewhere - see ConnectionSplitBroker 462 if (sub != null) { 463 464 // remove the subscription from all the matching queues. 465 List<Destination> removeList = new ArrayList<Destination>(); 466 destinationsLock.readLock().lock(); 467 try { 468 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 469 removeList.add(dest); 470 } 471 } finally { 472 destinationsLock.readLock().unlock(); 473 } 474 for (Destination dest : removeList) { 475 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId()); 476 } 477 478 destroySubscription(sub); 479 } 480 synchronized (consumerChangeMutexMap) { 481 consumerChangeMutexMap.remove(info.getConsumerId()); 482 } 483 } 484 485 protected void destroySubscription(Subscription sub) { 486 sub.destroy(); 487 } 488 489 @Override 490 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 491 throw new JMSException("Invalid operation."); 492 } 493 494 @Override 495 public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 496 final ConnectionContext context = producerExchange.getConnectionContext(); 497 498 if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { 499 final Destination regionDestination = lookup(context, messageSend.getDestination(),false); 500 producerExchange.setRegionDestination(regionDestination); 501 } 502 503 producerExchange.getRegionDestination().send(producerExchange, messageSend); 504 505 if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){ 506 producerExchange.getProducerState().getInfo().incrementSentCount(); 507 } 508 } 509 510 @Override 511 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 512 Subscription sub = consumerExchange.getSubscription(); 513 if (sub == null) { 514 sub = subscriptions.get(ack.getConsumerId()); 515 if (sub == null) { 516 if (!consumerExchange.getConnectionContext().isInRecoveryMode()) { 517 LOG.warn("Ack for non existent subscription, ack: {}", ack); 518 throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); 519 } else { 520 LOG.debug("Ack for non existent subscription in recovery, ack: {}", ack); 521 return; 522 } 523 } 524 consumerExchange.setSubscription(sub); 525 } 526 sub.acknowledge(consumerExchange.getConnectionContext(), ack); 527 } 528 529 @Override 530 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 531 Subscription sub = subscriptions.get(pull.getConsumerId()); 532 if (sub == null) { 533 throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId()); 534 } 535 return sub.pullMessage(context, pull); 536 } 537 538 protected Destination lookup(ConnectionContext context, ActiveMQDestination destination,boolean createTemporary) throws Exception { 539 Destination dest = null; 540 541 destinationsLock.readLock().lock(); 542 try { 543 dest = destinations.get(destination); 544 } finally { 545 destinationsLock.readLock().unlock(); 546 } 547 548 if (dest == null) { 549 if (isAutoCreateDestinations()) { 550 // Try to auto create the destination... re-invoke broker 551 // from the 552 // top so that the proper security checks are performed. 553 context.getBroker().addDestination(context, destination, createTemporary); 554 dest = addDestination(context, destination, false); 555 // We should now have the dest created. 556 destinationsLock.readLock().lock(); 557 try { 558 dest = destinations.get(destination); 559 } finally { 560 destinationsLock.readLock().unlock(); 561 } 562 } 563 564 if (dest == null) { 565 throw new JMSException("The destination " + destination + " does not exist."); 566 } 567 } 568 return dest; 569 } 570 571 @Override 572 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 573 Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId()); 574 if (sub != null) { 575 sub.processMessageDispatchNotification(messageDispatchNotification); 576 } else { 577 throw new JMSException("Slave broker out of sync with master - Subscription: " 578 + messageDispatchNotification.getConsumerId() + " on " 579 + messageDispatchNotification.getDestination() + " does not exist for dispatch of message: " 580 + messageDispatchNotification.getMessageId()); 581 } 582 } 583 584 /* 585 * For a Queue/TempQueue, dispatch order is imperative to match acks, so the 586 * dispatch is deferred till the notification to ensure that the 587 * subscription chosen by the master is used. AMQ-2102 588 */ 589 protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) 590 throws Exception { 591 Destination dest = null; 592 destinationsLock.readLock().lock(); 593 try { 594 dest = destinations.get(messageDispatchNotification.getDestination()); 595 } finally { 596 destinationsLock.readLock().unlock(); 597 } 598 599 if (dest != null) { 600 dest.processDispatchNotification(messageDispatchNotification); 601 } else { 602 throw new JMSException("Slave broker out of sync with master - Destination: " 603 + messageDispatchNotification.getDestination() + " does not exist for consumer " 604 + messageDispatchNotification.getConsumerId() + " with message: " 605 + messageDispatchNotification.getMessageId()); 606 } 607 } 608 609 @Override 610 public void gc() { 611 for (Subscription sub : subscriptions.values()) { 612 sub.gc(); 613 } 614 615 destinationsLock.readLock().lock(); 616 try { 617 for (Destination dest : destinations.values()) { 618 dest.gc(); 619 } 620 } finally { 621 destinationsLock.readLock().unlock(); 622 } 623 } 624 625 protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception; 626 627 protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) 628 throws Exception { 629 return destinationFactory.createDestination(context, destination, destinationStatistics); 630 } 631 632 public boolean isAutoCreateDestinations() { 633 return autoCreateDestinations; 634 } 635 636 public void setAutoCreateDestinations(boolean autoCreateDestinations) { 637 this.autoCreateDestinations = autoCreateDestinations; 638 } 639 640 @Override 641 @SuppressWarnings("unchecked") 642 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 643 destinationsLock.readLock().lock(); 644 try { 645 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 646 dest.addProducer(context, info); 647 } 648 } finally { 649 destinationsLock.readLock().unlock(); 650 } 651 } 652 653 /** 654 * Removes a Producer. 655 * 656 * @param context 657 * the environment the operation is being executed under. 658 * @throws Exception 659 * TODO 660 */ 661 @Override 662 @SuppressWarnings("unchecked") 663 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 664 destinationsLock.readLock().lock(); 665 try { 666 for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { 667 dest.removeProducer(context, info); 668 } 669 } finally { 670 destinationsLock.readLock().unlock(); 671 } 672 } 673 674 protected void dispose(ConnectionContext context, Destination dest) throws Exception { 675 dest.dispose(context); 676 dest.stop(); 677 destinationFactory.removeDestination(dest); 678 } 679 680 @Override 681 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 682 Subscription sub = subscriptions.get(control.getConsumerId()); 683 if (sub != null && sub instanceof AbstractSubscription) { 684 ((AbstractSubscription) sub).setPrefetchSize(control.getPrefetch()); 685 if (broker.getDestinationPolicy() != null) { 686 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(control.getDestination()); 687 if (entry != null) { 688 entry.configurePrefetch(sub); 689 } 690 } 691 LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()}); 692 try { 693 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup(); 694 } catch (Exception e) { 695 LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: {}", control.getDestination(), e); 696 } 697 } 698 } 699 700 @Override 701 public void reapplyInterceptor() { 702 destinationsLock.writeLock().lock(); 703 try { 704 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); 705 Map<ActiveMQDestination, Destination> map = getDestinationMap(); 706 for (ActiveMQDestination key : map.keySet()) { 707 Destination destination = map.get(key); 708 if (destination instanceof CompositeDestinationFilter) { 709 destination = ((CompositeDestinationFilter) destination).next; 710 } 711 if (destinationInterceptor != null) { 712 destination = destinationInterceptor.intercept(destination); 713 } 714 getDestinationMap().put(key, destination); 715 Destination prev = destinations.put(key, destination); 716 if (prev == null) { 717 updateRegionDestCounts(key, 1); 718 } 719 } 720 } finally { 721 destinationsLock.writeLock().unlock(); 722 } 723 } 724}