一个RPC逻辑示例代码及优化,涉及socket通信,Serializable序列化,proxy动态搭理,反射原理,io处理

作者: admin 分类: JAVA 发布时间: 2019-06-24 10:27  阅读: 66 views

之前做了一个网关项目,支持dubbo服务泛化调用。在被问及底层是什么协议的时候,假程序猿属性暴露无遗。竟然说出了http, 被同事异样的鄙视了一眼纠正说道’rpc协议’。心里糗糗的,于是决定有空看下RPC协议到底是什么东西。(这就是专注业务代码,没有深入理解代码底层的诟病,带人做事没法给一个正确的理论依据及实现逻辑,常常会有些抵触情绪。然而跟着一个牛人做事,那才叫一个有理有据,让人信服的无法反驳。所以,提升自己的硬实力才能有说服力【不管从生活、还是学习、工作】)

 

RPC流程图如下:

过程翻译为:

1. 建立通信TCP连接 (socket相关技术)socket支持TCP/IP协议的网络通信的基本操作单元
2. 客户端调用服务
  a. 将方法、参数等信息序列化
  b. 找到远程服务地址,发送消息
3. 服务端处理请求
  a. 反序列化消息
  b. 调用本地服务进行处理
  c. 将处理结果序列化后发送给客户端
4. 客户端处理响应
  a. 反序列化信息
  b. 结束

代码如下(8个类):

 

公共通信类:CommonModel.java 

示例接口|实现类:HelloService.java    HelloServiceImpl.java

示例bean类: Person.java

序列化工具类:SerializeTool

代理工厂:ProxyFactory

RPC客户端:RpcClient

RPC服务端:RpcServer

CommonModel.java

package com.chl.rpc.common;

import java.io.Serializable;

/**
 * 公共网络通信类,通过序列化该类,将客户端调用接口、方法、参数、参数类型封装,然后服务端反序列化,再通过反射,调取相应实现类的方法。
 * @author chenhailong
 *
 */
public class CommonModel implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	/**接口名称*/
	private String className;
	/**方法名称*/
	private String method;
	/**方法参数*/
	private Object[] args;
	/**参数类型*/
	private String[] types;
	
	public String getClassName() {
		return className;
	}
	public void setClassName(String className) {
		this.className = className;
	}
	public String getMethod() {
		return method;
	}
	public void setMethod(String method) {
		this.method = method;
	}
	public Object[] getArgs() {
		return args;
	}
	public void setArgs(Object[] args) {
		this.args = args;
	}
	public String[] getTypes() {
		return types;
	}
	public void setTypes(String[] types) {
		this.types = types;
	}
	

}

HelloService.java | HelloServiceImpl.java

package com.chl.rpc.common;

public interface HelloService {

	String sayHello(String name);
	
	Person getPersion(String name);
}
package com.chl.rpc.common;

public class HelloServiceImpl implements HelloService {

	@Override
	public String sayHello(String name) {
		return "hello"+name;
	}

	@Override
	public Person getPersion(String name) {
		Person p = new Person();
		p.setName(name);
		p.setAge(130);
		return p;
	}

}

Person.java

package com.chl.rpc.common;

import java.io.Serializable;

/**
 * 普通公共bean
 * @author chenhailong
 */
public class Person implements Serializable {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	private String name;
	private Integer age;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public Integer getAge() {
		return age;
	}
	public void setAge(Integer age) {
		this.age = age;
	}
	
}

SerializeTool.java

package com.chl.rpc.common;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
 * 序列化工具类
 * @author chenhailong
 */
public class SerializeTool {
	
	/**
	 * 序列化操作
	 * @param object
	 * @return
	 * @throws IOException
	 */
	public static byte[] serialize(Object object) throws IOException{
		ByteArrayOutputStream os = new ByteArrayOutputStream();
		ObjectOutputStream outputStream = new ObjectOutputStream(os);
		outputStream.writeObject(object);
		outputStream.flush();
		byte[] byteArray = os.toByteArray();
		outputStream.close();
		os.close();
		return byteArray;
	}
	
	/**
	 * 反序列化操作
	 * @param buf
	 * @return
	 * @throws IOException
	 * @throws ClassNotFoundException
	 */
	public static Object deSerialize(byte[] buf) throws IOException ,ClassNotFoundException{
		ByteArrayInputStream is = new ByteArrayInputStream(buf);
		ObjectInputStream inputStream = new ObjectInputStream(is);
		Object object = inputStream.readObject();
		inputStream.close();
		is.close();
		return object;
		
	}

}

ProxyFactory.java

package com.chl.rpc.client;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import com.chl.rpc.common.CommonModel;
import com.chl.rpc.common.SerializeTool;

/**
 * 代理工厂
 * @author chenhailong
 *
 */
public class ProxyFactory {
	
	private static InvocationHandler handler = new InvocationHandler() {
		
		@Override
		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

			/**这个是通用的传输对象*/
			CommonModel cm = new CommonModel();
			
			Class<?>[] classes = proxy.getClass().getInterfaces();
			String className = classes[0].getName();
			
			cm.setClassName(className);
			cm.setArgs(args);
			cm.setMethod(method.getName());
			
			String[] types = null;
			if(args != null) {
				types = new String[args.length];
				for(int i = 0;i < types.length; i++) {
					types[i] = args[i].getClass().getName();
				}
			}
			
			cm.setTypes(types);
			
			byte[] byteArray = SerializeTool.serialize(cm);
			Object send = RpcClient.send(byteArray);
			
			return send;
		}
	};
	
	@SuppressWarnings("unchecked")
	public static <T> T getInstance(Class<T> Clazz) {
		return (T) Proxy.newProxyInstance(Clazz.getClassLoader(), new Class[]{Clazz}, handler);
	}
}

RpcClient.java

package com.chl.rpc.client;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

import com.chl.rpc.common.HelloService;
import com.chl.rpc.common.SerializeTool;

/**
 * Rpc客户端
 * @author chenhailong
 */
public class RpcClient {
	
	public static Object send(byte[] bs) {
		try {
			Socket socket = new Socket("127.0.0.1",9999);
			OutputStream outputStream = socket.getOutputStream();
			outputStream.write(bs);
			InputStream in = socket.getInputStream();
			byte[] buf = new byte[1024];
			in.read(buf);
			Object formatDate = SerializeTool.deSerialize(buf);
			socket.close();
			return formatDate;
		}catch(Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	public static void main(String[] args) {

		HelloService hs = ProxyFactory.getInstance(HelloService.class);
		
		System.out.println("say------:"+  hs.sayHello("pikaqiu") );
		System.out.println("persion--:" + hs.getPersion("miaowazhognzi "));
		
	}

}

RpcServer.java

package com.chl.rpc.server;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.chl.rpc.common.CommonModel;
import com.chl.rpc.common.SerializeTool;

public class RpcServer {

	public static void main(String[] args) {

		try {
			openServer();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	
	
	public static void openServer() throws IOException {
		ServerSocket serverSocket = new ServerSocket(9999);
		try {
			System.out.println("服务端开启");
			while(true) {
				Socket socket = serverSocket.accept();
				System.out.println(socket.getInetAddress());
				InputStream in = socket.getInputStream();
				byte[] buf = new byte[1024];
				in.read(buf);
				byte[] formatDate = formatData(buf);
				OutputStream out = socket.getOutputStream();
				out.write(formatDate);
				socket.close();
			}
		}catch(Exception e) {
			e.printStackTrace();
			serverSocket.close();
		}
	}
	
	public static byte[] formatData(byte[] bs) {
		try {
			/**
			 * 将接收的数据反序列化为CommonModel
			 */
			CommonModel cm = (CommonModel)SerializeTool.deSerialize(bs);
			String className = cm.getClassName();
			String[] types = cm.getTypes();
			Object[] args  = cm.getArgs();
			
			/**
			 * 1.通过映射获取实现类(接口和实现类的关系)
			 * 2.通过反射原理调用具体方法并返回
			 */
			Class<?> cls = Class.forName(getMapValue(className));
			Class<?>[] typeCls = null; //参数类型类对象
			if(types != null) {
				typeCls = new Class[types.length];
				for(int i = 0;i< typeCls.length;i++) {
					typeCls[i] = Class.forName(types[i]);
				}
			}
			Method method = cls.getMethod(cm.getMethod(), typeCls);
			Object object = method.invoke(cls.newInstance(), args);
			byte[] byteArray = SerializeTool.serialize(object);
			return byteArray;
		}catch(Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 * 简单的map映射
	 * @param key
	 * @return
	 */
	public static String getMapValue(String key) {
		Map<String,String> map = new HashMap<String,String>();
		map.put("com.chl.rpc.common.HelloService", "com.chl.rpc.common.HelloServiceImpl");
		return map.get(key);
	}
	
	/**
	 * 通过配置中心读取
	 * @param key
	 * @return
	 * @throws IOException
	 */
	public static String getPropertyValue(String key) throws IOException{
		Properties pro = new Properties();
		FileInputStream in = new FileInputStream("xxx/xx/x.properties");
		pro.load(in);
		in.close();
		return pro.getProperty(key);
	}
}

 

注:运行时,请先调整RpcServer.java中的包名路径。然后先运行服务端,在运行客户端。

以上是一个RPC的简单示例,参考:https://blog.csdn.net/u013034378/article/details/80686233#commentBox

RPC的原理讲解有篇很好的文章:https://www.cnblogs.com/LBSer/p/4853234.html

 

由于这里是简单的示例,所有有很多的改造优化空间。

1. asm,javassist,cglib等字节码技术可以优于jdk代理机制
  字节码技术动态编译需要类并运行效率高于反射机制,但是维护不易,可以找一些开源项目替代。 
2. netty和 mina可以改善io性能问题 
  本示例中的io处理可以用nio实现的框架替代。
3. Hession、Kryo、fastjson和Protobuf(google)可以改善序列化性能 
  序列化替代。
4. zookeeper可以处理服务的注册与发现功能
   本示例中的服务映射可以通过zk进行替换。

一些成熟的RPC框架 阿里的dubbo、HSF,facebook的thrift等。

附:java代理实现示例

 

 

 


   原创文章,转载请标明本文链接: 一个RPC逻辑示例代码及优化,涉及socket通信,Serializable序列化,proxy动态搭理,反射原理,io处理

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

更多阅读