RPC 原理及 RPC 实例分析 - God is a Coder..

2018/5/28 posted in  Microservice

原文地址

在学校期间大家都写过不少程序,比如写个 hello world 服务类,然后本地调用下,如下所示。这些程序的特点是服务消费方和服务提供方是本地调用关系。

public class Test {
     public static void main(String[] args) {
         HelloWorldService helloWorldService = new HelloWorldServiceImpl();
         helloWorldService.sayHello("test");
     }
}

而一旦踏入公司尤其是大型互联网公司就会发现,公司的系统都由成千上万大大小小的服务组成,各服务部署在不同的机器上,由不同的团队负责。

这时就会遇到两个问题:

  1. 要搭建一个新服务,免不了需要依赖他人的服务,而现在他人的服务都在远端,怎么调用?
  2. 其它团队要使用我们的新服务,我们的服务该怎么发布以便他人调用?下文将对这两个问题展开探讨。

1.  如何调用他人的远程服务?

由于各服务部署在不同机器,服务间的调用免不了网络通信过程,服务消费方每调用一个服务都要写一坨网络通信相关的代码,不仅复杂而且极易出错。

如果有一种方式能让我们像调用本地服务一样调用远程服务,而让调用者对网络通信这些细节透明,那么将大大提高生产力,比如服务消费方在执行 helloWorldService.sayHello("test") 时,实质上调用的是远端的服务。这种方式其实就是 RPC(Remote Procedure Call Protocol),在各大互联网公司中被广泛使用,如阿里巴巴的 hsf、dubbo(开源)、Facebook 的 thrift(开源)、Google grpc(开源)、Twitter 的 finagle(开源)等。

要让网络通信细节对使用者透明,我们需要对通信细节进行封装,我们先看下一个 RPC 调用的流程涉及到哪些通信细节:

  1. 服务消费方(client)调用以本地调用方式调用服务;
  2. client stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
  3. client stub 找到服务地址,并将消息发送到服务端;
  4. server stub 收到消息后进行解码;
  5. server stub 根据解码结果调用本地的服务;
  6. 本地服务执行并将结果返回给 server stub;
  7. server stub 将返回结果打包成消息并发送至消费方;
  8. client stub 接收到消息,并进行解码;
  9. 服务消费方得到最终结果。

RPC 的目标就是要 2~8 这些步骤都封装起来,让用户对这些细节透明。

1.1 怎么做到透明化远程服务调用?

怎么封装通信细节才能让用户像以本地调用方式调用远程服务呢?对 java 来说就是使用代理!java 代理有两种方式:

  1. jdk 动态代理
  2. 字节码生成

尽管字节码生成方式实现的代理更为强大和高效,但代码维护不易,大部分公司实现 RPC 框架时还是选择动态代理方式。

下面简单介绍下动态代理怎么实现我们的需求。我们需要实现 RPCProxyClient 代理类,代理类的 invoke 方法中封装了与远端服务通信的细节,消费方首先从 RPCProxyClient 获得服务提供方的接口,当执行 helloWorldService.sayHello("test") 方法时就会调用 invoke 方法。

public class RPCProxyClient implements java.lang.reflect.InvocationHandler{
    private Object obj;

    public RPCProxyClient(Object obj){
        this.obj=obj;
    }

    /**
     * 得到被代理对象;
     */
    public static Object getProxy(Object obj){
        return java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(),
                obj.getClass().getInterfaces(), new RPCProxyClient(obj));
    }

    /**
     * 调用此方法执行
     */
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        //结果参数;
        Object result = new Object();
        // ...执行通信相关逻辑
        // ...
        return result;
    }
}
public class Test {
     public static void main(String[] args) {
         HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService.class);
         helloWorldService.sayHello("test");
     }
 }

1.2  怎么对消息进行编码和解码?

1.2.1 确定消息数据结构

  上节讲了 invoke 里需要封装通信细节(通信细节再后面几章详细探讨),而通信的第一步就是要确定客户端和服务端相互通信的消息结构。客户端的请求消息结构一般需要包括以下内容:

1)接口名称

  在我们的例子里接口名是 “HelloWorldService”,如果不传,服务端就不知道调用哪个接口了;

2)方法名

  一个接口内可能有很多方法,如果不传方法名服务端也就不知道调用哪个方法;

3)参数类型 & 参数值

  参数类型有很多,比如有 bool、int、long、double、string、map、list,甚至如 struct(class);以及相应的参数值;

4)超时时间

5)requestID,标识唯一请求 id,在下面一节会详细描述 requestID 的用处。

  同理服务端返回的消息结构一般包括以下内容。

1)返回值

2)状态 code

3)requestID 

1.2.2 序列化

一旦确定了消息的数据结构后,下一步就是要考虑序列化与反序列化了。

什么是序列化?序列化就是将数据结构或对象转换成二进制串的过程,也就是编码的过程。

什么是反序列化?将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程。

为什么需要序列化?转换为二进制串后才好进行网络传输嘛!

为什么需要反序列化?将二进制转换为对象才好进行后续处理!

现如今序列化的方案越来越多,每种序列化方案都有优点和缺点,它们在设计之初有自己独特的应用场景,那到底选择哪种呢?从 RPC 的角度上看,主要看三点:

  1. 通用性,比如是否能支持 Map 等复杂的数据结构;
  2. 性能,包括时间复杂度和空间复杂度,由于 RPC 框架将会被公司几乎所有服务使用,如果序列化上能节约一点时间,对整个公司的收益都将非常可观,同理如果序列化上能节约一点内存,网络带宽也能省下不少;
  3. 可扩展性,对互联网公司而言,业务变化飞快,如果序列化协议具有良好的可扩展性,支持自动增加新的业务字段,而不影响老的服务,这将大大提供系统的灵活度。

目前互联网公司广泛使用 Protobuf、Thrift、Avro 等成熟的序列化解决方案来搭建 RPC 框架,这些都是久经考验的解决方案。

1.3  通信

消息数据结构被序列化为二进制串后,下一步就要进行网络通信了。目前有两种常用 IO 通信模型:1)BIO;2)NIO。一般 RPC 框架需要支持这两种 IO 模型。

如何实现 RPC 的 IO 通信框架呢?

  1. 使用 java nio 方式自研,这种方式较为复杂,而且很有可能出现隐藏 bug,但也见过一些互联网公司使用这种方式;
  2. 基于 mina,mina 在早几年比较火热,不过这些年版本更新缓慢;
  3. 基于 netty,现在很多 RPC 框架都直接基于 netty 这一 IO 通信框架,省力又省心,比如阿里巴巴的 HSF、dubbo,Twitter 的 finagle 等。

1.4  消息里为什么要有 requestID?

如果使用 netty 的话,一般会用 channel.writeAndFlush()方法来发送消息二进制串,这个方法调用后对于整个远程调用 (从发出请求到接收到结果) 来说是一个异步的,即对于当前线程来说,将请求发送出来后,线程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现以下两个问题:

  1. 怎么让当前线程 “暂停”,等结果回来后,再向后执行?
  2. 如果有多个线程同时进行远程方法调用,这时建立在 client server 之间的 socket 连接上会有很多双方发送的消息传递,前后顺序也可能是随机的,server 处理完结果后,将结果消息发送给 client,client 收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?

如下图所示,线程 A 和线程 B 同时向 client socket 发送请求 requestA 和 requestB,socket 先后将 requestB 和 requestA 发送至 server,而 server 可能将 responseA 先返回,尽管 requestA 请求到达时间更晚。我们需要一种机制保证 responseA 丢给 ThreadA,responseB 丢给 ThreadB。

怎么解决呢?

  1. client 线程每次通过 socket 调用一次远程接口前,生成一个唯一的 ID,即 requestID(requestID 必需保证在一个 Socket 连接里面是唯一的),一般常常使用 AtomicLong 从 0 开始累计数字生成唯一 ID;
  2. 将处理结果的回调对象 callback,存放到全局 ConcurrentHashMap 里面 put(requestID, callback);
  3. 当线程调用 channel.writeAndFlush() 发送消息后,紧接着执行 callback 的 get() 方法试图获取远程返回的结果。在 get() 内部,则使用 synchronized 获取回调对象 callback 的锁,再先检测是否已经获取到结果,如果没有,然后调用 callback 的 wait() 方法,释放 callback 上的锁,让当前线程处于等待状态。
  4. 服务端接收到请求并处理后,将 response 结果(此结果中包含了前面的 requestID)发送给客户端,客户端 socket 连接上专门监听消息的线程收到消息,分析结果,取到 requestID,再从前面的 ConcurrentHashMap 里面 get(requestID),从而找到 callback 对象,再用 synchronized 获取 callback 上的锁,将方法调用结果设置到 callback 对象里,再调用 callback.notifyAll() 唤醒前面处于等待状态的线程。
public Object get() {
        synchronized (this) { // 旋锁
            while (!isDone) { // 是否有结果了
                wait(); //没结果是释放锁,让当前线程处于等待状态
            }
        }
}
private void setDone(Response res) {
        this.res = res;
        isDone = true;
        synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了
            notifyAll(); // 唤醒处于等待的线程
        }
    }

2 如何发布自己的服务?

如何让别人使用我们的服务呢?有同学说很简单嘛,告诉使用者服务的 IP 以及端口就可以了啊。确实是这样,这里问题的关键在于是自动告知还是人肉告知。

人肉告知的方式:如果你发现你的服务一台机器不够,要再添加一台,这个时候就要告诉调用者我现在有两个 ip 了,你们要轮询调用来实现负载均衡;调用者咬咬牙改了,结果某天一台机器挂了,调用者发现服务有一半不可用,他又只能手动修改代码来删除挂掉那台机器的 ip。现实生产环境当然不会使用人肉方式。

有没有一种方法能实现自动告知,即机器的增添、剔除对调用方透明,调用者不再需要写死服务提供方地址?当然可以,现如今 zookeeper 被广泛用于实现服务自动注册与发现功能!

简单来讲,zookeeper 可以充当一个服务注册表(Service Registry),让多个服务提供者形成一个集群,让服务消费者通过服务注册表获取具体的服务访问地址(ip + 端口)去访问具体的服务提供者。如下图所示:

具体来说,zookeeper 就是个分布式文件系统,每当一个服务提供者部署后都要将自己的服务注册到 zookeeper 的某一路径上: /{service}/{version}/{ip:port}, 比如我们的 HelloWorldService 部署到两台机器,那么 zookeeper 上就会创建两条目录:分别为 / HelloWorldService/1.0.0/100.19.20.01:16888  /HelloWorldService/1.0.0/100.19.20.02:16888。

zookeeper 提供了 “心跳检测” 功能,它会定时向各个服务提供者发送一个请求(实际上建立的是一个 Socket 长连接),如果长期没有响应,服务中心就认为该服务提供者已经“挂了”,并将其剔除,比如 100.19.20.02 这台机器如果宕机了,那么 zookeeper 上的路径就会只剩 / HelloWorldService/1.0.0/100.19.20.01:16888。

服务消费者会去监听相应路径(/HelloWorldService/1.0.0),一旦路径上的数据有任务变化(增加或减少),zookeeper 都会通知服务消费方服务提供者地址列表已经发生改变,从而进行更新。

更为重要的是 zookeeper 与生俱来的容错容灾能力(比如 leader 选举),可以确保服务注册表的高可用性。

3.Hadoop 中 RPC 实例分析

ipc.RPC 类中有一些内部类,为了大家对 RPC 类有个初步的印象,就先罗列几个我们感兴趣的分析一下吧:

Invocation :用于封装方法名和参数,作为数据传输层。
ClientCache :用于存储 client 对象,用 socket factory 作为 hash key, 存储结构为 hashMap
Invoker :是动态代理中的调用实现类,继承了 InvocationHandler.
Server :是 ipc.Server 的实现类。

    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      •••
      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      •••
      return value.get();
    }

如果你发现这个 invoke() 方法实现的有些奇怪的话,那你就对了。一般我们看到的动态代理的 invoke() 方法中总会有 method.invoke(ac, arg);  这句代码。而上面代码中却没有,这是为什么呢?其实使用 method.invoke(ac, arg); 是在本地 JVM 中调用;而在 hadoop 中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的 invoke() 方法必然需要进行网络通信。而网络通信就是下面的这段代码实现的:

ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);

Invocation 类在这里封装了方法名和参数。其实这里网络通信只是调用了 Client 类的 call() 方法。那我们接下来分析一下 ipc.Client 源码吧。和第一章一样,同样是 3 个问题

  1. 客户端和服务端的连接是怎样建立的?
  2. 客户端是怎样给服务端发送数据的?
  3. 客户端是怎样获取服务端的返回数据的?

3.1 客户端和服务端的连接是怎样建立的?

public Writable call(Writable param, ConnectionId remoteId)  
                       throws InterruptedException, IOException {
    Call call = new Call(param);       //将传入的数据封装成call对象
    Connection connection = getConnection(remoteId, call);   //获得一个连接
    connection.sendParam(call);     // 向服务端发送call对象
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
        } catch (InterruptedException ie) {
          // 因中断异常而终止,设置标志interrupted为true
          interrupted = true;
        }
      }
      if (interrupted) {
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // 本地异常
          throw wrapException(remoteId.getAddress(), call.error);
        }
      } else {
        return call.value; //返回结果数据
      }
    }
  }

具体代码的作用我已做了注释,所以这里不再赘述。但到目前为止,你依然不知道 RPC 机制底层的网络连接是怎么建立的。分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:

  Connection connection = getConnection(remoteId, call);   //获得一个连接
  connection.sendParam(call);      // 向服务端发送call对象

先看看是怎么获得一个到服务端的连接吧,下面贴出 ipc.Client 类中的 getConnection() 方法。

private Connection getConnection(ConnectionId remoteId,
                                   Call call)
                                   throws IOException, InterruptedException {
    if (!running.get()) {
      // 如果client关闭了
      throw new IOException("The client is stopped");
    }
    Connection connection;
//如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。
//但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call)); //将call对象放入对应连接中的calls池,就不贴出源码了
   //这句代码才是真正的完成了和服务端建立连接哦~
    connection.setupIOstreams();
    return connection;
  }

下面贴出 Client.Connection 类中的 setupIOstreams() 方法:

  private synchronized void setupIOstreams() throws InterruptedException {
   •••
      try {
       •••
        while (true) {
          setupConnection();  //建立连接
          InputStream inStream = NetUtils.getInputStream(socket);     //获得输入流
          OutputStream outStream = NetUtils.getOutputStream(socket);  //获得输出流
          writeRpcHeader(outStream);
          •••
          this.in = new DataInputStream(new BufferedInputStream
              (new PingInputStream(inStream)));   //将输入流装饰成DataInputStream
          this.out = new DataOutputStream
          (new BufferedOutputStream(outStream));   //将输出流装饰成DataOutputStream
          writeHeader();
          // 跟新活动时间
          touch();
          //当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
          start();
          return;
        }
      } catch (IOException e) {
        markClosed(e);
        close();
      }
    }

再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出 Client.Connection 类中的 setupConnection() 方法:

  private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      while (true) {
        try {
          this.socket = socketFactory.createSocket(); //终于看到创建socket的方法了
          this.socket.setTcpNoDelay(tcpNoDelay);
         •••
          // 设置连接超时为20s
          NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (SocketTimeoutException toe) {
          /* 设置最多连接重试为45次。
           * 总共有20s*45 = 15 分钟的重试时间。
           */
          handleConnectionFailure(timeoutFailures++, 45, toe);
        } catch (IOException ie) {
          handleConnectionFailure(ioFailures++, maxRetries, ie);
        }
      }
    }

终于,我们知道了客户端的连接是怎样建立的了,其实就是创建一个普通的 socket 进行通信。

3.2 客户端是怎样给服务端发送数据的? 

下面贴出 Client.Connection 类的 sendParam() 方法吧:

public void sendParam(Call call) {
      if (shouldCloseConnection.get()) {
        return;
      }
      DataOutputBuffer d=null;
      try {
        synchronized (this.out) {
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + " sending #" + call.id);
          //创建一个缓冲区
          d = new DataOutputBuffer();
          d.writeInt(call.id);
          call.param.write(d);
          byte[] data = d.getData();
          int dataLength = d.getLength();
          out.writeInt(dataLength);        //首先写出数据的长度
          out.write(data, 0, dataLength); //向服务端写数据
          out.flush();
        }
      } catch(IOException e) {
        markClosed(e);
      } finally {
        IOUtils.closeStream(d);
      }
    }  

3.3 客户端是怎样获取服务端的返回数据的? 

下面贴出 Client.Connection 类和 Client.Call 类中的相关方法:

方法一:  
  public void run() {
      •••
      while (waitForWork()) {
        receiveResponse();  //具体的处理方法
      }
      close();
     •••
}

方法二:
private void receiveResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();
      try {
        int id = in.readInt();                    // 阻塞读取id
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + id);
          Call call = calls.get(id);    //在calls池中找到发送时的那个对象
        int state = in.readInt();     // 阻塞读取call对象的状态
        if (state == Status.SUCCESS.state) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);           // 读取数据
        //将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三
          call.setValue(value);              
          calls.remove(id);               //删除已处理的call    
        } else if (state == Status.ERROR.state) {
        •••
        } else if (state == Status.FATAL.state) {
        •••
        }
      } catch (IOException e) {
        markClosed(e);
      }
}

方法三:
public synchronized void setValue(Writable value) {
      this.value = value;
      callComplete();   //具体实现
}
protected synchronized void callComplete() {
      this.done = true;
      notify();         // 唤醒client等待线程
    }

完成的功能主要是:启动一个处理线程,读取从服务端传来的 call 对象,将 call 对象读取完毕后,唤醒 client 处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面我们来分析 Server 端的源码吧。

3.4 ipc.Server 源码分析

为了让大家对 ipc.Server 有个初步的了解,我们先分析一下它的几个内部类吧:

Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时 Listener 内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让 Reader 读取用户请求。
Responder :响应 RPC 请求类,请求处理完毕,由 Responder 发送给请求客户端。
Connection :连接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取 callQueue 中的 call 对象,并对其进行操作。

private void initialize(Configuration conf) throws IOException {
   •••
    // 创建 rpc server
    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      //获得serviceRpcServer
      this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), 
          dnSocketAddr.getPort(), serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      setRpcServiceServerAddress(conf);
}
//获得server
    this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());

   •••
    this.server.start();  //启动 RPC server   Clients只允许连接该server
    if (serviceRpcServer != null) {
      serviceRpcServer.start();  //启动 RPC serviceRpcServer 为HDFS服务的server
    }
    startTrashEmptier(conf);
  }

查看 Namenode 初始化源码得知:RPC 的 server 对象是通过 ipc.RPC 类的 getServer() 方法获得的。下面咱们去看看 ipc.RPC 类中的 getServer() 源码吧:

public static Server getServer(final Object instance, final String bindAddress, final int port,
                                 final int numHandlers,
                                 final boolean verbose, Configuration conf,
                                 SecretManager<? extends TokenIdentifier> secretManager) 
    throws IOException {
    return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
  }

这时我们发现 getServer()是一个创建 Server 对象的工厂方法,但创建的却是 RPC.Server 类的对象。哈哈,现在你明白了我前面说的 “RPC.Server 是 ipc.Server 的实现类” 了吧。不过 RPC.Server 的构造函数还是调用了 ipc.Server 类的构造函数的,因篇幅所限,就不贴出相关源码了。

初始化 Server 后,Server 端就运行起来了,看看 ipc.Server 的 start() 源码吧:

  /** 启动服务 */
  public synchronized void start() {
    responder.start();  //启动responder
    listener.start();   //启动listener
    handlers = new Handler[handlerCount];

    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();   //逐个启动Handler
    }
  }

分析过 ipc.Client 源码后,我们知道 Client 端的底层通信直接采用了阻塞式 IO 编程,当时我们曾做出猜测:Server 端是不是也采用了阻塞式 IO。现在我们仔细地分析一下吧,如果 Server 端也采用阻塞式 IO,当连接进来的 Client 端很多时,势必会影响 Server 端的性能。hadoop 的实现者们考虑到了这点,所以他们采用了 java  NIO 来实现 Server 端,那 Server 端采用 java NIO 是怎么建立连接的呢?分析源码得知,Server 端采用 Listener 监听客户端的连接,下面先分析一下 Listener 的构造函数吧:

    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // 创建ServerSocketChannel,并设置成非阻塞式
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // 将server socket绑定到本地端口
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); 
      // 获得一个selector
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      //启动多个reader线程,为了防止请求多时服务端响应延时的问题
      for (int i = 0; i < readThreads; i++) {       
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }
      // 注册连接事件
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

在启动 Listener 线程时,服务端会一直等待客户端的连接,下面贴出 Server.Listener 类的 run() 方法:

  public void run() {
     •••
      while (running) {
        SelectionKey key = null;
        try {
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);     //具体的连接方法
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
       •••         
    }

下面贴出 Server.Listener 类中 doAccept() 方法中的关键源码吧:

    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) { //建立连接
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();  //从readers池中获得一个reader
        try {
          reader.startAdd(); // 激活readSelector,设置adding为true
          SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件
          c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象
          readKey.attach(c);   //将connection对象注入readKey
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
        ••• 
        } finally {
//设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
//用了wait()方法等待。因篇幅有限,就不贴出源码了。
          reader.finishAdd();
        }
      }
    }

当 reader 被唤醒,reader 接着执行 doRead() 方法。

下面贴出 Server.Listener.Reader 类中的 doRead() 方法和 Server.Connection 类中的 readAndProcess() 方法源码:

方法一:   
 void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();  //获得connection对象
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      try {
        count = c.readAndProcess();    // 接受并处理请求  
      } catch (InterruptedException ieo) {
       •••
      }
     •••    
}

方法二:
public int readAndProcess() throws IOException, InterruptedException {
      while (true) {
        •••
        if (!rpcHeaderRead) {
          if (rpcHeaderBuffer == null) {
            rpcHeaderBuffer = ByteBuffer.allocate(2);
          }
         //读取请求头
          count = channelRead(channel, rpcHeaderBuffer);
          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
            return count;
          }
        // 读取请求版本号  
          int version = rpcHeaderBuffer.get(0);
          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
        •••  

          data = ByteBuffer.allocate(dataLength);
        }
        // 读取请求  
        count = channelRead(channel, data);

        if (data.remaining() == 0) {
         •••
          if (useSasl) {
         •••
          } else {
            processOneRpc(data.array());//处理请求
          }
        •••
          }
        } 
        return count;
      }
    }

下面贴出 Server.Connection 类中的 processOneRpc() 方法和 processData() 方法的源码。

方法一:   
 private void processOneRpc(byte[] buf) throws IOException,
        InterruptedException {
      if (headerRead) {
        processData(buf);
      } else {
        processHeader(buf);
        headerRead = true;
        if (!authorizeConnection()) {
          throw new AccessControlException("Connection from " + this
              + " for protocol " + header.getProtocol()
              + " is unauthorized for user " + user);
        }
      }
}
方法二:
    private void processData(byte[] buf) throws  IOException, InterruptedException {
      DataInputStream dis =
        new DataInputStream(new ByteArrayInputStream(buf));
      int id = dis.readInt();      // 尝试读取id
      Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
      param.readFields(dis);        

      Call call = new Call(id, param, this);  //封装成call
      callQueue.put(call);   // 将call存入callQueue
      incRpcCount();  // 增加rpc请求的计数
    }

4. RPC 与 web service

RPC:

Web service

web service 接口就是 RPC 中的 stub 组件,规定了 server 能够提供的服务(web service),这在 server 和 client 上是一致的,但是也是跨语言跨平台的。同时,由于 web service 规范中的 WSDL 文件的存在,现在各平台的 web service 框架,都可以基于 WSDL 文件,自动生成 web service 接口 。

其实两者差不多,只是传输的协议不同。

Reference:

1. http://www.cnblogs.com/LBSer/p/4853234.html
2. http://weixiaolu.iteye.com/blog/1504898
3. http://kyfxbl.iteye.com/blog/1745550