转载

DataNode启动分析

启动工作线程

1. 通信:BPServiceActor,IpcServer,DataXceiverServer,localDataXceiverServer

2. 监控:JVMPauseMonitor,DU(dfsUsage)

3. 其他:InfoServer

入口

# org.apache.hadoop.hdfs.server.datanode.DataNode
public static void main(String args[]) {
	if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
	  System.exit(0);
	}
 
	secureMain(args, null);
}
 
public static void secureMain(String args[], SecureResources resources) {
	int errorCode = 0;
	try {
	  StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
	  // 创建DataNode
	  DataNode datanode = createDataNode(args, null, resources);
	  if (datanode != null) {
	    // 注:只需要等待BPServiceActor线程结束
	    datanode.join();
	  } else {
	    errorCode = 1;
	  }
	} catch (Throwable e) {
	  LOG.fatal("Exception in secureMain", e);
	  terminate(1, e);
	} finally {
	  // We need to terminate the process here because either shutdown was called
	  // or some disk related conditions like volumes tolerated or volumes required
	  // condition was not met. Also, In secure mode, control will go to Jsvc
	  // and Datanode process hangs if it does not exit.
	  LOG.warn("Exiting Datanode");
	  terminate(errorCode);
	}
}
 
  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    // 初始化DataNode,并完成部分工作线程的启动
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 启动剩余工作线程,dataXceiverServer,localDataXceiverServer,ipcServer等
      dn.runDatanodeDaemon();
    }
    return dn;
  }
 
   public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();
 
    if (args != null) {
      // parse generic hadoop options
      GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
      args = hParser.getRemainingArgs();
    }
 
    if (!parseArguments(args, conf)) {
      printUsage(System.err);
      return null;
    }
    // dfs.datanode.data.dir 可以[,]号分隔,如:[Disk]/storages/storage1/
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
    return makeInstance(dataLocations, conf, resources);
  }
 
  /**
   * 确认所给的数据目录至少有一个可以创建(如果父目录不存在也要可以创建)然后实例化DataNode
   */
  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    // 磁盘检查类,需要时创建本地目录,检查权限并且保证可以对目录进行读写
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    // 过滤出可以有rwx的目录,TODO: 注意不能rwx的目录需要检查
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");
 
    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }
 
  /**
   * Create the DataNode given a configuration, an array of dataDirs,
   * and a namenode proxy
   */
  DataNode(final Configuration conf,
           final List<StorageLocation> dataDirs,
           final SecureResources resources) throws IOException {
    ......
    try {
      hostName = getHostName(conf);
      LOG.info("Configured hostname is " + hostName);
      startDataNode(conf, dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }
    ......
  }
 
  void startDataNode(Configuration conf,
                     List<StorageLocation> dataDirs,
                     SecureResources resources
                     ) throws IOException {
 
    // settings global for all BPs in the Data Node
    this.secureResources = resources;
    synchronized (this) {
      this.dataDirs = dataDirs;
    }
    this.conf = conf;
    this.dnConf = new DNConf(conf);
    checkSecureConfig(dnConf, conf, resources);
 
    this.spanReceiverHost =
      SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);
 
    if (dnConf.maxLockedMemory > 0) {
      if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
        throw new RuntimeException(String.format(
            "Cannot start datanode because the configured max locked memory" +
            " size (%s) is greater than zero and native code is not available.",
            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
      }
      if (Path.WINDOWS) {
        NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
      } else {
        long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
        if (dnConf.maxLockedMemory > ulimit) {
          throw new RuntimeException(String.format(
            "Cannot start datanode because the configured max locked memory" +
            " size (%s) of %d bytes is more than the datanode's available" +
            " RLIMIT_MEMLOCK ulimit of %d bytes.",
            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
            dnConf.maxLockedMemory,
            ulimit));
        }
      }
    }
    LOG.info("Starting DataNode with maxLockedMemory = " +
        dnConf.maxLockedMemory);
 
    // 初始化DataStorage
    storage = new DataStorage();
 
    // global DN settings
    // 注册JMX
    registerMXBean();
    // https://blog.csdn.net/lipeng_bigdata/article/details/50828066
    // 初始化:DataXceiverServer,这个服务是用来接收、发送数据块,它从客户端或其它DataNode接收请求,流式通信,在DataNode#runDatanodeDaemon()中启动
    initDataXceiver(conf);
    // 启动InfoServer(WEB UI)
    startInfoServer(conf);
    // 启动JvmPauseMonitor,反射监控JVM,可以通过JMS查询
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
 
    // BlockPoolTokenSecretManager is required to create ipc server.
    this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
 
    // Login is done by now. Set the DN user name.
    dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
    LOG.info("dnUserName = " + dnUserName);
    LOG.info("supergroup = " + supergroup);
    // 初始化IPC服务,在DataNode#runDatanodeDaemon()中启动
    initIpcServer(conf);
 
    metrics = DataNodeMetrics.create(conf, getDisplayName());
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
    // 初始化BlockPoolManager
    blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(conf);
 
    // Create the ReadaheadPool from the DataNode context so we can
    // exit without having to explicitly shutdown its thread pool.
    readaheadPool = ReadaheadPool.getInstance();
    saslClient = new SaslDataTransferClient(dnConf.conf,
        dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
    saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  }

BlockPoolManager & BPOfferService

1. BlockPoolManager: 管理DataNode上所有的块池,对象的创建、删除、启动、停止、关闭必须通过这个类的API完成。每一个DataNode都有一个BlockManager的实例。

2. BPOfferService: 在DataNode上每个块池/名称空间一个实例,它处理名称空间的活动和备用NN的心跳。 此类为每个NN管理一个 BPServiceActor 实例,并将调用委托给这两个NN。 它还维护关于哪些NN被视为活动的状态。

private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);
 
    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
 
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      // 启动时bpByNaeserviceId是空集合
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }
 
      // Step 2. Any nameservices we currently have but are no longer present
      // need to be removed.
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));
 
      assert toRefresh.size() + toAdd.size() ==
        addrMap.size() :
          "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
          "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
          "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
 
 
      // Step 3. Start new nameservices
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
 
        for (String nsToAdd : toAdd) {
          ArrayList<InetSocketAddress> addrs =
            Lists.newArrayList(addrMap.get(nsToAdd).values());
          // 创建BPOfferService
          BPOfferService bpos = createBPOS(addrs);
          // 加入bpByNameserviceId集合,下次就会出现在toRefresh集合
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      // 启动所有的BPOfferService,实际是启动下面的BPServiceActor
      startAll();
    }
 
    // Step 4. Shut down old nameservices. This happens outside
    // of the synchronized(this) lock since they need to call
    // back to .remove() from another thread
    if (!toRemove.isEmpty()) {
      LOG.info("Stopping BPOfferServices for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRemove));
 
      for (String nsToRemove : toRemove) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
        bpos.stop();
        bpos.join();
        // they will call remove on their own
      }
    }
 
    // Step 5. Update nameservices whose NN list has changed
    if (!toRefresh.isEmpty()) {
      LOG.info("Refreshing list of NNs for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRefresh));
 
      for (String nsToRefresh : toRefresh) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
        ArrayList<InetSocketAddress> addrs =
          Lists.newArrayList(addrMap.get(nsToRefresh).values());
        bpos.refreshNNList(addrs);
      }
    }
  }
 
  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    this.dn = dn;
 
    for (InetSocketAddress addr : nnAddrs) {
      this.bpServices.add(new BPServiceActor(addr, this));
    }
  }

BPServiceActor

startAll(); # 启动所有的BPOfferService,BPOfferService又会启动BPServiceActor的bpThread线程,bpThread线程的run执行逻辑是:

1. 与NameNode进行第一次握手,获取命名空间的信息

2. 向NameNode注册当前DataNode

3. 周期性发送心跳到namenode,增量块汇报,全量块汇报,缓存块汇报等

4. 处理来自namenode的命令

# BPServiceActor.run()
  public void run() {
    LOG.info(this + " starting to offer service");
 
    try {
      while (true) {
        // init stuff
        try {
          // setup storage
          // 和namenode注册
          // TODO: 优化
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          // Initial handshake, storage recovery or registration failed
          runningState = RunningState.INIT_FAILED;
          if (shouldRetryInit()) {
            // Retry until all namenode's of BPOS failed initialization
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            sleepAndLogInterrupts(5000, "initializing");
          } else {
            runningState = RunningState.FAILED;
            LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }
 
      runningState = RunningState.RUNNING;
 
      while (shouldRun()) {
        try {
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }
 
  private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    bpNamenode = dn.connectToNN(nnAddr);
 
    // First phase of the handshake with NN - get the namespace
    // info.
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
 
    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    bpos.verifyAndSetNamespaceInfo(nsInfo);
 
    // Second phase of the handshake with the NN.
    register(nsInfo);
  }
 
  # BPOfferService.verifyAndSetNamespaceInfo(NamespaceInfo nsInfo)
  /**
   * Called by the BPServiceActors when they handshake to a NN.
   * If this is the first NN connection, this sets the namespace info
   * for this BPOfferService. If it's a connection to a new NN, it
   * verifies that this namespace matches (eg to prevent a misconfiguration
   * where a StandbyNode from a different cluster is specified)
   */
  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
    writeLock();
    try {
      if (this.bpNSInfo == null) {
        this.bpNSInfo = nsInfo;
        boolean success = false;
 
        // Now that we know the namespace ID, etc, we can pass this to the DN.
        // The DN can now initialize its local storage if we are the
        // first BP to handshake, etc.
        try {
          dn.initBlockPool(this);
          success = true;
        } finally {
          if (!success) {
            // The datanode failed to initialize the BP. We need to reset
            // the namespace info so that other BPService actors still have
            // a chance to set it, and re-initialize the datanode.
            this.bpNSInfo = null;
          }
        }
      } else {
        checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
            "Blockpool ID");
        checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
            "Namespace ID");
        checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
            "Cluster ID");
      }
    } finally {
      writeUnlock();
    }
  }

dn.initBlockPool(this);# DataNode.initBlockPool

# DataNode.initBlockPool(BPOfferService bpos)
/**
   * One of the Block Pools has successfully connected to its NN.
   * This initializes the local storage for that block pool,
   * checks consistency of the NN's cluster ID, etc.
   *
   * If this is the first block pool to register, this also initializes
   * the datanode-scoped storage.
   *
   * @param bpos Block pool offer service
   * @throws IOException if the NN is inconsistent with the local storage.
   */
  void initBlockPool(BPOfferService bpos) throws IOException {
    NamespaceInfo nsInfo = bpos.getNamespaceInfo();
    if (nsInfo == null) {
      throw new IOException("NamespaceInfo not found: Block pool " + bpos
          + " should have retrieved namespace info before initBlockPool.");
    }
 
    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
 
    // Register the new block pool with the BP manager.
    blockPoolManager.addBlockPool(bpos);
 
    // In the case that this is the first block pool to connect, initialize
    // the dataset, block scanners, etc.
    initStorage(nsInfo);
 
    // Exclude failed disks before initializing the block pools to avoid startup
    // failures.
    checkDiskError();
 
    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
    blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
    initDirectoryScanner(conf);
  }

initStorage(nsInfo);

# DataNode.initStorage(final NamespaceInfo nsInfo)
  // 8c0638471f8f1dd47667b2d6727d4d2d54e4b48c Tue Aug 09 03:02:53 CST 2016    Arpit Agarwal   HADOOP-10682. Replace FsDatasetImpl object lock with a separate lock object. (Chen Liang)
  // TODO: 查看initStorage(nsInfo)方法的实现
  /**
   * Initializes the {@link #data}. The initialization is done only once, when
   * handshake with the the first namenode is completed.
   */
  private void initStorage(final NamespaceInfo nsInfo) throws IOException {
    final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
        = FsDatasetSpi.Factory.getFactory(conf);
 
    if (!factory.isSimulated()) {
      final StartupOption startOpt = getStartupOption(conf);
      if (startOpt == null) {
        throw new IOException("Startup option not set.");
      }
      final String bpid = nsInfo.getBlockPoolID();
      //read storage info, lock data dirs and transition fs state if necessary
      synchronized (this) {
        storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
      }
      final StorageInfo bpStorage = storage.getBPStorage(bpid);
      LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
          + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
          + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid());
    }
 
    // If this is a newly formatted DataNode then assign a new DatanodeUuid.
    checkDatanodeUuid();
 
    synchronized(this)  {
      if (data == null) {
        data = factory.newInstance(this, storage, conf);
      }
    }
  }
 
  // factory使用的初始化语句为
  /**
 * A factory for creating {@link FsDatasetImpl} objects.
 */
public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> {
  @Override
  public FsDatasetImpl newInstance(DataNode datanode,
      DataStorage storage, Configuration conf) throws IOException {
    return new FsDatasetImpl(datanode, storage, conf);
  }
}
 
// 查看FsDatasetImpl的构造函数
/**
   * An FSDataset has a directory where it loads its data files.
   */
  FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
      ) throws IOException {
    this.fsRunning = true;
    this.datanode = datanode;
    this.dataStorage = storage;
    this.conf = conf;
    // The number of volumes required for operation is the total number
    // of volumes minus the number of failed volumes we can tolerate.
    final int volFailuresTolerated =
      conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
                  DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
 
    String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
    Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
    List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
        dataLocations, storage);
 
    int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
    int volsFailed = volumeFailureInfos.size();
    this.validVolsRequired = volsConfigured - volFailuresTolerated;
 
    if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
      throw new DiskErrorException("Invalid volume failure "
          + " config value: " + volFailuresTolerated);
    }
    if (volsFailed > volFailuresTolerated) {
      throw new DiskErrorException("Too many failed volumes - "
          + "current valid volumes: " + storage.getNumStorageDirs()
          + ", volumes configured: " + volsConfigured
          + ", volumes failed: " + volsFailed
          + ", volume failures tolerated: " + volFailuresTolerated);
    }
 
    storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
    volumeMap = new ReplicaMap(this);
    ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
 
    @SuppressWarnings("unchecked")
    final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
        ReflectionUtils.newInstance(conf.getClass(
            DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
            RoundRobinVolumeChoosingPolicy.class,
            VolumeChoosingPolicy.class), conf);
    // volumes对象生成
    volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
        blockChooserImpl);
    asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
    asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
    deletingBlock = new HashMap<String, Set<Long>>();
    // 添加volume到volumes对象
    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
      addVolume(dataLocations, storage.getStorageDir(idx));
    }
    setupAsyncLazyPersistThreads();
 
    cacheManager = new FsDatasetCache(this);
 
    // Start the lazy writer once we have built the replica maps.
    lazyWriter = new Daemon(new LazyWriter(conf));
    lazyWriter.start();
    registerMBean(datanode.getDatanodeUuid());
    localFS = FileSystem.getLocal(conf);
    blockPinningEnabled = conf.getBoolean(
      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
  }
  // addVolume的实现
  private void addVolume(Collection<StorageLocation> dataLocations,
      Storage.StorageDirectory sd) throws IOException {
    final File dir = sd.getCurrentDir();
    final StorageType storageType =
        getStorageTypeFromLocations(dataLocations, sd.getRoot());
 
    // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
    // nothing needed to be rolled back to make various data structures, e.g.,
    // storageMap and asyncDiskService, consistent.
    FsVolumeImpl fsVolume = new FsVolumeImpl(
        this, sd.getStorageUuid(), dir, this.conf, storageType);
    FsVolumeReference ref = fsVolume.obtainReference();
    ReplicaMap tempVolumeMap = new ReplicaMap(this);
    fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
    // TODO: 锁优化,拆分锁,调整锁定范围
    synchronized (this) {
      volumeMap.addAll(tempVolumeMap);
      storageMap.put(sd.getStorageUuid(),
          new DatanodeStorage(sd.getStorageUuid(),
              DatanodeStorage.State.NORMAL,
              storageType));
      asyncDiskService.addVolume(sd.getCurrentDir());
      volumes.addVolume(ref);
    }
 
    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
  }

checkDiskError();删除坏掉的磁盘

# DataNode.checkDiskError()
// TODO: 优化点,可并行检查,并且不需要锁定
// f678080dbd25a218e0406463a3c3a1fc03680702	Wed Dec 21 05:53:32 CST 2016	Xiaoyu Yao	HDFS-11182. Update DataNode to use DatasetVolumeChecker. Contributed by Arpit Agarwal.
// eaaa32950cbae42a74e28e3db3f0cdb1ff158119	Wed Nov 30 12:31:02 CST 2016	Arpit Agarwal	HDFS-11149. Support for parallel checking of FsVolumes.
/**
   * Check the disk error
   */
  private void checkDiskError() {
    Set<File> unhealthyDataDirs = data.checkDataDir();
    if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
      try {
        // Remove all unhealthy volumes from DataNode.
        removeVolumes(unhealthyDataDirs, false);
      } catch (IOException e) {
        LOG.warn("Error occurred when removing unhealthy storage dirs: "
            + e.getMessage(), e);
      }
      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
      for (File dataDir : unhealthyDataDirs) {
        sb.append(dataDir.getAbsolutePath() + ";");
      }
      handleDiskError(sb.toString());
    }
  }
 
  # FsDatasetImpl.checkDirs()
  Set<File> checkDirs() {
    synchronized(checkDirsMutex) {
      Set<File> failedVols = null;
 
      // Make a copy of volumes for performing modification
      final List<FsVolumeImpl> volumeList = getVolumes();
 
      for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
        final FsVolumeImpl fsv = i.next();
        try (FsVolumeReference ref = fsv.obtainReference()) {
          fsv.checkDirs();
        } catch (DiskErrorException e) {
          FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
          if (failedVols == null) {
            failedVols = new HashSet<>(1);
          }
          failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
          addVolumeFailureInfo(fsv);
          removeVolume(fsv);
        } catch (ClosedChannelException e) {
          FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
            "reference count on closed volume", e);
        } catch (IOException e) {
          FsDatasetImpl.LOG.error("Unexpected IOException", e);
        }
      }
 
      if (failedVols != null && failedVols.size() > 0) {
        FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
            + " failure volumes.");
      }
 
      return failedVols;
    }
  }

data.addBlockPool(nsInfo.getBlockPoolID(), conf);

@Override
  public void addBlockPool(String bpid, Configuration conf)
      throws IOException {
    LOG.info("Adding block pool " + bpid);
    synchronized(this) {
      volumes.addBlockPool(bpid, conf);
      volumeMap.initBlockPool(bpid);
    }
    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
  }

BlockPoolSlice

关于du的作用及优化:

在linux系统上,该线程将定期通过du -sk命令统计各blockpool目录的占用情况,随着心跳汇报给namenode。

执行linux命令需要从JVM继承fork出子进程,成本较高(尽管linux使用COW策略避免了对内存空间的完全copy)。为了加快datanode启动速度,此处允许使用之前缓存的dfsUsage值,该值保存在current目录下的dfsUsed文件中;缓存的dfsUsage会定期(fs.du.interval,默认600秒)持久化到磁盘中;在虚拟机关闭时,也会将当前的dfsUsage值持久化。

void addBlockPool(final String bpid, final Configuration conf) throws IOException {
    long totalStartTime = Time.monotonicNow();
 
    final List<IOException> exceptions = Collections.synchronizedList(
        new ArrayList<IOException>());
    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
    for (final FsVolumeImpl v : volumes.get()) {
      Thread t = new Thread() {
        public void run() {
          try (FsVolumeReference ref = v.obtainReference()) {
            FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
                " on volume " + v + "...");
            long startTime = Time.monotonicNow();
            // 关注
            v.addBlockPool(bpid, conf);
            long timeTaken = Time.monotonicNow() - startTime;
            FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
                " on " + v + ": " + timeTaken + "ms");
          } catch (ClosedChannelException e) {
            // ignore.
          } catch (IOException ioe) {
            FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
                ". Will throw later.", ioe);
            exceptions.add(ioe);
          }
        }
      };
      blockPoolAddingThreads.add(t);
      t.start();
    }
    for (Thread t : blockPoolAddingThreads) {
      try {
        t.join();
      } catch (InterruptedException ie) {
        throw new IOException(ie);
      }
    }
    if (!exceptions.isEmpty()) {
      throw exceptions.get(0);
    }
 
    long totalTimeTaken = Time.monotonicNow() - totalStartTime;
    FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
        bpid + ": " + totalTimeTaken + "ms");
  }
 
  void addBlockPool(String bpid, Configuration conf) throws IOException {
    File bpdir = new File(currentDir, bpid);
    BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
    bpSlices.put(bpid, bp);
  }

volumeMap.initBlockPool(bpid);

优化性能和修复锁使用的Bug

// 8ae4729107d33c6001cf1fdc8837afb71ea6c0d3	Wed Sep 28 01:02:15 CST 2016	Arpit Agarwal	HDFS-10828. Fix usage of FsDatasetImpl object lock in ReplicaMap. (Arpit Agarwal) HDFS-10682 replaced the FsDatasetImpl object lock with a separate reentrant lock but missed updating an instance ReplicaMap still uses the FsDatasetImpl.
// dd9ebf6eedfd4ff8b3486eae2a446de6b0c7fa8a	Wed Feb 03 03:23:00 CST 2016	Colin Patrick Mccabe	HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)
// d6fa34e014b0e2a61b24f05dd08ebe12354267fd	Tue Sep 29 16:20:35 CST 2015	yliu	HDFS-8859. Improve DataNode ReplicaMap memory footprint to save about 45%. (yliu)
 
# ReplicaMap.java
void initBlockPool(String bpid) {
    checkBlockPool(bpid);
    synchronized(mutex) {
      Map<Long, ReplicaInfo> m = map.get(bpid);
      if (m == null) {
        // Add an entry for block pool if it does not exist already
        m = new HashMap<Long, ReplicaInfo>();
        map.put(bpid, m);
      }
    }
  }
  // FsDatasetImpl
  static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
      Block block, long recoveryId, long xceiverStopTimeout) throws IOException

volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);

增加缓存

// fc1031af749435dc95efea6745b1b2300ce29446	Thu Mar 26 03:42:59 CST 2015	Kihwal Lee	HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah.
void getVolumeMap(ReplicaMap volumeMap,
                    final RamDiskReplicaTracker lazyWriteReplicaMap)
      throws IOException {
    // Recover lazy persist replicas, they will be added to the volumeMap
    // when we scan the finalized directory.
    if (lazypersistDir.exists()) {
      int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
      FsDatasetImpl.LOG.info(
          "Recovered " + numRecovered + " replicas from " + lazypersistDir);
    }
 
    // add finalized replicas
    addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
    // add rbw replicas
    addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
  }
原文  http://www.tianjiaguo.com/2018/04/namenode-restart-optimization/
正文到此结束
Loading...