001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.scheduler; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.File; 022import java.io.FilenameFilter; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Map.Entry; 032import java.util.Set; 033import java.util.TreeSet; 034import java.util.UUID; 035 036import org.apache.activemq.broker.scheduler.JobScheduler; 037import org.apache.activemq.broker.scheduler.JobSchedulerStore; 038import org.apache.activemq.protobuf.Buffer; 039import org.apache.activemq.store.kahadb.AbstractKahaDBStore; 040import org.apache.activemq.store.kahadb.JournalCommand; 041import org.apache.activemq.store.kahadb.KahaDBMetaData; 042import org.apache.activemq.store.kahadb.Visitor; 043import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; 044import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand; 045import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; 046import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; 047import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; 048import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 049import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 050import org.apache.activemq.store.kahadb.disk.journal.DataFile; 051import org.apache.activemq.store.kahadb.disk.journal.Location; 052import org.apache.activemq.store.kahadb.disk.page.Page; 053import org.apache.activemq.store.kahadb.disk.page.PageFile; 054import org.apache.activemq.store.kahadb.disk.page.Transaction; 055import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 056import org.apache.activemq.store.kahadb.scheduler.legacy.LegacyStoreReplayer; 057import org.apache.activemq.util.ByteSequence; 058import org.apache.activemq.util.IOHelper; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore { 063 064 private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); 065 066 private JobSchedulerKahaDBMetaData metaData = new JobSchedulerKahaDBMetaData(this); 067 private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 068 private final Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>(); 069 private File legacyStoreArchiveDirectory; 070 071 /** 072 * The Scheduler Token is used to identify base revisions of the Scheduler store. A store 073 * based on the initial scheduler design will not have this tag in it's meta-data and will 074 * indicate an update is needed. Later versions of the scheduler can also change this value 075 * to indicate incompatible store bases which require complete meta-data and journal rewrites 076 * instead of simpler meta-data updates. 077 */ 078 static final UUID SCHEDULER_STORE_TOKEN = UUID.fromString("57ed642b-1ee3-47b3-be6d-b7297d500409"); 079 080 /** 081 * The default scheduler store version. All new store instance will be given this version and 082 * earlier versions will be updated to this version. 083 */ 084 static final int CURRENT_VERSION = 1; 085 086 @Override 087 public JobScheduler getJobScheduler(final String name) throws Exception { 088 this.indexLock.writeLock().lock(); 089 try { 090 JobSchedulerImpl result = this.schedulers.get(name); 091 if (result == null) { 092 final JobSchedulerImpl js = new JobSchedulerImpl(this); 093 js.setName(name); 094 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 095 @Override 096 public void execute(Transaction tx) throws IOException { 097 js.createIndexes(tx); 098 js.load(tx); 099 metaData.getJobSchedulers().put(tx, name, js); 100 } 101 }); 102 result = js; 103 this.schedulers.put(name, js); 104 if (isStarted()) { 105 result.start(); 106 } 107 this.pageFile.flush(); 108 } 109 return result; 110 } finally { 111 this.indexLock.writeLock().unlock(); 112 } 113 } 114 115 @Override 116 public boolean removeJobScheduler(final String name) throws Exception { 117 boolean result = false; 118 119 this.indexLock.writeLock().lock(); 120 try { 121 final JobSchedulerImpl js = this.schedulers.remove(name); 122 result = js != null; 123 if (result) { 124 js.stop(); 125 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 126 @Override 127 public void execute(Transaction tx) throws IOException { 128 metaData.getJobSchedulers().remove(tx, name); 129 js.removeAll(tx); 130 } 131 }); 132 } 133 } finally { 134 this.indexLock.writeLock().unlock(); 135 } 136 return result; 137 } 138 139 /** 140 * Sets the directory where the legacy scheduler store files are archived before an 141 * update attempt is made. Both the legacy index files and the journal files are moved 142 * to this folder prior to an upgrade attempt. 143 * 144 * @param directory 145 * The directory to move the legacy Scheduler Store files to. 146 */ 147 public void setLegacyStoreArchiveDirectory(File directory) { 148 this.legacyStoreArchiveDirectory = directory; 149 } 150 151 /** 152 * Gets the directory where the legacy Scheduler Store files will be archived if the 153 * broker is started and an existing Job Scheduler Store from an old version is detected. 154 * 155 * @return the directory where scheduler store legacy files are archived on upgrade. 156 */ 157 public File getLegacyStoreArchiveDirectory() { 158 if (this.legacyStoreArchiveDirectory == null) { 159 this.legacyStoreArchiveDirectory = new File(getDirectory(), "legacySchedulerStore"); 160 } 161 162 return this.legacyStoreArchiveDirectory.getAbsoluteFile(); 163 } 164 165 @Override 166 public void load() throws IOException { 167 if (opened.compareAndSet(false, true)) { 168 getJournal().start(); 169 try { 170 loadPageFile(); 171 } catch (UnknownStoreVersionException ex) { 172 LOG.info("Can't start until store update is performed."); 173 upgradeFromLegacy(); 174 // Restart with the updated store 175 getJournal().start(); 176 loadPageFile(); 177 LOG.info("Update from legacy Scheduler store completed successfully."); 178 } catch (Throwable t) { 179 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause: {}", t.toString()); 180 LOG.debug("Index load failure", t); 181 182 // try to recover index 183 try { 184 pageFile.unload(); 185 } catch (Exception ignore) { 186 } 187 if (isArchiveCorruptedIndex()) { 188 pageFile.archive(); 189 } else { 190 pageFile.delete(); 191 } 192 metaData = new JobSchedulerKahaDBMetaData(this); 193 pageFile = null; 194 loadPageFile(); 195 } 196 startCheckpoint(); 197 recover(); 198 } 199 LOG.info("{} started.", this); 200 } 201 202 @Override 203 public void unload() throws IOException { 204 if (opened.compareAndSet(true, false)) { 205 for (JobSchedulerImpl js : this.schedulers.values()) { 206 try { 207 js.stop(); 208 } catch (Exception e) { 209 throw new IOException(e); 210 } 211 } 212 this.indexLock.writeLock().lock(); 213 try { 214 if (pageFile != null && pageFile.isLoaded()) { 215 metaData.setState(KahaDBMetaData.CLOSED_STATE); 216 217 if (metaData.getPage() != null) { 218 pageFile.tx().execute(new Transaction.Closure<IOException>() { 219 @Override 220 public void execute(Transaction tx) throws IOException { 221 tx.store(metaData.getPage(), metaDataMarshaller, true); 222 } 223 }); 224 } 225 } 226 } finally { 227 this.indexLock.writeLock().unlock(); 228 } 229 230 checkpointLock.writeLock().lock(); 231 try { 232 if (metaData.getPage() != null) { 233 checkpointUpdate(true); 234 } 235 } finally { 236 checkpointLock.writeLock().unlock(); 237 } 238 synchronized (checkpointThreadLock) { 239 if (checkpointThread != null) { 240 try { 241 checkpointThread.join(); 242 checkpointThread = null; 243 } catch (InterruptedException e) { 244 } 245 } 246 } 247 248 if (pageFile != null) { 249 pageFile.unload(); 250 pageFile = null; 251 } 252 if (this.journal != null) { 253 journal.close(); 254 journal = null; 255 } 256 257 metaData = new JobSchedulerKahaDBMetaData(this); 258 } 259 LOG.info("{} stopped.", this); 260 } 261 262 private void loadPageFile() throws IOException { 263 this.indexLock.writeLock().lock(); 264 try { 265 final PageFile pageFile = getPageFile(); 266 pageFile.load(); 267 pageFile.tx().execute(new Transaction.Closure<IOException>() { 268 @Override 269 public void execute(Transaction tx) throws IOException { 270 if (pageFile.getPageCount() == 0) { 271 Page<JobSchedulerKahaDBMetaData> page = tx.allocate(); 272 assert page.getPageId() == 0; 273 page.set(metaData); 274 metaData.setPage(page); 275 metaData.setState(KahaDBMetaData.CLOSED_STATE); 276 metaData.initialize(tx); 277 tx.store(metaData.getPage(), metaDataMarshaller, true); 278 } else { 279 Page<JobSchedulerKahaDBMetaData> page = null; 280 page = tx.load(0, metaDataMarshaller); 281 metaData = page.get(); 282 metaData.setPage(page); 283 } 284 metaData.load(tx); 285 metaData.loadScheduler(tx, schedulers); 286 for (JobSchedulerImpl js : schedulers.values()) { 287 try { 288 js.start(); 289 } catch (Exception e) { 290 JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e); 291 } 292 } 293 } 294 }); 295 296 pageFile.flush(); 297 } finally { 298 this.indexLock.writeLock().unlock(); 299 } 300 } 301 302 private void upgradeFromLegacy() throws IOException { 303 304 journal.close(); 305 journal = null; 306 try { 307 pageFile.unload(); 308 pageFile = null; 309 } catch (Exception ignore) {} 310 311 File storeDir = getDirectory().getAbsoluteFile(); 312 File storeArchiveDir = getLegacyStoreArchiveDirectory(); 313 314 LOG.info("Attempting to move old store files from {} to {}", storeDir, storeArchiveDir); 315 316 // Move only the known store files, locks and other items left in place. 317 IOHelper.moveFiles(storeDir, storeArchiveDir, new FilenameFilter() { 318 319 @Override 320 public boolean accept(File dir, String name) { 321 if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) { 322 return true; 323 } 324 return false; 325 } 326 }); 327 328 // We reset everything to clean state, then we can read from the old 329 // scheduler store and replay the scheduled jobs into this one as adds. 330 getJournal().start(); 331 metaData = new JobSchedulerKahaDBMetaData(this); 332 pageFile = null; 333 loadPageFile(); 334 335 LegacyStoreReplayer replayer = new LegacyStoreReplayer(getLegacyStoreArchiveDirectory()); 336 replayer.load(); 337 replayer.startReplay(this); 338 339 // Cleanup after replay and store what we've done. 340 pageFile.tx().execute(new Transaction.Closure<IOException>() { 341 @Override 342 public void execute(Transaction tx) throws IOException { 343 tx.store(metaData.getPage(), metaDataMarshaller, true); 344 } 345 }); 346 347 checkpointUpdate(true); 348 getJournal().close(); 349 getPageFile().unload(); 350 } 351 352 @Override 353 protected void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 354 LOG.debug("Job Scheduler Store Checkpoint started."); 355 356 // reflect last update exclusive of current checkpoint 357 Location lastUpdate = metaData.getLastUpdateLocation(); 358 metaData.setState(KahaDBMetaData.OPEN_STATE); 359 tx.store(metaData.getPage(), metaDataMarshaller, true); 360 pageFile.flush(); 361 362 if (cleanup) { 363 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 364 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); 365 366 LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet); 367 368 if (lastUpdate != null) { 369 gcCandidateSet.remove(lastUpdate.getDataFileId()); 370 } 371 372 this.metaData.getJournalRC().visit(tx, new BTreeVisitor<Integer, Integer>() { 373 374 @Override 375 public void visit(List<Integer> keys, List<Integer> values) { 376 for (Integer key : keys) { 377 if (gcCandidateSet.remove(key)) { 378 LOG.trace("Removed referenced file: {} from GC set", key); 379 } 380 } 381 } 382 383 @Override 384 public boolean isInterestedInKeysBetween(Integer first, Integer second) { 385 return true; 386 } 387 }); 388 389 LOG.trace("gc candidates after reference check: {}", gcCandidateSet); 390 391 // If there are GC candidates then check the remove command location to see 392 // if any of them can go or if they must stay in order to ensure proper recover. 393 // 394 // A log containing any remove commands must be kept until all the logs with the 395 // add commands for all the removed jobs have been dropped. 396 if (!gcCandidateSet.isEmpty()) { 397 Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx); 398 List<Integer> orphans = new ArrayList<Integer>(); 399 while (removals.hasNext()) { 400 boolean orphanedRemove = true; 401 Entry<Integer, List<Integer>> entry = removals.next(); 402 403 // If this log is not a GC candidate then there's no need to do a check to rule it out 404 if (gcCandidateSet.contains(entry.getKey())) { 405 for (Integer addLocation : entry.getValue()) { 406 if (completeFileSet.contains(addLocation)) { 407 LOG.trace("A remove in log {} has an add still in existance in {}.", entry.getKey(), addLocation); 408 orphanedRemove = false; 409 break; 410 } 411 } 412 413 // If it's not orphaned than we can't remove it, otherwise we 414 // stop tracking it it's log will get deleted on the next check. 415 if (!orphanedRemove) { 416 gcCandidateSet.remove(entry.getKey()); 417 } else { 418 LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey()); 419 orphans.add(entry.getKey()); 420 } 421 } 422 } 423 424 // Drop all orphaned removes from the tracker. 425 for (Integer orphan : orphans) { 426 metaData.getRemoveLocationTracker().remove(tx, orphan); 427 } 428 } 429 430 LOG.trace("gc candidates after removals check: {}", gcCandidateSet); 431 if (!gcCandidateSet.isEmpty()) { 432 if (LOG.isDebugEnabled()) { 433 LOG.debug("Cleanup removing the data files: " + gcCandidateSet); 434 } 435 journal.removeDataFiles(gcCandidateSet); 436 } 437 } 438 439 LOG.debug("Job Scheduler Store Checkpoint complete."); 440 } 441 442 /** 443 * Adds a reference for the journal log file pointed to by the given Location value. 444 * 445 * To prevent log files in the journal that still contain valid data that needs to be 446 * kept in order to allow for recovery the logs must have active references. Each Job 447 * scheduler should ensure that the logs are accurately referenced. 448 * 449 * @param tx 450 * The TX under which the update is to be performed. 451 * @param location 452 * The location value to update the reference count of. 453 * 454 * @throws IOException if an error occurs while updating the journal references table. 455 */ 456 protected void incrementJournalCount(Transaction tx, Location location) throws IOException { 457 int logId = location.getDataFileId(); 458 Integer val = metaData.getJournalRC().get(tx, logId); 459 int refCount = val != null ? val.intValue() + 1 : 1; 460 metaData.getJournalRC().put(tx, logId, refCount); 461 } 462 463 /** 464 * Removes one reference for the Journal log file indicated in the given Location value. 465 * 466 * The references are used to track which log files cannot be GC'd. When the reference count 467 * on a log file reaches zero the file id is removed from the tracker and the log will be 468 * removed on the next check point update. 469 * 470 * @param tx 471 * The TX under which the update is to be performed. 472 * @param location 473 * The location value to update the reference count of. 474 * 475 * @throws IOException if an error occurs while updating the journal references table. 476 */ 477 protected void decrementJournalCount(Transaction tx, Location location) throws IOException { 478 int logId = location.getDataFileId(); 479 Integer refCount = metaData.getJournalRC().get(tx, logId); 480 if (refCount != null) { 481 int refCountValue = refCount; 482 refCountValue--; 483 if (refCountValue <= 0) { 484 metaData.getJournalRC().remove(tx, logId); 485 } else { 486 metaData.getJournalRC().put(tx, logId, refCountValue); 487 } 488 } 489 } 490 491 /** 492 * Updates the Job removal tracking index with the location of a remove command and the 493 * original JobLocation entry. 494 * 495 * The JobLocation holds the locations in the logs where the add and update commands for 496 * a job stored. The log file containing the remove command can only be discarded after 497 * both the add and latest update log files have also been discarded. 498 * 499 * @param tx 500 * The TX under which the update is to be performed. 501 * @param location 502 * The location value to reference a remove command. 503 * @param removedJob 504 * The original JobLocation instance that holds the add and update locations 505 * 506 * @throws IOException if an error occurs while updating the remove location tracker. 507 */ 508 protected void referenceRemovedLocation(Transaction tx, Location location, JobLocation removedJob) throws IOException { 509 int logId = location.getDataFileId(); 510 List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId); 511 if (removed == null) { 512 removed = new ArrayList<Integer>(); 513 } 514 removed.add(removedJob.getLocation().getDataFileId()); 515 this.metaData.getRemoveLocationTracker().put(tx, logId, removed); 516 } 517 518 /** 519 * Retrieve the scheduled Job's byte blob from the journal. 520 * 521 * @param location 522 * The location of the KahaAddScheduledJobCommand that originated the Job. 523 * 524 * @return a ByteSequence containing the payload of the scheduled Job. 525 * 526 * @throws IOException if an error occurs while reading the payload value. 527 */ 528 protected ByteSequence getPayload(Location location) throws IOException { 529 KahaAddScheduledJobCommand job = (KahaAddScheduledJobCommand) this.load(location); 530 Buffer payload = job.getPayload(); 531 return new ByteSequence(payload.getData(), payload.getOffset(), payload.getLength()); 532 } 533 534 public void readLockIndex() { 535 this.indexLock.readLock().lock(); 536 } 537 538 public void readUnlockIndex() { 539 this.indexLock.readLock().unlock(); 540 } 541 542 public void writeLockIndex() { 543 this.indexLock.writeLock().lock(); 544 } 545 546 public void writeUnlockIndex() { 547 this.indexLock.writeLock().unlock(); 548 } 549 550 @Override 551 public String toString() { 552 return "JobSchedulerStore: " + getDirectory(); 553 } 554 555 @Override 556 protected String getPageFileName() { 557 return "scheduleDB"; 558 } 559 560 @Override 561 protected File getDefaultDataDirectory() { 562 return new File(IOHelper.getDefaultDataDirectory(), "delayedDB"); 563 } 564 565 private class MetaDataMarshaller extends VariableMarshaller<JobSchedulerKahaDBMetaData> { 566 567 private final JobSchedulerStoreImpl store; 568 569 MetaDataMarshaller(JobSchedulerStoreImpl store) { 570 this.store = store; 571 } 572 573 @Override 574 public JobSchedulerKahaDBMetaData readPayload(DataInput dataIn) throws IOException { 575 JobSchedulerKahaDBMetaData rc = new JobSchedulerKahaDBMetaData(store); 576 rc.read(dataIn); 577 return rc; 578 } 579 580 @Override 581 public void writePayload(JobSchedulerKahaDBMetaData object, DataOutput dataOut) throws IOException { 582 object.write(dataOut); 583 } 584 } 585 586 /** 587 * Called during index recovery to rebuild the index from the last known good location. For 588 * entries that occur before the last known good position we just ignore then and move on. 589 * 590 * @param command 591 * the command read from the Journal which should be used to update the index. 592 * @param location 593 * the location in the index where the command was read. 594 * @param inDoubtlocation 595 * the location in the index known to be the last time the index was valid. 596 * 597 * @throws IOException if an error occurs while recovering the index. 598 */ 599 protected void doRecover(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 600 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 601 process(data, location); 602 } 603 } 604 605 /** 606 * Called during recovery to allow the store to rebuild from scratch. 607 * 608 * @param data 609 * The command to process, which was read from the Journal. 610 * @param location 611 * The location of the command in the Journal. 612 * 613 * @throws IOException if an error occurs during command processing. 614 */ 615 @Override 616 protected void process(JournalCommand<?> data, final Location location) throws IOException { 617 data.visit(new Visitor() { 618 @Override 619 public void visit(final KahaAddScheduledJobCommand command) throws IOException { 620 final JobSchedulerImpl scheduler; 621 622 indexLock.writeLock().lock(); 623 try { 624 try { 625 scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); 626 } catch (Exception e) { 627 throw new IOException(e); 628 } 629 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 630 @Override 631 public void execute(Transaction tx) throws IOException { 632 scheduler.process(tx, command, location); 633 } 634 }); 635 636 processLocation(location); 637 } finally { 638 indexLock.writeLock().unlock(); 639 } 640 } 641 642 @Override 643 public void visit(final KahaRemoveScheduledJobCommand command) throws IOException { 644 final JobSchedulerImpl scheduler; 645 646 indexLock.writeLock().lock(); 647 try { 648 try { 649 scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); 650 } catch (Exception e) { 651 throw new IOException(e); 652 } 653 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 654 @Override 655 public void execute(Transaction tx) throws IOException { 656 scheduler.process(tx, command, location); 657 } 658 }); 659 660 processLocation(location); 661 } finally { 662 indexLock.writeLock().unlock(); 663 } 664 } 665 666 @Override 667 public void visit(final KahaRemoveScheduledJobsCommand command) throws IOException { 668 final JobSchedulerImpl scheduler; 669 670 indexLock.writeLock().lock(); 671 try { 672 try { 673 scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); 674 } catch (Exception e) { 675 throw new IOException(e); 676 } 677 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 678 @Override 679 public void execute(Transaction tx) throws IOException { 680 scheduler.process(tx, command, location); 681 } 682 }); 683 684 processLocation(location); 685 } finally { 686 indexLock.writeLock().unlock(); 687 } 688 } 689 690 @Override 691 public void visit(final KahaRescheduleJobCommand command) throws IOException { 692 final JobSchedulerImpl scheduler; 693 694 indexLock.writeLock().lock(); 695 try { 696 try { 697 scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); 698 } catch (Exception e) { 699 throw new IOException(e); 700 } 701 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 702 @Override 703 public void execute(Transaction tx) throws IOException { 704 scheduler.process(tx, command, location); 705 } 706 }); 707 708 processLocation(location); 709 } finally { 710 indexLock.writeLock().unlock(); 711 } 712 } 713 714 @Override 715 public void visit(final KahaDestroySchedulerCommand command) { 716 try { 717 removeJobScheduler(command.getScheduler()); 718 } catch (Exception e) { 719 LOG.warn("Failed to remove scheduler: {}", command.getScheduler()); 720 } 721 722 processLocation(location); 723 } 724 725 @Override 726 public void visit(KahaTraceCommand command) { 727 processLocation(location); 728 } 729 }); 730 } 731 732 protected void processLocation(final Location location) { 733 indexLock.writeLock().lock(); 734 try { 735 this.metaData.setLastUpdateLocation(location); 736 } finally { 737 indexLock.writeLock().unlock(); 738 } 739 } 740 741 /** 742 * We recover from the Journal logs as needed to restore the index. 743 * 744 * @throws IllegalStateException 745 * @throws IOException 746 */ 747 private void recover() throws IllegalStateException, IOException { 748 this.indexLock.writeLock().lock(); 749 try { 750 long start = System.currentTimeMillis(); 751 Location lastIndoubtPosition = getRecoveryPosition(); 752 Location recoveryPosition = lastIndoubtPosition; 753 754 if (recoveryPosition != null) { 755 int redoCounter = 0; 756 LOG.info("Recovering from the scheduled job journal @" + recoveryPosition); 757 while (recoveryPosition != null) { 758 try { 759 JournalCommand<?> message = load(recoveryPosition); 760 metaData.setLastUpdateLocation(recoveryPosition); 761 doRecover(message, recoveryPosition, lastIndoubtPosition); 762 redoCounter++; 763 } catch (IOException failedRecovery) { 764 if (isIgnoreMissingJournalfiles()) { 765 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 766 // track this dud location 767 journal.corruptRecoveryLocation(recoveryPosition); 768 } else { 769 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 770 } 771 } 772 recoveryPosition = journal.getNextLocation(recoveryPosition); 773 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 774 LOG.info("@ {}, {} entries recovered ..", recoveryPosition, redoCounter); 775 } 776 } 777 long end = System.currentTimeMillis(); 778 LOG.info("Recovery replayed {} operations from the journal in {} seconds.", 779 redoCounter, ((end - start) / 1000.0f)); 780 } 781 782 // We may have to undo some index updates. 783 pageFile.tx().execute(new Transaction.Closure<IOException>() { 784 @Override 785 public void execute(Transaction tx) throws IOException { 786 recoverIndex(tx); 787 } 788 }); 789 790 } finally { 791 this.indexLock.writeLock().unlock(); 792 } 793 } 794 795 private Location getRecoveryPosition() throws IOException { 796 // This loads the first position and we completely rebuild the index if we 797 // do not override it with some known recovery start location. 798 Location result = null; 799 800 if (!isForceRecoverIndex()) { 801 if (metaData.getLastUpdateLocation() != null) { 802 result = metaData.getLastUpdateLocation(); 803 } 804 } 805 806 return journal.getNextLocation(result); 807 } 808 809 private void recoverIndex(Transaction tx) throws IOException { 810 long start = System.currentTimeMillis(); 811 812 // It is possible index updates got applied before the journal updates.. 813 // in that case we need to removed references to Jobs that are not in the journal 814 final Location lastAppendLocation = journal.getLastAppendLocation(); 815 long undoCounter = 0; 816 817 // Go through all the jobs in each scheduler and check if any are added after 818 // the last appended location and remove those. For now we ignore the update 819 // location since the scheduled job will update itself after the next fire and 820 // a new update will replace any existing update. 821 for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { 822 Map.Entry<String, JobSchedulerImpl> entry = i.next(); 823 JobSchedulerImpl scheduler = entry.getValue(); 824 825 List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); 826 for (JobLocation job : jobs) { 827 if (job.getLocation().compareTo(lastAppendLocation) >= 0) { 828 if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) { 829 LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId()); 830 undoCounter++; 831 } 832 } 833 } 834 } 835 836 if (undoCounter > 0) { 837 // The rolled back operations are basically in flight journal writes. To avoid getting 838 // these the end user should do sync writes to the journal. 839 long end = System.currentTimeMillis(); 840 LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, ((end - start) / 1000.0f)); 841 undoCounter = 0; 842 } 843 844 // Now we check for missing and corrupt journal files. 845 846 // 1. Collect the set of all referenced journal files based on the Location of the 847 // the scheduled jobs and the marked last update field. 848 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 849 for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { 850 Map.Entry<String, JobSchedulerImpl> entry = i.next(); 851 JobSchedulerImpl scheduler = entry.getValue(); 852 853 List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); 854 for (JobLocation job : jobs) { 855 missingJournalFiles.add(job.getLocation().getDataFileId()); 856 if (job.getLastUpdate() != null) { 857 missingJournalFiles.add(job.getLastUpdate().getDataFileId()); 858 } 859 } 860 } 861 862 // 2. Remove from that set all known data file Id's in the journal and what's left 863 // is the missing set which will soon also contain the corrupted set. 864 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 865 if (!missingJournalFiles.isEmpty()) { 866 LOG.info("Some journal files are missing: {}", missingJournalFiles); 867 } 868 869 // 3. Now check all references in the journal logs for corruption and add any 870 // corrupt journal files to the missing set. 871 HashSet<Location> corruptedLocations = new HashSet<Location>(); 872 873 if (isCheckForCorruptJournalFiles()) { 874 Collection<DataFile> dataFiles = journal.getFileMap().values(); 875 for (DataFile dataFile : dataFiles) { 876 int id = dataFile.getDataFileId(); 877 for (long offset : dataFile.getCorruptedBlocks()) { 878 corruptedLocations.add(new Location(id, (int) offset)); 879 } 880 } 881 882 if (!corruptedLocations.isEmpty()) { 883 LOG.debug("Found some corrupted data blocks in the journal: {}", corruptedLocations.size()); 884 } 885 } 886 887 // 4. Now we either fail or we remove all references to missing or corrupt journal 888 // files from the various JobSchedulerImpl instances. We only remove the Job if 889 // the initial Add operation is missing when the ignore option is set, the updates 890 // could be lost but that's price you pay when ignoring the missing logs. 891 if (!missingJournalFiles.isEmpty() || !corruptedLocations.isEmpty()) { 892 if (!isIgnoreMissingJournalfiles()) { 893 throw new IOException("Detected missing/corrupt journal files."); 894 } 895 896 // Remove all Jobs that reference an Location that is either missing or corrupt. 897 undoCounter = removeJobsInMissingOrCorruptJounralFiles(tx, missingJournalFiles, corruptedLocations); 898 899 // Clean up the Journal Reference count Map. 900 removeJournalRCForMissingFiles(tx, missingJournalFiles); 901 } 902 903 if (undoCounter > 0) { 904 long end = System.currentTimeMillis(); 905 LOG.info("Detected missing/corrupt journal files. Dropped {} jobs from the " + 906 "index in {} seconds.", undoCounter, ((end - start) / 1000.0f)); 907 } 908 } 909 910 private void removeJournalRCForMissingFiles(Transaction tx, Set<Integer> missing) throws IOException { 911 List<Integer> matches = new ArrayList<Integer>(); 912 913 Iterator<Entry<Integer, Integer>> references = metaData.getJournalRC().iterator(tx); 914 while (references.hasNext()) { 915 int dataFileId = references.next().getKey(); 916 if (missing.contains(dataFileId)) { 917 matches.add(dataFileId); 918 } 919 } 920 921 for (Integer match : matches) { 922 metaData.getJournalRC().remove(tx, match); 923 } 924 } 925 926 private int removeJobsInMissingOrCorruptJounralFiles(Transaction tx, Set<Integer> missing, Set<Location> corrupted) throws IOException { 927 int removed = 0; 928 929 // Remove Jobs that reference missing or corrupt files. 930 // Remove Reference counts to missing or corrupt files. 931 // Remove and remove command markers to missing or corrupt files. 932 for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { 933 Map.Entry<String, JobSchedulerImpl> entry = i.next(); 934 JobSchedulerImpl scheduler = entry.getValue(); 935 936 List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); 937 for (JobLocation job : jobs) { 938 939 // Remove all jobs in missing log files. 940 if (missing.contains(job.getLocation().getDataFileId())) { 941 scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime()); 942 removed++; 943 continue; 944 } 945 946 // Remove all jobs in corrupted parts of log files. 947 if (corrupted.contains(job.getLocation())) { 948 scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime()); 949 removed++; 950 } 951 } 952 } 953 954 return removed; 955 } 956}