Netty是一个异步的事件驱动网络框架,使用Netty可以研发高性能的私有协议,将业务逻辑和网络进行解耦,通过Netty我们可以实现一些常用的协议,如HTTP。
基本概念
Channel
Channel是NIO的基础,它代表一个连接,通过这个链接可以进行IO操作,例如读和写。
Future
在Netty的Channel中的每一个IO操作都是非阻塞的。 这就意味着每一个操作都是立刻返回结果的。在Java标准库中有Future接口,但是我们使用Future的时候只能询问这个操作是否执行完成,或者阻塞当前的线程直到结果完成,这不是Netty想要的。
Netty实现了自己的ChannelFuture接口,我们可以传递一个回调到ChannelFuture,当操作完成的时候才会执行回调。
Events 和 Handlers
Netty使用的是事件驱动的应用设计,因此Handler处理的数据流,在管道中是链式的事件。事件和Handler可以被 输入 和 输出的数据流进行关联。
输入(Inbound)事件可以如下:
- Channel激活和灭活
- 读操作事件
- 异常事件
- 用户事件
输出(Outbound)事件比较简单,一般是打开和关闭连接,写入和刷新数据。
Encoder 和 Decoder
因为我们要处理网络协议,需要操作数据的序列化和反序列化。
代码
来个实际的案例:
- 新建项目,添加maven依赖
4.1.6.Final 复制代码 io.netty netty-all ${netty-all.version}
- 创建数据的pojo
public class RequestData { private int intValue; private String stringValue; // getter 和 setter // toString方法}public class ResponseData { private int intValue; // getter 和 setter // toString方法 }复制代码
- 创建Encoder和Decoder
public class RequestDataEncoder extends MessageToByteEncoder{ private final Charset charset = Charset.forName("UTF-8"); @Override protected void encode(ChannelHandlerContext channelHandlerContext, RequestData msg, ByteBuf out) throws Exception { out.writeInt(msg.getIntValue()); out.writeInt(msg.getStringValue().length()); out.writeCharSequence(msg.getStringValue(), charset); }}public class ResponseDataDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
- 创建请求的处理器
public class ProcessingHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RequestData requestData = (RequestData) msg; ResponseData responseData = new ResponseData(); responseData.setIntValue(requestData.getIntValue() * 2); ChannelFuture future = ctx.writeAndFlush(responseData); future.addListener(ChannelFutureListener.CLOSE); System.out.println(requestData); }}public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { RequestData msg = new RequestData(); msg.setIntValue(123); msg.setStringValue( "正常工作"); ChannelFuture future = ctx.writeAndFlush(msg); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println((ResponseData)msg); ctx.close(); }}复制代码
- 创建服务端应用
public class NettyServer { private int port; public NettyServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { int port = args.length > 0 ? Integer.parseInt(args[0]) : 9003; new NettyServer(port).run(); } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}复制代码
- 创建客户端应用
public class NettyClient { public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = 9003; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } }}复制代码
- 运行服务端和客户端
可见正常工作
最后
这里我们只是对Netty进行简单的介绍,介绍了它一些基本的概念,然后演示了一个例子。后续我们会对Netty进行更深入的研究