一个RPC逻辑示例代码及优化,涉及socket通信,Serializable序列化,proxy动态代理,反射原理,io处理

作者: admin 分类: 网络协议 发布时间: 2019-06-24 10:27  阅读: 624 views

之前做了一个网关项目,支持dubbo服务泛化调用。在被问及底层是什么协议的时候,假程序猿属性暴露无遗。竟然说出了http, 被同事异样的鄙视了一眼纠正说道’rpc协议’。心里糗糗的,于是决定有空看下RPC协议到底是什么东西。(这就是专注业务代码,没有深入理解代码底层的诟病,带人做事没法给一个正确的理论依据及实现逻辑,常常会有些抵触情绪。然而跟着一个牛人做事,那才叫一个有理有据,让人信服的无法反驳。所以,提升自己的硬实力才能有说服力【不管从生活、还是学习、工作】)

 

RPC流程图如下:

过程翻译为:

1. 建立通信TCP连接 (socket相关技术)socket支持TCP/IP协议的网络通信的基本操作单元
2. 客户端调用服务
  a. 将方法、参数等信息序列化
  b. 找到远程服务地址,发送消息
3. 服务端处理请求
  a. 反序列化消息
  b. 调用本地服务进行处理
  c. 将处理结果序列化后发送给客户端
4. 客户端处理响应
  a. 反序列化信息
  b. 结束

代码如下(8个类):

 

公共通信类:CommonModel.java 

示例接口|实现类:HelloService.java    HelloServiceImpl.java

示例bean类: Person.java

序列化工具类:SerializeTool

代理工厂:ProxyFactory

RPC客户端:RpcClient

RPC服务端:RpcServer

CommonModel.java

package com.chl.rpc.common;

import java.io.Serializable;

/**
 * 公共网络通信类,通过序列化该类,将客户端调用接口、方法、参数、参数类型封装,然后服务端反序列化,再通过反射,调取相应实现类的方法。
 * @author chenhailong
 *
 */
public class CommonModel implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    
    /**接口名称*/
    private String className;
    /**方法名称*/
    private String method;
    /**方法参数*/
    private Object[] args;
    /**参数类型*/
    private String[] types;
    
    public String getClassName() {
        return className;
    }
    public void setClassName(String className) {
        this.className = className;
    }
    public String getMethod() {
        return method;
    }
    public void setMethod(String method) {
        this.method = method;
    }
    public Object[] getArgs() {
        return args;
    }
    public void setArgs(Object[] args) {
        this.args = args;
    }
    public String[] getTypes() {
        return types;
    }
    public void setTypes(String[] types) {
        this.types = types;
    }
    

}

HelloService.java | HelloServiceImpl.java

package com.chl.rpc.common;

public interface HelloService {

    String sayHello(String name);
    
    Person getPersion(String name);
}
package com.chl.rpc.common;

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello(String name) {
        return "hello"+name;
    }

    @Override
    public Person getPersion(String name) {
        Person p = new Person();
        p.setName(name);
        p.setAge(130);
        return p;
    }

}

Person.java

package com.chl.rpc.common;

import java.io.Serializable;

/**
 * 普通公共bean
 * @author chenhailong
 */
public class Person implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    
    private String name;
    private Integer age;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    
}

SerializeTool.java

package com.chl.rpc.common;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
 * 序列化工具类
 * @author chenhailong
 */
public class SerializeTool {
    
    /**
     * 序列化操作
     * @param object
     * @return
     * @throws IOException
     */
    public static byte[] serialize(Object object) throws IOException{
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        ObjectOutputStream outputStream = new ObjectOutputStream(os);
        outputStream.writeObject(object);
        outputStream.flush();
        byte[] byteArray = os.toByteArray();
        outputStream.close();
        os.close();
        return byteArray;
    }
    
    /**
     * 反序列化操作
     * @param buf
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public static Object deSerialize(byte[] buf) throws IOException ,ClassNotFoundException{
        ByteArrayInputStream is = new ByteArrayInputStream(buf);
        ObjectInputStream inputStream = new ObjectInputStream(is);
        Object object = inputStream.readObject();
        inputStream.close();
        is.close();
        return object;
        
    }

}

ProxyFactory.java

package com.chl.rpc.client;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import com.chl.rpc.common.CommonModel;
import com.chl.rpc.common.SerializeTool;

/**
 * 代理工厂
 * @author chenhailong
 *
 */
public class ProxyFactory {
    
    private static InvocationHandler handler = new InvocationHandler() {
        
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

            /**这个是通用的传输对象*/
            CommonModel cm = new CommonModel();
            
            Class[] classes = proxy.getClass().getInterfaces();
            String className = classes[0].getName();
            
            cm.setClassName(className);
            cm.setArgs(args);
            cm.setMethod(method.getName());
            
            String[] types = null;
            if(args != null) {
                types = new String[args.length];
                for(int i = 0;i < types.length; i++) {
                    types[i] = args[i].getClass().getName();
                }
            }
            
            cm.setTypes(types);
            
            byte[] byteArray = SerializeTool.serialize(cm);
            Object send = RpcClient.send(byteArray);
            
            return send;
        }
    };
    
    @SuppressWarnings("unchecked")
    public static  T getInstance(Class Clazz) {
        return (T) Proxy.newProxyInstance(Clazz.getClassLoader(), new Class[]{Clazz}, handler);
    }
}

RpcClient.java

package com.chl.rpc.client;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

import com.chl.rpc.common.HelloService;
import com.chl.rpc.common.SerializeTool;

/**
 * Rpc客户端
 * @author chenhailong
 */
public class RpcClient {
    
    public static Object send(byte[] bs) {
        try {
            Socket socket = new Socket("127.0.0.1",9999);
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write(bs);
            InputStream in = socket.getInputStream();
            byte[] buf = new byte[1024];
            in.read(buf);
            Object formatDate = SerializeTool.deSerialize(buf);
            socket.close();
            return formatDate;
        }catch(Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    
    public static void main(String[] args) {

        HelloService hs = ProxyFactory.getInstance(HelloService.class);
        
        System.out.println("say------:"+  hs.sayHello("pikaqiu") );
        System.out.println("persion--:" + hs.getPersion("miaowazhognzi "));
        
    }

}

RpcServer.java

package com.chl.rpc.server;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.chl.rpc.common.CommonModel;
import com.chl.rpc.common.SerializeTool;

public class RpcServer {

    public static void main(String[] args) {

        try {
            openServer();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    
    
    public static void openServer() throws IOException {
        ServerSocket serverSocket = new ServerSocket(9999);
        try {
            System.out.println("服务端开启");
            while(true) {
                Socket socket = serverSocket.accept();
                System.out.println(socket.getInetAddress());
                InputStream in = socket.getInputStream();
                byte[] buf = new byte[1024];
                in.read(buf);
                byte[] formatDate = formatData(buf);
                OutputStream out = socket.getOutputStream();
                out.write(formatDate);
                socket.close();
            }
        }catch(Exception e) {
            e.printStackTrace();
            serverSocket.close();
        }
    }
    
    public static byte[] formatData(byte[] bs) {
        try {
            /**
             * 将接收的数据反序列化为CommonModel
             */
            CommonModel cm = (CommonModel)SerializeTool.deSerialize(bs);
            String className = cm.getClassName();
            String[] types = cm.getTypes();
            Object[] args  = cm.getArgs();
            
            /**
             * 1.通过映射获取实现类(接口和实现类的关系)
             * 2.通过反射原理调用具体方法并返回
             */
            Class cls = Class.forName(getMapValue(className));
            Class[] typeCls = null; //参数类型类对象
            if(types != null) {
                typeCls = new Class[types.length];
                for(int i = 0;i< typeCls.length;i++) {
                    typeCls[i] = Class.forName(types[i]);
                }
            }
            Method method = cls.getMethod(cm.getMethod(), typeCls);
            Object object = method.invoke(cls.newInstance(), args);
            byte[] byteArray = SerializeTool.serialize(object);
            return byteArray;
        }catch(Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 简单的map映射
     * @param key
     * @return
     */
    public static String getMapValue(String key) {
        Map map = new HashMap();
        map.put("com.chl.rpc.common.HelloService", "com.chl.rpc.common.HelloServiceImpl");
        return map.get(key);
    }
    
    /**
     * 通过配置中心读取
     * @param key
     * @return
     * @throws IOException
     */
    public static String getPropertyValue(String key) throws IOException{
        Properties pro = new Properties();
        FileInputStream in = new FileInputStream("xxx/xx/x.properties");
        pro.load(in);
        in.close();
        return pro.getProperty(key);
    }
}

 

注:运行时,请先调整RpcServer.java中的包名路径。然后先运行服务端,在运行客户端。

以上是一个RPC的简单示例,参考:https://blog.csdn.net/u013034378/article/details/80686233#commentBox

RPC的原理讲解有篇很好的文章:https://www.cnblogs.com/LBSer/p/4853234.html

 

由于这里是简单的示例,所有有很多的改造优化空间。

1. asm,javassist,cglib等字节码技术可以优于jdk代理机制
  字节码技术动态编译需要类并运行效率高于反射机制,但是维护不易,可以找一些开源项目替代。 
2. netty和 mina可以改善io性能问题 
  本示例中的io处理可以用nio实现的框架替代。
3. Hession、Kryo、fastjson和Protobuf(google)可以改善序列化性能 
  序列化替代。
4. zookeeper可以处理服务的注册与发现功能
   本示例中的服务映射可以通过zk进行替换。

一些成熟的RPC框架 阿里的dubbo、HSF,facebook的thrift等。

附:java代理实现示例

 

 

 


   原创文章,转载请标明本文链接: 一个RPC逻辑示例代码及优化,涉及socket通信,Serializable序列化,proxy动态代理,反射原理,io处理

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

更多阅读