001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.Set; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.BrokerServiceAware; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.scheduler.JobSchedulerStore; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQQueue; 034import org.apache.activemq.command.ActiveMQTempQueue; 035import org.apache.activemq.command.ActiveMQTempTopic; 036import org.apache.activemq.command.ActiveMQTopic; 037import org.apache.activemq.command.Message; 038import org.apache.activemq.command.MessageAck; 039import org.apache.activemq.command.MessageId; 040import org.apache.activemq.command.ProducerId; 041import org.apache.activemq.command.SubscriptionInfo; 042import org.apache.activemq.command.TransactionId; 043import org.apache.activemq.command.XATransactionId; 044import org.apache.activemq.openwire.OpenWireFormat; 045import org.apache.activemq.protobuf.Buffer; 046import org.apache.activemq.store.AbstractMessageStore; 047import org.apache.activemq.store.MessageRecoveryListener; 048import org.apache.activemq.store.MessageStore; 049import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 050import org.apache.activemq.store.PersistenceAdapter; 051import org.apache.activemq.store.TopicMessageStore; 052import org.apache.activemq.store.TransactionRecoveryListener; 053import org.apache.activemq.store.TransactionStore; 054import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 055import org.apache.activemq.store.kahadb.data.KahaDestination; 056import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 057import org.apache.activemq.store.kahadb.data.KahaLocation; 058import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 059import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 060import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 061import org.apache.activemq.store.kahadb.disk.journal.Location; 062import org.apache.activemq.store.kahadb.disk.page.Transaction; 063import org.apache.activemq.usage.MemoryUsage; 064import org.apache.activemq.usage.SystemUsage; 065import org.apache.activemq.util.ByteSequence; 066import org.apache.activemq.wireformat.WireFormat; 067 068public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware { 069 070 private final WireFormat wireFormat = new OpenWireFormat(); 071 private BrokerService brokerService; 072 073 @Override 074 public void setBrokerName(String brokerName) { 075 } 076 @Override 077 public void setUsageManager(SystemUsage usageManager) { 078 } 079 080 @Override 081 public TransactionStore createTransactionStore() throws IOException { 082 return new TransactionStore(){ 083 084 @Override 085 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 086 if (preCommit != null) { 087 preCommit.run(); 088 } 089 processCommit(txid); 090 if (postCommit != null) { 091 postCommit.run(); 092 } 093 } 094 @Override 095 public void prepare(TransactionId txid) throws IOException { 096 processPrepare(txid); 097 } 098 @Override 099 public void rollback(TransactionId txid) throws IOException { 100 processRollback(txid); 101 } 102 @Override 103 public void recover(TransactionRecoveryListener listener) throws IOException { 104 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) { 105 XATransactionId xid = (XATransactionId)entry.getKey(); 106 ArrayList<Message> messageList = new ArrayList<Message>(); 107 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 108 109 for (Operation op : entry.getValue()) { 110 if( op.getClass() == AddOpperation.class ) { 111 AddOpperation addOp = (AddOpperation)op; 112 Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) ); 113 messageList.add(msg); 114 } else { 115 RemoveOpperation rmOp = (RemoveOpperation)op; 116 MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) ); 117 ackList.add(ack); 118 } 119 } 120 121 Message[] addedMessages = new Message[messageList.size()]; 122 MessageAck[] acks = new MessageAck[ackList.size()]; 123 messageList.toArray(addedMessages); 124 ackList.toArray(acks); 125 listener.recover(xid, addedMessages, acks); 126 } 127 } 128 @Override 129 public void start() throws Exception { 130 } 131 @Override 132 public void stop() throws Exception { 133 } 134 }; 135 } 136 137 public class KahaDBMessageStore extends AbstractMessageStore { 138 protected KahaDestination dest; 139 140 public KahaDBMessageStore(ActiveMQDestination destination) { 141 super(destination); 142 this.dest = convert( destination ); 143 } 144 145 @Override 146 public ActiveMQDestination getDestination() { 147 return destination; 148 } 149 150 @Override 151 public void addMessage(ConnectionContext context, Message message) throws IOException { 152 KahaAddMessageCommand command = new KahaAddMessageCommand(); 153 command.setDestination(dest); 154 command.setMessageId(message.getMessageId().toProducerKey()); 155 processAdd(command, message.getTransactionId(), wireFormat.marshal(message)); 156 } 157 158 @Override 159 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 160 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 161 command.setDestination(dest); 162 command.setMessageId(ack.getLastMessageId().toProducerKey()); 163 processRemove(command, ack.getTransactionId()); 164 } 165 166 @Override 167 public void removeAllMessages(ConnectionContext context) throws IOException { 168 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 169 command.setDestination(dest); 170 process(command); 171 } 172 173 @Override 174 public Message getMessage(MessageId identity) throws IOException { 175 final String key = identity.toProducerKey(); 176 177 // Hopefully one day the page file supports concurrent read operations... but for now we must 178 // externally synchronize... 179 ByteSequence data; 180 synchronized(indexMutex) { 181 data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){ 182 @Override 183 public ByteSequence execute(Transaction tx) throws IOException { 184 StoredDestination sd = getStoredDestination(dest, tx); 185 Long sequence = sd.messageIdIndex.get(tx, key); 186 if( sequence ==null ) { 187 return null; 188 } 189 return sd.orderIndex.get(tx, sequence).data; 190 } 191 }); 192 } 193 if( data == null ) { 194 return null; 195 } 196 197 Message msg = (Message)wireFormat.unmarshal( data ); 198 return msg; 199 } 200 201 @Override 202 public void recover(final MessageRecoveryListener listener) throws Exception { 203 synchronized(indexMutex) { 204 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 205 @Override 206 public void execute(Transaction tx) throws Exception { 207 StoredDestination sd = getStoredDestination(dest, tx); 208 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 209 Entry<Long, MessageRecord> entry = iterator.next(); 210 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) ); 211 } 212 } 213 }); 214 } 215 } 216 217 long cursorPos=0; 218 219 @Override 220 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 221 synchronized(indexMutex) { 222 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 223 @Override 224 public void execute(Transaction tx) throws Exception { 225 StoredDestination sd = getStoredDestination(dest, tx); 226 Entry<Long, MessageRecord> entry=null; 227 int counter = 0; 228 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 229 entry = iterator.next(); 230 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 231 counter++; 232 if( counter >= maxReturned ) { 233 break; 234 } 235 } 236 if( entry!=null ) { 237 cursorPos = entry.getKey()+1; 238 } 239 } 240 }); 241 } 242 } 243 244 @Override 245 public void resetBatching() { 246 cursorPos=0; 247 } 248 249 250 @Override 251 public void setBatch(MessageId identity) throws IOException { 252 final String key = identity.toProducerKey(); 253 254 // Hopefully one day the page file supports concurrent read operations... but for now we must 255 // externally synchronize... 256 Long location; 257 synchronized(indexMutex) { 258 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){ 259 @Override 260 public Long execute(Transaction tx) throws IOException { 261 StoredDestination sd = getStoredDestination(dest, tx); 262 return sd.messageIdIndex.get(tx, key); 263 } 264 }); 265 } 266 if( location!=null ) { 267 cursorPos=location+1; 268 } 269 270 } 271 272 @Override 273 public void setMemoryUsage(MemoryUsage memoryUsage) { 274 } 275 @Override 276 public void start() throws Exception { 277 } 278 @Override 279 public void stop() throws Exception { 280 } 281 282 @Override 283 public void recoverMessageStoreStatistics() throws IOException { 284 int count = 0; 285 synchronized(indexMutex) { 286 count = pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 287 @Override 288 public Integer execute(Transaction tx) throws IOException { 289 // Iterate through all index entries to get a count of messages in the destination. 290 StoredDestination sd = getStoredDestination(dest, tx); 291 int rc=0; 292 for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) { 293 iterator.next(); 294 rc++; 295 } 296 return rc; 297 } 298 }); 299 } 300 getMessageStoreStatistics().getMessageCount().setCount(count); 301 } 302 303 } 304 305 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 306 public KahaDBTopicMessageStore(ActiveMQTopic destination) { 307 super(destination); 308 } 309 310 @Override 311 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 312 MessageId messageId, MessageAck ack) throws IOException { 313 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 314 command.setDestination(dest); 315 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 316 command.setMessageId(messageId.toProducerKey()); 317 // We are not passed a transaction info.. so we can't participate in a transaction. 318 // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack 319 // to pass back to the XA recover method. 320 // command.setTransactionInfo(); 321 processRemove(command, null); 322 } 323 324 @Override 325 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 326 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); 327 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 328 command.setDestination(dest); 329 command.setSubscriptionKey(subscriptionKey); 330 command.setRetroactive(retroactive); 331 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 332 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 333 process(command); 334 } 335 336 @Override 337 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 338 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 339 command.setDestination(dest); 340 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 341 process(command); 342 } 343 344 @Override 345 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 346 347 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 348 synchronized(indexMutex) { 349 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 350 @Override 351 public void execute(Transaction tx) throws IOException { 352 StoredDestination sd = getStoredDestination(dest, tx); 353 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { 354 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 355 SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) ); 356 subscriptions.add(info); 357 358 } 359 } 360 }); 361 } 362 363 SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; 364 subscriptions.toArray(rc); 365 return rc; 366 } 367 368 @Override 369 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 370 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 371 synchronized(indexMutex) { 372 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){ 373 @Override 374 public SubscriptionInfo execute(Transaction tx) throws IOException { 375 StoredDestination sd = getStoredDestination(dest, tx); 376 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 377 if( command ==null ) { 378 return null; 379 } 380 return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) ); 381 } 382 }); 383 } 384 } 385 386 @Override 387 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 388 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 389 synchronized(indexMutex) { 390 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 391 @Override 392 public Integer execute(Transaction tx) throws IOException { 393 StoredDestination sd = getStoredDestination(dest, tx); 394 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 395 if ( cursorPos==null ) { 396 // The subscription might not exist. 397 return 0; 398 } 399 cursorPos += 1; 400 401 int counter = 0; 402 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 403 iterator.next(); 404 counter++; 405 } 406 return counter; 407 } 408 }); 409 } 410 } 411 412 @Override 413 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 414 return 0; 415 } 416 417 private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false); 418 419 @Override 420 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 421 return stats; 422 } 423 424 @Override 425 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 426 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 427 synchronized(indexMutex) { 428 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 429 @Override 430 public void execute(Transaction tx) throws Exception { 431 StoredDestination sd = getStoredDestination(dest, tx); 432 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 433 cursorPos += 1; 434 435 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 436 Entry<Long, MessageRecord> entry = iterator.next(); 437 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 438 } 439 } 440 }); 441 } 442 } 443 444 @Override 445 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { 446 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 447 synchronized(indexMutex) { 448 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 449 @Override 450 public void execute(Transaction tx) throws Exception { 451 StoredDestination sd = getStoredDestination(dest, tx); 452 Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); 453 if( cursorPos == null ) { 454 cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 455 cursorPos += 1; 456 } 457 458 Entry<Long, MessageRecord> entry=null; 459 int counter = 0; 460 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 461 entry = iterator.next(); 462 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 463 counter++; 464 if( counter >= maxReturned ) { 465 break; 466 } 467 } 468 if( entry!=null ) { 469 sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); 470 } 471 } 472 }); 473 } 474 } 475 476 @Override 477 public void resetBatching(String clientId, String subscriptionName) { 478 try { 479 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 480 synchronized(indexMutex) { 481 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 482 @Override 483 public void execute(Transaction tx) throws IOException { 484 StoredDestination sd = getStoredDestination(dest, tx); 485 sd.subscriptionCursors.remove(subscriptionKey); 486 } 487 }); 488 } 489 } catch (IOException e) { 490 throw new RuntimeException(e); 491 } 492 } 493 } 494 495 String subscriptionKey(String clientId, String subscriptionName){ 496 return clientId+":"+subscriptionName; 497 } 498 499 @Override 500 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 501 return new KahaDBMessageStore(destination); 502 } 503 504 @Override 505 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 506 return new KahaDBTopicMessageStore(destination); 507 } 508 509 /** 510 * Cleanup method to remove any state associated with the given destination. 511 * This method does not stop the message store (it might not be cached). 512 * 513 * @param destination Destination to forget 514 */ 515 @Override 516 public void removeQueueMessageStore(ActiveMQQueue destination) { 517 } 518 519 /** 520 * Cleanup method to remove any state associated with the given destination 521 * This method does not stop the message store (it might not be cached). 522 * 523 * @param destination Destination to forget 524 */ 525 @Override 526 public void removeTopicMessageStore(ActiveMQTopic destination) { 527 } 528 529 @Override 530 public void deleteAllMessages() throws IOException { 531 } 532 533 534 @Override 535 public Set<ActiveMQDestination> getDestinations() { 536 try { 537 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 538 synchronized(indexMutex) { 539 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 540 @Override 541 public void execute(Transaction tx) throws IOException { 542 for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) { 543 Entry<String, StoredDestination> entry = iterator.next(); 544 rc.add(convert(entry.getKey())); 545 } 546 } 547 }); 548 } 549 return rc; 550 } catch (IOException e) { 551 throw new RuntimeException(e); 552 } 553 } 554 555 @Override 556 public long getLastMessageBrokerSequenceId() throws IOException { 557 return 0; 558 } 559 560 @Override 561 public long size() { 562 if ( !started.get() ) { 563 return 0; 564 } 565 try { 566 return pageFile.getDiskSize(); 567 } catch (IOException e) { 568 throw new RuntimeException(e); 569 } 570 } 571 572 @Override 573 public void beginTransaction(ConnectionContext context) throws IOException { 574 throw new IOException("Not yet implemented."); 575 } 576 @Override 577 public void commitTransaction(ConnectionContext context) throws IOException { 578 throw new IOException("Not yet implemented."); 579 } 580 @Override 581 public void rollbackTransaction(ConnectionContext context) throws IOException { 582 throw new IOException("Not yet implemented."); 583 } 584 585 @Override 586 public void checkpoint(boolean sync) throws IOException { 587 } 588 589 /////////////////////////////////////////////////////////////////// 590 // Internal conversion methods. 591 /////////////////////////////////////////////////////////////////// 592 593 594 595 KahaLocation convert(Location location) { 596 KahaLocation rc = new KahaLocation(); 597 rc.setLogId(location.getDataFileId()); 598 rc.setOffset(location.getOffset()); 599 return rc; 600 } 601 602 KahaDestination convert(ActiveMQDestination dest) { 603 KahaDestination rc = new KahaDestination(); 604 rc.setName(dest.getPhysicalName()); 605 switch( dest.getDestinationType() ) { 606 case ActiveMQDestination.QUEUE_TYPE: 607 rc.setType(DestinationType.QUEUE); 608 return rc; 609 case ActiveMQDestination.TOPIC_TYPE: 610 rc.setType(DestinationType.TOPIC); 611 return rc; 612 case ActiveMQDestination.TEMP_QUEUE_TYPE: 613 rc.setType(DestinationType.TEMP_QUEUE); 614 return rc; 615 case ActiveMQDestination.TEMP_TOPIC_TYPE: 616 rc.setType(DestinationType.TEMP_TOPIC); 617 return rc; 618 default: 619 return null; 620 } 621 } 622 623 ActiveMQDestination convert(String dest) { 624 int p = dest.indexOf(":"); 625 if( p<0 ) { 626 throw new IllegalArgumentException("Not in the valid destination format"); 627 } 628 int type = Integer.parseInt(dest.substring(0, p)); 629 String name = dest.substring(p+1); 630 631 switch( KahaDestination.DestinationType.valueOf(type) ) { 632 case QUEUE: 633 return new ActiveMQQueue(name); 634 case TOPIC: 635 return new ActiveMQTopic(name); 636 case TEMP_QUEUE: 637 return new ActiveMQTempQueue(name); 638 case TEMP_TOPIC: 639 return new ActiveMQTempTopic(name); 640 default: 641 throw new IllegalArgumentException("Not in the valid destination format"); 642 } 643 } 644 645 @Override 646 public long getLastProducerSequenceId(ProducerId id) { 647 return -1; 648 } 649 650 @Override 651 public void setBrokerService(BrokerService brokerService) { 652 this.brokerService = brokerService; 653 } 654 655 @Override 656 public void load() throws IOException { 657 if( brokerService!=null ) { 658 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 659 } 660 super.load(); 661 } 662 @Override 663 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 664 throw new UnsupportedOperationException(); 665 } 666}