hadoop源代码怎么看 (hadoop源码优化)

  • 前言
  • 准备工作
    • 定义接口
    • 实现接口
    • 启动一个server
    • 构建一个client的代理
    • 执行相应的方法。
  • Server底层实现
    • 内部类介绍
      • Call
      • Connection。
      • Handler
      • Listener
      • Reader
      • Responder
    • Server的启动
    • 接收请求
    • Reader线程读取数据
    • Handler线程处理请求
  • 客户端实现
    • 获取代理
    • 发送请求。

    前言

    因为hadoop底层各种通讯都用的是rpc,如client和namenode、client和datanode、namanode和datanode等。所以首先学习了一下hadoop rpc的内部实现,拜读了一下hadoop的源码

    hadoop源代码怎么看,hadoop的主要模板

    准备工作

    首先*载下**hadoop的最新稳定版源码(目前是2.7.3),编译hadoop源码,因为hadoop的底层序列号用的是google的 protobuf,所以需要把这些proto文件编译成java文件,方便debug调试。如果比较懒的话,其实用maven把相关jar和源码包*载下**下来也行。

    Hadoop的rpc并没有采用现成的rpc框架,如thrift等,而是采用jdk自带的库完全自己写了一套,更加轻量级,更加可控。

    用到的主要的技术是java NIO、网络编程、反射和动态代理,如果对这几块不太熟悉的话,建议先找些资料看看相关的东西

    #Hadoop rpc实现流程 Hadoop rpc框架位于hadoop源码的hadoop-commn项目里,就像我们学习任何语言先学习hello world一样,我们先来一个最简单的程序,这个程序是从hadoop源码test目录里找到的,testRPC.java,我们运行其中的main方法。 (我这在main方法简单改动,new了个Configuration()对象,当参数传进来)

    hadoop源代码怎么看,hadoop的主要模板

    定义接口

    首先要定义一个接口协议,所有的接口都要继承VersionedProtocol

    publicinterfaceTestProtocolextendsVersionedProtocol{
    publicstaticfinallongversionID=1L;
    
    Stringecho(Stringvalue)throwsIOException;
    }
    

    实现接口

    要实现这个接口

    publicstaticclassTestImplimplementsTestProtocol{
    @Override
    publiclonggetProtocolVersion(Stringprotocol,longclientVersion){
    returnTestProtocol.versionID;
    }
    
    @Override
    publicProtocolSignaturegetProtocolSignature(Stringprotocol,
    longclientVersion,inthashcode){
    returnnewProtocolSignature(TestProtocol.versionID,null);
    }
    @Override
    publicStringecho(Stringvalue)throwsIOException{
    returnvalue;
    }
    }
    

    启动一个server

    Serverserver=newRPC.Builder(conf).setProtocol(TestProtocol.class)
    .setInstance(newTestImpl()).setBindAddress(ADDRESS).setPort(0)
    .build();
    server.start();
    

    构建一个client的代理

    TestProtocolproxy=RPC.getProxy(TestProtocol.class,TestProtocol.versionID,addr,conf);
    

    执行相应的方法。

    		String stringResult = proxy.echo("hello hadoop rpc");
    		System.out.println(stringResult);
    
    

    重点内容

    Server底层实现

    内部类介绍

    server类是org.apache.hadoop.ipc.Server,里面包含几个重要的内部类

    hadoop源代码怎么看,hadoop的主要模板

    内部类介绍

    Call

    将一个rpc请求需要的东西封装到Call对象里

    privatefinalintcallId;//theclient'scallid客户端id
    privatefinalintretryCount;//theretrycountofthecall重试次数
    privatefinalWritablerpcRequest;//SerializedRpcrequestfromclient序列号的请求
    privatefinalConnectionconnection;//connectiontoclient
    privatelongtimestamp;//timereceivedwhenresponseisnull
    //timeservedwhenresponseisnotnull
    privateByteBufferrpcResponse;//theresponseforthiscall
    privatefinalRPC.RpcKindrpcKind;
    privatefinalbyte[]clientId;
    privatefinalSpantraceSpan;//thetracingspan ontheserverside
    
    

    Connection。

    客户端与服务器通信的一些信息在这个里面

    Handler

    用于处理接受到rpc请求

    Listener

    用于监听rpc请求。

    Reader

    用于读取Listener接受到的请求

    Responder

    用于将rpc请求返回客户端

    hadoop源代码怎么看,hadoop的主要模板

    Server的启动

    服务器的构造是通过静态方法RPC.Builder(conf).build()创建的,通过跟踪代码我们发现他最后调用了Server的构造方法

    protectedServer(StringbindAddress,intport,Class<?extendsWritable>rpcRequestClass,inthandlerCount,intnumReaders,intqueueSizePerHandler,Configurationconf,StringserverName,SecretManager<?extendsTokenIdentifier>secretManager,StringportRangeConfig)throwsIOException{
    …………………………………..
    this.callQueue=newCallQueueManager<Call>(getQueueClass(prefix,conf),maxQueueSize,prefix,conf);
    …………………………………………
    
    //Startthelistenerhereandletitbindtotheport
    listener=newListener();
    this.port=listener.getAddress().getPort();
    ………………………………………………..
    //Createtheresponderhere
    responder=newResponder();
    …………………………………………….
    }
    
    

    我们看到我们上面提到的两个内部类listener和responder都是在这里创建的,之后调用start方法启动服务。

    publicsynchronizedvoidstart(){
    responder.start();
    listener.start();
    handlers=newHandler[handlerCount];
    
    for(inti=0;i<handlerCount;i++){
    handlers[i]=newHandler(i);
    handlers[i].start();
    }
    }
    

    接收请求

    从Listener的构造方法中我们看到服务器监听了SelectionKey.OP_ACCEPT,他只是监听是否有请求过来,而不做处理,这样为了提高并发。 同时启动了一些Reader线程,这些线程是用来从channel读取数据的。

    publicListener()throwsIOException{
    address=newInetSocketAddress(bindAddress,port);
    //Createanewserversocketandsettononblockingmode
    acceptChannel=ServerSocketChannel.open();
    acceptChannel.configureBlocking(false);
    
    //Bindtheserversockettothelocalhostandport
    bind(acceptChannel.socket(),address,backlogLength,conf,portRangeConfig);
    port=acceptChannel.socket().getLocalPort();//Couldbeanephemeralport
    //createaselector;
    selector=Selector.open();
    readers=newReader[readThreads];
    for(inti=0;i<readThreads;i++){
    Readerreader=newReader(
    "SocketReader#"+(i+1)+"forport"+port);
    readers[i]=reader;
    reader.start();
    }
    
    //监听OP_ACCEPT事件
    **acceptChannel.register(selector,SelectionKey.OP_ACCEPT);**
    this.setName("IPCServerlisteneron"+port);
    this.setDaemon(true);
    }
    

    Reader线程读取数据

    通过Listener的run方法我们看到如果一旦接受到请求,然后就让reader去处理

    Connectionc=connectionManager.register(channel);
    //IftheconnectionManagercan'ttakeit,closetheconnection.
    if(c==null){
    if(channel.isOpen()){
    IOUtils.cleanup(null,channel);
    }
    continue;
    }
    key.attach(c);//socloseCurrentConnectioncangettheobject
    **reader.addConnection(c);**
    

    跟踪Reader的run方法,我们看到最后将读取的信息封装成了一个Call对象put到callQueue中

    Callcall=newCall(header.getCallId(),header.getRetryCount(),
    rpcRequest,this,ProtoUtil.convert(header.getRpcKind()),
    header.getClientId().toByteArray(),traceSpan);
    
    callQueue.put(call);//queuethecall;maybeblockedhere
    

    Handler线程处理请求

    finalCallcall=callQueue.take();//popthequeue;maybeblockedhere
    if(LOG.isDebugEnabled()){
    LOG.debug(Thread.currentThread().getName()+":"+call+"forRpcKind"+call.rpcKind);
    }
    if(!call.connection.channel.isOpen()){
    LOG.info(Thread.currentThread().getName()+":skipped"+call);
    continue;
    }
    

    最后调用了RpcInvoker的call方法最终通过反射来执行相应的方法

    Methodmethod=
    protocolImpl.protocolClass.getMethod(call.getMethodName(),
    call.getParameterClasses());
    method.setAccessible(true);
    server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
    Objectvalue=
    method.invoke(protocolImpl.protocolImpl,call.getParameters());
    if(server.verbose)log("Return:"+value);
    returnnewObjectWritable(method.getReturnType(),value);
    

    客户端实现

    获取代理

    通过RPC的静态方法getProxy获取代理

    TestProtocolproxy=RPC.getProxy(TestProtocol.class,TestProtocol.versionID,
    addr,conf);
    

    在这里是通过java的动态代理来获取代理。通过跟踪代码我们找到了这里

    public<T>ProtocolProxy<T>getProxy(Class<T>protocol,longclientVersion,
    InetSocketAddressaddr,UserGroupInformationticket,
    Configurationconf,SocketFactoryfactory,
    intrpcTimeout,RetryPolicyconnectionRetryPolicy,
    AtomicBooleanfallbackToSimpleAuth)
    throwsIOException{
    
    if(connectionRetryPolicy!=null){
    thrownewUnsupportedOperationException(
    "Notsupported:connectionRetryPolicy="+connectionRetryPolicy);
    }
    
    Tproxy=(T)Proxy.newProxyInstance(protocol.getClassLoader(),
    newClass[]{protocol},newInvoker(protocol,addr,ticket,conf,
    factory,rpcTimeout,fallbackToSimpleAuth));
    returnnewProtocolProxy<T>(protocol,proxy,true);
    }
    

    hadoop源代码怎么看,hadoop的主要模板

    发送请求。

    在Invoker的构造方法里,我们看到在这里新建了一个org.apache.hadoop.ipc.Client对象,在invoke方法里调用了client里面的call方法,最终调用connection.sendRpcRequest(call); 来发送rpc请求

    finalCallcall=createCall(rpcKind,rpcRequest);
    Connectionconnection=getConnection(remoteId,call,serviceClass,
    fallbackToSimpleAuth);
    try{
    connection.sendRpcRequest(call);//sendtherpcrequest
    }catch(RejectedExecutionExceptione){
    thrownewIOException("connectionhasbeenclosed",e);
    }catch(InterruptedExceptione){
    Thread.currentThread().interrupt();
    LOG.warn("interruptedwaitingtosendrpcrequesttoserver",e);
    thrownewIOException(e);
    }
    
    

    在sendRpcRequest方法里,可以看到使用了基于tcp的socket通讯,将数据发送了服务器端。

    synchronized(Connection.this.out){
    if(shouldCloseConnection.get()){
    return;
    }
    
    if(LOG.isDebugEnabled())
    LOG.debug(getName()+"sending#"+call.id);
    
    byte[]data=d.getData();
    inttotalLength=d.getLength();
    out.writeInt(totalLength);//TotalLength
    out.write(data,0,totalLength);//RpcRequestHeader+RpcRequest
    out.flush();
    }
    

    欢迎关注我的公众号【大数据技术与应用实战】,获取更多干货资料!