001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.region.policy.DispatchPolicy; 036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 040import org.apache.activemq.broker.util.InsertionCountList; 041import org.apache.activemq.command.ActiveMQDestination; 042import org.apache.activemq.command.ConsumerInfo; 043import org.apache.activemq.command.ExceptionResponse; 044import org.apache.activemq.command.Message; 045import org.apache.activemq.command.MessageAck; 046import org.apache.activemq.command.MessageId; 047import org.apache.activemq.command.ProducerAck; 048import org.apache.activemq.command.ProducerInfo; 049import org.apache.activemq.command.Response; 050import org.apache.activemq.command.SubscriptionInfo; 051import org.apache.activemq.filter.MessageEvaluationContext; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.store.MessageRecoveryListener; 054import org.apache.activemq.store.NoLocalSubscriptionAware; 055import org.apache.activemq.store.PersistenceAdapter; 056import org.apache.activemq.store.TopicMessageStore; 057import org.apache.activemq.thread.Task; 058import org.apache.activemq.thread.TaskRunner; 059import org.apache.activemq.thread.TaskRunnerFactory; 060import org.apache.activemq.transaction.Synchronization; 061import org.apache.activemq.util.SubscriptionKey; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * The Topic is a destination that sends a copy of a message to every active 067 * Subscription registered. 068 */ 069public class Topic extends BaseDestination implements Task { 070 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 071 private final TopicMessageStore topicStore; 072 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 073 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 074 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 075 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 076 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 077 private final TaskRunner taskRunner; 078 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 079 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 080 @Override 081 public void run() { 082 try { 083 Topic.this.taskRunner.wakeup(); 084 } catch (InterruptedException e) { 085 } 086 }; 087 }; 088 089 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 090 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 091 super(brokerService, store, destination, parentStats); 092 this.topicStore = store; 093 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 094 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 095 } 096 097 @Override 098 public void initialize() throws Exception { 099 super.initialize(); 100 // set non default subscription recovery policy (override policyEntries) 101 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 102 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 103 setAlwaysRetroactive(true); 104 } 105 if (store != null) { 106 // AMQ-2586: Better to leave this stat at zero than to give the user 107 // misleading metrics. 108 // int messageCount = store.getMessageCount(); 109 // destinationStatistics.getMessages().setCount(messageCount); 110 store.start(); 111 } 112 } 113 114 @Override 115 public List<Subscription> getConsumers() { 116 synchronized (consumers) { 117 return new ArrayList<Subscription>(consumers); 118 } 119 } 120 121 public boolean lock(MessageReference node, LockOwner sub) { 122 return true; 123 } 124 125 @Override 126 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 127 if (!sub.getConsumerInfo().isDurable()) { 128 129 // Do a retroactive recovery if needed. 130 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 131 132 // synchronize with dispatch method so that no new messages are sent 133 // while we are recovering a subscription to avoid out of order messages. 134 dispatchLock.writeLock().lock(); 135 try { 136 boolean applyRecovery = false; 137 synchronized (consumers) { 138 if (!consumers.contains(sub)){ 139 sub.add(context, this); 140 consumers.add(sub); 141 applyRecovery=true; 142 super.addSubscription(context, sub); 143 } 144 } 145 if (applyRecovery){ 146 subscriptionRecoveryPolicy.recover(context, this, sub); 147 } 148 } finally { 149 dispatchLock.writeLock().unlock(); 150 } 151 152 } else { 153 synchronized (consumers) { 154 if (!consumers.contains(sub)){ 155 sub.add(context, this); 156 consumers.add(sub); 157 super.addSubscription(context, sub); 158 } 159 } 160 } 161 } else { 162 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 163 super.addSubscription(context, sub); 164 sub.add(context, this); 165 if(dsub.isActive()) { 166 synchronized (consumers) { 167 boolean hasSubscription = false; 168 169 if (consumers.size() == 0) { 170 hasSubscription = false; 171 } else { 172 for (Subscription currentSub : consumers) { 173 if (currentSub.getConsumerInfo().isDurable()) { 174 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 175 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 176 hasSubscription = true; 177 break; 178 } 179 } 180 } 181 } 182 183 if (!hasSubscription) { 184 consumers.add(sub); 185 } 186 } 187 } 188 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 189 } 190 } 191 192 @Override 193 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 194 if (!sub.getConsumerInfo().isDurable()) { 195 super.removeSubscription(context, sub, lastDeliveredSequenceId); 196 synchronized (consumers) { 197 consumers.remove(sub); 198 } 199 } 200 sub.remove(context, this); 201 } 202 203 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 204 if (topicStore != null) { 205 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 206 DurableTopicSubscription removed = durableSubscribers.remove(key); 207 if (removed != null) { 208 destinationStatistics.getConsumers().decrement(); 209 // deactivate and remove 210 removed.deactivate(false, 0l); 211 consumers.remove(removed); 212 } 213 } 214 } 215 216 private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { 217 if (hasSelectorChanged(info1, info2)) { 218 return true; 219 } 220 221 return hasNoLocalChanged(info1, info2); 222 } 223 224 private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { 225 //Not all persistence adapters store the noLocal value for a subscription 226 PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter(); 227 if (adapter instanceof NoLocalSubscriptionAware) { 228 if (info1.isNoLocal() ^ info2.isNoLocal()) { 229 return true; 230 } 231 } 232 233 return false; 234 } 235 236 private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) { 237 if (info1.getSelector() != null ^ info2.getSelector() != null) { 238 return true; 239 } 240 241 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 242 return true; 243 } 244 245 return false; 246 } 247 248 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 249 // synchronize with dispatch method so that no new messages are sent 250 // while we are recovering a subscription to avoid out of order messages. 251 dispatchLock.writeLock().lock(); 252 try { 253 254 if (topicStore == null) { 255 return; 256 } 257 258 // Recover the durable subscription. 259 String clientId = subscription.getSubscriptionKey().getClientId(); 260 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 261 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 262 if (info != null) { 263 // Check to see if selector changed. 264 if (hasDurableSubChanged(info, subscription.getConsumerInfo())) { 265 // Need to delete the subscription 266 topicStore.deleteSubscription(clientId, subscriptionName); 267 info = null; 268 // Force a rebuild of the selector chain for the subscription otherwise 269 // the stored subscription is updated but the selector expression is not 270 // and the subscription will not behave according to the new configuration. 271 subscription.setSelector(subscription.getConsumerInfo().getSelector()); 272 synchronized (consumers) { 273 consumers.remove(subscription); 274 } 275 } else { 276 synchronized (consumers) { 277 if (!consumers.contains(subscription)) { 278 consumers.add(subscription); 279 } 280 } 281 } 282 } 283 284 // Do we need to create the subscription? 285 if (info == null) { 286 info = new SubscriptionInfo(); 287 info.setClientId(clientId); 288 info.setSelector(subscription.getConsumerInfo().getSelector()); 289 info.setSubscriptionName(subscriptionName); 290 info.setDestination(getActiveMQDestination()); 291 info.setNoLocal(subscription.getConsumerInfo().isNoLocal()); 292 // This destination is an actual destination id. 293 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 294 // This destination might be a pattern 295 synchronized (consumers) { 296 consumers.add(subscription); 297 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 298 } 299 } 300 301 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 302 msgContext.setDestination(destination); 303 if (subscription.isRecoveryRequired()) { 304 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 305 @Override 306 public boolean recoverMessage(Message message) throws Exception { 307 message.setRegionDestination(Topic.this); 308 try { 309 msgContext.setMessageReference(message); 310 if (subscription.matches(message, msgContext)) { 311 subscription.add(message); 312 } 313 } catch (IOException e) { 314 LOG.error("Failed to recover this message {}", message, e); 315 } 316 return true; 317 } 318 319 @Override 320 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 321 throw new RuntimeException("Should not be called."); 322 } 323 324 @Override 325 public boolean hasSpace() { 326 return true; 327 } 328 329 @Override 330 public boolean isDuplicate(MessageId id) { 331 return false; 332 } 333 }); 334 } 335 } finally { 336 dispatchLock.writeLock().unlock(); 337 } 338 } 339 340 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 341 synchronized (consumers) { 342 consumers.remove(sub); 343 } 344 sub.remove(context, this, dispatched); 345 } 346 347 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 348 if (subscription.getConsumerInfo().isRetroactive()) { 349 subscriptionRecoveryPolicy.recover(context, this, subscription); 350 } 351 } 352 353 @Override 354 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 355 final ConnectionContext context = producerExchange.getConnectionContext(); 356 357 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 358 producerExchange.incrementSend(); 359 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 360 && !context.isInRecoveryMode(); 361 362 message.setRegionDestination(this); 363 364 // There is delay between the client sending it and it arriving at the 365 // destination.. it may have expired. 366 if (message.isExpired()) { 367 broker.messageExpired(context, message, null); 368 getDestinationStatistics().getExpired().increment(); 369 if (sendProducerAck) { 370 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 371 context.getConnection().dispatchAsync(ack); 372 } 373 return; 374 } 375 376 if (memoryUsage.isFull()) { 377 isFull(context, memoryUsage); 378 fastProducer(context, producerInfo); 379 380 if (isProducerFlowControl() && context.isProducerFlowControl()) { 381 382 if (warnOnProducerFlowControl) { 383 warnOnProducerFlowControl = false; 384 LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 385 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 386 } 387 388 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 389 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 390 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 391 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 392 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 393 } 394 395 // We can avoid blocking due to low usage if the producer is sending a sync message or 396 // if it is using a producer window 397 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 398 synchronized (messagesWaitingForSpace) { 399 messagesWaitingForSpace.add(new Runnable() { 400 @Override 401 public void run() { 402 try { 403 404 // While waiting for space to free up... the 405 // message may have expired. 406 if (message.isExpired()) { 407 broker.messageExpired(context, message, null); 408 getDestinationStatistics().getExpired().increment(); 409 } else { 410 doMessageSend(producerExchange, message); 411 } 412 413 if (sendProducerAck) { 414 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 415 .getSize()); 416 context.getConnection().dispatchAsync(ack); 417 } else { 418 Response response = new Response(); 419 response.setCorrelationId(message.getCommandId()); 420 context.getConnection().dispatchAsync(response); 421 } 422 423 } catch (Exception e) { 424 if (!sendProducerAck && !context.isInRecoveryMode()) { 425 ExceptionResponse response = new ExceptionResponse(e); 426 response.setCorrelationId(message.getCommandId()); 427 context.getConnection().dispatchAsync(response); 428 } 429 } 430 } 431 }); 432 433 registerCallbackForNotFullNotification(); 434 context.setDontSendReponse(true); 435 return; 436 } 437 438 } else { 439 // Producer flow control cannot be used, so we have do the flow control 440 // at the broker by blocking this thread until there is space available. 441 442 if (memoryUsage.isFull()) { 443 if (context.isInTransaction()) { 444 445 int count = 0; 446 while (!memoryUsage.waitForSpace(1000)) { 447 if (context.getStopping().get()) { 448 throw new IOException("Connection closed, send aborted."); 449 } 450 if (count > 2 && context.isInTransaction()) { 451 count = 0; 452 int size = context.getTransaction().size(); 453 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 454 } 455 count++; 456 } 457 } else { 458 waitForSpace( 459 context, 460 producerExchange, 461 memoryUsage, 462 "Usage Manager Memory Usage limit reached. Stopping producer (" 463 + message.getProducerId() 464 + ") to prevent flooding " 465 + getActiveMQDestination().getQualifiedName() 466 + "." 467 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 468 } 469 } 470 471 // The usage manager could have delayed us by the time 472 // we unblock the message could have expired.. 473 if (message.isExpired()) { 474 getDestinationStatistics().getExpired().increment(); 475 LOG.debug("Expired message: {}", message); 476 return; 477 } 478 } 479 } 480 } 481 482 doMessageSend(producerExchange, message); 483 messageDelivered(context, message); 484 if (sendProducerAck) { 485 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 486 context.getConnection().dispatchAsync(ack); 487 } 488 } 489 490 /** 491 * do send the message - this needs to be synchronized to ensure messages 492 * are stored AND dispatched in the right order 493 * 494 * @param producerExchange 495 * @param message 496 * @throws IOException 497 * @throws Exception 498 */ 499 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 500 throws IOException, Exception { 501 final ConnectionContext context = producerExchange.getConnectionContext(); 502 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 503 Future<Object> result = null; 504 505 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 506 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 507 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 508 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 509 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 510 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 511 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 512 throw new javax.jms.ResourceAllocationException(logMessage); 513 } 514 515 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 516 } 517 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 518 519 //Moved the reduceMemoryfootprint clearing to the dispatch method 520 } 521 522 message.incrementReferenceCount(); 523 524 if (context.isInTransaction()) { 525 context.getTransaction().addSynchronization(new Synchronization() { 526 @Override 527 public void afterCommit() throws Exception { 528 // It could take while before we receive the commit 529 // operation.. by that time the message could have 530 // expired.. 531 if (message.isExpired()) { 532 if (broker.isExpired(message)) { 533 getDestinationStatistics().getExpired().increment(); 534 broker.messageExpired(context, message, null); 535 } 536 message.decrementReferenceCount(); 537 return; 538 } 539 try { 540 dispatch(context, message); 541 } finally { 542 message.decrementReferenceCount(); 543 } 544 } 545 546 @Override 547 public void afterRollback() throws Exception { 548 message.decrementReferenceCount(); 549 } 550 }); 551 552 } else { 553 try { 554 dispatch(context, message); 555 } finally { 556 message.decrementReferenceCount(); 557 } 558 } 559 560 if (result != null && !result.isCancelled()) { 561 try { 562 result.get(); 563 } catch (CancellationException e) { 564 // ignore - the task has been cancelled if the message 565 // has already been deleted 566 } 567 } 568 } 569 570 private boolean canOptimizeOutPersistence() { 571 return durableSubscribers.size() == 0; 572 } 573 574 @Override 575 public String toString() { 576 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 577 } 578 579 @Override 580 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 581 final MessageReference node) throws IOException { 582 if (topicStore != null && node.isPersistent()) { 583 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 584 SubscriptionKey key = dsub.getSubscriptionKey(); 585 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 586 convertToNonRangedAck(ack, node)); 587 } 588 messageConsumed(context, node); 589 } 590 591 @Override 592 public void gc() { 593 } 594 595 public Message loadMessage(MessageId messageId) throws IOException { 596 return topicStore != null ? topicStore.getMessage(messageId) : null; 597 } 598 599 @Override 600 public void start() throws Exception { 601 this.subscriptionRecoveryPolicy.start(); 602 if (memoryUsage != null) { 603 memoryUsage.start(); 604 } 605 606 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 607 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 608 } 609 } 610 611 @Override 612 public void stop() throws Exception { 613 if (taskRunner != null) { 614 taskRunner.shutdown(); 615 } 616 this.subscriptionRecoveryPolicy.stop(); 617 if (memoryUsage != null) { 618 memoryUsage.stop(); 619 } 620 if (this.topicStore != null) { 621 this.topicStore.stop(); 622 } 623 624 scheduler.cancel(expireMessagesTask); 625 } 626 627 @Override 628 public Message[] browse() { 629 final List<Message> result = new ArrayList<Message>(); 630 doBrowse(result, getMaxBrowsePageSize()); 631 return result.toArray(new Message[result.size()]); 632 } 633 634 private void doBrowse(final List<Message> browseList, final int max) { 635 try { 636 if (topicStore != null) { 637 final List<Message> toExpire = new ArrayList<Message>(); 638 topicStore.recover(new MessageRecoveryListener() { 639 @Override 640 public boolean recoverMessage(Message message) throws Exception { 641 if (message.isExpired()) { 642 toExpire.add(message); 643 } 644 browseList.add(message); 645 return true; 646 } 647 648 @Override 649 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 650 return true; 651 } 652 653 @Override 654 public boolean hasSpace() { 655 return browseList.size() < max; 656 } 657 658 @Override 659 public boolean isDuplicate(MessageId id) { 660 return false; 661 } 662 }); 663 final ConnectionContext connectionContext = createConnectionContext(); 664 for (Message message : toExpire) { 665 for (DurableTopicSubscription sub : durableSubscribers.values()) { 666 if (!sub.isActive()) { 667 message.setRegionDestination(this); 668 messageExpired(connectionContext, sub, message); 669 } 670 } 671 } 672 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 673 if (msgs != null) { 674 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 675 browseList.add(msgs[i]); 676 } 677 } 678 } 679 } catch (Throwable e) { 680 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 681 } 682 } 683 684 @Override 685 public boolean iterate() { 686 synchronized (messagesWaitingForSpace) { 687 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 688 Runnable op = messagesWaitingForSpace.removeFirst(); 689 op.run(); 690 } 691 692 if (!messagesWaitingForSpace.isEmpty()) { 693 registerCallbackForNotFullNotification(); 694 } 695 } 696 return false; 697 } 698 699 private void registerCallbackForNotFullNotification() { 700 // If the usage manager is not full, then the task will not 701 // get called.. 702 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 703 // so call it directly here. 704 sendMessagesWaitingForSpaceTask.run(); 705 } 706 } 707 708 // Properties 709 // ------------------------------------------------------------------------- 710 711 public DispatchPolicy getDispatchPolicy() { 712 return dispatchPolicy; 713 } 714 715 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 716 this.dispatchPolicy = dispatchPolicy; 717 } 718 719 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 720 return subscriptionRecoveryPolicy; 721 } 722 723 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 724 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 725 // allow users to combine retained message policy with other ActiveMQ policies 726 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 727 policy.setWrapped(recoveryPolicy); 728 } else { 729 this.subscriptionRecoveryPolicy = recoveryPolicy; 730 } 731 } 732 733 // Implementation methods 734 // ------------------------------------------------------------------------- 735 736 @Override 737 public final void wakeup() { 738 } 739 740 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 741 // AMQ-2586: Better to leave this stat at zero than to give the user 742 // misleading metrics. 743 // destinationStatistics.getMessages().increment(); 744 destinationStatistics.getEnqueues().increment(); 745 destinationStatistics.getMessageSize().addSize(message.getSize()); 746 MessageEvaluationContext msgContext = null; 747 748 dispatchLock.readLock().lock(); 749 try { 750 if (!subscriptionRecoveryPolicy.add(context, message)) { 751 return; 752 } 753 synchronized (consumers) { 754 if (consumers.isEmpty()) { 755 onMessageWithNoConsumers(context, message); 756 return; 757 } 758 } 759 760 // Clear memory before dispatch - need to clear here because the call to 761 //subscriptionRecoveryPolicy.add() will unmarshall the state 762 if (isReduceMemoryFootprint() && message.isMarshalled()) { 763 message.clearUnMarshalledState(); 764 } 765 766 msgContext = context.getMessageEvaluationContext(); 767 msgContext.setDestination(destination); 768 msgContext.setMessageReference(message); 769 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 770 onMessageWithNoConsumers(context, message); 771 } 772 773 } finally { 774 dispatchLock.readLock().unlock(); 775 if (msgContext != null) { 776 msgContext.clear(); 777 } 778 } 779 } 780 781 private final Runnable expireMessagesTask = new Runnable() { 782 @Override 783 public void run() { 784 List<Message> browsedMessages = new InsertionCountList<Message>(); 785 doBrowse(browsedMessages, getMaxExpirePageSize()); 786 } 787 }; 788 789 @Override 790 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 791 broker.messageExpired(context, reference, subs); 792 // AMQ-2586: Better to leave this stat at zero than to give the user 793 // misleading metrics. 794 // destinationStatistics.getMessages().decrement(); 795 destinationStatistics.getExpired().increment(); 796 MessageAck ack = new MessageAck(); 797 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 798 ack.setDestination(destination); 799 ack.setMessageID(reference.getMessageId()); 800 try { 801 if (subs instanceof DurableTopicSubscription) { 802 ((DurableTopicSubscription)subs).removePending(reference); 803 } 804 acknowledge(context, subs, ack, reference); 805 } catch (Exception e) { 806 LOG.error("Failed to remove expired Message from the store ", e); 807 } 808 } 809 810 @Override 811 protected Logger getLog() { 812 return LOG; 813 } 814 815 protected boolean isOptimizeStorage(){ 816 boolean result = false; 817 818 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 819 result = true; 820 for (DurableTopicSubscription s : durableSubscribers.values()) { 821 if (s.isActive()== false){ 822 result = false; 823 break; 824 } 825 if (s.getPrefetchSize()==0){ 826 result = false; 827 break; 828 } 829 if (s.isSlowConsumer()){ 830 result = false; 831 break; 832 } 833 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 834 result = false; 835 break; 836 } 837 } 838 } 839 return result; 840 } 841 842 /** 843 * force a reread of the store - after transaction recovery completion 844 */ 845 @Override 846 public void clearPendingMessages() { 847 dispatchLock.readLock().lock(); 848 try { 849 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 850 clearPendingAndDispatch(durableTopicSubscription); 851 } 852 } finally { 853 dispatchLock.readLock().unlock(); 854 } 855 } 856 857 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 858 synchronized (durableTopicSubscription.pendingLock) { 859 durableTopicSubscription.pending.clear(); 860 try { 861 durableTopicSubscription.dispatchPending(); 862 } catch (IOException exception) { 863 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 864 durableTopicSubscription, 865 destination, 866 durableTopicSubscription.pending }, exception); 867 } 868 } 869 } 870 871 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 872 return durableSubscribers; 873 } 874}