1. 通信:BPServiceActor,IpcServer,DataXceiverServer,localDataXceiverServer
2. 监控:JVMPauseMonitor,DU(dfsUsage)
3. 其他:InfoServer
# org.apache.hadoop.hdfs.server.datanode.DataNode public static void main(String args[]) { if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) { System.exit(0); } secureMain(args, null); } public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; try { StringUtils.startupShutdownMessage(DataNode.class, args, LOG); // 创建DataNode DataNode datanode = createDataNode(args, null, resources); if (datanode != null) { // 注:只需要等待BPServiceActor线程结束 datanode.join(); } else { errorCode = 1; } } catch (Throwable e) { LOG.fatal("Exception in secureMain", e); terminate(1, e); } finally { // We need to terminate the process here because either shutdown was called // or some disk related conditions like volumes tolerated or volumes required // condition was not met. Also, In secure mode, control will go to Jsvc // and Datanode process hangs if it does not exit. LOG.warn("Exiting Datanode"); terminate(errorCode); } } public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { // 初始化DataNode,并完成部分工作线程的启动 DataNode dn = instantiateDataNode(args, conf, resources); if (dn != null) { // 启动剩余工作线程,dataXceiverServer,localDataXceiverServer,ipcServer等 dn.runDatanodeDaemon(); } return dn; } public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException { if (conf == null) conf = new HdfsConfiguration(); if (args != null) { // parse generic hadoop options GenericOptionsParser hParser = new GenericOptionsParser(conf, args); args = hParser.getRemainingArgs(); } if (!parseArguments(args, conf)) { printUsage(System.err); return null; } // dfs.datanode.data.dir 可以[,]号分隔,如:[Disk]/storages/storage1/ Collection<StorageLocation> dataLocations = getStorageLocations(conf); UserGroupInformation.setConfiguration(conf); SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY, DFS_DATANODE_KERBEROS_PRINCIPAL_KEY); return makeInstance(dataLocations, conf, resources); } /** * 确认所给的数据目录至少有一个可以创建(如果父目录不存在也要可以创建)然后实例化DataNode */ static DataNode makeInstance(Collection<StorageLocation> dataDirs, Configuration conf, SecureResources resources) throws IOException { LocalFileSystem localFS = FileSystem.getLocal(conf); FsPermission permission = new FsPermission( conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY, DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT)); // 磁盘检查类,需要时创建本地目录,检查权限并且保证可以对目录进行读写 DataNodeDiskChecker dataNodeDiskChecker = new DataNodeDiskChecker(permission); // 过滤出可以有rwx的目录,TODO: 注意不能rwx的目录需要检查 List<StorageLocation> locations = checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker); DefaultMetricsSystem.initialize("DataNode"); assert locations.size() > 0 : "number of data directories should be > 0"; return new DataNode(conf, locations, resources); } /** * Create the DataNode given a configuration, an array of dataDirs, * and a namenode proxy */ DataNode(final Configuration conf, final List<StorageLocation> dataDirs, final SecureResources resources) throws IOException { ...... try { hostName = getHostName(conf); LOG.info("Configured hostname is " + hostName); startDataNode(conf, dataDirs, resources); } catch (IOException ie) { shutdown(); throw ie; } ...... } void startDataNode(Configuration conf, List<StorageLocation> dataDirs, SecureResources resources ) throws IOException { // settings global for all BPs in the Data Node this.secureResources = resources; synchronized (this) { this.dataDirs = dataDirs; } this.conf = conf; this.dnConf = new DNConf(conf); checkSecureConfig(dnConf, conf, resources); this.spanReceiverHost = SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX); if (dnConf.maxLockedMemory > 0) { if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) { throw new RuntimeException(String.format( "Cannot start datanode because the configured max locked memory" + " size (%s) is greater than zero and native code is not available.", DFS_DATANODE_MAX_LOCKED_MEMORY_KEY)); } if (Path.WINDOWS) { NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory); } else { long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit(); if (dnConf.maxLockedMemory > ulimit) { throw new RuntimeException(String.format( "Cannot start datanode because the configured max locked memory" + " size (%s) of %d bytes is more than the datanode's available" + " RLIMIT_MEMLOCK ulimit of %d bytes.", DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, dnConf.maxLockedMemory, ulimit)); } } } LOG.info("Starting DataNode with maxLockedMemory = " + dnConf.maxLockedMemory); // 初始化DataStorage storage = new DataStorage(); // global DN settings // 注册JMX registerMXBean(); // https://blog.csdn.net/lipeng_bigdata/article/details/50828066 // 初始化:DataXceiverServer,这个服务是用来接收、发送数据块,它从客户端或其它DataNode接收请求,流式通信,在DataNode#runDatanodeDaemon()中启动 initDataXceiver(conf); // 启动InfoServer(WEB UI) startInfoServer(conf); // 启动JvmPauseMonitor,反射监控JVM,可以通过JMS查询 pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); // BlockPoolTokenSecretManager is required to create ipc server. this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager(); // Login is done by now. Set the DN user name. dnUserName = UserGroupInformation.getCurrentUser().getShortUserName(); LOG.info("dnUserName = " + dnUserName); LOG.info("supergroup = " + supergroup); // 初始化IPC服务,在DataNode#runDatanodeDaemon()中启动 initIpcServer(conf); metrics = DataNodeMetrics.create(conf, getDisplayName()); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); // 初始化BlockPoolManager blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(conf); // Create the ReadaheadPool from the DataNode context so we can // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); }
1. BlockPoolManager: 管理DataNode上所有的块池,对象的创建、删除、启动、停止、关闭必须通过这个类的API完成。每一个DataNode都有一个BlockManager的实例。
2. BPOfferService: 在DataNode上每个块池/名称空间一个实例,它处理名称空间的活动和备用NN的心跳。 此类为每个NN管理一个 BPServiceActor 实例,并将调用委托给这两个NN。 它还维护关于哪些NN被视为活动的状态。
private void doRefreshNamenodes( Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException { assert Thread.holdsLock(refreshNamenodesLock); Set<String> toRefresh = Sets.newLinkedHashSet(); Set<String> toAdd = Sets.newLinkedHashSet(); Set<String> toRemove; synchronized (this) { // Step 1. For each of the new nameservices, figure out whether // it's an update of the set of NNs for an existing NS, // or an entirely new nameservice. // 启动时bpByNaeserviceId是空集合 for (String nameserviceId : addrMap.keySet()) { if (bpByNameserviceId.containsKey(nameserviceId)) { toRefresh.add(nameserviceId); } else { toAdd.add(nameserviceId); } } // Step 2. Any nameservices we currently have but are no longer present // need to be removed. toRemove = Sets.newHashSet(Sets.difference( bpByNameserviceId.keySet(), addrMap.keySet())); assert toRefresh.size() + toAdd.size() == addrMap.size() : "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) + " toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) + " toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh); // Step 3. Start new nameservices if (!toAdd.isEmpty()) { LOG.info("Starting BPOfferServices for nameservices: " + Joiner.on(",").useForNull("<default>").join(toAdd)); for (String nsToAdd : toAdd) { ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToAdd).values()); // 创建BPOfferService BPOfferService bpos = createBPOS(addrs); // 加入bpByNameserviceId集合,下次就会出现在toRefresh集合 bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); } } // 启动所有的BPOfferService,实际是启动下面的BPServiceActor startAll(); } // Step 4. Shut down old nameservices. This happens outside // of the synchronized(this) lock since they need to call // back to .remove() from another thread if (!toRemove.isEmpty()) { LOG.info("Stopping BPOfferServices for nameservices: " + Joiner.on(",").useForNull("<default>").join(toRemove)); for (String nsToRemove : toRemove) { BPOfferService bpos = bpByNameserviceId.get(nsToRemove); bpos.stop(); bpos.join(); // they will call remove on their own } } // Step 5. Update nameservices whose NN list has changed if (!toRefresh.isEmpty()) { LOG.info("Refreshing list of NNs for nameservices: " + Joiner.on(",").useForNull("<default>").join(toRefresh)); for (String nsToRefresh : toRefresh) { BPOfferService bpos = bpByNameserviceId.get(nsToRefresh); ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToRefresh).values()); bpos.refreshNNList(addrs); } } } BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); this.dn = dn; for (InetSocketAddress addr : nnAddrs) { this.bpServices.add(new BPServiceActor(addr, this)); } }
startAll(); # 启动所有的BPOfferService,BPOfferService又会启动BPServiceActor的bpThread线程,bpThread线程的run执行逻辑是:
1. 与NameNode进行第一次握手,获取命名空间的信息
2. 向NameNode注册当前DataNode
3. 周期性发送心跳到namenode,增量块汇报,全量块汇报,缓存块汇报等
4. 处理来自namenode的命令
# BPServiceActor.run() public void run() { LOG.info(this + " starting to offer service"); try { while (true) { // init stuff try { // setup storage // 和namenode注册 // TODO: 优化 connectToNNAndHandshake(); break; } catch (IOException ioe) { // Initial handshake, storage recovery or registration failed runningState = RunningState.INIT_FAILED; if (shouldRetryInit()) { // Retry until all namenode's of BPOS failed initialization LOG.error("Initialization failed for " + this + " " + ioe.getLocalizedMessage()); sleepAndLogInterrupts(5000, "initializing"); } else { runningState = RunningState.FAILED; LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe); return; } } } runningState = RunningState.RUNNING; while (shouldRun()) { try { offerService(); } catch (Exception ex) { LOG.error("Exception in BPOfferService for " + this, ex); sleepAndLogInterrupts(5000, "offering service"); } } runningState = RunningState.EXITED; } catch (Throwable ex) { LOG.warn("Unexpected exception in block pool " + this, ex); runningState = RunningState.FAILED; } finally { LOG.warn("Ending block pool service for: " + this); cleanUp(); } } private void connectToNNAndHandshake() throws IOException { // get NN proxy bpNamenode = dn.connectToNN(nnAddr); // First phase of the handshake with NN - get the namespace // info. NamespaceInfo nsInfo = retrieveNamespaceInfo(); // Verify that this matches the other NN in this HA pair. // This also initializes our block pool in the DN if we are // the first NN connection for this BP. bpos.verifyAndSetNamespaceInfo(nsInfo); // Second phase of the handshake with the NN. register(nsInfo); } # BPOfferService.verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) /** * Called by the BPServiceActors when they handshake to a NN. * If this is the first NN connection, this sets the namespace info * for this BPOfferService. If it's a connection to a new NN, it * verifies that this namespace matches (eg to prevent a misconfiguration * where a StandbyNode from a different cluster is specified) */ void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { writeLock(); try { if (this.bpNSInfo == null) { this.bpNSInfo = nsInfo; boolean success = false; // Now that we know the namespace ID, etc, we can pass this to the DN. // The DN can now initialize its local storage if we are the // first BP to handshake, etc. try { dn.initBlockPool(this); success = true; } finally { if (!success) { // The datanode failed to initialize the BP. We need to reset // the namespace info so that other BPService actors still have // a chance to set it, and re-initialize the datanode. this.bpNSInfo = null; } } } else { checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), "Blockpool ID"); checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), "Namespace ID"); checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), "Cluster ID"); } } finally { writeUnlock(); } }
dn.initBlockPool(this);# DataNode.initBlockPool
# DataNode.initBlockPool(BPOfferService bpos) /** * One of the Block Pools has successfully connected to its NN. * This initializes the local storage for that block pool, * checks consistency of the NN's cluster ID, etc. * * If this is the first block pool to register, this also initializes * the datanode-scoped storage. * * @param bpos Block pool offer service * @throws IOException if the NN is inconsistent with the local storage. */ void initBlockPool(BPOfferService bpos) throws IOException { NamespaceInfo nsInfo = bpos.getNamespaceInfo(); if (nsInfo == null) { throw new IOException("NamespaceInfo not found: Block pool " + bpos + " should have retrieved namespace info before initBlockPool."); } setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID()); // Register the new block pool with the BP manager. blockPoolManager.addBlockPool(bpos); // In the case that this is the first block pool to connect, initialize // the dataset, block scanners, etc. initStorage(nsInfo); // Exclude failed disks before initializing the block pools to avoid startup // failures. checkDiskError(); data.addBlockPool(nsInfo.getBlockPoolID(), conf); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); initDirectoryScanner(conf); }
initStorage(nsInfo);
# DataNode.initStorage(final NamespaceInfo nsInfo) // 8c0638471f8f1dd47667b2d6727d4d2d54e4b48c Tue Aug 09 03:02:53 CST 2016 Arpit Agarwal HADOOP-10682. Replace FsDatasetImpl object lock with a separate lock object. (Chen Liang) // TODO: 查看initStorage(nsInfo)方法的实现 /** * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. */ private void initStorage(final NamespaceInfo nsInfo) throws IOException { final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory = FsDatasetSpi.Factory.getFactory(conf); if (!factory.isSimulated()) { final StartupOption startOpt = getStartupOption(conf); if (startOpt == null) { throw new IOException("Startup option not set."); } final String bpid = nsInfo.getBlockPoolID(); //read storage info, lock data dirs and transition fs state if necessary synchronized (this) { storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt); } final StorageInfo bpStorage = storage.getBPStorage(bpid); LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID() + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion() + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid()); } // If this is a newly formatted DataNode then assign a new DatanodeUuid. checkDatanodeUuid(); synchronized(this) { if (data == null) { data = factory.newInstance(this, storage, conf); } } } // factory使用的初始化语句为 /** * A factory for creating {@link FsDatasetImpl} objects. */ public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> { @Override public FsDatasetImpl newInstance(DataNode datanode, DataStorage storage, Configuration conf) throws IOException { return new FsDatasetImpl(datanode, storage, conf); } } // 查看FsDatasetImpl的构造函数 /** * An FSDataset has a directory where it loads its data files. */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf ) throws IOException { this.fsRunning = true; this.datanode = datanode; this.dataStorage = storage; this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf); List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos( dataLocations, storage); int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsFailed = volumeFailureInfos.size(); this.validVolsRequired = volsConfigured - volFailuresTolerated; if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { throw new DiskErrorException("Invalid volume failure " + " config value: " + volFailuresTolerated); } if (volsFailed > volFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + storage.getNumStorageDirs() + ", volumes configured: " + volsConfigured + ", volumes failed: " + volsFailed + ", volume failures tolerated: " + volFailuresTolerated); } storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); volumeMap = new ReplicaMap(this); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); // volumes对象生成 volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); deletingBlock = new HashMap<String, Set<Long>>(); // 添加volume到volumes对象 for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } setupAsyncLazyPersistThreads(); cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. lazyWriter = new Daemon(new LazyWriter(conf)); lazyWriter.start(); registerMBean(datanode.getDatanodeUuid()); localFS = FileSystem.getLocal(conf); blockPinningEnabled = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT); } // addVolume的实现 private void addVolume(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd) throws IOException { final File dir = sd.getCurrentDir(); final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is // nothing needed to be rolled back to make various data structures, e.g., // storageMap and asyncDiskService, consistent. FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); FsVolumeReference ref = fsVolume.obtainReference(); ReplicaMap tempVolumeMap = new ReplicaMap(this); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); // TODO: 锁优化,拆分锁,调整锁定范围 synchronized (this) { volumeMap.addAll(tempVolumeMap); storageMap.put(sd.getStorageUuid(), new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); asyncDiskService.addVolume(sd.getCurrentDir()); volumes.addVolume(ref); } LOG.info("Added volume - " + dir + ", StorageType: " + storageType); }
checkDiskError();删除坏掉的磁盘
# DataNode.checkDiskError() // TODO: 优化点,可并行检查,并且不需要锁定 // f678080dbd25a218e0406463a3c3a1fc03680702 Wed Dec 21 05:53:32 CST 2016 Xiaoyu Yao HDFS-11182. Update DataNode to use DatasetVolumeChecker. Contributed by Arpit Agarwal. // eaaa32950cbae42a74e28e3db3f0cdb1ff158119 Wed Nov 30 12:31:02 CST 2016 Arpit Agarwal HDFS-11149. Support for parallel checking of FsVolumes. /** * Check the disk error */ private void checkDiskError() { Set<File> unhealthyDataDirs = data.checkDataDir(); if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) { try { // Remove all unhealthy volumes from DataNode. removeVolumes(unhealthyDataDirs, false); } catch (IOException e) { LOG.warn("Error occurred when removing unhealthy storage dirs: " + e.getMessage(), e); } StringBuilder sb = new StringBuilder("DataNode failed volumes:"); for (File dataDir : unhealthyDataDirs) { sb.append(dataDir.getAbsolutePath() + ";"); } handleDiskError(sb.toString()); } } # FsDatasetImpl.checkDirs() Set<File> checkDirs() { synchronized(checkDirsMutex) { Set<File> failedVols = null; // Make a copy of volumes for performing modification final List<FsVolumeImpl> volumeList = getVolumes(); for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) { final FsVolumeImpl fsv = i.next(); try (FsVolumeReference ref = fsv.obtainReference()) { fsv.checkDirs(); } catch (DiskErrorException e) { FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e); if (failedVols == null) { failedVols = new HashSet<>(1); } failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile()); addVolumeFailureInfo(fsv); removeVolume(fsv); } catch (ClosedChannelException e) { FsDatasetImpl.LOG.debug("Caught exception when obtaining " + "reference count on closed volume", e); } catch (IOException e) { FsDatasetImpl.LOG.error("Unexpected IOException", e); } } if (failedVols != null && failedVols.size() > 0) { FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size() + " failure volumes."); } return failedVols; } }
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
@Override public void addBlockPool(String bpid, Configuration conf) throws IOException { LOG.info("Adding block pool " + bpid); synchronized(this) { volumes.addBlockPool(bpid, conf); volumeMap.initBlockPool(bpid); } volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker); }
关于du的作用及优化:
在linux系统上,该线程将定期通过du -sk命令统计各blockpool目录的占用情况,随着心跳汇报给namenode。
执行linux命令需要从JVM继承fork出子进程,成本较高(尽管linux使用COW策略避免了对内存空间的完全copy)。为了加快datanode启动速度,此处允许使用之前缓存的dfsUsage值,该值保存在current目录下的dfsUsed文件中;缓存的dfsUsage会定期(fs.du.interval,默认600秒)持久化到磁盘中;在虚拟机关闭时,也会将当前的dfsUsage值持久化。
void addBlockPool(final String bpid, final Configuration conf) throws IOException { long totalStartTime = Time.monotonicNow(); final List<IOException> exceptions = Collections.synchronizedList( new ArrayList<IOException>()); List<Thread> blockPoolAddingThreads = new ArrayList<Thread>(); for (final FsVolumeImpl v : volumes.get()) { Thread t = new Thread() { public void run() { try (FsVolumeReference ref = v.obtainReference()) { FsDatasetImpl.LOG.info("Scanning block pool " + bpid + " on volume " + v + "..."); long startTime = Time.monotonicNow(); // 关注 v.addBlockPool(bpid, conf); long timeTaken = Time.monotonicNow() - startTime; FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid + " on " + v + ": " + timeTaken + "ms"); } catch (ClosedChannelException e) { // ignore. } catch (IOException ioe) { FsDatasetImpl.LOG.info("Caught exception while scanning " + v + ". Will throw later.", ioe); exceptions.add(ioe); } } }; blockPoolAddingThreads.add(t); t.start(); } for (Thread t : blockPoolAddingThreads) { try { t.join(); } catch (InterruptedException ie) { throw new IOException(ie); } } if (!exceptions.isEmpty()) { throw exceptions.get(0); } long totalTimeTaken = Time.monotonicNow() - totalStartTime; FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " + bpid + ": " + totalTimeTaken + "ms"); } void addBlockPool(String bpid, Configuration conf) throws IOException { File bpdir = new File(currentDir, bpid); BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf); bpSlices.put(bpid, bp); }
volumeMap.initBlockPool(bpid);
优化性能和修复锁使用的Bug
// 8ae4729107d33c6001cf1fdc8837afb71ea6c0d3 Wed Sep 28 01:02:15 CST 2016 Arpit Agarwal HDFS-10828. Fix usage of FsDatasetImpl object lock in ReplicaMap. (Arpit Agarwal) HDFS-10682 replaced the FsDatasetImpl object lock with a separate reentrant lock but missed updating an instance ReplicaMap still uses the FsDatasetImpl. // dd9ebf6eedfd4ff8b3486eae2a446de6b0c7fa8a Wed Feb 03 03:23:00 CST 2016 Colin Patrick Mccabe HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe) // d6fa34e014b0e2a61b24f05dd08ebe12354267fd Tue Sep 29 16:20:35 CST 2015 yliu HDFS-8859. Improve DataNode ReplicaMap memory footprint to save about 45%. (yliu) # ReplicaMap.java void initBlockPool(String bpid) { checkBlockPool(bpid); synchronized(mutex) { Map<Long, ReplicaInfo> m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already m = new HashMap<Long, ReplicaInfo>(); map.put(bpid, m); } } } // FsDatasetImpl static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, Block block, long recoveryId, long xceiverStopTimeout) throws IOException
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
增加缓存
// fc1031af749435dc95efea6745b1b2300ce29446 Thu Mar 26 03:42:59 CST 2015 Kihwal Lee HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah. void getVolumeMap(ReplicaMap volumeMap, final RamDiskReplicaTracker lazyWriteReplicaMap) throws IOException { // Recover lazy persist replicas, they will be added to the volumeMap // when we scan the finalized directory. if (lazypersistDir.exists()) { int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir); FsDatasetImpl.LOG.info( "Recovered " + numRecovered + " replicas from " + lazypersistDir); } // add finalized replicas addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true); // add rbw replicas addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false); }