编写Netty应用程序
从 netty-example
工程下抄了一份 EchoServer 过来,并删减了部分代码,归纳出来一份编写 Netty 服务端程序的模板,我把它称为 “Netty 编码十步曲”。
1. 声明线程池(必须)
一般来说,我们会声明两个 Group,一个是 bossGroup,用于处理 Accept 事件,一个是 workerGroup,用于处理消息的读写事件。其中,bossGroup 一般声明为一个线程。当然,如果声明一个 Group 也是可以的,只是不建议。
1 2
| EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
|
这就像是大型餐厅一般有接待生和服务员两种职位一样,接待生一般形象良好,专门负责接待客人,服务员形象稍差,专门负责上菜收碟,你要说不区分两种职位,混用行不行呢,当然也可以,只是不建议,专人干专事。
2. 创建服务端引导类(必须)
引导类,是用来集成所有配置,引导程序加载的,分成两种,一种是客户端引导类 Bootstrap(个人觉得叫 ClientBootstrap 可能更贴切),另一种是服务端引导类 ServerBootstrap,我们这里编写的是服务端程序,创建的当然是服务端引导类。
注意,Bootstrap 和 ServerBootstrap 之间并不是继承关系,他们是平等的,都继承了 AbstractBootstrap 抽象类。
1
| ServerBootstrap serverBootstrap = new ServerBootstrap();
|
Bootstrap/ServerBootstrap 就像是店长,它负责统筹整个 Netty 程序的正常运行。
3. 设置线程池(必须)
把第一步声明的线程池设置到 ServerBootstrap 中,它说明了 Netty 应用程序以什么样的线程模型运行,正如前面所说 bossGroup 负责接受(Accept)连接,workerGroup 负责读写数据。
1
| serverBootstrap.group(bossGroup, workerGroup);
|
4. 设置 ServerSocketChannel 类型(必须)
设置 Netty 程序以什么样的 IO 模型运行,我们这里介绍的是 NIO 编程,选择的当然是 NioServerSocketChannel。
1
| serverBootstrap.channel(NioServerSocketChannel.class);
|
如果您需要使用阻塞型 IO 模型,直接把 Nio 改成 Oio 就可以了,即 OioServerSocketChannel,不过它已经废弃了,所以不建议。
另外,如果您的程序运行在 Linux 系统上,还可以使用一种更高效的方式,即 EpollServerSocketChannel,它使用的是 Linux 系统上的 epoll 模型,比 select 模型更高效,可见 Netty 把性能优化做到了极致。
5. 设置参数(可选)
设置 Netty 中可以使用的任何参数,这些参数都在 ChannelOption 及其子类中,后面我们会详细介绍各个参数的含义,不过,很遗憾地告诉你,大多数情况下并不需要修改 Netty 的默认参数,这就是 Netty 比较牛的地方。
1
| serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);
|
我们这里设置了一个 SO_BACKLOG 系统参数,它表示的是最大等待连接数量。
6. 设置 Handler(可选)
设置 ServerSocketChannel 对应的 Handler,注意只能设置一个,它会在 SocketChannel 建立起来之前执行,等我们看源码的时候会详细介绍它的执行时机。
1
| serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))
|
我们这里简单地设置一个打印日志的 Handler。
7. 编写并设置子 Handler(必须)
Netty 中的 Handler 分成两种,一种叫做 Inbound,一种叫做 Outbound。我们这里简单地写一个 Inbound 类型的 Handler,它接收到数据后立即写回客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
|
设置子 Handler 设置的是 SocketChannel 对应的 Handler,注意也是只能设置一个,它用于处理 SocketChannel 的事件。
1 2 3 4 5 6 7 8 9
| serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoServerHandler()); } });
|
虽然只能设置一个,但是 Netty 的提供了一种可以设置多个 Handler 的途径,即使用 ChannelInitializer 方式,当然,第六步的设置 Handler 也可以使用这种方式设置多个 Handler。
这里,我们设置了一个打印的 Handler 和一个自定义的 EchoServerHandler。
8. 绑定端口(必须)
绑定端口,并启动服务端程序,sync () 会阻塞直到启动完成才执行后面的代码。
ChannelFuture f = serverBootstrap.bind(PORT).sync();
9. 等待服务端端口关闭(必须)
等待服务端监听端口关闭,sync () 会阻塞主线程,内部调用的是 Object 的 wait () 方法。
f.channel().closeFuture().sync();
10. 优雅地关闭线程池(建议)
最后,是在 finally 中调用 shutdownGracefully () 方法优雅地关闭线程池,优雅停机。
1 2
| bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
|
为什么需要设置 ServerSocketChannel 的类型,而不需要设置 SocketChannel 的类型呢?
那是因为 SocketChannel 是 ServerSocketChannel 在接受连接之后创建出来的,所以,并不需要单独再设置它的类型,比如,NioServerSocketChannel 创建出来的肯定是 NioSocketChannel,而 EpollServerSocketChannel 创建出来的肯定是 EpollSocketChannel。
如何调试
其实学 Netty 的 99% 都是服务端的同学,所以,我们的课程并不会刻意介绍如何编写客户端的 Netty 程序,但是我们怎么调试呢?
这里,我教给大家一个技巧,通过 XSHELL 这个工具调试,这个工具几乎是后端同学必备的一个工具,所以调试起来也是比较容易的。
比如,我上面启动了一个 Netty 服务端,它的端口是 8007,只要打开 Terminal,不要连接任何服务器,输入以下代码即可连接到我们的 Netty 服务端:
然后,输入任何你想输入的内容,它都会照样返回,比如下面这样:
后记
本节,我们学习了 Netty 编码的十步曲,其中,有些步骤是必须的,有些步骤可选的,有些步骤是建议保留的,相信通过本节的学习,你一定可以写出十分健壮且优雅的 Netty 服务端程序了。
别急哦,本节还没有结束,在附录部分有一份简单的群聊系统,您也可以尝试按照本节介绍的十步曲自己尝试写一个简单的示例练练手。
思维导图
附录 —— 使用 Netty 实现简单群聊系统
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
| public final class NettyChatServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } });
ChannelFuture f = serverBootstrap.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
private static class ChatNettyHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("one conn active: " + ctx.channel()); ChatHolder.join((SocketChannel) ctx.channel()); }
@Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String content = new String(bytes, StandardCharsets.UTF_8); System.out.println(content);
if (content.equals("quit\r\n")) { ctx.channel().close(); } else { ChatHolder.propagate((SocketChannel) ctx.channel(), content); } }
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("one conn inactive: " + ctx.channel()); ChatHolder.quit((SocketChannel) ctx.channel()); } }
private static class ChatHolder { static final Map<SocketChannel, String> USER_MAP = new ConcurrentHashMap<>();
static void join(SocketChannel socketChannel) { String userId = "用户" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); send(socketChannel, "您的id为:" + userId + "\n\r");
for (SocketChannel channel : USER_MAP.keySet()) { send(channel, userId + " 加入了群聊" + "\n\r"); }
USER_MAP.put(socketChannel, userId); }
static void quit(SocketChannel socketChannel) { String userId = USER_MAP.get(socketChannel); send(socketChannel, "您退出了群聊" + "\n\r"); USER_MAP.remove(socketChannel);
for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + " 退出了群聊" + "\n\r"); } } }
public static void propagate(SocketChannel socketChannel, String content) { String userId = USER_MAP.get(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + ": " + content); } } }
static void send(SocketChannel socketChannel, String msg) { try { ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length); writeBuffer.writeCharSequence(msg, Charset.defaultCharset()); socketChannel.writeAndFlush(writeBuffer); } catch (Exception e) { e.printStackTrace(); } } } }
|
可以发现,只需要改动设置子 Handler 里面的那一个地方就可以了,其它地方完全不需要修改,非常便捷。
模拟群聊