001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.File; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.LinkedHashMap; 030import java.util.TreeMap; 031import java.util.Map.Entry; 032import java.util.concurrent.atomic.AtomicBoolean; 033 034import org.apache.activemq.command.SubscriptionInfo; 035import org.apache.activemq.command.TransactionId; 036import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 037import org.apache.activemq.store.kahadb.data.KahaDestination; 038import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 039import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 040import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 041import org.apache.activemq.util.ByteSequence; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 045import org.apache.activemq.store.kahadb.disk.page.PageFile; 046import org.apache.activemq.store.kahadb.disk.page.Transaction; 047import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 048import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 049import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 050 051public class TempMessageDatabase { 052 053 private static final Logger LOG = LoggerFactory.getLogger(TempMessageDatabase.class); 054 055 public static final int CLOSED_STATE = 1; 056 public static final int OPEN_STATE = 2; 057 058 protected BTreeIndex<String, StoredDestination> destinations; 059 protected PageFile pageFile; 060 061 protected File directory; 062 063 boolean enableIndexWriteAsync = true; 064 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 065 066 protected AtomicBoolean started = new AtomicBoolean(); 067 protected AtomicBoolean opened = new AtomicBoolean(); 068 069 public TempMessageDatabase() { 070 } 071 072 public void start() throws Exception { 073 if (started.compareAndSet(false, true)) { 074 load(); 075 } 076 } 077 078 public void stop() throws Exception { 079 if (started.compareAndSet(true, false)) { 080 unload(); 081 } 082 } 083 084 private void loadPageFile() throws IOException { 085 synchronized (indexMutex) { 086 final PageFile pageFile = getPageFile(); 087 pageFile.load(); 088 pageFile.tx().execute(new Transaction.Closure<IOException>() { 089 public void execute(Transaction tx) throws IOException { 090 destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 091 destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 092 destinations.setValueMarshaller(new StoredDestinationMarshaller()); 093 destinations.load(tx); 094 } 095 }); 096 pageFile.flush(); 097 storedDestinations.clear(); 098 } 099 } 100 101 /** 102 * @throws IOException 103 */ 104 public void open() throws IOException { 105 if( opened.compareAndSet(false, true) ) { 106 loadPageFile(); 107 } 108 } 109 110 public void load() throws IOException { 111 synchronized (indexMutex) { 112 open(); 113 pageFile.unload(); 114 pageFile.delete(); 115 loadPageFile(); 116 } 117 } 118 119 120 public void close() throws IOException, InterruptedException { 121 if( opened.compareAndSet(true, false)) { 122 synchronized (indexMutex) { 123 pageFile.unload(); 124 } 125 } 126 } 127 128 public void unload() throws IOException, InterruptedException { 129 synchronized (indexMutex) { 130 if( pageFile.isLoaded() ) { 131 close(); 132 } 133 } 134 } 135 136 public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException { 137 if (txid!=null) { 138 synchronized (indexMutex) { 139 ArrayList<Operation> inflightTx = getInflightTx(txid); 140 inflightTx.add(new AddOpperation(command, data)); 141 } 142 } else { 143 synchronized (indexMutex) { 144 pageFile.tx().execute(new Transaction.Closure<IOException>() { 145 public void execute(Transaction tx) throws IOException { 146 upadateIndex(tx, command, data); 147 } 148 }); 149 } 150 } 151 } 152 153 public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException { 154 if (txid!=null) { 155 synchronized (indexMutex) { 156 ArrayList<Operation> inflightTx = getInflightTx(txid); 157 inflightTx.add(new RemoveOpperation(command)); 158 } 159 } else { 160 synchronized (indexMutex) { 161 pageFile.tx().execute(new Transaction.Closure<IOException>() { 162 public void execute(Transaction tx) throws IOException { 163 updateIndex(tx, command); 164 } 165 }); 166 } 167 } 168 169 } 170 171 public void process(final KahaRemoveDestinationCommand command) throws IOException { 172 synchronized (indexMutex) { 173 pageFile.tx().execute(new Transaction.Closure<IOException>() { 174 public void execute(Transaction tx) throws IOException { 175 updateIndex(tx, command); 176 } 177 }); 178 } 179 } 180 181 public void process(final KahaSubscriptionCommand command) throws IOException { 182 synchronized (indexMutex) { 183 pageFile.tx().execute(new Transaction.Closure<IOException>() { 184 public void execute(Transaction tx) throws IOException { 185 updateIndex(tx, command); 186 } 187 }); 188 } 189 } 190 191 public void processCommit(TransactionId key) throws IOException { 192 synchronized (indexMutex) { 193 ArrayList<Operation> inflightTx = inflightTransactions.remove(key); 194 if (inflightTx == null) { 195 inflightTx = preparedTransactions.remove(key); 196 } 197 if (inflightTx == null) { 198 return; 199 } 200 201 final ArrayList<Operation> messagingTx = inflightTx; 202 pageFile.tx().execute(new Transaction.Closure<IOException>() { 203 public void execute(Transaction tx) throws IOException { 204 for (Operation op : messagingTx) { 205 op.execute(tx); 206 } 207 } 208 }); 209 } 210 } 211 212 public void processPrepare(TransactionId key) { 213 synchronized (indexMutex) { 214 ArrayList<Operation> tx = inflightTransactions.remove(key); 215 if (tx != null) { 216 preparedTransactions.put(key, tx); 217 } 218 } 219 } 220 221 public void processRollback(TransactionId key) { 222 synchronized (indexMutex) { 223 ArrayList<Operation> tx = inflightTransactions.remove(key); 224 if (tx == null) { 225 preparedTransactions.remove(key); 226 } 227 } 228 } 229 230 // ///////////////////////////////////////////////////////////////// 231 // These methods do the actual index updates. 232 // ///////////////////////////////////////////////////////////////// 233 234 protected final Object indexMutex = new Object(); 235 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 236 237 private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException { 238 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 239 240 // Skip adding the message to the index if this is a topic and there are 241 // no subscriptions. 242 if (sd.subscriptions != null && sd.ackPositions.isEmpty()) { 243 return; 244 } 245 246 // Add the message. 247 long id = sd.nextMessageId++; 248 Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 249 if( previous == null ) { 250 sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data)); 251 } else { 252 // restore the previous value.. Looks like this was a redo of a previously 253 // added message. We don't want to assing it a new id as the other indexes would 254 // be wrong.. 255 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 256 } 257 } 258 259 private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException { 260 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 261 if (!command.hasSubscriptionKey()) { 262 263 // In the queue case we just remove the message from the index.. 264 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 265 if (sequenceId != null) { 266 sd.orderIndex.remove(tx, sequenceId); 267 } 268 } else { 269 // In the topic case we need remove the message once it's been acked 270 // by all the subs 271 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 272 273 // Make sure it's a valid message id... 274 if (sequence != null) { 275 String subscriptionKey = command.getSubscriptionKey(); 276 Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence); 277 278 // The following method handles deleting un-referenced messages. 279 removeAckByteSequence(tx, sd, subscriptionKey, prev); 280 281 // Add it to the new location set. 282 addAckByteSequence(sd, sequence, subscriptionKey); 283 } 284 285 } 286 } 287 288 private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException { 289 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 290 sd.orderIndex.clear(tx); 291 sd.orderIndex.unload(tx); 292 tx.free(sd.orderIndex.getPageId()); 293 294 sd.messageIdIndex.clear(tx); 295 sd.messageIdIndex.unload(tx); 296 tx.free(sd.messageIdIndex.getPageId()); 297 298 if (sd.subscriptions != null) { 299 sd.subscriptions.clear(tx); 300 sd.subscriptions.unload(tx); 301 tx.free(sd.subscriptions.getPageId()); 302 303 sd.subscriptionAcks.clear(tx); 304 sd.subscriptionAcks.unload(tx); 305 tx.free(sd.subscriptionAcks.getPageId()); 306 } 307 308 String key = key(command.getDestination()); 309 storedDestinations.remove(key); 310 destinations.remove(tx, key); 311 } 312 313 private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException { 314 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 315 316 // If set then we are creating it.. otherwise we are destroying the sub 317 if (command.hasSubscriptionInfo()) { 318 String subscriptionKey = command.getSubscriptionKey(); 319 sd.subscriptions.put(tx, subscriptionKey, command); 320 long ackByteSequence=-1; 321 if (!command.getRetroactive()) { 322 ackByteSequence = sd.nextMessageId-1; 323 } 324 325 sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence); 326 addAckByteSequence(sd, ackByteSequence, subscriptionKey); 327 } else { 328 // delete the sub... 329 String subscriptionKey = command.getSubscriptionKey(); 330 sd.subscriptions.remove(tx, subscriptionKey); 331 Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey); 332 if( prev!=null ) { 333 removeAckByteSequence(tx, sd, subscriptionKey, prev); 334 } 335 } 336 337 } 338 339 public HashSet<Integer> getJournalFilesBeingReplicated() { 340 return journalFilesBeingReplicated; 341 } 342 343 // ///////////////////////////////////////////////////////////////// 344 // StoredDestination related implementation methods. 345 // ///////////////////////////////////////////////////////////////// 346 347 348 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 349 350 class StoredSubscription { 351 SubscriptionInfo subscriptionInfo; 352 String lastAckId; 353 ByteSequence lastAckByteSequence; 354 ByteSequence cursor; 355 } 356 357 static class MessageRecord { 358 final String messageId; 359 final ByteSequence data; 360 361 public MessageRecord(String messageId, ByteSequence location) { 362 this.messageId=messageId; 363 this.data=location; 364 } 365 366 @Override 367 public String toString() { 368 return "["+messageId+","+data+"]"; 369 } 370 } 371 372 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageRecord> { 373 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 374 375 public MessageRecord readPayload(DataInput dataIn) throws IOException { 376 return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn)); 377 } 378 379 public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException { 380 dataOut.writeUTF(object.messageId); 381 ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut); 382 } 383 } 384 385 static class StoredDestination { 386 long nextMessageId; 387 BTreeIndex<Long, MessageRecord> orderIndex; 388 BTreeIndex<String, Long> messageIdIndex; 389 390 // These bits are only set for Topics 391 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 392 BTreeIndex<String, Long> subscriptionAcks; 393 HashMap<String, Long> subscriptionCursors; 394 TreeMap<Long, HashSet<String>> ackPositions; 395 } 396 397 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 398 public Class<StoredDestination> getType() { 399 return StoredDestination.class; 400 } 401 402 public StoredDestination readPayload(DataInput dataIn) throws IOException { 403 StoredDestination value = new StoredDestination(); 404 value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong()); 405 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 406 407 if (dataIn.readBoolean()) { 408 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 409 value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 410 } 411 return value; 412 } 413 414 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 415 dataOut.writeLong(value.orderIndex.getPageId()); 416 dataOut.writeLong(value.messageIdIndex.getPageId()); 417 if (value.subscriptions != null) { 418 dataOut.writeBoolean(true); 419 dataOut.writeLong(value.subscriptions.getPageId()); 420 dataOut.writeLong(value.subscriptionAcks.getPageId()); 421 } else { 422 dataOut.writeBoolean(false); 423 } 424 } 425 } 426 427 static class ByteSequenceMarshaller extends VariableMarshaller<ByteSequence> { 428 final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller(); 429 430 public ByteSequence readPayload(DataInput dataIn) throws IOException { 431 byte data[] = new byte[dataIn.readInt()]; 432 dataIn.readFully(data); 433 return new ByteSequence(data); 434 } 435 436 public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException { 437 dataOut.writeInt(object.getLength()); 438 dataOut.write(object.getData(), object.getOffset(), object.getLength()); 439 } 440 } 441 442 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 443 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 444 445 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 446 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 447 rc.mergeFramed((InputStream)dataIn); 448 return rc; 449 } 450 451 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 452 object.writeFramed((OutputStream)dataOut); 453 } 454 } 455 456 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 457 String key = key(destination); 458 StoredDestination rc = storedDestinations.get(key); 459 if (rc == null) { 460 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 461 rc = loadStoredDestination(tx, key, topic); 462 // Cache it. We may want to remove/unload destinations from the 463 // cache that are not used for a while 464 // to reduce memory usage. 465 storedDestinations.put(key, rc); 466 } 467 return rc; 468 } 469 470 /** 471 * @param tx 472 * @param key 473 * @param topic 474 * @return 475 * @throws IOException 476 */ 477 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 478 // Try to load the existing indexes.. 479 StoredDestination rc = destinations.get(tx, key); 480 if (rc == null) { 481 // Brand new destination.. allocate indexes for it. 482 rc = new StoredDestination(); 483 rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate()); 484 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 485 486 if (topic) { 487 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 488 rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 489 } 490 destinations.put(tx, key, rc); 491 } 492 493 // Configure the marshalers and load. 494 rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 495 rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 496 rc.orderIndex.load(tx); 497 498 // Figure out the next key using the last entry in the destination. 499 Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx); 500 if( lastEntry!=null ) { 501 rc.nextMessageId = lastEntry.getKey()+1; 502 } 503 504 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 505 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 506 rc.messageIdIndex.load(tx); 507 508 // If it was a topic... 509 if (topic) { 510 511 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 512 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 513 rc.subscriptions.load(tx); 514 515 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 516 rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE); 517 rc.subscriptionAcks.load(tx); 518 519 rc.ackPositions = new TreeMap<Long, HashSet<String>>(); 520 rc.subscriptionCursors = new HashMap<String, Long>(); 521 522 for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 523 Entry<String, Long> entry = iterator.next(); 524 addAckByteSequence(rc, entry.getValue(), entry.getKey()); 525 } 526 527 } 528 return rc; 529 } 530 531 /** 532 * @param sd 533 * @param messageSequence 534 * @param subscriptionKey 535 */ 536 private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) { 537 HashSet<String> hs = sd.ackPositions.get(messageSequence); 538 if (hs == null) { 539 hs = new HashSet<String>(); 540 sd.ackPositions.put(messageSequence, hs); 541 } 542 hs.add(subscriptionKey); 543 } 544 545 /** 546 * @param tx 547 * @param sd 548 * @param subscriptionKey 549 * @param sequenceId 550 * @throws IOException 551 */ 552 private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { 553 // Remove the sub from the previous location set.. 554 if (sequenceId != null) { 555 HashSet<String> hs = sd.ackPositions.get(sequenceId); 556 if (hs != null) { 557 hs.remove(subscriptionKey); 558 if (hs.isEmpty()) { 559 HashSet<String> firstSet = sd.ackPositions.values().iterator().next(); 560 sd.ackPositions.remove(sequenceId); 561 562 // Did we just empty out the first set in the 563 // ordered list of ack locations? Then it's time to 564 // delete some messages. 565 if (hs == firstSet) { 566 567 // Find all the entries that need to get deleted. 568 ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>(); 569 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 570 Entry<Long, MessageRecord> entry = iterator.next(); 571 if (entry.getKey().compareTo(sequenceId) <= 0) { 572 // We don't do the actually delete while we are 573 // iterating the BTree since 574 // iterating would fail. 575 deletes.add(entry); 576 } 577 } 578 579 // Do the actual deletes. 580 for (Entry<Long, MessageRecord> entry : deletes) { 581 sd.messageIdIndex.remove(tx,entry.getValue().messageId); 582 sd.orderIndex.remove(tx,entry.getKey()); 583 } 584 } 585 } 586 } 587 } 588 } 589 590 private String key(KahaDestination destination) { 591 return destination.getType().getNumber() + ":" + destination.getName(); 592 } 593 594 // ///////////////////////////////////////////////////////////////// 595 // Transaction related implementation methods. 596 // ///////////////////////////////////////////////////////////////// 597 protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>(); 598 protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>(); 599 600 private ArrayList<Operation> getInflightTx(TransactionId key) { 601 ArrayList<Operation> tx = inflightTransactions.get(key); 602 if (tx == null) { 603 tx = new ArrayList<Operation>(); 604 inflightTransactions.put(key, tx); 605 } 606 return tx; 607 } 608 609 abstract class Operation { 610 abstract public void execute(Transaction tx) throws IOException; 611 } 612 613 class AddOpperation extends Operation { 614 final KahaAddMessageCommand command; 615 private final ByteSequence data; 616 617 public AddOpperation(KahaAddMessageCommand command, ByteSequence location) { 618 this.command = command; 619 this.data = location; 620 } 621 622 public void execute(Transaction tx) throws IOException { 623 upadateIndex(tx, command, data); 624 } 625 626 public KahaAddMessageCommand getCommand() { 627 return command; 628 } 629 } 630 631 class RemoveOpperation extends Operation { 632 final KahaRemoveMessageCommand command; 633 634 public RemoveOpperation(KahaRemoveMessageCommand command) { 635 this.command = command; 636 } 637 638 public void execute(Transaction tx) throws IOException { 639 updateIndex(tx, command); 640 } 641 642 public KahaRemoveMessageCommand getCommand() { 643 return command; 644 } 645 } 646 647 // ///////////////////////////////////////////////////////////////// 648 // Initialization related implementation methods. 649 // ///////////////////////////////////////////////////////////////// 650 651 private PageFile createPageFile() { 652 PageFile index = new PageFile(directory, "temp-db"); 653 index.setEnableWriteThread(isEnableIndexWriteAsync()); 654 index.setWriteBatchSize(getIndexWriteBatchSize()); 655 index.setEnableDiskSyncs(false); 656 index.setEnableRecoveryFile(false); 657 return index; 658 } 659 660 public File getDirectory() { 661 return directory; 662 } 663 664 public void setDirectory(File directory) { 665 this.directory = directory; 666 } 667 668 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 669 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 670 } 671 672 public int getIndexWriteBatchSize() { 673 return setIndexWriteBatchSize; 674 } 675 676 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 677 this.enableIndexWriteAsync = enableIndexWriteAsync; 678 } 679 680 boolean isEnableIndexWriteAsync() { 681 return enableIndexWriteAsync; 682 } 683 684 public PageFile getPageFile() { 685 if (pageFile == null) { 686 pageFile = createPageFile(); 687 } 688 return pageFile; 689 } 690 691}