Hadoop的RPC分析

RPC

RPC就是远程过程调用,具体什么是RPC,看一个例子就会明白。
比如客户端有一个RPC协议类Protocol。

1
2
3
interfce Protocol{
int add(int a, int b);
}

但是客户端没有其实现的具体类,该类在服务端

1
2
3
4
5
Class ProtocolImpl implenets Protocol{
int add(int a, int b){
return a + b;
}
}

则客户端需要调用ProtocolImpl的add方法,需要将调用的方法及其参数等信息发送给服务端,服务端解析信息,调用ProtocolImpl的add方法,将结果在传输给客户端,而RPC的目的就是使客户端仿佛在调用自身的方法来得到该方法的结果。
在这其中,Protocol就是一个RPC的协议,这个协议其实就是一个接口类,接口中的方法就是对外提供的远程调用方法。

RPC由四个模块组成:
1、通信模块。
两个相互协作的通信模块实现请求-应答协议,它们在客户和服务器之间传递请求和应答消息,一般不会对数据包进行任何处理。请求–应答协议的实现方式有同步方式和异步方式两种。同步模式下客户端程序一直阻塞到服务器端发送的应答请求到达本地; 而异步模式不同,客户端将请求发送到服务器端后,不必等待应答返回,可以做其他事情,待服务器端处理完请求后,主动通知客户端。在高并发应用场景中,一般采用异步模式以降低访问延迟和提高带宽利用率。
2、Stub 程序(代理程序)。
客户端和服务器端均包含Stub程序,可将之看做代理程序。它使得远程函数调用表现得跟本地调用一样,对用户程序完全透明。在客户端,它表现得就像一个本地程序,但不直接执行本地调用,而是将请求信息通过网络模块发送给服务器端。此外,当服务器发送应答后,它会解码对应结果。在服务器端,Stub程序依次进行解码请求消息中的参数、调用相应的服务过程和编码应答结果的返回值等处理。
3、调度程序。
调度程序接收来自通信模块的请求消息,并根据其中的标识选择一个Stub程序进行处理。通常客户端并发请求量比较大时,会采用线程池提高处理效率。
4、客户程序/服务过程。
请求的发出者和请求的处理者。

Hadoop RPC

Hadoop RPC主要分为四个部分,分别是序列化层、函数调用层、网络传输层和服务器端处理框架,具体实现机制如下:

序列化层。序列化主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储,在RPC框架中,
它主要用于将用户请求中的参数或者应答转化成字节流以便跨机器传输。Hadoop2.0之后,
主要用Protocol Buffers和Apache Avro,Hadoop本身也提供了一套序列化框架,
一个类只要实现Writable接口即可支持对象序列化与反序列化。

函数调用层。函数调用层主要功能是定位要调用的函数并执行该函数,
Hadoop RPC采用了Java反射机制(服务器端)与动态代理(客户端)实现了函数调用。

网络传输层。网络传输层描述了Client与Server之间消息传输的方式,Hadoop RPC采用了基于TCP/IP的Socket机制。

服务器端处理框架。服务器端处理框架可被抽象为网络I/O模型,它描述了客户端与服务器端间信息交互方式,
它的设计直接决定着服务器端的并发处理能力,常见的网络 I/O 模型有阻塞式 I/O、非阻塞式 I/O、事件驱动 I/O 等,而Hadoop RPC采用了基于Reactor设计模式的事件驱动 I/O 模型(NIO)。

Hadoop RPC的简单使用

首先需要定义一个PRC协议,该接口必须继承VersionedProtocol接口

1
2
3
4
public interface IProxyProtocol extends VersionedProtocol{
long versionID = 1234L;
int add(int a, int b);
}

接着需要一个类实现PRC的接口用于服务端的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyProxyProtocol implements IProxyProtocol {
@Override
public int add(int a, int b) {
System.out.println("I am adding");
return a+b;
}

@Override
public long getProtocolVersion(String s, long l) throws IOException {
System.out.println("MyProxy.ProtocolVersion = " + IProxyProtocol.versionID );
return IProxyProtocol.versionID ;
}

@Override
public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
return new ProtocolSignature(IProxyProtocol.versionID , null);
}
}

最后是实现客户端和服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyRPCClient {
public final static int PORT = 8888;
public final static String ADDRESS = "localhost";
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
IProxyProtocol proxy;
proxy = RPC.getProxy(IProxyProtocol.class,
111 ,
new InetSocketAddress(ADDRESS, PORT), conf);
int result = proxy.add(2, 3);
System.out.println(result);
RPC.stopProxy(proxy);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyRPCServer {
public final static int PORT = 8888;
public final static String ADDRESS = "localhost";
public static void main(String[] args) throws IOException{
RPC.Server server = new RPC.Builder(new Configuration())
.setProtocol(IProxyProtocol.class)
.setInstance(new MyProxyProtocol())
.setBindAddress(ADDRESS)
.setPort(PORT)
.setNumHandlers(10)
.build();
server.start();
}
}

上述的Hadoop RPC的实现,客户端是调用了 RPC.getProxy方法,生成了IProxyProtocol的动态代理,在调用协议的方法时,代理类会将方法和参数出给服务端,服务端使用具体的实现类来执行方法并返回结果给客户端。需要注意的是RPC协议必须声明versionID这个变量或者定义ProtocolInfo注解(包含看协议的名字和versionID)

客户端的实现

首先我们来看Hadoop RPC的客户端代码,调用RPC.getProxy方法,追踪这个方法,可以看到其内部实现

1
2
3
4
public static <T> T getProxy(Class<T> protocol,long clientVersion,InetSocketAddress addr, Configuration conf)
throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}

内部调用了getProtocolProxy,而getProtocolProxy又经过几次重载函数的调用,最后的实现是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
* @param protocol protocol RPC协议
* @param clientVersion client's version
* @param addr server address 服务端的地址 ip:port
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @param connectionRetryPolicy retry policy
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
}

其中ProtocolProxy封装了动态代理类和PRC协议,ProtocolProxy.getProxy会返回生成的动态代理类。而ProtocolProxy类是getProtocolEngine返回的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 如果缓存中存在该协议的RpcEngine,则直接调用
* 否则就从Configuration中取出rpc.engine.protocol.getName()的RpeEngine类
* 默认为WritableRpcEngine类
*/
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}

可以看到RpcEngine从Configuration读取,如果没有,默认设置是WritableRpcEngine。这是Hadoop RPC对序列化方式多样性的支持,目前提供了Writable(WritableRpcEngine)和Protocol Buffer(ProtocolRpcEngine)两种,用户也可以通过RPC.setProtocolEngine()设置。这里我们只分析WritableRpcEngine,跳转到WritableRpcEngine.getProxy方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
//默认为null
if (connectionRetryPolicy != null) {
throw new UnsupportedOperationException(
"Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
}
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy<T>(protocol, proxy, true);
}

可以看到这里使用了动态代理生成的了代理类,封装进了ProtocolProxy类中,而RPC.getProxy中是返回了ProtocolProxy.getProxy的结果。则调用PRC协议的方法会触发代理类的invoke方法,客户端就是在invoke方法中实现了方法名和参数的传递和结果的接受。因此我们继续看Invoker类,这是WritableRpcEngine的内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private static class Invoker implements RpcInvocationHandler {
//ConnectionId中包括了服务端的地址,协议类,安全令牌等,用来唯一标识Client.Connection类
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
private final AtomicBoolean fallbackToSimpleAuth;

public Invoker(Class<?> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
TraceScope traceScope = null;
if (Trace.isTracing()) {
traceScope = Trace.startSpan(RpcClientUtil.methodToTraceString(method));
}
ObjectWritable value;
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
remoteId, fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
...
}

可以看到invoke中是调用了Client.call方法来进行远程函数调用,Client的获得是先从CLIENTS中的缓存(其实就是内部维护了一个HashMap<SocketFactory, Client>),如果没有就根据SocketFactory和序列化类型实例化一个并放入其中。call方法的一个参数new Invocation(method, args),其实是用来序列化方法类及其传递的参数的。Invocation类实现了Writable接口,Writable是hadoop用来序列化的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private static class Invocation implements Writable, Configurable {
private String methodName; //方法名
private Class<?>[] parameterClasses; //参数类型
private Object[] parameters; //参数实例对象
private Configuration conf;
private long clientVersion; //客户端version,主要从方法所在类的ProtocolInfo注解中的version或者versionID中获得
private int clientMethodsHash; //方法所在类所有方法的形成的hash
private String declaringClassProtocolName; //要从方法所在类的ProtocolInfo注解中的name
...

public void readFields(DataInput in) throws IOException {
rpcVersion = in.readLong();
declaringClassProtocolName = UTF8.readString(in);
methodName = UTF8.readString(in);
clientVersion = in.readLong();
clientMethodsHash = in.readInt();
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}

@Override
@SuppressWarnings("deprecation")
public void write(DataOutput out) throws IOException {
out.writeLong(rpcVersion);
UTF8.writeString(out, declaringClassProtocolName);
UTF8.writeString(out, methodName);
out.writeLong(clientVersion);
out.writeInt(clientMethodsHash);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], conf, true);
}
}
...
}

继续我们追踪Client.call,来了的Client类中(客户端核心的类),通过重载函数的调用,最后定位到了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters 包含序列化的方法和方法参数
* @param remoteId - the target rpc server
* @param serviceClass - service class for RPC
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @returns the rpc response
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest); //将序列化的信息封装进Call中
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth); //获得Connection对象,其中封装了socket,连接到服务端
try {
connection.sendRpcRequest(call); // send the rpc
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}

synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Call interrupted");
}
}

if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error);
}
} else {
return call.getRpcResponse();
}
}
}

从中可以看出首先是实例化一个Call对象,封装了输送的内容,Connection负责连接服务端,接受返回信息,放入对应的Call中,并唤醒Call,读出返回数据。其中一个Connection负责同个服务端地址,同个RPC协议,同个安全令牌的连接下的所有Call中内容的发送和接受,connection维护了Call的集合,通过callid知道对应返回的数据和发送的数据属于哪个Call对象,从后文可以看出。这里主要调用了createCall创造了Call对象,getConnection连接服务端,connection.sendRpcRequest发送请求并接受返回。接下来就深入这三个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
return new Call(rpcKind, rpcRequest);
}

static class Call {
final int id; // call id
final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done

private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
this.rpcRequest = param;

//生成callId,每一个Call实例对象都有唯一的id,为了connection接受消息后放入对应id的Call中
final Integer id = callId.get();
if (id == null) {
this.id = nextCallId();
} else {
callId.set(null);
this.id = id;
}

final Integer rc = retryCount.get();
if (rc == null) {
this.retry = 0;
} else {
this.retry = rc;
}
}
...
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
}
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId, serviceClass);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call));

//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}

其中维护了一个连接池(其实就是一个HashTable,保证线程安全),remoteId作为其key,当连接池没有连接则会新建一个Connection并将其添加到连接池中,初始化Connection就是一些赋值,可以在Connection中看到Socket,实际上是调用了Socket建立了TCP连接,并且Connection继承了Thread,在建立连接后会启动线程,不断等待结果的相应,在下文中可以看到。在”synchronized (connections)”代码块中是不会开始建立连接的,因为如果在同步代码块中连接会造成阻塞,降低整个系统的效率。真正的建立连接是connection.setupIOstreams中会调用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
* 建立channle的in和outStream,启动connection的线程接受返回消息
*/
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
if (socket != null || shouldCloseConnection.get()) {
return;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connecting to " + server);
}
short numRetries = 0;
Random rand = null;
while (true) {
setupConnection(); //建立socket连接
InputStream inStream = NetUtils.getInputStream(socket); //获得输入流
OutputStream outStream = NetUtils.getOutputStream(socket); //获得输出流
writeConnectionHeader(outStream); //传输连接头
...

if (doPing) {
inStream = new PingInputStream(inStream);
}
this.in = new DataInputStream(new BufferedInputStream(inStream));

// SASL may have already buffered the stream
if (!(outStream instanceof BufferedOutputStream)) {
outStream = new BufferedOutputStream(outStream);
}
this.out = new DataOutputStream(outStream);

writeConnectionContext(remoteId, authMethod); //传输上下文

// update last activity time
touch();

if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connected to " + server);
}

// start the receiver thread after the socket connection has been set
// up
start();
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {
markClosed((IOException)t);
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();
}
}


/**
* Write the connection header - this is sent when connection is established
* +----------------------------------+
* | "hrpc" 4 bytes |
* +----------------------------------+
* | Version (1 byte) |
* +----------------------------------+
* | Service Class (1 byte) |
* +----------------------------------+
* | AuthProtocol (1 byte) |
* +----------------------------------+
*/
private void writeConnectionHeader(OutputStream outStream)
throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
out.write(RpcConstants.HEADER.array());
out.write(RpcConstants.CURRENT_VERSION);
out.write(serviceClass);
out.write(authProtocol.callId);
out.flush();
}

这里的整个过程就是setupConnection方法建立连接,得到输入输出流,再传输Hadoop PRC的连接头和上下文(Configuration中设置的一些RPC的参数)。连接头从注释中可以看到,下文可以看到server端接受解析连接头。最后启动线程,不断等待接收相应结果。主要连接方法是setupConnection方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
while (true) {
try {
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(true);

/*
* Bind the socket to the host specified in the principal name of the
* client, to ensure Server matching address of the client connection
* to host name in principal passed.
*/
UserGroupInformation ticket = remoteId.getTicket();
if (ticket != null && ticket.hasKerberosCredentials()) {
KerberosInfo krbInfo =
remoteId.getProtocol().getAnnotation(KerberosInfo.class);
if (krbInfo != null && krbInfo.clientPrincipal() != null) {
String host =
SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());

// If host name is a valid local address then bind socket to it
InetAddress localAddr = NetUtils.getLocalInetAddress(host);
if (localAddr != null) {
this.socket.bind(new InetSocketAddress(localAddr, 0));
}
}
}

NetUtils.connect(this.socket, server, connectionTimeout);
if (rpcTimeout > 0) {
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
}
this.socket.setSoTimeout(pingInterval);
return;
} catch (ConnectTimeoutException toe) {
/* Check for an address change and update the local reference.
* Reset the failure counter if the address was changed
*/
if (updateAddress()) {
timeoutFailures = ioFailures = 0;
}
handleConnectionTimeout(timeoutFailures++,
maxRetriesOnSocketTimeouts, toe);
} catch (IOException ie) {
if (updateAddress()) {
timeoutFailures = ioFailures = 0;
}
handleConnectionFailure(ioFailures++, ie);
}
}
}

1
2
3
4
5
public class StandardSocketFactory extends SocketFactory {
public Socket createSocket() throws IOException {
return SocketChannel.open().socket();
}
}

setupConnection中调用了socketFactory.createSocket()根据具体不同实现大SocketFactory来创建Socket,默认是StandardSocketFactory实现,这里通过SocketChannel来得到socket,socket在setupConnection中设置,建立长连接和超时时间(默认为一分钟,socket.setSoTimeout设置)。最后通过NetUtils.connect建立连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public static void connect(Socket socket,
SocketAddress address,
int timeout) throws IOException {
connect(socket, address, null, timeout);
}

public static void connect(Socket socket,
SocketAddress endpoint,
SocketAddress localAddr,
int timeout) throws IOException {
if (socket == null || endpoint == null || timeout < 0) {
throw new IllegalArgumentException("Illegal argument for connect()");
}

SocketChannel ch = socket.getChannel();

if (localAddr != null) {
Class localClass = localAddr.getClass();
Class remoteClass = endpoint.getClass();
Preconditions.checkArgument(localClass.equals(remoteClass),
"Local address %s must be of same family as remote address %s.",
localAddr, endpoint);
socket.bind(localAddr);
}

try {
if (ch == null) {
// let the default implementation handle it.
socket.connect(endpoint, timeout);
} else {
SocketIOWithTimeout.connect(ch, endpoint, timeout);
}
} catch (SocketTimeoutException ste) {
throw new ConnectTimeoutException(ste.getMessage());
}

// There is a very rare case allowed by the TCP specification, such that
// if we are trying to connect to an endpoint on the local machine,
// and we end up choosing an ephemeral port equal to the destination port,
// we will actually end up getting connected to ourself (ie any data we
// send just comes right back). This is only possible if the target
// daemon is down, so we'll treat it like connection refused.
if (socket.getLocalPort() == socket.getPort() &&
socket.getLocalAddress().equals(socket.getInetAddress())) {
LOG.info("Detected a loopback TCP socket, disconnecting it");
socket.close();
throw new ConnectException(
"Localhost targeted connection resulted in a loopback. " +
"No daemon is listening on the target port.");
}
}


/**
* SocketIOWithTimeout.connect方法
*/
static void connect(SocketChannel channel,
SocketAddress endpoint, int timeout) throws IOException {

boolean blockingOn = channel.isBlocking();
if (blockingOn) {
channel.configureBlocking(false);
}

try {
if (channel.connect(endpoint)) {
return;
}

long timeoutLeft = timeout;
long endTime = (timeout > 0) ? (Time.now() + timeout): 0;

while (true) {
// we might have to call finishConnect() more than once
// for some channels (with user level protocols)

int ret = selector.select((SelectableChannel)channel,
SelectionKey.OP_CONNECT, timeoutLeft);

if (ret > 0 && channel.finishConnect()) {
return;
}

if (ret == 0 ||
(timeout > 0 &&
(timeoutLeft = (endTime - Time.now())) <= 0)) {
throw new SocketTimeoutException(
timeoutExceptionString(channel, timeout,
SelectionKey.OP_CONNECT));
}
}
} catch (IOException e) {
// javadoc for SocketChannel.connect() says channel should be closed.
try {
channel.close();
} catch (IOException ignored) {}
throw e;
} finally {
if (blockingOn && channel.isOpen()) {
channel.configureBlocking(true);
}
}
}

可以看到实际上就是socket或socketChannel(设置为不阻塞)的连接。和服务端的连接建立连接之后就是发送Call中的内容到服务端,并接收其相应结果,返回到了Client中的call方法,通过Connection.sendRpcRequest来发送方法信息和实际参数,并在其中启动Connection对象的线程,等待接收服务端响应消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
}

// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread,

// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);

synchronized (sendRpcRequestLock) {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}

if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);

byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
out.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
// unrecoverable state (eg half a call left on the wire).
// So, close the connection, killing any outstanding calls
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
});

try {
senderFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();

// cause should only be a RuntimeException as the Runnable above
// catches IOException
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("unexpected checked exception", cause);
}
}
}
}

将call中的消息写入到输出流中,写入的格式可以从注释中看到,首先是报文的长度,接着是报文头,最后是发送的请求内容。其中先将消息写入到临时的DataOutputBuffer,最后将其放入线程池中发送,避免了阻塞。前文讲到我们在建立连接之后启动了Connection的线程,不断从服务端接受响应消息。接下来我们看如何接受服务端返回的函数结果,即Connection的run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections " + connections.size());
try {
while (waitForWork()) {//wait here for work - read or close connection
receiveRpcResponse();
}
} catch (Throwable t) {
...
}


/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
private void receiveRpcResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();

try {
int totalLen = in.readInt();
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
checkResponse(header);

int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);

int callId = header.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId);

Call call = calls.get(callId);
RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
calls.remove(callId);
call.setRpcResponse(value);

// verify that length was correct
// only for ProtobufEngine where len can be verified easily
if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
ProtobufRpcEngine.RpcWrapper resWrapper =
(ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
if (totalLen != headerLen + resWrapper.getLength()) {
throw new RpcClientException(
"RPC response length mismatch on rpc success");
}
}
} else { // Rpc Request failed
// Verify that length was correct
if (totalLen != headerLen) {
throw new RpcClientException(
"RPC response length mismatch on rpc error");
}

final String exceptionClassName = header.hasExceptionClassName() ?
header.getExceptionClassName() :
"ServerDidNotSetExceptionClassName";
final String errorMsg = header.hasErrorMsg() ?
header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
final RpcErrorCodeProto erCode =
(header.hasErrorDetail() ? header.getErrorDetail() : null);
if (erCode == null) {
LOG.warn("Detailed error code not set by server on rpc error");
}
RemoteException re =
( (erCode == null) ?
new RemoteException(exceptionClassName, errorMsg) :
new RemoteException(exceptionClassName, errorMsg, erCode));
if (status == RpcStatusProto.ERROR) {
calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(re);
}
}
} catch (IOException e) {
markClosed(e);
}
}

可以看到首先是获得消息的长度,再是解析消息头,根据callId将发消息内容放入对应的Call中,将call.done赋值为true,并唤醒对应的Call。最后回到Client.call方法,返回call.getRpcResponse();到此hadoop 客户端的代码分析完毕。

服务端的实现

服务端首先是使用build模式创建一个RPC.Server对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Server build() throws IOException, HadoopIllegalArgumentException {
if (this.conf == null) {
throw new HadoopIllegalArgumentException("conf is not set");
}
if (this.protocol == null) {
throw new HadoopIllegalArgumentException("protocol is not set");
}
if (this.instance == null) {
throw new HadoopIllegalArgumentException("instance is not set");
}

return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
}

可以看到和客户端一样是通过getProtocolEngine得到不同的ProtocolEngine类来生成不同的Server对象,同样我们来看默认的ProtocolEngine类WritableRpcEngine。调用了WritableRpcEngine的getServer的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public RPC.Server getServer(Class<?> protocolClass,
Object protocolImpl, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
}



public static class Server extends RPC.Server {
...
/**
* Construct an RPC server.
* @param protocolClass - the protocol being registered
* can be null for compatibility with old usage (see below for details)
* @param protocolImpl the protocol impl that will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*/
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
classNameBase(protocolImpl.getClass().getName()), secretManager,
portRangeConfig);

this.verbose = verbose;

Class<?>[] protocols;
if (protocolClass == null) { // derive protocol from impl
/*
* In order to remain compatible with the old usage where a single
* target protocolImpl is suppled for all protocol interfaces, and
* the protocolImpl is derived from the protocolClass(es)
* we register all interfaces extended by the protocolImpl
*/
protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());

} else {
if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
throw new IOException("protocolClass "+ protocolClass +
" is not implemented by protocolImpl which is of class " +
protocolImpl.getClass());
}
// register protocol class and its super interfaces
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
protocols = RPC.getProtocolInterfaces(protocolClass);
}
for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) {
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
}
}

}
}


// Register protocol and its impl for rpc calls
void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
Object protocolImpl) {
String protocolName = RPC.getProtocolName(protocolClass);
long version;


try {
version = RPC.getProtocolVersion(protocolClass);
} catch (Exception ex) {
LOG.warn("Protocol " + protocolClass +
" NOT registered as cannot get protocol version ");
return;
}


getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
new ProtoClassProtoImpl(protocolClass, protocolImpl));
LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName + " version=" + version +
" ProtocolImpl=" + protocolImpl.getClass().getName() +
" protocolClass=" + protocolClass.getName());
}


Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RPC.RpcKind rpcKind) {
if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
protocolImplMapArray.add(
new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
}
}
return protocolImplMapArray.get(rpcKind.ordinal());
}

WritableRpcEngine是继承自RPC.Server,而RPC.Server是继承ipc的Server的,getServer直接初始化了WritableRpcEngine.Server对象,可以看到在初始化的过程中,首先是调用了父类的构造函数,接着将协议的具体实现类根据RPC_WRITABLE和协议接口名放入到缓存中。接着我们来看父类的构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract static class Server extends org.apache.hadoop.ipc.Server {
...
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
int numReaders, int queueSizePerHandler,
Configuration conf, String serverName,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig) throws IOException {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
conf, serverName, secretManager, portRangeConfig);
initProtocolMetaInfo(conf);
}

private void initProtocolMetaInfo(Configuration conf) {
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(this);
BlockingService protocolInfoBlockingService = ProtocolInfoService
.newReflectiveBlockingService(xlator);
addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, ProtocolMetaInfoPB.class,
protocolInfoBlockingService);
}
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public abstract class Server {
protected Server(String bindAddress, int port,
Class<? extends Writable> rpcRequestClass, int handlerCount,
int numReaders, int queueSizePerHandler, Configuration conf,
String serverName, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.portRangeConfig = portRangeConfig;
this.port = port;
this.rpcRequestClass = rpcRequestClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
...
listener = new Listener();
this.port = listener.getAddress().getPort();
connectionManager = new ConnectionManager();
this.rpcMetrics = RpcMetrics.create(this, conf);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
this.tcpNoDelay = conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);

this.setLogSlowRPC(conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));

// Create the responder here
responder = new Responder();
...
}
}

可以看到父类的初始化主要是对一些变量的赋值,最主要的是初始化了listener和responder对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {

private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address; //the address we bind at
private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);

public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);

// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
//如果没有指定值,readThreads默认为1
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}

// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
...
}

Listener初始化了ServerSocketChannel,绑定了指定的一个或者一串中的一个端口,初始化了Selector,向其中注册了”连接就绪“的事件。并定义启动了几个读线程(Reader类)。Reader类主要用于读取客户端发送的信息,范序列化,生成相应的Call对象。每个Reader类中都有一个Selector,用于监听“读就绪”事件。在Listener的选择器中收到连接就绪的事件就会socketChannel封装进Connection中,将读事件注册给Reader的选择器,有相应的reader来负责读取信息。(下文会详细看到)现在先大致看一下responder类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 // Sends responses of RPC back to clients.
private class Responder extends Thread {
private final Selector writeSelector;
private int pending; // connections waiting to register

final static int PURGE_INTERVAL = 900000; // 15mins

Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open(); // create a selector
pending = 0;
}

@Override
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
try {
doRunLoop();
} finally {
LOG.info("Stopping " + Thread.currentThread().getName());
try {
writeSelector.close();
} catch (IOException ioe) {
LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe);
}
}
}
...
}

可以看到responder也是一个线程类,这个线程类主要是将RPC的相应信息传给客户端,可以看到其内部也有一个selector对象,主要是一些写事件会注册在其中。
在初始化Listener和Responder对象,生成Server实例之后,就需要调用Server.start()来启动服务端。

1
2
3
4
5
6
7
8
9
10
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];

for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}

分别启动了listener、responder和handlers的线程。接下来看listener的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
getSelector().select(); //阻塞直到注册的socket管道有事件触发为止
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
connectionManager.closeIdle(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
}
LOG.info("Stopping " + Thread.currentThread().getName());

synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) { }

selector= null;
acceptChannel= null;

// close all connections
connectionManager.stopIdleScan();
connectionManager.closeAll();
}
}

void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {

channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);

Reader reader = getReader(); //数组递增,获取下一个reader
Connection c = connectionManager.register(channel); //构造Connection对象并放入connectionManager管理,如果connectionManager的最大连接数将无法建立连接
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
IOUtils.cleanup(null, channel);
}
connectionManager.droppedConnections.getAndIncrement();
continue;
}
//将其添加到SelectionKey中,当发生错误时可以从SelectionKey中获得Connection将其关闭
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
}
}

listener线程等待注册的连接事件触发,调用doAccept函数,doAccept中获得连接的socket,构造一个Connection对象放入connectionManager的缓存中(内部有一个Set connections变量),这个Connection类是ipc.Server的内部类,和上述Client中的内部类不同。接着在递增的取出一个reader将connection放入reader内部的阻塞队列中。

1
2
3
4
5
6
public void addConnection(Connection conn) throws InterruptedException {
//pendingConnections为阻塞队列 BlockingQueue<Connection>
pendingConnections.put(conn);
//唤醒readSelector
readSelector.wakeup();
}

接着我们来看Reader的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public void run() {
LOG.info("Starting " + Thread.currentThread().getName());
try {
doRunLoop();
} finally {
try {
readSelector.close();
} catch (IOException ioe) {
LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
}
}
}

private synchronized void doRunLoop() {
while (running) {
SelectionKey key = null;
try {
// consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();
for (int i=size; i>0; i--) {
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
}
readSelector.select(); //不用经过判断,如果没有channel触发事件,阻塞直到有一个注册的事件就绪为止或者调用weakup()

Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isReadable()) {
doRead(key);
}
} catch (CancelledKeyException cke) {
// something else closed the connection, ex. responder or
// the listener doing an idle scan. ignore it and let them
// clean up.
LOG.info(Thread.currentThread().getName() +
": connection aborted from " + key.attachment());
}
key = null;
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
} catch (Throwable re) {
LOG.fatal("Bug in read selector!", re);
ExitUtil.terminate(1, "Bug in read selector!");
}
}
}

从阻塞队列中取出connection,在readSelector注册读事件,选择器响应,调用doRead方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();
if (c == null) {
return;
}
c.setLastContact(Time.now());

try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
...
}
}

public int readAndProcess()
throws WrappedRpcServerException, IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/
int count = -1;
//dataLengthBuffer为四个字节,可能是接受报头,也可能是数据的长度
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
/**
* 如果connectionHeaderRead是true,
* 表示已经读过连接头,那么不需要读取头部数据,dataLengthBuffer存储的数据的长度。
* 如果connectionHeaderRead是false,
* 表示dataLengthBuffer读取的是hrpc
* +----------------------------------+
* | "hrpc" 4 bytes |
* +----------------------------------+
* | Version (1 byte) |
* +----------------------------------+
* | Service Class (1 byte) |
* +----------------------------------+
* | AuthProtocol (1 byte) |
* +----------------------------------+
*/
if (!connectionHeaderRead) {
//Every connection is expected to send the header.
//connectionHeaderBuf主要存放后三个字节
if (connectionHeaderBuf == null) {
connectionHeaderBuf = ByteBuffer.allocate(3);
}
count = channelRead(channel, connectionHeaderBuf);
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count;
}
int version = connectionHeaderBuf.get(0);
// TODO we should add handler for service class later
this.setServiceClass(connectionHeaderBuf.get(1));
dataLengthBuffer.flip();

// Check if it looks like the user is hitting an IPC port
// with an HTTP GET - this is a common error, so we can
// send back a simple string indicating as much.
if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {
setupHttpRequestOnIpcPortResponse();
return -1;
}
//RpcConstants.HEADER为“hrpc”
if (!RpcConstants.HEADER.equals(dataLengthBuffer)
|| version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
setupBadVersionResponse(version);
return -1;
}

// this may switch us into SIMPLE
authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));

dataLengthBuffer.clear();
connectionHeaderBuf = null;
connectionHeaderRead = true;
continue;
}

//分配空间为dataLength的长度
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
checkDataLength(dataLength);
data = ByteBuffer.allocate(dataLength);
}

//读取数据
count = channelRead(channel, data);

//第一次是读取PRC的context,connectionContextRead为false,读取完context后为true
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
//如果是context,则读取context,否则读取其数据
processOneRpc(data.array());
data = null;
if (!isHeaderRead) {
continue;
}
}
return count;
}
}


/**
* Process an RPC Request - handle connection setup and decoding of
* request into a Call
* @param buf - contains the RPC request header and the rpc request
* @throws IOException - internal error that should not be returned to
* client, typically failure to respond to client
* @throws WrappedRpcServerException - an exception to be sent back to
* the client that does not require verbose logging by the
* Listener thread
* @throws InterruptedException
*/
private void processOneRpc(byte[] buf)
throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1;
int retry = RpcConstants.INVALID_RETRY_COUNT;
try {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
callId = header.getCallId();
retry = header.getRetryCount();
if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId);
}
checkRpcHeaders(header);

//这里通过callId来判数据是context还是实际数据,如果是context,callId为CONNECTION_CONTEXT_CALL_ID = -3,否则callId > 0
if (callId < 0) { // callIds typically used during connection setup
//connectionContextRead在该方法中赋值为true
processRpcOutOfBandRequest(header, dis);
} else if (!connectionContextRead) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context not established");
} else {
//读取数据
processRpcRequest(header, dis);
}
} catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause();
final Call call = new Call(callId, retry, null, this);
setupResponse(authFailedResponse, call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage());
call.sendResponse();
throw wrse;
}
}


/**
* Process an RPC Request - the connection headers and context must
* have been already read
* @param header - RPC request header
* @param dis - stream to request payload
* @throws WrappedRpcServerException - due to fatal rpc layer issues such
* as invalid header or deserialization error. In this case a RPC fatal
* status response will later be sent back to client.
* @throws InterruptedException
*/
private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException,
InterruptedException {
Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) {
LOG.warn("Unknown rpc kind " + header.getRpcKind() +
" from client " + getHostAddress());
final String err = "Unknown rpc kind in rpc header" +
header.getRpcKind();
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
}
Writable rpcRequest;
try { //Read the rpc request
//利用反射实例化对象,并读取信息,得到调用方法和参数的数据。如果是WritableRpcEngine,对应的是Invocation
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) { // includes runtime exception from newInstance
LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getRpcKind(), t);
String err = "IPC server unable to read call parameters: "+ t.getMessage();
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
}

Span traceSpan = null;
if (header.hasTraceInfo()) {
// If the incoming RPC included tracing info, always continue the trace
TraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(),
header.getTraceInfo().getParentId());
traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach();
}

//将信息封装成Call对象,包括callId,调用对象信息的等,和Client的Call相对应
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);

if (callQueue.isClientBackoffEnabled()) {
// if RPC queue is full, we will ask the RPC client to back off by
// throwing RetriableException. Whether RPC client will honor
// RetriableException and retry depends on client ipc retry policy.
// For example, FailoverOnNetworkExceptionRetry handles
// RetriableException.
queueRequestOrAskClientToBackOff(call);
} else {
callQueue.put(call); // queue the call; maybe blocked here
}
incRpcCount(); // Increment the rpc count
}

内部调用了Connection.readAndProcess方法,dataLengthBuffer可能读取到连接头,如果是连接头则dataLengthBuffer里的内容是“hrpc”,否则dataLengthBuffer读取的是数据的长度,这里有分context和实际的数据,这个都由processOneRpc处理,这里通过callId是否小于0来判断是否是context(context的callId为-3),如果是context,则会将connectionContextRead设置true,最后通过processRpcRequest方法读取实际数据。通过反射机制创建具体的Writable子类,通过ReadFile的发序列化得到内容,最后将CallId,Wriable等封装进Call中(和Client的Call相对应),放入callQueue这个队列值,而callQueue中的Call的处理是交给Handler处理。我们来看Handler的run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
private class Handler extends Thread {
public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
}

@Override
public void run() {
LOG.debug(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
TraceScope traceScope = null;
try {
//从阻塞队列中取出一个Call对象
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
}
if (!call.connection.channel.isOpen()) {
LOG.info(Thread.currentThread().getName() + ": skipped " + call);
continue;
}
String errorClass = null;
String error = null;
RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
RpcErrorCodeProto detailedErr = null;
Writable value = null;

//放入ThreadLocal,以便获取当前处理的call等信息
CurCall.set(call);
if (call.traceSpan != null) {
traceScope = Trace.continueSpan(call.traceSpan);
}

try {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
//调用call调用对应的方法
if (call.connection.user == null) {
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);

}
}
);
}
} catch (Throwable e) {
...
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
setupResponse(buf, call, returnStatus, detailedErr,
value, errorClass, error);

// Discard the large buf and reset it back to smaller size
// to free up heap.
if (buf.size() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call "
+ call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
call.sendResponse();
}
} catch (InterruptedException e) {
...
} finally {
...
}
}
LOG.debug(Thread.currentThread().getName() + ": exiting");
}

}

可以看到handler的run方法,从callQueue阻塞队列中取出Call对象,接着调用call方法来执行指定的方法,我们接着来看call方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** Called for each call. 
* call的接口方法
*/
public abstract Writable call(RPC.RpcKind rpcKind, String protocol,
Writable param, long receiveTime) throws Exception;

//具体实现
@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}

public static RpcInvoker getRpcInvoker(RPC.RpcKind rpcKind) {
RpcKindMapValue val = rpcKindMap.get(rpcKind);
return (val == null) ? null : val.rpcInvoker;
}

可以看到实际到最好是调用看RpcInvoker的具体实现类的call方法,而该类是从rpcKindMap中通过rpcKind取出的,那么这个RpcInvoker的具体实现类是什么时候放进去的,具体是什么。追踪rpcKindMap使用的地方可以看到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class WritableRpcEngine implements RpcEngine {
/**
* Register the rpcRequest deserializer for WritableRpcEngine
*/
private static synchronized void initialize() {
org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE,
Invocation.class, new Server.WritableRpcInvoker());
isInitialized = true;
}
}

public static void registerProtocolEngine(RPC.RpcKind rpcKind,
Class<? extends Writable> rpcRequestWrapperClass,
RpcInvoker rpcInvoker) {
RpcKindMapValue old =
rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
if (old != null) {
rpcKindMap.put(rpcKind, old);
throw new IllegalArgumentException("ReRegistration of rpcKind: " +
rpcKind);
}
LOG.debug("rpcKind=" + rpcKind +
", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
", rpcInvoker=" + rpcInvoker);
}

可以看出在当初WritableRpcEngine类加载时就已经将对应的初始化的Server.WritableRpcInvoker类放入rpcKindMap中,所以如果是采用WritableRpcEngine类的,那现在从rpcKindMap取出的就是Server.WritableRpcInvoker类。具体看该类下的call方法如果通过反射技术调用过程得到具体的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
static class WritableRpcInvoker implements RpcInvoker {
@Override
public Writable call(org.apache.hadoop.ipc.RPC.Server server,
String protocolName, Writable rpcRequest, long receivedTime)
throws IOException, RPC.VersionMismatch {

//得到当初客户端序列化的Invocation对象
Invocation call = (Invocation)rpcRequest;
if (server.verbose) log("Call: " + call);

// Verify writable rpc version
if (call.getRpcVersion() != writableRpcVersion) {
// Client is using a different version of WritableRpc
throw new RpcServerException(
"WritableRpc version mismatch, client side version="
+ call.getRpcVersion() + ", server side version="
+ writableRpcVersion);
}

long clientVersion = call.getProtocolVersion();
final String protoName;
ProtoClassProtoImpl protocolImpl;
if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
// VersionProtocol methods are often used by client to figure out
// which version of protocol to use.
//
// Versioned protocol methods should go the protocolName protocol
// rather than the declaring class of the method since the
// the declaring class is VersionedProtocol which is not
// registered directly.
// Send the call to the highest protocol version
VerProtocolImpl highest = server.getHighestSupportedProtocol(
RPC.RpcKind.RPC_WRITABLE, protocolName);
if (highest == null) {
throw new RpcServerException("Unknown protocol: " + protocolName);
}
protocolImpl = highest.protocolTarget;
} else {
protoName = call.declaringClassProtocolName;

// Find the right impl for the protocol based on client version.
//根据协议名和客户端的版本号,RpcKind来找到具体实现类
ProtoNameVer pv =
new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
protocolImpl =
server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
if (protocolImpl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE,
protoName);
if (highest == null) {
throw new RpcServerException("Unknown protocol: " + protoName);
} else { // protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
}
}


// Invoke the protocol method
long startTime = Time.now();
int qTime = (int) (startTime-receivedTime);
Exception exception = null;
try {
//根据方法名来找到方法,利用反射来调用该方法得到结果
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
if (server.verbose) log("Return: "+value);
//封装进ObjectWritable类中返回
return new ObjectWritable(method.getReturnType(), value);

} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
exception = (IOException)target;
throw (IOException)target;
} else {
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
exception = ioe;
throw ioe;
}
} catch (Throwable e) {
if (!(e instanceof IOException)) {
LOG.error("Unexpected throwable object ", e);
}
IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
exception = ioe;
throw ioe;
} finally {
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
String detailedMetricsName = (exception == null) ?
call.getMethodName() :
exception.getClass().getSimpleName();
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
if (server.isLogSlowRPC()) {
server.logSlowRpcCalls(call.getMethodName(), processingTime);
}
}
}
}

WritableRpcInvoker的call方法中将rpcRequest转型为Invocation对象,取出其中的协议名和版本号,根据其从server.getProtocolImplMap中得到协议具体实现类对象,从而通过方法名利用反射调用相应的方法,返回结果封装进ObjectWritable中。至于是如何得到协议具体实现类对象的,可以从以下方法中看出。

1
2
3
4
5
6
7
8
9
10
   
Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RPC.RpcKind rpcKind) {
if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
protocolImplMapArray.add(
new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
}
}
return protocolImplMapArray.get(rpcKind.ordinal());
}

根据rpcKind从protocolImplMapArray取出Map<ProtoNameVer, ProtoClassProtoImpl>对象,再根据ProtoNameVer取出具体实例对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public Server(Class<?> protocolClass, Object protocolImpl,
Configuration conf, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
classNameBase(protocolImpl.getClass().getName()), secretManager,
portRangeConfig);

this.verbose = verbose;


Class<?>[] protocols;
if (protocolClass == null) { // derive protocol from impl
/*
* In order to remain compatible with the old usage where a single
* target protocolImpl is suppled for all protocol interfaces, and
* the protocolImpl is derived from the protocolClass(es)
* we register all interfaces extended by the protocolImpl
*/
protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());

} else {
if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
throw new IOException("protocolClass "+ protocolClass +
" is not implemented by protocolImpl which is of class " +
protocolImpl.getClass());
}
// register protocol class and its super interfaces
//将protocolClass和protocolImpl放入map中
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
//得到protocolClass的所有接口
protocols = RPC.getProtocolInterfaces(protocolClass);
}
//将protocolClass的所有接口和protocolImpl放入map中
for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) {
//
registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
}
}

}

void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
Object protocolImpl) {
String protocolName = RPC.getProtocolName(protocolClass);
long version;


try {
version = RPC.getProtocolVersion(protocolClass);
} catch (Exception ex) {
LOG.warn("Protocol " + protocolClass +
" NOT registered as cannot get protocol version ");
return;
}

//放入map中
getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
new ProtoClassProtoImpl(protocolClass, protocolImpl));
LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName + " version=" + version +
" ProtocolImpl=" + protocolImpl.getClass().getName() +
" protocolClass=" + protocolClass.getName());
}

可以从上面看出,在初始化Server对象时,就已经将协议具体类和其所有接口类放入一个Map中,而这个map根据rpcKind放在了列表的rpcKind位置处。

现在回到Handler的run方法中,在调用call方法获得结果值后,调用setupResponse和call.sendResponse()方法,我们来具体看看这两个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Setup response for the IPC Call.
*
* @param responseBuf buffer to serialize the response into
* @param call {@link Call} to which we are setting up the response
* @param status of the IPC call
* @param rv return value for the IPC Call, if the call was successful
* @param errorClass error class, if the the call failed
* @param error error message, if the call failed
* @throws IOException
*/
private static void setupResponse(ByteArrayOutputStream responseBuf,
Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
Writable rv, String errorClass, String error)
throws IOException {
responseBuf.reset();
DataOutputStream out = new DataOutputStream(responseBuf);
//构造响应数据头
RpcResponseHeaderProto.Builder headerBuilder = RpcResponseHeaderProto.newBuilder();
headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
headerBuilder.setCallId(call.callId);
headerBuilder.setRetryCount(call.retryCount);
headerBuilder.setStatus(status);
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);

if (status == RpcStatusProto.SUCCESS) {
RpcResponseHeaderProto header = headerBuilder.build();
final int headerLen = header.getSerializedSize();
int fullLength = CodedOutputStream.computeRawVarint32Size(headerLen) +
headerLen;
try {
if (rv instanceof ProtobufRpcEngine.RpcWrapper) {
ProtobufRpcEngine.RpcWrapper resWrapper =
(ProtobufRpcEngine.RpcWrapper) rv;
fullLength += resWrapper.getLength();
out.writeInt(fullLength);
header.writeDelimitedTo(out);
rv.write(out);
} else { // Have to serialize to buffer to get len
//写入数据长度,数据头,数据内容(序列化响应返回的内容)
final DataOutputBuffer buf = new DataOutputBuffer();
rv.write(buf);
byte[] data = buf.getData();
fullLength += buf.getLength();
out.writeInt(fullLength);
header.writeDelimitedTo(out);
out.write(data, 0, buf.getLength());
}
} catch (Throwable t) {
LOG.warn("Error serializing call response for call " + call, t);
// Call back to same function - this is OK since the
// buffer is reset at the top, and since status is changed
// to ERROR it won't infinite loop.
setupResponse(responseBuf, call, RpcStatusProto.ERROR,
RpcErrorCodeProto.ERROR_SERIALIZING_RESPONSE,
null, t.getClass().getName(),
StringUtils.stringifyException(t));
return;
}
} else { // Rpc Failure
headerBuilder.setExceptionClassName(errorClass);
headerBuilder.setErrorMsg(error);
headerBuilder.setErrorDetail(erCode);
RpcResponseHeaderProto header = headerBuilder.build();
int headerLen = header.getSerializedSize();
final int fullLength =
CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;
out.writeInt(fullLength);
header.writeDelimitedTo(out);
}
//将其设置在Call对象中
call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
}

setupResponse中主要是序列化响应的对象,并将其放入Call对象的rpcResponse中。接着就需要将Call传递给Responder来处理了。可以从Call.sendResponse()看到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//Call.sendResponse()
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"})
public void sendResponse() throws IOException {
int count = responseWaitCount.decrementAndGet();
assert count >= 0 : "response has already been sent";
if (count == 0) {
connection.sendResponse(this);
}
}

//Connection.sendResponse(Call call)
private void sendResponse(Call call) throws IOException {
responder.doRespond(call);
}


//
// Enqueue a response from the application.
//
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
// must only wrap before adding to the responseQueue to prevent
// postponed responses from being encrypted and sent out of order.
if (call.connection.useWrap) {
ByteArrayOutputStream response = new ByteArrayOutputStream();
wrapWithSasl(response, call);
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
//放入responseQueue中
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}



// Processes one response. Returns true if there are no more pending
// data for this channel.
//
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, then we are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
//
// Extract the first call
// 从列表中取出第一个元素
//
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
}
//
// Send as much data as we can in the non-blocking fashion
// 尝试一次性发送结果
//
int numBytes = channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
//如果没有余留,则表示发送成功
if (!call.rpcResponse.hasRemaining()) {
//Clear out the response buffer so it can be collected
call.rpcResponse = null;
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote " + numBytes + " bytes.");
}
} else {
//
// If we were unable to write the entire response out, then
// insert in Selector queue.
// 否则需要放入列表中,向responser的writeSelector注册写事件
//
call.connection.responseQueue.addFirst(call);

if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = Time.now();

incPending();
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
//向选择器注册
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote partial " + numBytes + " bytes.");
}
}
error = false; // everything went off well
}
} finally {
if (error && call != null) {
LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
}
return done;
}

可以看到,最后放到了responseQueue中,responseQueue是一个列表,不是一个阻塞对列,每个Call封装了从客户端反序列化的对象信息,发送回客户端的数据还有Connection(每一个客户端和服务端维持一个Connection,因此call.connection.responseQueue代表着连接着同个客户端的响应消息发送队列,发送到同个客户端的call存放在同一个responseQueue中)。当responseQueue的长度为1时会调用processResponse,但如果存在一些特殊情况(返回的结果数量太大或者网络缓慢),没能一次性将结果发送,则会向Responser的selector注册写事件,由Responser将响应结果采用异步的方式继续发送未发送完的数据。来看Responser的run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@Override
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
try {
doRunLoop();
} finally {
LOG.info("Stopping " + Thread.currentThread().getName());
try {
writeSelector.close();
} catch (IOException ioe) {
LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe);
}
}
}


private void doRunLoop() {
long lastPurgeTime = 0; // last check for old calls.

while (running) {
try {
waitPending(); // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isWritable()) {
doAsyncWrite(key);
}
} catch (CancelledKeyException cke) {
// something else closed the connection, ex. reader or the
// listener doing an idle scan. ignore it and let them clean
// up
Call call = (Call)key.attachment();
if (call != null) {
LOG.info(Thread.currentThread().getName() +
": connection aborted from " + call.connection);
}
} catch (IOException e) {
LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
}
}
//等待时间设置,如果消息超过该时间还没有发送成功,则会执行后面的代码,关闭管道
long now = Time.now();
if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some calls that have not been sent out for a
// long time, discard them.
//
if(LOG.isDebugEnabled()) {
LOG.debug("Checking for old call responses.");
}
ArrayList<Call> calls;

// get the list of channels from list of keys.
// 搜集超过时间还未发送的call
synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call)key.attachment();
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}

//逐个关闭
for(Call call : calls) {
doPurge(call, now);
}
} catch (OutOfMemoryError e) {
//
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
LOG.warn("Out of Memory in server select", e);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
LOG.warn("Exception in Responder", e);
}
}
}


private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call)key.attachment();
if (call == null) {
return;
}
if (key.channel() != call.connection.channel) {
throw new IOException("doAsyncWrite: bad channel");
}

synchronized(call.connection.responseQueue) {
if (processResponse(call.connection.responseQueue, false)) {
try {
key.interestOps(0);
} catch (CancelledKeyException e) {
/* The Listener/reader might have closed the socket.
* We don't explicitly cancel the key, so not sure if this will
* ever fire.
* This warning could be removed.
*/
LOG.warn("Exception while changing ops : " + e);
}
}
}
}

Responser异步继续发送在选择器中注册的写事件中的响应消息(还是调用),并设定超时时间,超过时间还未发送则对其进行关闭。

总体流程

客户端

客户端实际时使用了动态代理在在Invoker方法中发送了远程过程调用的请求到服务端,并等待接受结果。

1540654160948

服务端

服务端采用reactor基于事件驱动的设计模式,利用JDK自带的socket通信机制和线程池完成。

1540654304046

参考

http://bigdatadecode.club/Hadoop%20RPC%20%E8%A7%A3%E6%9E%90.html

《Hadoop技术内幕:深入解析YARN架构设计与实现原理》,图片也来自此。

0%