001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2008-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.admin.ads;
028
029import java.util.Collection;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.LinkedHashSet;
035import java.util.Map;
036import java.util.Set;
037
038import javax.naming.NameNotFoundException;
039import javax.naming.NamingEnumeration;
040import javax.naming.NamingException;
041import javax.naming.directory.SearchControls;
042import javax.naming.directory.SearchResult;
043import javax.naming.ldap.InitialLdapContext;
044import javax.naming.ldap.LdapName;
045
046import org.forgerock.i18n.LocalizableMessage;
047import org.forgerock.i18n.slf4j.LocalizedLogger;
048import org.opends.admin.ads.ADSContext.ServerProperty;
049import org.opends.admin.ads.util.ApplicationTrustManager;
050import org.opends.admin.ads.util.ConnectionUtils;
051import org.opends.admin.ads.util.PreferredConnection;
052import org.opends.admin.ads.util.ServerLoader;
053import org.opends.quicksetup.util.Utils;
054
055import static com.forgerock.opendj.cli.Utils.*;
056
057import static org.opends.messages.QuickSetupMessages.*;
058
059/**
060 * This class allows to read the configuration of the different servers that are
061 * registered in a given ADS server. It provides a read only view of the
062 * configuration of the servers and of the replication topologies that might be
063 * configured between them.
064 */
065public class TopologyCache
066{
067
068  private final ADSContext adsContext;
069  private final ApplicationTrustManager trustManager;
070  private final int timeout;
071  private final String bindDN;
072  private final String bindPwd;
073  private final Set<ServerDescriptor> servers = new HashSet<>();
074  private final Set<SuffixDescriptor> suffixes = new HashSet<>();
075  private final Set<PreferredConnection> preferredConnections = new LinkedHashSet<>();
076  private final TopologyCacheFilter filter = new TopologyCacheFilter();
077  private static final int MULTITHREAD_TIMEOUT = 90 * 1000;
078  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
079
080  /**
081   * Constructor of the TopologyCache.
082   *
083   * @param adsContext the adsContext to the ADS registry.
084   * @param trustManager the ApplicationTrustManager that must be used to trust
085   * certificates when we create connections to the registered servers to read
086   * their configuration.
087   * @param timeout the timeout to establish the connection in milliseconds.
088   * Use {@code 0} to express no timeout.
089   */
090  public TopologyCache(ADSContext adsContext,
091                       ApplicationTrustManager trustManager,
092                       int timeout)
093  {
094    this.adsContext = adsContext;
095    this.trustManager = trustManager;
096    this.timeout = timeout;
097    bindDN = ConnectionUtils.getBindDN(adsContext.getDirContext());
098    bindPwd = ConnectionUtils.getBindPassword(adsContext.getDirContext());
099  }
100
101  /**
102   * Reads the configuration of the registered servers.
103   *
104   * @throws TopologyCacheException if there is an issue reading the
105   * configuration of the registered servers.
106   */
107  public void reloadTopology() throws TopologyCacheException
108  {
109    suffixes.clear();
110    servers.clear();
111    try
112    {
113      Set<Map<ServerProperty, Object>> adsServers =
114          adsContext.readServerRegistry();
115
116      Set<ServerLoader> threadSet = new HashSet<>();
117      for (Map<ServerProperty, Object> serverProperties : adsServers)
118      {
119        ServerLoader t = getServerLoader(serverProperties);
120        t.start();
121        threadSet.add(t);
122      }
123      joinThreadSet(threadSet);
124      /*
125       * Try to consolidate things (even if the data is not complete).
126       */
127
128      HashMap<LdapName, Set<SuffixDescriptor>> hmSuffixes = new HashMap<>();
129      for (ServerLoader loader : threadSet)
130      {
131        ServerDescriptor descriptor = loader.getServerDescriptor();
132        for (ReplicaDescriptor replica : descriptor.getReplicas())
133        {
134          logger.info(LocalizableMessage.raw("Handling replica with dn: "
135              + replica.getSuffix().getDN()));
136
137          boolean suffixFound = false;
138          LdapName dn = new LdapName(replica.getSuffix().getDN());
139          Set<SuffixDescriptor> sufs = hmSuffixes.get(dn);
140          if (sufs != null)
141          {
142            Iterator<SuffixDescriptor> it = sufs.iterator();
143            while (it.hasNext() && !suffixFound)
144            {
145              SuffixDescriptor suffix = it.next();
146              Iterator<String> it2 = suffix.getReplicationServers().iterator();
147              while (it2.hasNext() && !suffixFound)
148              {
149                if (replica.getReplicationServers().contains(it2.next()))
150                {
151                  suffixFound = true;
152                  Set<ReplicaDescriptor> replicas = suffix.getReplicas();
153                  replicas.add(replica);
154                  suffix.setReplicas(replicas);
155                  replica.setSuffix(suffix);
156                }
157              }
158            }
159          }
160          if (!suffixFound)
161          {
162            if (sufs == null)
163            {
164              sufs = new HashSet<>();
165              hmSuffixes.put(dn, sufs);
166            }
167            sufs.add(replica.getSuffix());
168            suffixes.add(replica.getSuffix());
169          }
170        }
171        servers.add(descriptor);
172      }
173
174      // Figure out the replication monitoring if it is required.
175      if (getFilter().searchMonitoringInformation())
176      {
177        readReplicationMonitoring();
178      }
179    }
180    catch (ADSContextException ade)
181    {
182      throw new TopologyCacheException(ade);
183    }
184    catch (Throwable t)
185    {
186      throw new TopologyCacheException(TopologyCacheException.Type.BUG, t);
187    }
188  }
189
190  /**
191   * Returns the trust manager used by this class.
192   *
193   * @return the trust manager used by this class.
194   */
195  public ApplicationTrustManager getTrustManager()
196  {
197    return trustManager;
198  }
199
200  /**
201   * Returns the timeout to establish the connection in milliseconds.
202   *
203   * @return the timeout to establish the connection in milliseconds. Returns
204   * {@code 0} to express no timeout.
205   */
206  public int getConnectTimeout()
207  {
208    return timeout;
209  }
210
211  /**
212   * Reads the replication monitoring.
213   */
214  private void readReplicationMonitoring()
215  {
216    Set<ReplicaDescriptor> replicasToUpdate = getReplicasToUpdate();
217    for (ServerDescriptor server : getServers())
218    {
219      if (server.isReplicationServer())
220      {
221        // If is replication server, then at least we were able to read the
222        // configuration, so assume that we might be able to read monitoring
223        // (even if an exception occurred before).
224        Set<ReplicaDescriptor> candidateReplicas = getCandidateReplicas(server);
225        if (!candidateReplicas.isEmpty())
226        {
227          Set<ReplicaDescriptor> updatedReplicas = new HashSet<>();
228          try
229          {
230            updateReplicas(server, candidateReplicas, updatedReplicas);
231          }
232          catch (NamingException ne)
233          {
234            server.setLastException(new TopologyCacheException(
235                TopologyCacheException.Type.GENERIC_READING_SERVER, ne));
236          }
237          replicasToUpdate.removeAll(updatedReplicas);
238        }
239      }
240
241      if (replicasToUpdate.isEmpty())
242      {
243        break;
244      }
245    }
246  }
247
248  private Set<ReplicaDescriptor> getReplicasToUpdate()
249  {
250    Set<ReplicaDescriptor> replicasToUpdate = new HashSet<>();
251    for (ServerDescriptor server : getServers())
252    {
253      for (ReplicaDescriptor replica : server.getReplicas())
254      {
255        if (replica.isReplicated())
256        {
257          replicasToUpdate.add(replica);
258        }
259      }
260    }
261    return replicasToUpdate;
262  }
263
264  private Set<ReplicaDescriptor> getCandidateReplicas(ServerDescriptor server)
265  {
266    Set<ReplicaDescriptor> candidateReplicas = new HashSet<>();
267    // It contains replication information: analyze it.
268    String repServer = server.getReplicationServerHostPort();
269    for (SuffixDescriptor suffix : getSuffixes())
270    {
271      if (containsIgnoreCase(suffix.getReplicationServers(), repServer))
272      {
273        candidateReplicas.addAll(suffix.getReplicas());
274      }
275    }
276    return candidateReplicas;
277  }
278
279  private boolean containsIgnoreCase(Set<String> col, String toFind)
280  {
281    for (String s : col)
282    {
283      if (s.equalsIgnoreCase(toFind))
284      {
285        return true;
286      }
287    }
288    return false;
289  }
290
291  /**
292   * Sets the list of LDAP URLs and connection type that are preferred to be
293   * used to connect to the servers. When we have a server to which we can
294   * connect using a URL on the list we will try to use it.
295   *
296   * @param cnx the list of preferred connections.
297   */
298  public void setPreferredConnections(Set<PreferredConnection> cnx)
299  {
300    preferredConnections.clear();
301    preferredConnections.addAll(cnx);
302  }
303
304  /**
305   * Returns the list of LDAP URLs and connection type that are preferred to be
306   * used to connect to the servers. If a URL is on this list, when we have a
307   * server to which we can connect using that URL and the associated connection
308   * type we will try to use it.
309   *
310   * @return the list of preferred connections.
311   */
312  public LinkedHashSet<PreferredConnection> getPreferredConnections()
313  {
314    return new LinkedHashSet<>(preferredConnections);
315  }
316
317  /**
318   * Returns a Set containing all the servers that are registered in the ADS.
319   *
320   * @return a Set containing all the servers that are registered in the ADS.
321   */
322  public Set<ServerDescriptor> getServers()
323  {
324    return new HashSet<>(servers);
325  }
326
327  /**
328   * Returns a Set containing the suffixes (replication topologies) that could
329   * be retrieved after the last call to reloadTopology.
330   *
331   * @return a Set containing the suffixes (replication topologies) that could
332   * be retrieved after the last call to reloadTopology.
333   */
334  public Set<SuffixDescriptor> getSuffixes()
335  {
336    return new HashSet<>(suffixes);
337  }
338
339  /**
340   * Returns the filter to be used when retrieving information.
341   *
342   * @return the filter to be used when retrieving information.
343   */
344  public TopologyCacheFilter getFilter()
345  {
346    return filter;
347  }
348
349  /**
350   * Method used to wait at most a certain time (MULTITHREAD_TIMEOUT) for the
351   * different threads to finish.
352   *
353   * @param threadSet the list of threads (we assume that they are started) that
354   * we must wait for.
355   */
356  private void joinThreadSet(Set<ServerLoader> threadSet)
357  {
358    Date startDate = new Date();
359    for (ServerLoader t : threadSet)
360    {
361      long timeToJoin = MULTITHREAD_TIMEOUT - System.currentTimeMillis()
362          + startDate.getTime();
363      try
364      {
365        if (timeToJoin > 0)
366        {
367          t.join(MULTITHREAD_TIMEOUT);
368        }
369      }
370      catch (InterruptedException ie)
371      {
372        logger.info(LocalizableMessage.raw(ie + " caught and ignored", ie));
373      }
374      if (t.isAlive())
375      {
376        t.interrupt();
377      }
378    }
379    Date endDate = new Date();
380    long workingTime = endDate.getTime() - startDate.getTime();
381    logger.info(LocalizableMessage.raw("Loading ended at " + workingTime + " ms"));
382  }
383
384  /**
385   * Creates a ServerLoader object based on the provided server properties.
386   *
387   * @param serverProperties the server properties to be used to generate the
388   * ServerLoader.
389   * @return a ServerLoader object based on the provided server properties.
390   */
391  private ServerLoader getServerLoader(
392      Map<ServerProperty, Object> serverProperties)
393  {
394    return new ServerLoader(serverProperties, bindDN, bindPwd,
395        trustManager == null ? null : trustManager.createCopy(),
396        timeout,
397        getPreferredConnections(), getFilter());
398  }
399
400  /**
401   * Returns the adsContext used by this TopologyCache.
402   *
403   * @return the adsContext used by this TopologyCache.
404   */
405  public ADSContext getAdsContext()
406  {
407    return adsContext;
408  }
409
410  /**
411   * Returns a set of error messages encountered in the TopologyCache.
412   *
413   * @return a set of error messages encountered in the TopologyCache.
414   */
415  public Set<LocalizableMessage> getErrorMessages()
416  {
417    Set<TopologyCacheException> exceptions = new HashSet<>();
418    Set<ServerDescriptor> theServers = getServers();
419    Set<LocalizableMessage> exceptionMsgs = new LinkedHashSet<>();
420    for (ServerDescriptor server : theServers)
421    {
422      TopologyCacheException e = server.getLastException();
423      if (e != null)
424      {
425        exceptions.add(e);
426      }
427    }
428    /*
429     * Check the exceptions and see if we throw them or not.
430     */
431    for (TopologyCacheException e : exceptions)
432    {
433      switch (e.getType())
434      {
435        case NOT_GLOBAL_ADMINISTRATOR:
436          exceptionMsgs.add(INFO_NOT_GLOBAL_ADMINISTRATOR_PROVIDED.get());
437
438          break;
439        case GENERIC_CREATING_CONNECTION:
440          if (isCertificateException(e.getCause()))
441          {
442            exceptionMsgs.add(
443                INFO_ERROR_READING_CONFIG_LDAP_CERTIFICATE_SERVER.get(
444                e.getHostPort(), e.getCause().getMessage()));
445          }
446          else
447          {
448            exceptionMsgs.add(Utils.getMessage(e));
449          }
450          break;
451        default:
452          exceptionMsgs.add(Utils.getMessage(e));
453      }
454    }
455    return exceptionMsgs;
456  }
457
458  /**
459   * Updates the monitoring information of the provided replicas using the
460   * information located in cn=monitor of a given replication server.
461   *
462   * @param replicationServer the replication server.
463   * @param candidateReplicas the collection of replicas that must be updated.
464   * @param updatedReplicas the collection of replicas that are actually
465   * updated. This list is updated by the method.
466   */
467  private void updateReplicas(ServerDescriptor replicationServer,
468                              Collection<ReplicaDescriptor> candidateReplicas,
469                              Collection<ReplicaDescriptor> updatedReplicas)
470      throws NamingException
471  {
472    SearchControls ctls = new SearchControls();
473    ctls.setSearchScope(SearchControls.SUBTREE_SCOPE);
474    ctls.setReturningAttributes(
475        new String[]
476        {
477          "approx-older-change-not-synchronized-millis", "missing-changes",
478          "domain-name", "server-id"
479        });
480
481    InitialLdapContext ctx = null;
482    NamingEnumeration<SearchResult> monitorEntries = null;
483    try
484    {
485      ServerLoader loader =
486          getServerLoader(replicationServer.getAdsProperties());
487      ctx = loader.createContext();
488
489      monitorEntries = ctx.search(
490          new LdapName("cn=monitor"), "(missing-changes=*)", ctls);
491
492      while (monitorEntries.hasMore())
493      {
494        SearchResult sr = monitorEntries.next();
495
496        String dn = ConnectionUtils.getFirstValue(sr, "domain-name");
497        int replicaId = -1;
498        try
499        {
500          String sid = ConnectionUtils.getFirstValue(sr, "server-id");
501          if (sid == null)
502          {
503            // This is not a replica, but a replication server. Skip it
504            continue;
505          }
506          replicaId = Integer.valueOf(sid);
507        }
508        catch (Throwable t)
509        {
510          logger.warn(LocalizableMessage.raw("Unexpected error reading replica ID: " + t,
511              t));
512        }
513
514        for (ReplicaDescriptor replica : candidateReplicas)
515        {
516          if (Utils.areDnsEqual(dn, replica.getSuffix().getDN())
517              && replica.isReplicated()
518              && replica.getReplicationId() == replicaId)
519          {
520            // This statistic is optional.
521            setAgeOfOldestMissingChange(replica, sr);
522            setMissingChanges(replica, sr);
523            updatedReplicas.add(replica);
524          }
525        }
526      }
527    }
528    catch (NameNotFoundException nse)
529    {
530    }
531    finally
532    {
533      if (monitorEntries != null)
534      {
535        try
536        {
537          monitorEntries.close();
538        }
539        catch (Throwable t)
540        {
541          logger.warn(LocalizableMessage.raw(
542              "Unexpected error closing enumeration on monitor entries" + t, t));
543        }
544      }
545      if (ctx != null)
546      {
547        ctx.close();
548      }
549    }
550  }
551
552  private void setMissingChanges(ReplicaDescriptor replica, SearchResult sr) throws NamingException
553  {
554    String s = ConnectionUtils.getFirstValue(sr, "missing-changes");
555    if (s != null)
556    {
557      try
558      {
559        replica.setMissingChanges(Integer.valueOf(s));
560      }
561      catch (Throwable t)
562      {
563        logger.warn(LocalizableMessage.raw(
564            "Unexpected error reading missing changes: " + t, t));
565      }
566    }
567  }
568
569  private void setAgeOfOldestMissingChange(ReplicaDescriptor replica, SearchResult sr) throws NamingException
570  {
571    String s = ConnectionUtils.getFirstValue(sr, "approx-older-change-not-synchronized-millis");
572    if (s != null)
573    {
574      try
575      {
576        replica.setAgeOfOldestMissingChange(Long.valueOf(s));
577      }
578      catch (Throwable t)
579      {
580        logger.warn(LocalizableMessage.raw(
581            "Unexpected error reading age of oldest change: " + t, t));
582      }
583    }
584  }
585}