四、Netty简单案例实践及服务端启动流程

发布时间:2021-10-18 12:59:30

首先来看一个简单的Netty服务器和客户端通讯的例子


服务器代码如下:


public class SimpleNettyServer {
public static void main(String[] args) {
new SimpleNettyServer(8878).runServer();
}

private final int serverPort;
ServerBootstrap serverBootstrap = new ServerBootstrap();

public SimpleNettyServer(int serverPort) {
this.serverPort = serverPort;
}

public void runServer() {
//创建反应器线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

try {
//1.设置反应器线程组
serverBootstrap.group(bossGroup, workerGroup);
//2.设置NIO类型的通道
serverBootstrap.channel(NioServerSocketChannel.class);
//3.设置通道的参数
serverBootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
serverBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//4.装配子通道流水线
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
//流水线管理子通道中的Handler处理器
//向子通道流水线中添加一个Handler处理器
ch.pipeline().addLast(new SimpleNettyServerHandler());
}
});
//5.开始绑定通道
//通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = serverBootstrap.bind(serverPort).sync();
channelFuture.addListener((ChannelFutureListener) future -> {
if (channelFuture.isSuccess()) {
System.out.println("监听端口" + serverPort + " 成功");
} else {
System.out.println("监听端口" + serverPort + "失败");
}
});
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

服务端Handler代码如下:


public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter {

//收到数据读取
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf= (ByteBuf) msg;
System.out.println("收到消息"+buf.toString(CharsetUtil.UTF_8));
}

//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("客户端消息已收到并处理", CharsetUtil.UTF_8));
}
}

客户端代码:


public class SimpleNettyClient {

public static void main(String[] args) {
new SimpleNettyClient("127.0.0.1",8878).startClient();

}
EventLoopGroup group = new NioEventLoopGroup();
private final int serverPort;
private final String serverAddress;

public SimpleNettyClient(String serverAddress, int serverPort ) {
this.serverAddress=serverAddress;
this.serverPort = serverPort;
}

public void startClient() {
try {
//1.创建客户端启动对象
Bootstrap bootstrap = new Bootstrap();
//2.设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new SimpleNettyClientHandler()); //加入自己的处理器
}
});
//3.启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect(serverAddress, serverPort).sync();
channelFuture.addListener((ChannelFutureListener)future->{
if (channelFuture.isSuccess()) {
System.out.println("连接服务器成功");
System.out.println("客户端 ok..");
} else {
System.out.println("连接服务器失败");
}
});
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();

}
}
}

客户端Handler代码:


public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter {

//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) {
String msg="hello, server。This is client。";
System.out.println("发往服务端的消息为"+msg);
ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}

//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

下面我们结合上面的代码来简单分析下Netty的启动流程:


一、NioEventLoopGroup

? ? ? ? ? 在服务端启动时,我们创建了两个NioEventLoopGroup对象,这两个对象可以看做是传统IO编程模型的两大线程组,bossGroup


表示监听端口,accept 新连接的线程组,workerGroup表示处理每一条连接的数据读写的线程组。


先看一下NioEventLoopGroup初始化的过程:


根据debug进入初始化流程可以看到最终的调用的是其父类MultithreadEventExecutorGroup的初始化方法


protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args)

参数解析:


在父类的构造函数中我们可以看到这些参数是怎么初始化的。


1.?nThreads是指定的,如果没有填的话默认是cup处理器核心数*2(静态代码块中初始化)。


2.?executor此处为null,在方法体中将实例化。


3.?chooserFactory指定为DefaultEventExecutorChooserFactory.INSTANCE,在静态属性中可以看到为? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new DefaultEventExecutorChooserFactory(),主要是用于选择下一个可用的EventExecutor即NioEventLoop。


4.?args为传递给创建NioEventLoop的newChild方法的调用的参数


* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call

? 4.1? ?SelectorProvider.provider();-->SelectorProvider.open()返回一个Selector。


? 4.2? ?DefaultSelectStrategyFactory.INSTANCE 默认选择策略工厂


? 4.3? ?RejectedExecutionHandlers.reject();拒绝策略处理器


?


构造函数解析:


1.初始化executor,创建了一个线程创建器。


if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

在此方法中有以下代码:


@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}

当有任务传进来的时候,都将创建一个线程实体去处理这个任务。并将传进来的可执行任务包装为FastThreadLocalThread


protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}

2. 初始化children -->创建一个?EventExecutor[nThreads]数组,用来保存NioEventLoop,接下来是循环初始化NioEventLoop,指定的参数为上方的参数args...。


3.线程选择器,主要是为新连接绑定对应的NioEventLoop,选择NioEventLoop的方法是chooser的next(),就是?Netty 需要一个?NioEventLoop 时, 会调用EventExecutorChooser的next() 方法获取一个可用的 EventLoop。


chooser = chooserFactory.newChooser(children);

根据方法isPowerOfTwo来判断对应的是哪个选择器。


此处executors.length为12,根据? ?isPowerOfTwo(executors.length)(是否为2的幂),此处返回的为GenericEventExecutorChooser选择器。


public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

下面是DefaultEventExecutorChooserFactory内部的两种事件选择器(主要是做循环选择,二进制的与运算明显效率要高)。代码如下:


private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}

完成实例化的BossGroup Debug截图如下:



? ? ? ?可以看到内部存储了一个EventExecutor数组,其中的每个NioEventLoop相当于一个子反应器。Netty的NioEventLoopGroup就相当于一个多线程版的反应器。在之前的Reactor反应器模式中,一般使用两个反应器,一个负责新连接的监听和接受,一个负责IO事件处理。对应到Netty服务器程序中则是设置两个NioEventLoopGroup线程组,一个EventLoopGroup负责监听和接受,一个负责IO的事件处理。


?


二、NIOEventLoop

先看一下Debug看到的其中一个NioEventLoop:



?


NIOEventLoop类图:



NioEventLoop是在Group创建时通过children[i] = newChild(executor, args);方法创建的。


@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

这三个参数在上面已有说明,看一下创建过程:


1.??通过SelectorProvider去创建Selector。SelectorProvider是Java NIO中的抽象类,它的作用是调用Windows或者Linux底层NIO的实现,为JavaNIO提供服务,比如经常用的Selector.open()方法内部就是通过调用SelectorProvider.openSelector()来得到多路复用器selector并包装。一个Selector和一个NioEventLoop绑定。


provider = selectorProvider;
selector = openSelector();

2.??创建tailTasks = newTaskQueue(maxPendingTasks);此处maxPendingTasks(最大待处理任务)值为Integer.MAX_VALUE。


protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

返回的是一个PlatformDependent.newMpscQueue(maxPendingTasks);


MpscQueue是针对Netty中NIO任务设计的一种队列,允许有多个生产者(外部线程),只有一个消费者(NioEventLoop)的队列。


3.??创建taskQueue = newTaskQueue(this.maxPendingTasks)


? ? ? ? ? ? ?--->PlatformDependent.newMpscQueue(maxPendingTasks)。


4.? ?保存线程执行器。在MultithreadEventExecutorGroup构造函数中执行new ThreadPerTaskExecutor(newDefaultThreadFactory())并传入了newChild方法中,最终也传入该构造函数。


this.executor = ObjectUtil.checkNotNull(executor, "executor");

并且在SingleThreadEventExecutor类中有一个属性private volatile Thread thread,它用来引用支撑该EventExecutor的线程,用来处理I/O事件和执行任务,叫支撑线程或者I/O线程均可,thread所引用的线程即来自executor。


5.指定rejectedExecutionHandler拒绝任务处理策略。


重要属性:


1.?Thread线程类的成员:NioEventLoop 继承于 SingleThreadEventLoop,?而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor。SingleThreadEventExecutor 是 Netty 中对本地线程的抽象,?它内部有一个 Thread thread 属性, 存储了一个本地 Java 线程。因此我们可以认为,?一个 NioEventLoop 其实和一个特定的线程绑定,并且在其生命周期内, 绑定的线程都不会再改变。


2.NIO选择器:通过前面提供的 provider.openSelector()返回一个Selector


3.TaskQueue任务队列。


?


三、服务端启动流程
服务端的Socket在哪里初始化?在哪里Accept?连接?

?


1.创建Channel


从入口方法blind(port)进入:


-------->bind(new InetSocketAddress(inetPort));


? ? ?--------->doBind(localAddress);


? ? ? ? ?-------------->initAndRegister();初始化和注册Channel,下面看一下源码:


channel = channelFactory.newChannel();
init(channel);

? ?channelFactory.newChannel();方法返回的为?clazz.newInstance();


此处的clazz为serverBootstrap.channel(NioServerSocketChannel.class)绑定的NioServerSocketChannel.class类。


看一下构造方法,


public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

1.1??newSocket方法参数为静态属性SelectorProvider.provider()(Java?NIO包下的),该方法返回的为一个ServerSocketChannel(NIO? ? ? ? ? ?包下)。


1.2??config = new NioServerSocketChannelConfig(this, javaChannel().socket());Tcp参数配置类


1.3??父类AbstractNioChannel构造函数配置非阻塞:ch.configureBlocking(false);


1.4??父类AbstractChannel构造函数:


? ? 创建Channel的唯一标识:newId();? ? newUnsafe():Tcp底层的读写操作类;? ? 新建ChannelPipeLine

protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

2.初始化服务端Channel


由init()入口方法进入,主要作用就是保存用户自定义的属性,通过这个属性创建一个连接器。


2.1??channel.config().setOptions(options)和channel.attr(key).set(e.getValue())


? ? ? 此处设置的options和attrs都是在设置服务端通道的参数时候设置的。


2.2??设置currentChildOptions和currentChildAttrs属性值,这两个属性值也是在服务端装配子通道流水线时设置的参数。每次在处理新连接? ? ? ? ?时都会将该属性配置上去。


synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

2.3? 配置服务端的Handler


p.addLast(new ChannelInitializer() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

2.4? 添加连接器,服务端的pipeLine都会默认有一个ServerBootstrapAcceptor,主要目的就是为新连接分配NIO的一个线程。


ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});



由initAndRegister()中的register(channel)进入:?


ChannelFuture regFuture = config().group().register(channel);

3.1? 先调用了MultithreadEventLoopGroup的register方法,利用chooser.next()方法返回EventLoop


3.2? 调用SingleThreadEventLoop的register方法,传入一个DefaultChannelPromise,绑定了EventLoop和channel


3.3? 实际调用的为AbstractChannel的register方法:


? ? ? 1) AbstractChannel.this.eventLoop = eventLoop绑定线程


? ? ? 2) register0()实际注册:


? ? ? ? ? ? 1.? doRegister()NIO底层的事件,并绑定attachment为自身


selectionKey = javaChannel().register(eventLoop().selector, 0, this);

? ? ? ? 2.主要做一些事件的回调,服务端pipeline,触发事件的回调。


// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();

? ? ?3)pipeline.fireChannelRegistered();把channel注册成功的事件传播到用户的代码里去。


?


4.服务端口的绑定


入口方法为AbstractBootstrap的doBind(final SocketAddress localAddress)方法:


4.1?调用?AbstractBootstrap的doBind0(regFuture, channel, localAddress, promise)方法


最终调用的为NioServerSocketChannel的doBind方法,即Java底层的blind方法,此处的javaChannel就是之前创建的Channel。


调用堆栈


doBind:126, NioServerSocketChannel (io.netty.channel.socket.nio)
bind:554, AbstractChannel$AbstractUnsafe (io.netty.channel)
bind:1258, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeBind:512, AbstractChannelHandlerContext (io.netty.channel)
bind:497, AbstractChannelHandlerContext (io.netty.channel)
bind:980, DefaultChannelPipeline (io.netty.channel)
bind:250, AbstractChannel (io.netty.channel)

源码:


@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

4.2端口绑定成功之后,会调用pipeline.fireChannelActive()方法。


if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

调用堆栈为:


doBeginRead:412, AbstractNioChannel (io.netty.channel.nio)
doBeginRead:55, AbstractNioMessageChannel (io.netty.channel.nio)
beginRead:769, AbstractChannel$AbstractUnsafe (io.netty.channel)
read:1286, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeRead:704, AbstractChannelHandlerContext (io.netty.channel)
read:684, AbstractChannelHandlerContext (io.netty.channel)
read:1011, DefaultChannelPipeline (io.netty.channel)
read:280, AbstractChannel (io.netty.channel)
readIfIsAutoRead:1346, DefaultChannelPipeline$HeadContext (io.netty.channel)
channelActive:1324, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeChannelActive:224, AbstractChannelHandlerContext (io.netty.channel)
invokeChannelActive:210, AbstractChannelHandlerContext (io.netty.channel)
fireChannelActive:902, DefaultChannelPipeline (io.netty.channel)
run:565, AbstractChannel$AbstractUnsafe$2 (io.netty.channel)

可以看到最终调用的方法为AbstractNioChannel的doBeginRead方法:


@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

? ? 可以看到为通道注册了readInterestOp事件。在NIOServerSocketChannel初始化时可以看到readInterestOp就是SelectionKey.OP_ACCEPT事件。


public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}

至此,服务端程序启动完成。


?


?

相关文档

  • 地震防范措施
  • 微信登录失败代码10003
  • 白骨精简介
  • 江苏南京视觉艺术学院2018年6月英语六级报名时间及报名条件_南京视觉艺术学院 倪妮
  • 同业银行跳槽面试自我介绍
  • 二年级春节手抄报
  • 网站关键词选取技巧:正确简练并具代表性
  • 形容雨大的句子汇总
  • 孩子喜欢听的童话故事
  • 广东政法干警图形推理习题及答案
  • 汽车配件买卖通用合同
  • 加拿大留学签证的三大影响因素
  • 成语鬼哭神愁的意思
  • DRF上传图片,返回 url 自动拼接当前的路径,变为绝对路径,如何改为相对路径
  • 高中数学老师个人工作总结(精选4篇)
  • 记忆力严重减退要咋办
  • 中国改革开放是以什么为指导思想
  • 2013国庆节如何顺利乘飞机出行呢?
  • Java静态代码块和静态方法
  • 龙行虎步的反义词
  • 2017年双拥工作总结 2008年双拥工作总结
  • 棋牌室转让合同范本
  • 我是如何从月薪4500努力到年薪30W-中篇
  • Linux traceroute带源地址路由跟踪
  • 婴儿肥怎样快速瘦脸6种方法瘦脸告别婴儿肥
  • 怎么判断笔记本硬盘坏了
  • 怎么给电脑换壁纸
  • 如何将本地代码使用Git上传至Github
  • 创维电视怎么外接音响
  • 形容潇洒的诗句56句
  • 猜你喜欢

  • 关于描写蜜蜂的初中作文推荐
  • 【学*实践】再塑生命的人导学案
  • 室内装饰设计图集_室内装饰设计色彩资料图集
  • 七夕相亲活动方案4
  • 规范标准版资料目录
  • 深圳市办公出租合同
  • 会计大学生新学期个人计划
  • 【5套打包】南京市小学一年级语文下期末考试测试题(含答案)
  • 街道2019年工作誓师大会发言稿
  • 韶关市二代社保卡数据采集、发行服务和社会保障卡管理系统
  • 微裂纹、微孔镀铬添加剂使用方法
  • 小小螺丝钉 小小螺丝钉毁掉大飞机
  • 新生儿缺氧缺血性脑病120例血清电解质水*观察
  • 黑龙江省为农村劳动力转移“开绿灯”
  • 2015国家公务员考试申论热点:规范退休官员兼职任职
  • 日常个人礼仪知识
  • 《春天来了》走进大自然 优秀PPT课件3
  • 2019届高考英语一轮复*第一部分教材课文要点Module5ALessoninaLab语篇解题微技巧
  • 三年级写人作文_守护甜心之冰雪姐妹(一)_550字
  • 最新XX年新护士工作总结
  • 黄山市新先不锈钢材料制品有限公司(企业信用报告)- 天眼查
  • 18春北语《证券投资与管理》作业4
  • 东莞市高?永丰毛织厂企业信用报告-天眼查
  • 华 南 师 范 大 学 - 中山大学公共卫生学院网站
  • 读杨红樱的《亲爱的笨笨猪》有感500字
  • word文档常见问题及解决方法
  • 关于销售人员辞职报告模板
  • 四川省医药物资总公司惠仁堂大药房企业信用报告-天眼查
  • 推动工会组织作用全面促进人才培养
  • 2016北京市大兴区高三(一模)化 学
  • make ARCH=arm64 CROSS_COMPILE=aarch64-linux-android- savedefconfig
  • 摆脱健忘的有效方法
  • 八年级生物保护生物的多样性
  • 员工请假公出单2
  • (2019-2020)【知识点】七年级语文下学期期末试题 新人教版(1)【必备资料】
  • 关节镜下微创经皮钢板固定治疗胫骨平台Ⅰ~Ⅲ型骨折患者的疗效
  • 林口县金鼎矿业开发有限公司企业信用报告-天眼查
  • 2018年离职原因调查报告-优秀word范文 (2页)
  • 七一建党节手抄报简单小学
  • 平头琴.木吉它的变迁
  • 2017-2018学年山东省泰安市宁阳一中高二上学期期中考试英语试题 Word版无答案
  • 四川长城房地产开发有限公司(企业信用报告)- 天眼查
  • 电脑版