RPC
RPC就是远程过程调用,具体什么是RPC,看一个例子就会明白。
比如客户端有一个RPC协议类Protocol。
1 | interfce Protocol{ |
但是客户端没有其实现的具体类,该类在服务端1
2
3
4
5Class ProtocolImpl implenets Protocol{
int add(int a, int b){
return a + b;
}
}
则客户端需要调用ProtocolImpl的add方法,需要将调用的方法及其参数等信息发送给服务端,服务端解析信息,调用ProtocolImpl的add方法,将结果在传输给客户端,而RPC的目的就是使客户端仿佛在调用自身的方法来得到该方法的结果。
在这其中,Protocol就是一个RPC的协议,这个协议其实就是一个接口类,接口中的方法就是对外提供的远程调用方法。
RPC由四个模块组成:
1、通信模块。
两个相互协作的通信模块实现请求-应答协议,它们在客户和服务器之间传递请求和应答消息,一般不会对数据包进行任何处理。请求–应答协议的实现方式有同步方式和异步方式两种。同步模式下客户端程序一直阻塞到服务器端发送的应答请求到达本地; 而异步模式不同,客户端将请求发送到服务器端后,不必等待应答返回,可以做其他事情,待服务器端处理完请求后,主动通知客户端。在高并发应用场景中,一般采用异步模式以降低访问延迟和提高带宽利用率。
2、Stub 程序(代理程序)。
客户端和服务器端均包含Stub程序,可将之看做代理程序。它使得远程函数调用表现得跟本地调用一样,对用户程序完全透明。在客户端,它表现得就像一个本地程序,但不直接执行本地调用,而是将请求信息通过网络模块发送给服务器端。此外,当服务器发送应答后,它会解码对应结果。在服务器端,Stub程序依次进行解码请求消息中的参数、调用相应的服务过程和编码应答结果的返回值等处理。
3、调度程序。
调度程序接收来自通信模块的请求消息,并根据其中的标识选择一个Stub程序进行处理。通常客户端并发请求量比较大时,会采用线程池提高处理效率。
4、客户程序/服务过程。
请求的发出者和请求的处理者。
Hadoop RPC
Hadoop RPC主要分为四个部分,分别是序列化层、函数调用层、网络传输层和服务器端处理框架,具体实现机制如下:
序列化层。序列化主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储,在RPC框架中,
它主要用于将用户请求中的参数或者应答转化成字节流以便跨机器传输。Hadoop2.0之后,
主要用Protocol Buffers和Apache Avro,Hadoop本身也提供了一套序列化框架,
一个类只要实现Writable接口即可支持对象序列化与反序列化。
函数调用层。函数调用层主要功能是定位要调用的函数并执行该函数,
Hadoop RPC采用了Java反射机制(服务器端)与动态代理(客户端)实现了函数调用。
网络传输层。网络传输层描述了Client与Server之间消息传输的方式,Hadoop RPC采用了基于TCP/IP的Socket机制。
服务器端处理框架。服务器端处理框架可被抽象为网络I/O模型,它描述了客户端与服务器端间信息交互方式,
它的设计直接决定着服务器端的并发处理能力,常见的网络 I/O 模型有阻塞式 I/O、非阻塞式 I/O、事件驱动 I/O 等,而Hadoop RPC采用了基于Reactor设计模式的事件驱动 I/O 模型(NIO)。
Hadoop RPC的简单使用
首先需要定义一个PRC协议,该接口必须继承VersionedProtocol接口1
2
3
4public interface IProxyProtocol extends VersionedProtocol{
long versionID = 1234L;
int add(int a, int b);
}
接着需要一个类实现PRC的接口用于服务端的调用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class MyProxyProtocol implements IProxyProtocol {
public int add(int a, int b) {
System.out.println("I am adding");
return a+b;
}
public long getProtocolVersion(String s, long l) throws IOException {
System.out.println("MyProxy.ProtocolVersion = " + IProxyProtocol.versionID );
return IProxyProtocol.versionID ;
}
public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
return new ProtocolSignature(IProxyProtocol.versionID , null);
}
}
最后是实现客户端和服务端1
2
3
4
5
6
7
8
9
10
11
12
13
14public class MyRPCClient {
public final static int PORT = 8888;
public final static String ADDRESS = "localhost";
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
IProxyProtocol proxy;
proxy = RPC.getProxy(IProxyProtocol.class,
111 ,
new InetSocketAddress(ADDRESS, PORT), conf);
int result = proxy.add(2, 3);
System.out.println(result);
RPC.stopProxy(proxy);
}
}
1 | public class MyRPCServer { |
上述的Hadoop RPC的实现,客户端是调用了 RPC.getProxy方法,生成了IProxyProtocol的动态代理,在调用协议的方法时,代理类会将方法和参数出给服务端,服务端使用具体的实现类来执行方法并返回结果给客户端。需要注意的是RPC协议必须声明versionID这个变量或者定义ProtocolInfo注解(包含看协议的名字和versionID)
客户端的实现
首先我们来看Hadoop RPC的客户端代码,调用RPC.getProxy方法,追踪这个方法,可以看到其内部实现1
2
3
4public static <T> T getProxy(Class<T> protocol,long clientVersion,InetSocketAddress addr, Configuration conf)
throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
内部调用了getProtocolProxy,而getProtocolProxy又经过几次重载函数的调用,最后的实现是1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
* @param protocol protocol RPC协议
* @param clientVersion client's version
* @param addr server address 服务端的地址 ip:port
* @param ticket security ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
* @param connectionRetryPolicy retry policy
* @param fallbackToSimpleAuth set to true or false during calls to indicate if
* a secure client falls back to simple auth
* @return the proxy
* @throws IOException if any error occurs
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
}
其中ProtocolProxy封装了动态代理类和PRC协议,ProtocolProxy.getProxy会返回生成的动态代理类。而ProtocolProxy类是getProtocolEngine返回的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16/**
* 如果缓存中存在该协议的RpcEngine,则直接调用
* 否则就从Configuration中取出rpc.engine.protocol.getName()的RpeEngine类
* 默认为WritableRpcEngine类
*/
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
可以看到RpcEngine从Configuration读取,如果没有,默认设置是WritableRpcEngine。这是Hadoop RPC对序列化方式多样性的支持,目前提供了Writable(WritableRpcEngine)和Protocol Buffer(ProtocolRpcEngine)两种,用户也可以通过RPC.setProtocolEngine()设置。这里我们只分析WritableRpcEngine,跳转到WritableRpcEngine.getProxy方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
//默认为null
if (connectionRetryPolicy != null) {
throw new UnsupportedOperationException(
"Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
}
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy<T>(protocol, proxy, true);
}
可以看到这里使用了动态代理生成的了代理类,封装进了ProtocolProxy类中,而RPC.getProxy中是返回了ProtocolProxy.getProxy的结果。则调用PRC协议的方法会触发代理类的invoke方法,客户端就是在invoke方法中实现了方法名和参数的传递和结果的接受。因此我们继续看Invoker类,这是WritableRpcEngine的内部类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45private static class Invoker implements RpcInvocationHandler {
//ConnectionId中包括了服务端的地址,协议类,安全令牌等,用来唯一标识Client.Connection类
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
private final AtomicBoolean fallbackToSimpleAuth;
public Invoker(Class<?> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
TraceScope traceScope = null;
if (Trace.isTracing()) {
traceScope = Trace.startSpan(RpcClientUtil.methodToTraceString(method));
}
ObjectWritable value;
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
remoteId, fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
...
}
可以看到invoke中是调用了Client.call方法来进行远程函数调用,Client的获得是先从CLIENTS中的缓存(其实就是内部维护了一个HashMap<SocketFactory, Client>),如果没有就根据SocketFactory和序列化类型实例化一个并放入其中。call方法的一个参数new Invocation(method, args),其实是用来序列化方法类及其传递的参数的。Invocation类实现了Writable接口,Writable是hadoop用来序列化的接口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40private static class Invocation implements Writable, Configurable {
private String methodName; //方法名
private Class<?>[] parameterClasses; //参数类型
private Object[] parameters; //参数实例对象
private Configuration conf;
private long clientVersion; //客户端version,主要从方法所在类的ProtocolInfo注解中的version或者versionID中获得
private int clientMethodsHash; //方法所在类所有方法的形成的hash
private String declaringClassProtocolName; //要从方法所在类的ProtocolInfo注解中的name
...
public void readFields(DataInput in) throws IOException {
rpcVersion = in.readLong();
declaringClassProtocolName = UTF8.readString(in);
methodName = UTF8.readString(in);
clientVersion = in.readLong();
clientMethodsHash = in.readInt();
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
"deprecation") (
public void write(DataOutput out) throws IOException {
out.writeLong(rpcVersion);
UTF8.writeString(out, declaringClassProtocolName);
UTF8.writeString(out, methodName);
out.writeLong(clientVersion);
out.writeInt(clientMethodsHash);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], conf, true);
}
}
...
}
继续我们追踪Client.call,来了的Client类中(客户端核心的类),通过重载函数的调用,最后定位到了1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters 包含序列化的方法和方法参数
* @param remoteId - the target rpc server
* @param serviceClass - service class for RPC
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @returns the rpc response
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest); //将序列化的信息封装进Call中
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth); //获得Connection对象,其中封装了socket,连接到服务端
try {
connection.sendRpcRequest(call); // send the rpc
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Call interrupted");
}
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error);
}
} else {
return call.getRpcResponse();
}
}
}
从中可以看出首先是实例化一个Call对象,封装了输送的内容,Connection负责连接服务端,接受返回信息,放入对应的Call中,并唤醒Call,读出返回数据。其中一个Connection负责同个服务端地址,同个RPC协议,同个安全令牌的连接下的所有Call中内容的发送和接受,connection维护了Call的集合,通过callid知道对应返回的数据和发送的数据属于哪个Call对象,从后文可以看出。这里主要调用了createCall创造了Call对象,getConnection连接服务端,connection.sendRpcRequest发送请求并接受返回。接下来就深入这三个函数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
return new Call(rpcKind, rpcRequest);
}
static class Call {
final int id; // call id
final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
this.rpcRequest = param;
//生成callId,每一个Call实例对象都有唯一的id,为了connection接受消息后放入对应id的Call中
final Integer id = callId.get();
if (id == null) {
this.id = nextCallId();
} else {
callId.set(null);
this.id = id;
}
final Integer rc = retryCount.get();
if (rc == null) {
this.retry = 0;
} else {
this.retry = rc;
}
}
...
}
1 | /** Get a connection from the pool, or create a new one and add it to the |
其中维护了一个连接池(其实就是一个HashTable,保证线程安全),remoteId作为其key,当连接池没有连接则会新建一个Connection并将其添加到连接池中,初始化Connection就是一些赋值,可以在Connection中看到Socket,实际上是调用了Socket建立了TCP连接,并且Connection继承了Thread,在建立连接后会启动线程,不断等待结果的相应,在下文中可以看到。在”synchronized (connections)”代码块中是不会开始建立连接的,因为如果在同步代码块中连接会造成阻塞,降低整个系统的效率。真正的建立连接是connection.setupIOstreams中会调用的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
* 建立channle的in和outStream,启动connection的线程接受返回消息
*/
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
if (socket != null || shouldCloseConnection.get()) {
return;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connecting to " + server);
}
short numRetries = 0;
Random rand = null;
while (true) {
setupConnection(); //建立socket连接
InputStream inStream = NetUtils.getInputStream(socket); //获得输入流
OutputStream outStream = NetUtils.getOutputStream(socket); //获得输出流
writeConnectionHeader(outStream); //传输连接头
...
if (doPing) {
inStream = new PingInputStream(inStream);
}
this.in = new DataInputStream(new BufferedInputStream(inStream));
// SASL may have already buffered the stream
if (!(outStream instanceof BufferedOutputStream)) {
outStream = new BufferedOutputStream(outStream);
}
this.out = new DataOutputStream(outStream);
writeConnectionContext(remoteId, authMethod); //传输上下文
// update last activity time
touch();
if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connected to " + server);
}
// start the receiver thread after the socket connection has been set
// up
start();
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {
markClosed((IOException)t);
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();
}
}
/**
* Write the connection header - this is sent when connection is established
* +----------------------------------+
* | "hrpc" 4 bytes |
* +----------------------------------+
* | Version (1 byte) |
* +----------------------------------+
* | Service Class (1 byte) |
* +----------------------------------+
* | AuthProtocol (1 byte) |
* +----------------------------------+
*/
private void writeConnectionHeader(OutputStream outStream)
throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
out.write(RpcConstants.HEADER.array());
out.write(RpcConstants.CURRENT_VERSION);
out.write(serviceClass);
out.write(authProtocol.callId);
out.flush();
}
这里的整个过程就是setupConnection方法建立连接,得到输入输出流,再传输Hadoop PRC的连接头和上下文(Configuration中设置的一些RPC的参数)。连接头从注释中可以看到,下文可以看到server端接受解析连接头。最后启动线程,不断等待接收相应结果。主要连接方法是setupConnection方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
while (true) {
try {
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(true);
/*
* Bind the socket to the host specified in the principal name of the
* client, to ensure Server matching address of the client connection
* to host name in principal passed.
*/
UserGroupInformation ticket = remoteId.getTicket();
if (ticket != null && ticket.hasKerberosCredentials()) {
KerberosInfo krbInfo =
remoteId.getProtocol().getAnnotation(KerberosInfo.class);
if (krbInfo != null && krbInfo.clientPrincipal() != null) {
String host =
SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());
// If host name is a valid local address then bind socket to it
InetAddress localAddr = NetUtils.getLocalInetAddress(host);
if (localAddr != null) {
this.socket.bind(new InetSocketAddress(localAddr, 0));
}
}
}
NetUtils.connect(this.socket, server, connectionTimeout);
if (rpcTimeout > 0) {
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
}
this.socket.setSoTimeout(pingInterval);
return;
} catch (ConnectTimeoutException toe) {
/* Check for an address change and update the local reference.
* Reset the failure counter if the address was changed
*/
if (updateAddress()) {
timeoutFailures = ioFailures = 0;
}
handleConnectionTimeout(timeoutFailures++,
maxRetriesOnSocketTimeouts, toe);
} catch (IOException ie) {
if (updateAddress()) {
timeoutFailures = ioFailures = 0;
}
handleConnectionFailure(ioFailures++, ie);
}
}
}
1 | public class StandardSocketFactory extends SocketFactory { |
setupConnection中调用了socketFactory.createSocket()根据具体不同实现大SocketFactory来创建Socket,默认是StandardSocketFactory实现,这里通过SocketChannel来得到socket,socket在setupConnection中设置,建立长连接和超时时间(默认为一分钟,socket.setSoTimeout设置)。最后通过NetUtils.connect建立连接。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103public static void connect(Socket socket,
SocketAddress address,
int timeout) throws IOException {
connect(socket, address, null, timeout);
}
public static void connect(Socket socket,
SocketAddress endpoint,
SocketAddress localAddr,
int timeout) throws IOException {
if (socket == null || endpoint == null || timeout < 0) {
throw new IllegalArgumentException("Illegal argument for connect()");
}
SocketChannel ch = socket.getChannel();
if (localAddr != null) {
Class localClass = localAddr.getClass();
Class remoteClass = endpoint.getClass();
Preconditions.checkArgument(localClass.equals(remoteClass),
"Local address %s must be of same family as remote address %s.",
localAddr, endpoint);
socket.bind(localAddr);
}
try {
if (ch == null) {
// let the default implementation handle it.
socket.connect(endpoint, timeout);
} else {
SocketIOWithTimeout.connect(ch, endpoint, timeout);
}
} catch (SocketTimeoutException ste) {
throw new ConnectTimeoutException(ste.getMessage());
}
// There is a very rare case allowed by the TCP specification, such that
// if we are trying to connect to an endpoint on the local machine,
// and we end up choosing an ephemeral port equal to the destination port,
// we will actually end up getting connected to ourself (ie any data we
// send just comes right back). This is only possible if the target
// daemon is down, so we'll treat it like connection refused.
if (socket.getLocalPort() == socket.getPort() &&
socket.getLocalAddress().equals(socket.getInetAddress())) {
LOG.info("Detected a loopback TCP socket, disconnecting it");
socket.close();
throw new ConnectException(
"Localhost targeted connection resulted in a loopback. " +
"No daemon is listening on the target port.");
}
}
/**
* SocketIOWithTimeout.connect方法
*/
static void connect(SocketChannel channel,
SocketAddress endpoint, int timeout) throws IOException {
boolean blockingOn = channel.isBlocking();
if (blockingOn) {
channel.configureBlocking(false);
}
try {
if (channel.connect(endpoint)) {
return;
}
long timeoutLeft = timeout;
long endTime = (timeout > 0) ? (Time.now() + timeout): 0;
while (true) {
// we might have to call finishConnect() more than once
// for some channels (with user level protocols)
int ret = selector.select((SelectableChannel)channel,
SelectionKey.OP_CONNECT, timeoutLeft);
if (ret > 0 && channel.finishConnect()) {
return;
}
if (ret == 0 ||
(timeout > 0 &&
(timeoutLeft = (endTime - Time.now())) <= 0)) {
throw new SocketTimeoutException(
timeoutExceptionString(channel, timeout,
SelectionKey.OP_CONNECT));
}
}
} catch (IOException e) {
// javadoc for SocketChannel.connect() says channel should be closed.
try {
channel.close();
} catch (IOException ignored) {}
throw e;
} finally {
if (blockingOn && channel.isOpen()) {
channel.configureBlocking(true);
}
}
}
可以看到实际上就是socket或socketChannel(设置为不阻塞)的连接。和服务端的连接建立连接之后就是发送Call中的内容到服务端,并接收其相应结果,返回到了Client中的call方法,通过Connection.sendRpcRequest来发送方法信息和实际参数,并在其中启动Connection对象的线程,等待接收服务端响应消息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
}
// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread,
// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);
synchronized (sendRpcRequestLock) {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
public void run() {
try {
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
out.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
// unrecoverable state (eg half a call left on the wire).
// So, close the connection, killing any outstanding calls
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
});
try {
senderFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// cause should only be a RuntimeException as the Runnable above
// catches IOException
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("unexpected checked exception", cause);
}
}
}
}
将call中的消息写入到输出流中,写入的格式可以从注释中看到,首先是报文的长度,接着是报文头,最后是发送的请求内容。其中先将消息写入到临时的DataOutputBuffer,最后将其放入线程池中发送,避免了阻塞。前文讲到我们在建立连接之后启动了Connection的线程,不断从服务端接受响应消息。接下来我们看如何接受服务端返回的函数结果,即Connection的run方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections " + connections.size());
try {
while (waitForWork()) {//wait here for work - read or close connection
receiveRpcResponse();
}
} catch (Throwable t) {
...
}
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
private void receiveRpcResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();
try {
int totalLen = in.readInt();
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
checkResponse(header);
int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
int callId = header.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId);
Call call = calls.get(callId);
RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
calls.remove(callId);
call.setRpcResponse(value);
// verify that length was correct
// only for ProtobufEngine where len can be verified easily
if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
ProtobufRpcEngine.RpcWrapper resWrapper =
(ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
if (totalLen != headerLen + resWrapper.getLength()) {
throw new RpcClientException(
"RPC response length mismatch on rpc success");
}
}
} else { // Rpc Request failed
// Verify that length was correct
if (totalLen != headerLen) {
throw new RpcClientException(
"RPC response length mismatch on rpc error");
}
final String exceptionClassName = header.hasExceptionClassName() ?
header.getExceptionClassName() :
"ServerDidNotSetExceptionClassName";
final String errorMsg = header.hasErrorMsg() ?
header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
final RpcErrorCodeProto erCode =
(header.hasErrorDetail() ? header.getErrorDetail() : null);
if (erCode == null) {
LOG.warn("Detailed error code not set by server on rpc error");
}
RemoteException re =
( (erCode == null) ?
new RemoteException(exceptionClassName, errorMsg) :
new RemoteException(exceptionClassName, errorMsg, erCode));
if (status == RpcStatusProto.ERROR) {
calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(re);
}
}
} catch (IOException e) {
markClosed(e);
}
}
可以看到首先是获得消息的长度,再是解析消息头,根据callId将发消息内容放入对应的Call中,将call.done赋值为true,并唤醒对应的Call。最后回到Client.call方法,返回call.getRpcResponse();到此hadoop 客户端的代码分析完毕。
服务端的实现
服务端首先是使用build模式创建一个RPC.Server对象
1 | public Server build() throws IOException, HadoopIllegalArgumentException { |
可以看到和客户端一样是通过getProtocolEngine得到不同的ProtocolEngine类来生成不同的Server对象,同样我们来看默认的ProtocolEngine类WritableRpcEngine。调用了WritableRpcEngine的getServer的方法。
1 | public RPC.Server getServer(Class<?> protocolClass, |
WritableRpcEngine是继承自RPC.Server,而RPC.Server是继承ipc的Server的,getServer直接初始化了WritableRpcEngine.Server对象,可以看到在初始化的过程中,首先是调用了父类的构造函数,接着将协议的具体实现类根据RPC_WRITABLE和协议接口名放入到缓存中。接着我们来看父类的构造函数。
1 | public abstract static class Server extends org.apache.hadoop.ipc.Server { |
1 | public abstract class Server { |
可以看到父类的初始化主要是对一些变量的赋值,最主要的是初始化了listener和responder对象。
1 | /** Listens on the socket. Creates jobs for the handler threads*/ |
Listener初始化了ServerSocketChannel,绑定了指定的一个或者一串中的一个端口,初始化了Selector,向其中注册了”连接就绪“的事件。并定义启动了几个读线程(Reader类)。Reader类主要用于读取客户端发送的信息,范序列化,生成相应的Call对象。每个Reader类中都有一个Selector,用于监听“读就绪”事件。在Listener的选择器中收到连接就绪的事件就会socketChannel封装进Connection中,将读事件注册给Reader的选择器,有相应的reader来负责读取信息。(下文会详细看到)现在先大致看一下responder类。
1 | // Sends responses of RPC back to clients. |
可以看到responder也是一个线程类,这个线程类主要是将RPC的相应信息传给客户端,可以看到其内部也有一个selector对象,主要是一些写事件会注册在其中。
在初始化Listener和Responder对象,生成Server实例之后,就需要调用Server.start()来启动服务端。
1 | public synchronized void start() { |
分别启动了listener、responder和handlers的线程。接下来看listener的run方法
1 | public void run() { |
listener线程等待注册的连接事件触发,调用doAccept函数,doAccept中获得连接的socket,构造一个Connection对象放入connectionManager的缓存中(内部有一个Set
1 | public void addConnection(Connection conn) throws InterruptedException { |
接着我们来看Reader的run方法
1 | public void run() { |
从阻塞队列中取出connection,在readSelector注册读事件,选择器响应,调用doRead方法
1 | void doRead(SelectionKey key) throws InterruptedException { |
内部调用了Connection.readAndProcess方法,dataLengthBuffer可能读取到连接头,如果是连接头则dataLengthBuffer里的内容是“hrpc”,否则dataLengthBuffer读取的是数据的长度,这里有分context和实际的数据,这个都由processOneRpc处理,这里通过callId是否小于0来判断是否是context(context的callId为-3),如果是context,则会将connectionContextRead设置true,最后通过processRpcRequest方法读取实际数据。通过反射机制创建具体的Writable子类,通过ReadFile的发序列化得到内容,最后将CallId,Wriable等封装进Call中(和Client的Call相对应),放入callQueue这个队列值,而callQueue中的Call的处理是交给Handler处理。我们来看Handler的run方法。
1 | private class Handler extends Thread { |
可以看到handler的run方法,从callQueue阻塞队列中取出Call对象,接着调用call方法来执行指定的方法,我们接着来看call方法。
1 | /** Called for each call. |
可以看到实际到最好是调用看RpcInvoker的具体实现类的call方法,而该类是从rpcKindMap中通过rpcKind取出的,那么这个RpcInvoker的具体实现类是什么时候放进去的,具体是什么。追踪rpcKindMap使用的地方可以看到。
1 | public class WritableRpcEngine implements RpcEngine { |
可以看出在当初WritableRpcEngine类加载时就已经将对应的初始化的Server.WritableRpcInvoker类放入rpcKindMap中,所以如果是采用WritableRpcEngine类的,那现在从rpcKindMap取出的就是Server.WritableRpcInvoker类。具体看该类下的call方法如果通过反射技术调用过程得到具体的值。
1 | static class WritableRpcInvoker implements RpcInvoker { |
WritableRpcInvoker的call方法中将rpcRequest转型为Invocation对象,取出其中的协议名和版本号,根据其从server.getProtocolImplMap中得到协议具体实现类对象,从而通过方法名利用反射调用相应的方法,返回结果封装进ObjectWritable中。至于是如何得到协议具体实现类对象的,可以从以下方法中看出。
1 |
|
根据rpcKind从protocolImplMapArray取出Map<ProtoNameVer, ProtoClassProtoImpl>对象,再根据ProtoNameVer取出具体实例对象。
1 | public Server(Class<?> protocolClass, Object protocolImpl, |
可以从上面看出,在初始化Server对象时,就已经将协议具体类和其所有接口类放入一个Map中,而这个map根据rpcKind放在了列表的rpcKind位置处。
现在回到Handler的run方法中,在调用call方法获得结果值后,调用setupResponse和call.sendResponse()方法,我们来具体看看这两个方法。
1 | /** |
setupResponse中主要是序列化响应的对象,并将其放入Call对象的rpcResponse中。接着就需要将Call传递给Responder来处理了。可以从Call.sendResponse()看到。
1 | //Call.sendResponse() |
可以看到,最后放到了responseQueue中,responseQueue是一个列表,不是一个阻塞对列,每个Call封装了从客户端反序列化的对象信息,发送回客户端的数据还有Connection(每一个客户端和服务端维持一个Connection,因此call.connection.responseQueue代表着连接着同个客户端的响应消息发送队列,发送到同个客户端的call存放在同一个responseQueue中)。当responseQueue的长度为1时会调用processResponse,但如果存在一些特殊情况(返回的结果数量太大或者网络缓慢),没能一次性将结果发送,则会向Responser的selector注册写事件,由Responser将响应结果采用异步的方式继续发送未发送完的数据。来看Responser的run方法。
1 |
|
Responser异步继续发送在选择器中注册的写事件中的响应消息(还是调用),并设定超时时间,超过时间还未发送则对其进行关闭。
总体流程
客户端
客户端实际时使用了动态代理在在Invoker方法中发送了远程过程调用的请求到服务端,并等待接受结果。
服务端
服务端采用reactor基于事件驱动的设计模式,利用JDK自带的socket通信机制和线程池完成。
参考
http://bigdatadecode.club/Hadoop%20RPC%20%E8%A7%A3%E6%9E%90.html
《Hadoop技术内幕:深入解析YARN架构设计与实现原理》,图片也来自此。