java-rmi源码解析

欢迎查看Eetal的第二十一篇博客–java-rmi源码解析

相关核心类

sun.rmi.server.UnicastServerRef
sun.rmi.server.UnicastRef
sun.rmi.server.Util
sun.rmi.transport.tcp.TCPEndpoint
sun.rmi.transport.LiveRef
java.rmi.Naming
sun.rmi.registry.RegistryImpl

rmi

RMI是Java的一组拥护开发分布式应用程序的API。
RMI使用Java语言接口定义了远程对象,它集合了Java序列化和Java远程方法协议(Java Remote Method Protocol)。
简单地说,这样使原先的程序在同一操作系统的方法调用,变成了不同操作系统之间程序的方法调用,由于J2EE是分布式程序平台,它以RMI机制实现程序组件在不同操作系统之间的通信。
比如,一个EJB可以通过RMI调用Web上另一台机器上的EJB远程方法。

简单使用rmi

要发布的服务接口

1
2
3
4
5
public interface HelloService extends Remote{
public String sayHello() throws RemoteException;
//要发布的服务类的方法必须都throws RemoteException
//在Util中,创建代理对象时会checkMethod,存在没有throws RemoteException的则抛出IllegalArgumentException
}

util中的checkMethod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 public static Remote createProxy(Class<?> implClass,
RemoteRef clientRef,
boolean forceStubUse)
throws StubNotFoundException
{
.....
final Class<?>[] interfaces = getRemoteInterfaces(implClass);
.....
}
private static Class<?>[] getRemoteInterfaces(Class<?> remoteClass) {
ArrayList<Class<?>> list = new ArrayList<>();
getRemoteInterfaces(list, remoteClass);
return list.toArray(new Class<?>[list.size()]);
}

private static void getRemoteInterfaces(ArrayList<Class<?>> list, Class<?> cl) {
.....
for (int i = 0; i < interfaces.length; i++) {
Class<?> intf = interfaces[i];
if (Remote.class.isAssignableFrom(intf)) {
if (!(list.contains(intf))) {
Method[] methods = intf.getMethods();
for (int j = 0; j < methods.length; j++) {
checkMethod(methods[j]);
}
list.add(intf);
}
}
}
}

private static void checkMethod(Method m) {
Class<?>[] ex = m.getExceptionTypes();
for (int i = 0; i < ex.length; i++) {
if (ex[i].isAssignableFrom(RemoteException.class))
return;
}
throw new IllegalArgumentException(
"illegal remote method encountered: " + m);
}

要发布的服务接口的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {
//需要继承UnicastRemoteObject,因为Remote只是接口,UnicastRemoteObject是Remote的实现类

protected HelloServiceImpl() throws RemoteException {
super();
}

@Override
public String sayHello() throws RemoteException{
return "hello";
}

}

服务发布端与客户消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Server {
final static String host = "127.0.0.1";
final static int port = 8080;
public static void main(String[] args) throws RemoteException, MalformedURLException {
HelloService helloService = new HelloServiceImpl();
LocateRegistry.createRegistry(port);
Naming.rebind("rmi://"+host+":"+port+"/hello", helloService);//不写port默认是1099
System.out.println("服务启动...");
}

}
public class Client {

public static void main(String[] args) throws MalformedURLException, RemoteException, NotBoundException {
HelloService helloService = (HelloService) Naming.lookup("rmi://"+Server.host+":"+Server.port+"/hello");//默认端口1099
System.out.println(helloService.sayHello());
}

}

追踪源码

启动服务前,创建服务对象会调用父类无参构造
HelloServiceImpl

1
2
3
protected HelloServiceImpl() throws RemoteException {
super();
}

UnicastRemoteObject构造

1
2
3
4
5
6
7
8
9
protected UnicastRemoteObject() throws RemoteException
{
this(0);
}
protected UnicastRemoteObject(int port) throws RemoteException
{
this.port = port;
exportObject((Remote) this, port);
}

在构造中调用了exportObject,将服务暴露出去

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Remote exportObject(Remote obj, int port)
throws RemoteException
{
return exportObject(obj, new UnicastServerRef(port));
}
private static Remote exportObject(Remote obj, UnicastServerRef sref)
throws RemoteException
{
if (obj instanceof UnicastRemoteObject) {
((UnicastRemoteObject) obj).ref = sref;
}
return sref.exportObject(obj, null, false);
}

构造UnicastServerRef服务器对象来发布服务

1
2
3
4
public UnicastServerRef(int port) {
super(new LiveRef(port));
this.filter = null;
}

UnicastServerRef调用父类带参构造

1
2
3
public UnicastRef(LiveRef liveRef) {
ref = liveRef;
}

sref对象发布服务细节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public Remote exportObject(Remote impl, Object data,
boolean permanent)
throws RemoteException
{
Class<?> implClass = impl.getClass();
Remote stub;

try {
stub = Util.createProxy(implClass, getClientRef(), forceStubUse);
} catch (IllegalArgumentException e) {
throw new ExportException(
"remote object implements illegal remote interface", e);
}
if (stub instanceof RemoteStub) {
setSkeleton(impl);
}

Target target =
new Target(impl, this, stub, ref.getObjID(), permanent);
ref.exportObject(target);
hashToMethod_Map = hashToMethod_Maps.get(implClass);
return stub;
}

创建代理对象,并生成一个可以真正发布的target,调用ref对象(上一步构建以后传递到父类构造函数的LiveRef)发布出去
LiveRef细节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public LiveRef(int port) {
this((new ObjID()), port);
}
public LiveRef(ObjID objID, int port) {
this(objID, TCPEndpoint.getLocalEndpoint(port), true);
}
public LiveRef(ObjID objID, Endpoint endpoint, boolean isLocal) {
ep = endpoint;
id = objID;
this.isLocal = isLocal;
}
public void exportObject(Target target) throws RemoteException {
ep.exportObject(target);
}

TCPEndpoint.getLocalEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public static TCPEndpoint getLocalEndpoint(int port) {
return getLocalEndpoint(port, null, null);
}

public static TCPEndpoint getLocalEndpoint(int port,
RMIClientSocketFactory csf,
RMIServerSocketFactory ssf)
{
/*
* Find mapping for an endpoint key to the list of local unique
* endpoints for this client/server socket factory pair (perhaps
* null) for the specific port.
*/
TCPEndpoint ep = null;

synchronized (localEndpoints) {
TCPEndpoint endpointKey = new TCPEndpoint(null, port, csf, ssf);
LinkedList<TCPEndpoint> epList = localEndpoints.get(endpointKey);
String localHost = resampleLocalHost();

if (epList == null) {
/*
* Create new endpoint list.
*/
ep = new TCPEndpoint(localHost, port, csf, ssf);
epList = new LinkedList<TCPEndpoint>();
epList.add(ep);
ep.listenPort = port;
ep.transport = new TCPTransport(epList);
localEndpoints.put(endpointKey, epList);

if (TCPTransport.tcpLog.isLoggable(Log.BRIEF)) {
TCPTransport.tcpLog.log(Log.BRIEF,
"created local endpoint for socket factory " + ssf +
" on port " + port);
}
} else {
synchronized (epList) {
ep = epList.getLast();
String lastHost = ep.host;
int lastPort = ep.port;
TCPTransport lastTransport = ep.transport;
// assert (localHost == null ^ lastHost != null)
if (localHost != null && !localHost.equals(lastHost)) {
/*
* Hostname has been updated; add updated endpoint
* to list.
*/
if (lastPort != 0) {
/*
* Remove outdated endpoints only if the
* port has already been set on those endpoints.
*/
epList.clear();
}
ep = new TCPEndpoint(localHost, lastPort, csf, ssf);
ep.listenPort = port;
ep.transport = lastTransport;
epList.add(ep);
}
}
}
}

return ep;
}

最终获得一个可以发布服务的TCPEndpoint对象,并调用该对象把服务暴露出去

服务注册与拉取源码分析

注册服务
LocateRegistry.createRegistry(port);
创建注册中心时,会创建一个RegistryImpl对象

1
2
3
public static Registry createRegistry(int port) throws RemoteException {
return new RegistryImpl(port);
}

RegistryImpl代表注册中心
RegistryImpl中使用一个HashTable来注册(bind)服务和查找(lookup)服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private Hashtable<String, Remote> bindings = new Hashtable<>(101);
public RegistryImpl(int port)
throws RemoteException
{
if (port == Registry.REGISTRY_PORT && System.getSecurityManager() != null) {
try {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
public Void run() throws RemoteException {
LiveRef lref = new LiveRef(id, port);
setup(new UnicastServerRef(lref, RegistryImpl::registryFilter));
return null;
}
}, null, new SocketPermission("localhost:"+port, "listen,accept"));
} catch (PrivilegedActionException pae) {
throw (RemoteException)pae.getException();
}
} else {
LiveRef lref = new LiveRef(id, port);
setup(new UnicastServerRef(lref, RegistryImpl::registryFilter));
}
}
private void setup(UnicastServerRef uref)
throws RemoteException
{
ref = uref;
uref.exportObject(this, null, true);
}
public Remote lookup(String name)
throws RemoteException, NotBoundException
{
synchronized (bindings) {
Remote obj = bindings.get(name);
if (obj == null)
throw new NotBoundException(name);
return obj;
}
}
public void bind(String name, Remote obj)
throws RemoteException, AlreadyBoundException, AccessException
{
synchronized (bindings) {
Remote curr = bindings.get(name);
if (curr != null)
throw new AlreadyBoundException(name);
bindings.put(name, obj);
}
}
public void unbind(String name)
throws RemoteException, NotBoundException, AccessException
{
synchronized (bindings) {
Remote obj = bindings.get(name);
if (obj == null)
throw new NotBoundException(name);
bindings.remove(name);
}
}
public void rebind(String name, Remote obj)
throws RemoteException, AccessException
{
bindings.put(name, obj);
}
.....

最终和服务端暴露服务类似,会把RegistryImpl对象暴露出去

当服务提供方调用Naming.rebind(“rmi://“+host+”:”+port+”/hello”, helloService);注册服务时
会先根据端口获取到暴露的注册中心对象RegistryImpl,然后调用其方法注册
对应的客户端获取服务的过程也是类似(HelloService) Naming.lookup(“rmi://“+Server.host+”:”+Server.port+”/hello”);
会先根据端口获取到暴露的注册中心对象RegistryImpl,然后调用其方法获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void rebind(String name, Remote obj)
throws RemoteException, java.net.MalformedURLException
{
ParsedNamingURL parsed = parseURL(name);
Registry registry = getRegistry(parsed);

if (obj == null)
throw new NullPointerException("cannot bind to null");

registry.rebind(parsed.name, obj);
}
public static Remote lookup(String name)
throws NotBoundException,
java.net.MalformedURLException,
RemoteException
{
ParsedNamingURL parsed = parseURL(name);
Registry registry = getRegistry(parsed);

if (parsed.name == null)
return registry;
return registry.lookup(parsed.name);
}

更多精彩内容

请移步

个人主页: yangyitao.top