001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2014-2015 ForgeRock AS 025 */ 026package org.opends.server.backends.pdb; 027 028import static com.persistit.Transaction.CommitPolicy.*; 029import static java.util.Arrays.*; 030 031import static org.opends.messages.BackendMessages.*; 032import static org.opends.messages.UtilityMessages.*; 033import static org.opends.server.backends.pluggable.spi.StorageUtils.*; 034import static org.opends.server.util.StaticUtils.*; 035 036import java.io.Closeable; 037import java.io.File; 038import java.io.FileFilter; 039import java.io.IOException; 040import java.nio.file.Files; 041import java.nio.file.Path; 042import java.nio.file.Paths; 043import java.rmi.RemoteException; 044import java.util.ArrayList; 045import java.util.HashMap; 046import java.util.HashSet; 047import java.util.List; 048import java.util.ListIterator; 049import java.util.Map; 050import java.util.NoSuchElementException; 051import java.util.Objects; 052import java.util.Queue; 053import java.util.Set; 054import java.util.concurrent.ConcurrentLinkedDeque; 055 056import org.forgerock.i18n.LocalizableMessage; 057import org.forgerock.i18n.slf4j.LocalizedLogger; 058import org.forgerock.opendj.config.server.ConfigChangeResult; 059import org.forgerock.opendj.config.server.ConfigException; 060import org.forgerock.opendj.ldap.ByteSequence; 061import org.forgerock.opendj.ldap.ByteString; 062import org.forgerock.util.Reject; 063import org.opends.server.admin.server.ConfigurationChangeListener; 064import org.opends.server.admin.std.server.PDBBackendCfg; 065import org.opends.server.api.Backupable; 066import org.opends.server.api.DiskSpaceMonitorHandler; 067import org.opends.server.backends.pluggable.spi.AccessMode; 068import org.opends.server.backends.pluggable.spi.Cursor; 069import org.opends.server.backends.pluggable.spi.Importer; 070import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException; 071import org.opends.server.backends.pluggable.spi.ReadOperation; 072import org.opends.server.backends.pluggable.spi.SequentialCursor; 073import org.opends.server.backends.pluggable.spi.Storage; 074import org.opends.server.backends.pluggable.spi.StorageInUseException; 075import org.opends.server.backends.pluggable.spi.StorageRuntimeException; 076import org.opends.server.backends.pluggable.spi.StorageStatus; 077import org.opends.server.backends.pluggable.spi.StorageUtils; 078import org.opends.server.backends.pluggable.spi.TreeName; 079import org.opends.server.backends.pluggable.spi.UpdateFunction; 080import org.opends.server.backends.pluggable.spi.WriteOperation; 081import org.opends.server.backends.pluggable.spi.WriteableTransaction; 082import org.opends.server.core.DirectoryServer; 083import org.opends.server.core.MemoryQuota; 084import org.opends.server.core.ServerContext; 085import org.opends.server.extensions.DiskSpaceMonitor; 086import org.opends.server.types.BackupConfig; 087import org.opends.server.types.BackupDirectory; 088import org.opends.server.types.DirectoryException; 089import org.opends.server.types.RestoreConfig; 090import org.opends.server.util.BackupManager; 091 092import com.persistit.Configuration; 093import com.persistit.Configuration.BufferPoolConfiguration; 094import com.persistit.Exchange; 095import com.persistit.Key; 096import com.persistit.Persistit; 097import com.persistit.Transaction; 098import com.persistit.Value; 099import com.persistit.Volume; 100import com.persistit.VolumeSpecification; 101import com.persistit.exception.InUseException; 102import com.persistit.exception.PersistitException; 103import com.persistit.exception.RollbackException; 104import com.persistit.exception.TreeNotFoundException; 105 106/** PersistIt database implementation of the {@link Storage} engine. */ 107public final class PDBStorage implements Storage, Backupable, ConfigurationChangeListener<PDBBackendCfg>, 108 DiskSpaceMonitorHandler 109{ 110 private static final int IMPORT_DB_CACHE_SIZE = 4 * MB; 111 112 private static final double MAX_SLEEP_ON_RETRY_MS = 50.0; 113 private static final String VOLUME_NAME = "dj"; 114 private static final String JOURNAL_NAME = VOLUME_NAME + "_journal"; 115 /** The buffer / page size used by the PersistIt storage. */ 116 private static final int BUFFER_SIZE = 16 * 1024; 117 118 /** PersistIt implementation of the {@link Cursor} interface. */ 119 private static final class CursorImpl implements Cursor<ByteString, ByteString> 120 { 121 private ByteString currentKey; 122 private ByteString currentValue; 123 private final Exchange exchange; 124 125 private CursorImpl(final Exchange exchange) 126 { 127 this.exchange = exchange; 128 } 129 130 @Override 131 public void close() 132 { 133 // Release immediately because this exchange did not come from the txn cache 134 exchange.getPersistitInstance().releaseExchange(exchange); 135 } 136 137 @Override 138 public boolean isDefined() 139 { 140 return exchange.getValue().isDefined(); 141 } 142 143 @Override 144 public ByteString getKey() 145 { 146 if (currentKey == null) 147 { 148 throwIfUndefined(); 149 currentKey = ByteString.wrap(exchange.getKey().reset().decodeByteArray()); 150 } 151 return currentKey; 152 } 153 154 @Override 155 public ByteString getValue() 156 { 157 if (currentValue == null) 158 { 159 throwIfUndefined(); 160 currentValue = ByteString.wrap(exchange.getValue().getByteArray()); 161 } 162 return currentValue; 163 } 164 165 @Override 166 public boolean next() 167 { 168 clearCurrentKeyAndValue(); 169 try 170 { 171 return exchange.next(); 172 } 173 catch (final PersistitException e) 174 { 175 throw new StorageRuntimeException(e); 176 } 177 } 178 179 @Override 180 public void delete() 181 { 182 throwIfUndefined(); 183 try 184 { 185 exchange.remove(); 186 } 187 catch (final PersistitException | RollbackException e) 188 { 189 throw new StorageRuntimeException(e); 190 } 191 } 192 193 @Override 194 public boolean positionToKey(final ByteSequence key) 195 { 196 clearCurrentKeyAndValue(); 197 bytesToKey(exchange.getKey(), key); 198 try 199 { 200 exchange.fetch(); 201 return exchange.getValue().isDefined(); 202 } 203 catch (final PersistitException e) 204 { 205 throw new StorageRuntimeException(e); 206 } 207 } 208 209 @Override 210 public boolean positionToKeyOrNext(final ByteSequence key) 211 { 212 clearCurrentKeyAndValue(); 213 bytesToKey(exchange.getKey(), key); 214 try 215 { 216 exchange.fetch(); 217 return exchange.getValue().isDefined() || exchange.next(); 218 } 219 catch (final PersistitException e) 220 { 221 throw new StorageRuntimeException(e); 222 } 223 } 224 225 @Override 226 public boolean positionToIndex(int index) 227 { 228 // There doesn't seem to be a way to optimize this using Persistit. 229 clearCurrentKeyAndValue(); 230 exchange.getKey().to(Key.BEFORE); 231 try 232 { 233 for (int i = 0; i <= index; i++) 234 { 235 if (!exchange.next()) 236 { 237 return false; 238 } 239 } 240 return true; 241 } 242 catch (final PersistitException e) 243 { 244 throw new StorageRuntimeException(e); 245 } 246 } 247 248 @Override 249 public boolean positionToLastKey() 250 { 251 clearCurrentKeyAndValue(); 252 exchange.getKey().to(Key.AFTER); 253 try 254 { 255 return exchange.previous(); 256 } 257 catch (final PersistitException e) 258 { 259 throw new StorageRuntimeException(e); 260 } 261 } 262 263 private void clearCurrentKeyAndValue() 264 { 265 currentKey = null; 266 currentValue = null; 267 } 268 269 private void throwIfUndefined() 270 { 271 if (!isDefined()) 272 { 273 throw new NoSuchElementException(); 274 } 275 } 276 } 277 278 /** PersistIt implementation of the {@link Importer} interface. */ 279 private final class ImporterImpl implements Importer 280 { 281 private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>(); 282 private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>() 283 { 284 @Override 285 protected Map<TreeName, Exchange> initialValue() 286 { 287 final Map<TreeName, Exchange> value = new HashMap<>(); 288 allExchanges.add(value); 289 return value; 290 } 291 }; 292 293 @Override 294 public void close() 295 { 296 for (Map<TreeName, Exchange> map : allExchanges) 297 { 298 for (Exchange exchange : map.values()) 299 { 300 db.releaseExchange(exchange); 301 } 302 map.clear(); 303 } 304 PDBStorage.this.close(); 305 } 306 307 @Override 308 public void clearTree(final TreeName treeName) 309 { 310 final Transaction txn = db.getTransaction(); 311 deleteTree(txn, treeName); 312 createTree(txn, treeName); 313 } 314 315 private void createTree(final Transaction txn, final TreeName treeName) 316 { 317 Exchange ex = null; 318 try 319 { 320 txn.begin(); 321 ex = getNewExchange(treeName, true); 322 txn.commit(); 323 } 324 catch (PersistitException e) 325 { 326 throw new StorageRuntimeException(e); 327 } 328 finally 329 { 330 txn.end(); 331 releaseExchangeSilenty(ex); 332 } 333 } 334 335 private void deleteTree(Transaction txn, final TreeName treeName) 336 { 337 Exchange ex = null; 338 try 339 { 340 txn.begin(); 341 ex = getNewExchange(treeName, true); 342 ex.removeTree(); 343 txn.commit(); 344 } 345 catch (PersistitException e) 346 { 347 throw new StorageRuntimeException(e); 348 } 349 finally 350 { 351 txn.end(); 352 releaseExchangeSilenty(ex); 353 } 354 } 355 356 private void releaseExchangeSilenty(Exchange ex) 357 { 358 if ( ex != null) 359 { 360 db.releaseExchange(ex); 361 } 362 } 363 364 @Override 365 public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value) 366 { 367 try 368 { 369 final Exchange ex = getExchangeFromCache(treeName); 370 bytesToKey(ex.getKey(), key); 371 bytesToValue(ex.getValue(), value); 372 ex.store(); 373 } 374 catch (final Exception e) 375 { 376 throw new StorageRuntimeException(e); 377 } 378 } 379 380 @Override 381 public ByteString read(final TreeName treeName, final ByteSequence key) 382 { 383 try 384 { 385 final Exchange ex = getExchangeFromCache(treeName); 386 bytesToKey(ex.getKey(), key); 387 ex.fetch(); 388 return valueToBytes(ex.getValue()); 389 } 390 catch (final PersistitException e) 391 { 392 throw new StorageRuntimeException(e); 393 } 394 } 395 396 private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException 397 { 398 Map<TreeName, Exchange> threadExchanges = exchanges.get(); 399 Exchange exchange = threadExchanges.get(treeName); 400 if (exchange == null) 401 { 402 exchange = getNewExchange(treeName, false); 403 threadExchanges.put(treeName, exchange); 404 } 405 return exchange; 406 } 407 408 @Override 409 public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName) 410 { 411 try 412 { 413 return new CursorImpl(getNewExchange(treeName, false)); 414 } 415 catch (PersistitException e) 416 { 417 throw new StorageRuntimeException(e); 418 } 419 } 420 } 421 422 /** Common interface for internal WriteableTransaction implementations. */ 423 private interface StorageImpl extends WriteableTransaction, Closeable { 424 } 425 426 /** PersistIt implementation of the {@link WriteableTransaction} interface. */ 427 private final class WriteableStorageImpl implements StorageImpl 428 { 429 private static final String DUMMY_RECORD = "_DUMMY_RECORD_"; 430 private final Map<TreeName, Exchange> exchanges = new HashMap<>(); 431 432 @Override 433 public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value) 434 { 435 try 436 { 437 final Exchange ex = getExchangeFromCache(treeName); 438 bytesToKey(ex.getKey(), key); 439 bytesToValue(ex.getValue(), value); 440 ex.store(); 441 } 442 catch (final PersistitException | RollbackException e) 443 { 444 throw new StorageRuntimeException(e); 445 } 446 } 447 448 @Override 449 public boolean delete(final TreeName treeName, final ByteSequence key) 450 { 451 try 452 { 453 final Exchange ex = getExchangeFromCache(treeName); 454 bytesToKey(ex.getKey(), key); 455 return ex.remove(); 456 } 457 catch (final PersistitException | RollbackException e) 458 { 459 throw new StorageRuntimeException(e); 460 } 461 } 462 463 @Override 464 public void deleteTree(final TreeName treeName) 465 { 466 Exchange ex = null; 467 try 468 { 469 ex = getExchangeFromCache(treeName); 470 ex.removeTree(); 471 } 472 catch (final PersistitException | RollbackException e) 473 { 474 throw new StorageRuntimeException(e); 475 } 476 finally 477 { 478 exchanges.values().remove(ex); 479 db.releaseExchange(ex); 480 } 481 } 482 483 @Override 484 public long getRecordCount(TreeName treeName) 485 { 486 // FIXME: is there a better/quicker way to do this? 487 try(final Cursor<?, ?> cursor = openCursor(treeName)) 488 { 489 long count = 0; 490 while (cursor.next()) 491 { 492 count++; 493 } 494 return count; 495 } 496 } 497 498 @Override 499 public Cursor<ByteString, ByteString> openCursor(final TreeName treeName) 500 { 501 try 502 { 503 /* 504 * Acquire a new exchange for the cursor rather than using a cached 505 * exchange in order to avoid reentrant accesses to the same tree 506 * interfering with the cursor position. 507 */ 508 return new CursorImpl(getNewExchange(treeName, false)); 509 } 510 catch (final PersistitException | RollbackException e) 511 { 512 throw new StorageRuntimeException(e); 513 } 514 } 515 516 @Override 517 public void openTree(final TreeName treeName, boolean createOnDemand) 518 { 519 if (createOnDemand) 520 { 521 openCreateTree(treeName); 522 } 523 else 524 { 525 try 526 { 527 getExchangeFromCache(treeName); 528 } 529 catch (final PersistitException | RollbackException e) 530 { 531 throw new StorageRuntimeException(e); 532 } 533 } 534 } 535 536 @Override 537 public ByteString read(final TreeName treeName, final ByteSequence key) 538 { 539 try 540 { 541 final Exchange ex = getExchangeFromCache(treeName); 542 bytesToKey(ex.getKey(), key); 543 ex.fetch(); 544 return valueToBytes(ex.getValue()); 545 } 546 catch (final PersistitException | RollbackException e) 547 { 548 throw new StorageRuntimeException(e); 549 } 550 } 551 552 @Override 553 public boolean update(final TreeName treeName, final ByteSequence key, final UpdateFunction f) 554 { 555 try 556 { 557 final Exchange ex = getExchangeFromCache(treeName); 558 bytesToKey(ex.getKey(), key); 559 ex.fetch(); 560 final ByteSequence oldValue = valueToBytes(ex.getValue()); 561 final ByteSequence newValue = f.computeNewValue(oldValue); 562 if (!Objects.equals(newValue, oldValue)) 563 { 564 if (newValue == null) 565 { 566 ex.remove(); 567 } 568 else 569 { 570 ex.getValue().clear().putByteArray(newValue.toByteArray()); 571 ex.store(); 572 } 573 return true; 574 } 575 return false; 576 } 577 catch (final PersistitException | RollbackException e) 578 { 579 throw new StorageRuntimeException(e); 580 } 581 } 582 583 private void openCreateTree(final TreeName treeName) 584 { 585 Exchange ex = null; 586 try 587 { 588 ex = getNewExchange(treeName, true); 589 // Work around a problem with forced shutdown right after tree creation. 590 // Tree operations are not part of the journal, so force a couple operations to be able to recover. 591 ByteString dummyKey = ByteString.valueOfUtf8(DUMMY_RECORD); 592 put(treeName, dummyKey, ByteString.empty()); 593 delete(treeName, dummyKey); 594 } 595 catch (final PersistitException | RollbackException e) 596 { 597 throw new StorageRuntimeException(e); 598 } 599 finally 600 { 601 db.releaseExchange(ex); 602 } 603 } 604 605 private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException 606 { 607 Exchange exchange = exchanges.get(treeName); 608 if (exchange == null) 609 { 610 exchange = getNewExchange(treeName, false); 611 exchanges.put(treeName, exchange); 612 } 613 return exchange; 614 } 615 616 @Override 617 public void close() 618 { 619 for (final Exchange ex : exchanges.values()) 620 { 621 db.releaseExchange(ex); 622 } 623 exchanges.clear(); 624 } 625 } 626 627 /** PersistIt read-only implementation of {@link StorageImpl} interface. */ 628 private final class ReadOnlyStorageImpl implements StorageImpl { 629 private final WriteableStorageImpl delegate; 630 631 ReadOnlyStorageImpl(WriteableStorageImpl delegate) 632 { 633 this.delegate = delegate; 634 } 635 636 @Override 637 public ByteString read(TreeName treeName, ByteSequence key) 638 { 639 return delegate.read(treeName, key); 640 } 641 642 @Override 643 public Cursor<ByteString, ByteString> openCursor(TreeName treeName) 644 { 645 return delegate.openCursor(treeName); 646 } 647 648 @Override 649 public long getRecordCount(TreeName treeName) 650 { 651 return delegate.getRecordCount(treeName); 652 } 653 654 @Override 655 public void openTree(TreeName treeName, boolean createOnDemand) 656 { 657 if (createOnDemand) 658 { 659 throw new ReadOnlyStorageException(); 660 } 661 Exchange ex = null; 662 try 663 { 664 ex = getNewExchange(treeName, false); 665 } 666 catch (final TreeNotFoundException e) 667 { 668 // ignore missing trees. 669 } 670 catch (final PersistitException | RollbackException e) 671 { 672 throw new StorageRuntimeException(e); 673 } 674 finally 675 { 676 db.releaseExchange(ex); 677 } 678 } 679 680 @Override 681 public void close() 682 { 683 delegate.close(); 684 } 685 686 @Override 687 public void deleteTree(TreeName name) 688 { 689 throw new ReadOnlyStorageException(); 690 } 691 692 @Override 693 public void put(TreeName treeName, ByteSequence key, ByteSequence value) 694 { 695 throw new ReadOnlyStorageException(); 696 } 697 698 @Override 699 public boolean update(TreeName treeName, ByteSequence key, UpdateFunction f) 700 { 701 throw new ReadOnlyStorageException(); 702 } 703 704 @Override 705 public boolean delete(TreeName treeName, ByteSequence key) 706 { 707 throw new ReadOnlyStorageException(); 708 } 709 } 710 711 private Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException 712 { 713 return db.getExchange(volume, treeName.toString(), create); 714 } 715 716 private StorageImpl newStorageImpl() { 717 final WriteableStorageImpl writeableStorage = new WriteableStorageImpl(); 718 return accessMode.isWriteable() ? writeableStorage : new ReadOnlyStorageImpl(writeableStorage); 719 } 720 721 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 722 private final ServerContext serverContext; 723 private final File backendDirectory; 724 private AccessMode accessMode; 725 private Persistit db; 726 private Volume volume; 727 private PDBBackendCfg config; 728 private DiskSpaceMonitor diskMonitor; 729 private PDBMonitor monitor; 730 private MemoryQuota memQuota; 731 private StorageStatus storageStatus = StorageStatus.working(); 732 733 /** 734 * Creates a new persistit storage with the provided configuration. 735 * 736 * @param cfg 737 * The configuration. 738 * @param serverContext 739 * This server instance context 740 * @throws ConfigException if memory cannot be reserved 741 */ 742 // FIXME: should be package private once importer is decoupled. 743 public PDBStorage(final PDBBackendCfg cfg, ServerContext serverContext) throws ConfigException 744 { 745 this.serverContext = serverContext; 746 backendDirectory = getBackendDirectory(cfg); 747 config = cfg; 748 cfg.addPDBChangeListener(this); 749 } 750 751 private Configuration buildImportConfiguration() 752 { 753 final Configuration dbCfg = buildConfiguration(AccessMode.READ_WRITE); 754 getBufferPoolCfg(dbCfg).setMaximumMemory(IMPORT_DB_CACHE_SIZE); 755 dbCfg.setCommitPolicy(SOFT); 756 return dbCfg; 757 } 758 759 private Configuration buildConfiguration(AccessMode accessMode) 760 { 761 this.accessMode = accessMode; 762 763 final Configuration dbCfg = new Configuration(); 764 dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath()); 765 dbCfg.setJournalPath(new File(backendDirectory, JOURNAL_NAME).getPath()); 766 dbCfg.setCheckpointInterval(config.getDBCheckpointerWakeupInterval()); 767 // Volume is opened read write because recovery will fail if opened read-only 768 dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null, 769 BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false))); 770 final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg); 771 bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE); 772 773 diskMonitor = serverContext.getDiskSpaceMonitor(); 774 memQuota = serverContext.getMemoryQuota(); 775 if (config.getDBCacheSize() > 0) 776 { 777 bufferPoolCfg.setMaximumMemory(config.getDBCacheSize()); 778 memQuota.acquireMemory(config.getDBCacheSize()); 779 } 780 else 781 { 782 bufferPoolCfg.setMaximumMemory(memQuota.memPercentToBytes(config.getDBCachePercent())); 783 memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent())); 784 } 785 dbCfg.setCommitPolicy(config.isDBTxnNoSync() ? SOFT : GROUP); 786 dbCfg.setJmxEnabled(false); 787 return dbCfg; 788 } 789 790 @Override 791 public void close() 792 { 793 if (db != null) 794 { 795 DirectoryServer.deregisterMonitorProvider(monitor); 796 monitor = null; 797 try 798 { 799 db.close(); 800 db = null; 801 } 802 catch (final PersistitException e) 803 { 804 throw new IllegalStateException(e); 805 } 806 } 807 if (config.getDBCacheSize() > 0) 808 { 809 memQuota.releaseMemory(config.getDBCacheSize()); 810 } 811 else 812 { 813 memQuota.releaseMemory(memQuota.memPercentToBytes(config.getDBCachePercent())); 814 } 815 config.removePDBChangeListener(this); 816 diskMonitor.deregisterMonitoredDirectory(getDirectory(), this); 817 } 818 819 private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg) 820 { 821 return dbCfg.getBufferPoolMap().get(BUFFER_SIZE); 822 } 823 824 @Override 825 public void open(AccessMode accessMode) throws ConfigException, StorageRuntimeException 826 { 827 Reject.ifNull(accessMode, "accessMode must not be null"); 828 open0(buildConfiguration(accessMode)); 829 } 830 831 private void open0(final Configuration dbCfg) throws ConfigException 832 { 833 setupStorageFiles(backendDirectory, config.getDBDirectoryPermissions(), config.dn()); 834 try 835 { 836 if (db != null) 837 { 838 throw new IllegalStateException( 839 "Database is already open, either the backend is enabled or an import is currently running."); 840 } 841 db = new Persistit(dbCfg); 842 843 final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap()); 844 final long totalSize = bufferCount * BUFFER_SIZE / 1024; 845 logger.info(NOTE_PDB_MEMORY_CFG, config.getBackendId(), bufferCount, BUFFER_SIZE, totalSize); 846 847 db.initialize(); 848 volume = db.loadVolume(VOLUME_NAME); 849 monitor = new PDBMonitor(config.getBackendId() + " PDB Database", db); 850 DirectoryServer.registerMonitorProvider(monitor); 851 } 852 catch(final InUseException e) { 853 throw new StorageInUseException(e); 854 } 855 catch (final PersistitException | RollbackException e) 856 { 857 throw new StorageRuntimeException(e); 858 } 859 registerMonitoredDirectory(config); 860 } 861 862 @Override 863 public <T> T read(final ReadOperation<T> operation) throws Exception 864 { 865 final Transaction txn = db.getTransaction(); 866 for (;;) 867 { 868 txn.begin(); 869 try 870 { 871 try (final StorageImpl storageImpl = newStorageImpl()) 872 { 873 final T result = operation.run(storageImpl); 874 txn.commit(); 875 return result; 876 } 877 catch (final StorageRuntimeException e) 878 { 879 if (e.getCause() != null) 880 { 881 throw (Exception) e.getCause(); 882 } 883 throw e; 884 } 885 } 886 catch (final RollbackException e) 887 { 888 // retry 889 } 890 catch (final Exception e) 891 { 892 txn.rollback(); 893 throw e; 894 } 895 finally 896 { 897 txn.end(); 898 } 899 } 900 } 901 902 @Override 903 public Importer startImport() throws ConfigException, StorageRuntimeException 904 { 905 open0(buildImportConfiguration()); 906 return new ImporterImpl(); 907 } 908 909 @Override 910 public void write(final WriteOperation operation) throws Exception 911 { 912 final Transaction txn = db.getTransaction(); 913 for (;;) 914 { 915 txn.begin(); 916 try 917 { 918 try (final StorageImpl storageImpl = newStorageImpl()) 919 { 920 operation.run(storageImpl); 921 txn.commit(); 922 return; 923 } 924 catch (final StorageRuntimeException e) 925 { 926 if (e.getCause() != null) 927 { 928 throw (Exception) e.getCause(); 929 } 930 throw e; 931 } 932 } 933 catch (final RollbackException e) 934 { 935 // retry after random sleep (reduces transactions collision. Drawback: increased latency) 936 Thread.sleep((long) (Math.random() * MAX_SLEEP_ON_RETRY_MS)); 937 } 938 catch (final Exception e) 939 { 940 txn.rollback(); 941 throw e; 942 } 943 finally 944 { 945 txn.end(); 946 } 947 } 948 } 949 950 @Override 951 public boolean supportsBackupAndRestore() 952 { 953 return true; 954 } 955 956 @Override 957 public File getDirectory() 958 { 959 return getBackendDirectory(config); 960 } 961 962 private static File getBackendDirectory(PDBBackendCfg cfg) 963 { 964 return getDBDirectory(cfg.getDBDirectory(), cfg.getBackendId()); 965 } 966 967 @Override 968 public ListIterator<Path> getFilesToBackup() throws DirectoryException 969 { 970 try 971 { 972 if (db == null) 973 { 974 return getFilesToBackupWhenOffline(); 975 } 976 977 // FIXME: use full programmatic way of retrieving backup file once available in persistIt 978 // When requesting files to backup, append only mode must also be set (-a) otherwise it will be ended 979 // by PersistIt and performing backup may corrupt the DB. 980 String filesAsString = db.getManagement().execute("backup -a -f"); 981 String[] allFiles = filesAsString.split("[\r\n]+"); 982 final List<Path> files = new ArrayList<>(); 983 for (String file : allFiles) 984 { 985 files.add(Paths.get(file)); 986 } 987 return files.listIterator(); 988 } 989 catch (Exception e) 990 { 991 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 992 ERR_BACKEND_LIST_FILES_TO_BACKUP.get(config.getBackendId(), stackTraceToSingleLineString(e))); 993 } 994 } 995 996 /** Filter to retrieve the database files to backup. */ 997 private static final FileFilter BACKUP_FILES_FILTER = new FileFilter() 998 { 999 @Override 1000 public boolean accept(File file) 1001 { 1002 String name = file.getName(); 1003 return VOLUME_NAME.equals(name) || name.matches(JOURNAL_NAME + "\\.\\d+$"); 1004 } 1005 }; 1006 1007 /** 1008 * Returns the list of files to backup when there is no open database. 1009 * <p> 1010 * It is not possible to rely on the database returning the files, so the files must be retrieved 1011 * from a file filter. 1012 */ 1013 private ListIterator<Path> getFilesToBackupWhenOffline() throws DirectoryException 1014 { 1015 return BackupManager.getFiles(getDirectory(), BACKUP_FILES_FILTER, config.getBackendId()).listIterator(); 1016 } 1017 1018 @Override 1019 public Path beforeRestore() throws DirectoryException 1020 { 1021 return null; 1022 } 1023 1024 @Override 1025 public boolean isDirectRestore() 1026 { 1027 // restore is done in an intermediate directory 1028 return false; 1029 } 1030 1031 @Override 1032 public void afterRestore(Path restoreDirectory, Path saveDirectory) throws DirectoryException 1033 { 1034 // intermediate directory content is moved to database directory 1035 File targetDirectory = getDirectory(); 1036 recursiveDelete(targetDirectory); 1037 try 1038 { 1039 Files.move(restoreDirectory, targetDirectory.toPath()); 1040 } 1041 catch(IOException e) 1042 { 1043 LocalizableMessage msg = ERR_CANNOT_RENAME_RESTORE_DIRECTORY.get(restoreDirectory, targetDirectory.getPath()); 1044 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), msg); 1045 } 1046 } 1047 1048 /** 1049 * Switch the database in append only mode. 1050 * <p> 1051 * This is a mandatory operation before performing a backup. 1052 */ 1053 private void switchToAppendOnlyMode() throws DirectoryException 1054 { 1055 try 1056 { 1057 // FIXME: use full programmatic way of switching to this mode once available in persistIt 1058 db.getManagement().execute("backup -a -c"); 1059 } 1060 catch (RemoteException e) 1061 { 1062 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 1063 ERR_BACKEND_SWITCH_TO_APPEND_MODE.get(config.getBackendId(), stackTraceToSingleLineString(e))); 1064 } 1065 } 1066 1067 /** 1068 * Terminate the append only mode of the database. 1069 * <p> 1070 * This should be called only when database was previously switched to append only mode. 1071 */ 1072 private void endAppendOnlyMode() throws DirectoryException 1073 { 1074 try 1075 { 1076 // FIXME: use full programmatic way of ending append mode once available in persistIt 1077 db.getManagement().execute("backup -e"); 1078 } 1079 catch (RemoteException e) 1080 { 1081 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 1082 ERR_BACKEND_END_APPEND_MODE.get(config.getBackendId(), stackTraceToSingleLineString(e))); 1083 } 1084 } 1085 1086 @Override 1087 public void createBackup(BackupConfig backupConfig) throws DirectoryException 1088 { 1089 if (db != null) 1090 { 1091 switchToAppendOnlyMode(); 1092 } 1093 try 1094 { 1095 new BackupManager(config.getBackendId()).createBackup(this, backupConfig); 1096 } 1097 finally 1098 { 1099 if (db != null) 1100 { 1101 endAppendOnlyMode(); 1102 } 1103 } 1104 } 1105 1106 @Override 1107 public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException 1108 { 1109 new BackupManager(config.getBackendId()).removeBackup(backupDirectory, backupID); 1110 } 1111 1112 @Override 1113 public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException 1114 { 1115 new BackupManager(config.getBackendId()).restoreBackup(this, restoreConfig); 1116 } 1117 1118 @Override 1119 public Set<TreeName> listTrees() 1120 { 1121 try 1122 { 1123 String[] treeNames = volume.getTreeNames(); 1124 final Set<TreeName> results = new HashSet<>(treeNames.length); 1125 for (String treeName : treeNames) 1126 { 1127 if (!treeName.equals("_classIndex")) 1128 { 1129 results.add(TreeName.valueOf(treeName)); 1130 } 1131 } 1132 return results; 1133 } 1134 catch (PersistitException e) 1135 { 1136 throw new StorageRuntimeException(e); 1137 } 1138 } 1139 1140 /** 1141 * TODO: it would be nice to use the low-level key/value APIs. They seem quite 1142 * inefficient at the moment for simple byte arrays. 1143 */ 1144 private static Key bytesToKey(final Key key, final ByteSequence bytes) 1145 { 1146 final byte[] tmp = bytes.toByteArray(); 1147 return key.clear().appendByteArray(tmp, 0, tmp.length); 1148 } 1149 1150 private static Value bytesToValue(final Value value, final ByteSequence bytes) 1151 { 1152 value.clear().putByteArray(bytes.toByteArray()); 1153 return value; 1154 } 1155 1156 private static ByteString valueToBytes(final Value value) 1157 { 1158 if (value.isDefined()) 1159 { 1160 return ByteString.wrap(value.getByteArray()); 1161 } 1162 return null; 1163 } 1164 1165 @Override 1166 public boolean isConfigurationChangeAcceptable(PDBBackendCfg newCfg, 1167 List<LocalizableMessage> unacceptableReasons) 1168 { 1169 long newSize = computeSize(newCfg); 1170 long oldSize = computeSize(config); 1171 return (newSize <= oldSize || memQuota.isMemoryAvailable(newSize - oldSize)) 1172 && checkConfigurationDirectories(newCfg, unacceptableReasons); 1173 } 1174 1175 private long computeSize(PDBBackendCfg cfg) 1176 { 1177 return cfg.getDBCacheSize() > 0 ? cfg.getDBCacheSize() : memQuota.memPercentToBytes(cfg.getDBCachePercent()); 1178 } 1179 1180 /** 1181 * Checks newly created backend has a valid configuration. 1182 * @param cfg the new configuration 1183 * @param unacceptableReasons the list of accumulated errors and their messages 1184 * @param context the server context 1185 * @return true if newly created backend has a valid configuration 1186 */ 1187 static boolean isConfigurationAcceptable(PDBBackendCfg cfg, List<LocalizableMessage> unacceptableReasons, 1188 ServerContext context) 1189 { 1190 if (context != null) 1191 { 1192 MemoryQuota memQuota = context.getMemoryQuota(); 1193 if (cfg.getDBCacheSize() > 0 && !memQuota.isMemoryAvailable(cfg.getDBCacheSize())) 1194 { 1195 unacceptableReasons.add(ERR_BACKEND_CONFIG_CACHE_SIZE_GREATER_THAN_JVM_HEAP.get( 1196 cfg.getDBCacheSize(), memQuota.getAvailableMemory())); 1197 return false; 1198 } 1199 else if (!memQuota.isMemoryAvailable(memQuota.memPercentToBytes(cfg.getDBCachePercent()))) 1200 { 1201 unacceptableReasons.add(ERR_BACKEND_CONFIG_CACHE_PERCENT_GREATER_THAN_JVM_HEAP.get( 1202 cfg.getDBCachePercent(), memQuota.memBytesToPercent(memQuota.getAvailableMemory()))); 1203 return false; 1204 } 1205 } 1206 return checkConfigurationDirectories(cfg, unacceptableReasons); 1207 } 1208 1209 private static boolean checkConfigurationDirectories(PDBBackendCfg cfg, 1210 List<LocalizableMessage> unacceptableReasons) 1211 { 1212 final ConfigChangeResult ccr = new ConfigChangeResult(); 1213 File newBackendDirectory = getBackendDirectory(cfg); 1214 1215 checkDBDirExistsOrCanCreate(newBackendDirectory, ccr, true); 1216 checkDBDirPermissions(cfg.getDBDirectoryPermissions(), cfg.dn(), ccr); 1217 if (!ccr.getMessages().isEmpty()) 1218 { 1219 unacceptableReasons.addAll(ccr.getMessages()); 1220 return false; 1221 } 1222 return true; 1223 } 1224 1225 @Override 1226 public ConfigChangeResult applyConfigurationChange(PDBBackendCfg cfg) 1227 { 1228 final ConfigChangeResult ccr = new ConfigChangeResult(); 1229 1230 try 1231 { 1232 File newBackendDirectory = getBackendDirectory(cfg); 1233 1234 // Create the directory if it doesn't exist. 1235 if(!cfg.getDBDirectory().equals(config.getDBDirectory())) 1236 { 1237 checkDBDirExistsOrCanCreate(newBackendDirectory, ccr, false); 1238 if (!ccr.getMessages().isEmpty()) 1239 { 1240 return ccr; 1241 } 1242 1243 ccr.setAdminActionRequired(true); 1244 ccr.addMessage(NOTE_CONFIG_DB_DIR_REQUIRES_RESTART.get(config.getDBDirectory(), cfg.getDBDirectory())); 1245 } 1246 1247 if (!cfg.getDBDirectoryPermissions().equalsIgnoreCase(config.getDBDirectoryPermissions()) 1248 || !cfg.getDBDirectory().equals(config.getDBDirectory())) 1249 { 1250 checkDBDirPermissions(cfg.getDBDirectoryPermissions(), cfg.dn(), ccr); 1251 if (!ccr.getMessages().isEmpty()) 1252 { 1253 return ccr; 1254 } 1255 1256 setDBDirPermissions(newBackendDirectory, cfg.getDBDirectoryPermissions(), cfg.dn(), ccr); 1257 if (!ccr.getMessages().isEmpty()) 1258 { 1259 return ccr; 1260 } 1261 } 1262 registerMonitoredDirectory(cfg); 1263 config = cfg; 1264 } 1265 catch (Exception e) 1266 { 1267 addErrorMessage(ccr, LocalizableMessage.raw(stackTraceToSingleLineString(e))); 1268 } 1269 return ccr; 1270 } 1271 1272 private void registerMonitoredDirectory(PDBBackendCfg cfg) 1273 { 1274 diskMonitor.registerMonitoredDirectory( 1275 cfg.getBackendId() + " backend", 1276 getDirectory(), 1277 cfg.getDiskLowThreshold(), 1278 cfg.getDiskFullThreshold(), 1279 this); 1280 } 1281 1282 @Override 1283 public void removeStorageFiles() throws StorageRuntimeException 1284 { 1285 StorageUtils.removeStorageFiles(backendDirectory); 1286 } 1287 1288 @Override 1289 public StorageStatus getStorageStatus() 1290 { 1291 return storageStatus; 1292 } 1293 1294 @Override 1295 public void diskFullThresholdReached(File directory, long thresholdInBytes) { 1296 storageStatus = statusWhenDiskSpaceFull(directory, thresholdInBytes, config.getBackendId()); 1297 } 1298 1299 @Override 1300 public void diskLowThresholdReached(File directory, long thresholdInBytes) { 1301 storageStatus = statusWhenDiskSpaceLow(directory, thresholdInBytes, config.getBackendId()); 1302 } 1303 1304 @Override 1305 public void diskSpaceRestored(File directory, long lowThresholdInBytes, long fullThresholdInBytes) { 1306 storageStatus = StorageStatus.working(); 1307 } 1308}