这篇文章主要讲解了“Hadoop中DataNode的启动过程介绍”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Hadoop中DataNode的启动过程介绍”吧!

成都创新互联是一家集网站建设,昭通企业网站建设,昭通品牌网站建设,网站定制,昭通网站建设报价,网络营销,网络优化,昭通网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
DataNode同NameNode都一样,是一个java进程,所以从main方法开始,看代码:
public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol,
DataNodeMXBean {
public static void main(String args[]) {
secureMain(args, null);
}
public static void secureMain(String args[], SecureResources resources) {
DataNode datanode = createDataNode(args, null, resources);
}
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
dn.runDatanodeDaemon();
}
return dn;
}
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
return makeInstance(dataLocations, conf, resources);
}
static DataNode makeInstance(Collection dataDirs,
Configuration conf, SecureResources resources) throws IOException {
return new DataNode(conf, locations, resources);
}
DataNode(final Configuration conf,
final List dataDirs,
final SecureResources resources) throws IOException {
startDataNode(conf, dataDirs, resources);
}
public void runDatanodeDaemon() throws IOException {
blockPoolManager.startAll();
// start dataXceiveServer
dataXceiverServer.start();
if (localDataXceiverServer != null) {
localDataXceiverServer.start();
}
ipcServer.start();
startPlugins(conf);
}
} 上面的代码跟踪不难,但是最终需要注意两个方法:startDataNode和runDatanodeDaemon方法,前面一个用于初始化DataNode,后面一个启动DataNode的后台线程,这些线程是会伴随DataNode进程一直跑着的。接着,让我们重点研究下方法startDataNode,看代码:
void startDataNode(Configuration conf, ListdataDirs, // DatanodeProtocol namenode, SecureResources resources ) throws IOException { storage = new DataStorage(); // global DN settings registerMXBean(); initDataXceiver(conf); startInfoServer(conf); pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); initIpcServer(conf); blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(conf); }
registerMXBean这个方法可以忽略,用来注册MBean信息;initDataXceiver这个方法应该来说还是比较重要,实例化的dataXceiverServer用来接受客户端或者其他datanode的数据接收或者发送请求;startInfoServer方法用来启动datanode的web服务器;pauseMonitor用来监控jvm是否有停顿;initIpcServer方法比较重要,用来启动datanode上的rpc服务,主要包括两个服务:ClientDatanodeProtocolPB和InterDatanodeProtocolPB。
然后属于DataNode的重点来了,blockPoolManager对象的实例化,注意一点,2.4.1 这个版本的hadoop已经支持了hadoop Federation的特性,而blockPooolManager就是支撑这个特性来的。现在让我们来看看他里面的东西。还是先上代码吧。
class BlockPoolManager {
BlockPoolManager(DataNode dn) {
this.dn = dn;
}
void refreshNamenodes(Configuration conf)
throws IOException {
synchronized (refreshNamenodesLock) {
doRefreshNamenodes(newAddressMap);
}
}
private void doRefreshNamenodes(
Map> addrMap) throws IOException {
synchronized (this) {
startAll();
}
}
synchronized void startAll() throws IOException {
for (BPOfferService bpos : offerServices) {
bpos.start();
}
}
}
class BPOfferService {
void start() {
for (BPServiceActor actor : bpServices) {
actor.start();
}
}
}
class BPServiceActor implements Runnable {
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {
//Thread is started already
return;
}
bpThread = new Thread(this, formatThreadName());
bpThread.setDaemon(true); // needed for JUnit testing
bpThread.start();
}
public void run() {
while (true) {
connectToNNAndHandshake();
break;
}
while (shouldRun()) {
offerService();
}
}
private void offerService() throws Exception {
while (shouldRun()) {
HeartbeatResponse resp = sendHeartBeat();
List cmds = blockReport();
}
}
} 顺着代码往下走,整个思路都会比较清晰了,BPServiceActor这个类做了具体的事情,包括datanode跟namenode的握手,发送心跳和报告块信息,执行namenode发回来的命名。
详细的过程就不啰嗦了。
到这里DataNode的启动过程就搞了一个段落。
感谢各位的阅读,以上就是“Hadoop中DataNode的启动过程介绍”的内容了,经过本文的学习后,相信大家对Hadoop中DataNode的启动过程介绍这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!