博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty 的 Google protobuf 开发
阅读量:5077 次
发布时间:2019-06-12

本文共 8462 字,大约阅读时间需要 28 分钟。

根据上一篇博文  netty 集成 protobuf 的方法非常简单.代码如下:

server

package protobuf.server.impl;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import object.server.impl.SubReqServer;import object.server.impl.SubScriptReqProto;public class SubReqProtobufServer {    public void start(int port) {        NioEventLoopGroup workGroup = new NioEventLoopGroup();        NioEventLoopGroup bossGroup = new NioEventLoopGroup();        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.group(bossGroup, workGroup);        bootstrap.channel(NioServerSocketChannel.class);        // 配置 NioServerSocketChannel 的 tcp 参数, BACKLOG 的大小        bootstrap.option(ChannelOption.SO_BACKLOG, 100);        bootstrap.handler(new LoggingHandler(LogLevel.INFO));        bootstrap.childHandler(new ChannelInitializer
() { protected void initChannel(SocketChannel ch) throws Exception { /* * 首先添加 ProtobufVarint32FrameDecoder 处理器 , 它主要用于半包处理 */ ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); /* * 然后添加 ProtobufDecoder 解码器 , * 他的参数com.google.protobuf.MessageLite 实际上就是要告诉 ProtobufDecoder * 需要解码的目标类是什么,否则仅仅从字节数组中是无法判断出需要解码的目标类型信息 */ ProtobufDecoder protobufDecoder = new ProtobufDecoder( SubScriptReqProto.SubScriptReq.getDefaultInstance()); ch.pipeline().addLast(protobufDecoder); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqProtobufHandler()); } }); // 绑定端口,随后调用它的同步阻塞方法 sync 等等绑定操作成功,完成之后 Netty 会返回一个 ChannelFuture // 它的功能类似于的 Future,主要用于异步操作的通知回调. ChannelFuture channelFuture; try { channelFuture = bootstrap.bind(port).sync(); // 等待服务端监听端口关闭,调用 sync 方法进行阻塞,等待服务端链路关闭之后 main 函数才退出. channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { SubReqProtobufServer server = new SubReqProtobufServer(); server.start(9091); }}

serverHandler

package protobuf.server.impl;import object.server.impl.SubScriptReqProto;import object.server.impl.SubScriptRespProto;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;public class SubReqProtobufHandler extends ChannelHandlerAdapter {    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)            throws Exception {        cause.printStackTrace();        ctx.close();    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        try {                        SubScriptReqProto.SubScriptReq req = (SubScriptReqProto.SubScriptReq) msg;            System.out.println("SubReqProtobufHandler : " + req);            ctx.writeAndFlush(resp(req.getSubReqID()));        } catch (Exception e) {            e.printStackTrace();            throw e;        }    }    private Object resp(int subReqID) {        SubScriptRespProto.SubScriptResp.Builder builder = SubScriptRespProto.SubScriptResp                .newBuilder();        builder.setSubReqID(subReqID);        builder.setDesc("desc");        builder.setRespCode(2);        return builder.build();    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }}

 

client 

package protobuf.client.impl;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.codec.serialization.ObjectEncoder;import object.client.impl.SubReqClient;import object.server.impl.SubScriptReqProto;import object.server.impl.SubScriptRespProto;public class SubReqProtobufClient {    public void connect(String host, int port) {        NioEventLoopGroup workGroup = new NioEventLoopGroup();        Bootstrap bootstrap = new Bootstrap();        bootstrap.group(workGroup);        bootstrap.channel(NioSocketChannel.class);        bootstrap.option(ChannelOption.TCP_NODELAY, true);        bootstrap.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { /* * 禁止堆类加载器进行缓存,他在基于 OSGI 的动态模块化编程中经常使用,由于 OSGI 可以进行热部署和热升级,当某个 * bundle * 升级后,它对应的类加载器也将一起升级,因此在动态模块化的编程过程中,很少对类加载器进行缓存,因为他随时可能会发生变化. */ ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast( new ProtobufDecoder(SubScriptRespProto.SubScriptResp .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 发起异步链接操作 ChannelFuture future; try { future = bootstrap.connect(host, port).sync(); // 等待客户端链路关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } public static void main(String[] args) { new SubReqProtobufClient().connect("localhost", 9091); }}

clientHandler

package protobuf.client.impl;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import object.server.impl.SubScriptReqProto;public class SubReqClientHandler extends ChannelHandlerAdapter {    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)            throws Exception {        cause.printStackTrace();        ctx.close();    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        try {            SubScriptReqProto.SubScriptReq.Builder builder = SubScriptReqProto.SubScriptReq                    .newBuilder();            for (int i = 0; i < 100; i++) {                builder.setSubReqID(999 + i);                builder.setAddress("address" + i);                builder.setProductName("productvalue" + i);                builder.setUserName("userName" + i);                ctx.writeAndFlush(builder.build());            }        } catch (Exception e) {            e.printStackTrace();        }    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        System.out.println("SubReqClientHandler : " + msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }}

 

使用 ProtoBuf 的注意事项:

  ProtobufDecode 只负责解码,它不支持半包读写,因此,在 protoBuf 前一定要一个能处理半包读写的处理器.他有三种方法可以选择:

  1. 使用 Netty 提供的 ProtobufVarint32FrameDecoder;
  2. 继承 Netty 提供的通用半包解码器 LengthFieldBasedFrameDecoder;
  3. 继承 ByteToMessageDecoder 类,自己处理半包消息

   

以上内容出自 : <Netty 权威指南

>

转载于:https://www.cnblogs.com/mjorcen/p/4545185.html

你可能感兴趣的文章
POJ 1042 Gone Fishing
查看>>
虚拟机linux下使用cuteftp
查看>>
.NET短距离领域通信-32feet.NET
查看>>
流的概念(来自MSDN)
查看>>
学习opencv-------函数使用二(图像变换)
查看>>
第二次作业及总结——数据类型和运算符
查看>>
靠自己开创这里所没有的未来~
查看>>
c#获取文件图标
查看>>
微信二维码支付native原生支付开发模式二
查看>>
数学图形(1.24)巴斯加线与蚶线
查看>>
线程操作之创建线程
查看>>
第六章学习小结
查看>>
CSS竖向步骤条
查看>>
Struts中ActionContext和ServletActionContext的比较
查看>>
Win7系统32位和64位的区别
查看>>
POJ 2352 Stars
查看>>
CentOS网络设置 couldn't resolve host 'mirrorlist.centos.org问题解决
查看>>
《Python》网络编程基础
查看>>
消息队列实现分布式事务
查看>>
SAP CO11; CO11N; CO15的区别
查看>>