Netty组件

2023/6/5 21:26:10

Netty组件

EventLoop

事件循环对象

EventLoop本质是一个单线程执行器(同时维护了一个Selector,里面有run方法处理Channel上源源不断的io事件

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

事件循环组

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

示例

 		// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup(2);
        EventLoop event = eventLoopGroup.next();//获取EventLoop对象,当遍历到尾节点后又从头开始
        System.out.println(event);
        EventLoop event2 = eventLoopGroup.next();
        System.out.println(event2);
        EventLoop event3 = eventLoopGroup.next();
        System.out.println(event3);

输出

io.netty.channel.nio.NioEventLoop@282ba1e
io.netty.channel.nio.NioEventLoop@13b6d03
io.netty.channel.nio.NioEventLoop@282ba1e

处理普通与定时任务

普通任务

 EventLoop event = eventLoopGroup.next();//获取EventLoop对象,当遍历到尾节点后又从头开始
        //执行普通任务
        event.submit(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("run task ...");
        });
        event.shutdownGracefully();

定时任务

        event.scheduleAtFixedRate(()->{
            System.out.println("run schedule task ...");
        },0,1,TimeUnit.SECONDS);

        System.out.println("main ...");

💡关闭 EventLoopGroup

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

处理IO任务

服务器端代码

public class Server {
    public static void main(String[] args) {
        new ServerBootstrap().
                group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(buf.toString(StandardCharsets.UTF_8) + "=>"+Thread.currentThread().getName());
                            }
                        });
                    }
                })
                .bind(8888);
    }
}

客户端代码

public class Client {
    public static void main(String[] args) throws InterruptedException, IOException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                }).connect(new InetSocketAddress("localhost", 8888))
                .sync()
                .channel();
        channel.writeAndFlush("haha");
    }
}

Channel会与EventLoop进行绑定
在这里插入图片描述

分工与增加自定义EventLoopGroup

Bootstrap的group()方法可以传入两个EventLoopGroup参数,参数1为BossEventGroup,参数2为WorkerEventGroup,单个参数的方法会为Boss与Worker创建相同的EventGroup

  public ServerBootstrap group(EventLoopGroup group) {  //单参方法
        return this.group(group, group);
    }

  public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { //双参方法
        super.group(parentGroup);
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        } else {
            this.childGroup = (EventLoopGroup)ObjectUtil.checkNotNull(childGroup, "childGroup");
            return this;
        }
    }

当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理

服务器端代码

public class Server {
    public static void main(String[] args) {
        EventLoopGroup group=new DefaultEventLoopGroup();//处理耗时任务,防止阻塞worker thread
        new ServerBootstrap()
                //arg1: BossEventLoopGroup                 arg2:WorkerEventGroup
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                ctx.fireChannelRead(msg); //传递msg给下一个handler
                            }
                        }).addLast(group,"handlerA",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8888);
    }
}

客户端代码

public class Client {
    public static void main(String[] args) throws InterruptedException, IOException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                }).connect(new InetSocketAddress("localhost", 8888))
                .sync()
                .channel();
        channel.writeAndFlush("hello");
    }
}

输出

defaultEventLoopGroup-2-3 1
nioEventLoopGroup-4-1 1
defaultEventLoopGroup-2-3 1
nioEventLoopGroup-4-2 2
defaultEventLoopGroup-2-4 2
nioEventLoopGroup-4-2 2
defaultEventLoopGroup-2-4 2
nioEventLoopGroup-4-1 3
defaultEventLoopGroup-2-5 3

可以看到,nio 工人和 非 nio 工人也分别绑定了 channel
在这里插入图片描述

💡 Handler执行中如何换人?

源码:io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 获得下一个EventLoop, excutor即为 EventLoopGroup
    EventExecutor executor = next.executor();
    
    // 如果下一个EventLoop 在当前的 EventLoopGroup中
    if (executor.inEventLoop()) {
        // 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
        next.invokeChannelRead(m);
    } else {
        // 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用

Channel

channel的主要作用

  • close():可以关闭channel
  • closeFuture():获取一个用来可以用来处理channel关闭的Future对象
    • sync方法的作用是同步等待channel关闭
    • addListener方法是用于异步等待channel关闭
  • pipline()方法添加处理器
  • wirte()方法将数据写入
  • writeAndFlush()方法将数据写入并刷出

ChannelFuture

public class Client {
    public static void main(String[] args) throws InterruptedException, IOException {
        ChannelFuture future = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                }).connect(new InetSocketAddress("localhost", 8888));
                //.sync()  //不使用sync方法则返回未连接成功的channel对象[id: 0x4218fad1]
                future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        Channel channel = channelFuture.channel();
                        System.out.println(channel); //返回已连接的channel [id: 0x41bbc250, L:/127.0.0.1:57536 - R:localhost/127.0.0.1:8888]
                        channel.writeAndFlush("async ...");
                    }
                });
    }
}
  • 调用connect方法后可以返回一个 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
  • connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象,需要调用sync方法进行同步阻塞或着调用addListener方法添加异步回调

CloseFuture

@Slf4j
public class ClientClose {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8888));
        Channel channel = channelFuture.sync()
                .channel();
        log.info("connect success! " + channel);
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String msg = scanner.nextLine();
                if ("q".equals(msg)) {
                    //close channel
                    channel.close(); // async
                    break;
                }
                log.info("send msg:" + msg);
                channel.writeAndFlush(msg);
            }
        }, "myThread").start();
        ChannelFuture closedFuture = channel.closeFuture();
//        closedFuture.sync();
        closedFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.info("channel closed!....");
                group.shutdownGracefully();
            }
        });
    }
}
  • 因为channel.close方法为异步方法,所以对于channel关闭后的操作需要针对性处理
  • 调用channel.closeFuture()方法会返回一个ChannelFuture对象
    • 可以通过sync进行阻塞,直到future执行完成
    • 可以通过addListener添加异步回调

http://www.jnnr.cn/a/369274.html

相关文章

R 语言基础

R 语言基础 一门新的语言学习一般是从输出 “Hello, World!” 程序开始&#xff0c;R 语言的 “Hello, World!” 程序代码如下&#xff1a; ## 实例&#xff08;helloworld.R&#xff09;myString <- "Hello, World!"print ( myString )以上实例将字符串 “Hell…

SQL注入进阶练习(二)常见绕过手段、防御的解决方案

常见绕过手段、防御的解决方案1.常用SQL注入绕过手段1.1 注释符绕过1.2 大小写绕过1.3 内联注释绕过1.4 双写关键字绕过1.5 特殊编码绕过1.6 空格过滤绕过1.7 过滤 or and xor (异或) not 绕过1.8 过滤等号绕过1.9 过滤大小于号绕过1.10 过滤引号绕过1.11 过滤逗号绕过1.12 过滤…

★LDO相关

1.型号 TPS79501 TPS79301 2.PSRR值&#xff0c;频率 TPS795_50dB&#xff0c;10kHz TPS793_70dB&#xff0c;10kHz 电源抑制比&#xff1a;供电电压纹波对输出电压影响&#xff0c;值越高越好&#xff08;某个频段的AC从输入到输出的衰减程度&#xff0c;衰减越高&#x…

提升集群吞吐量与稳定性的秘诀: Dubbo 自适应负载均衡与限流策略实现解析

作者&#xff1a;刘泉禄 整体介绍 本文所说的“柔性服务”主要是指 consumer 端的负载均衡和 provider 端的限流两个功能。在之前的 Dubbo 版本中&#xff0c;负载均衡部分更多的考虑的是公平性原则&#xff0c;即 consumer 端尽可能平等的从 provider 中作出选择&#xff0c;…

笔记本电脑自带录屏在哪?一步教您找到

案例&#xff1a;怎么找到笔记本电脑上的自带录屏功能&#xff1f; “从网上了解到笔记本电脑有自带的录屏功能&#xff0c;但我不知道笔记本自带的录屏叫什么名字&#xff0c;也不知道笔记本自带录屏在哪。有没有小伙伴知道&#xff1f;” 随着科技的不断进步&#xff0c;越…

【面试题】简单的说说对原型链的了解

大厂面试题分享 面试题库 前后端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 web前端面试题库 VS java后端面试题库大全 前言 作为Javascript的基础之一&#xff0c;原型一直贯穿我们的JS代码并且成为面试的常考…

基于Python长时间序列遥感数据处理及在全球变化、物候提取、植被变绿与固碳分析、生物量估算与趋势分析等领域中的应用

植被是陆地生态系统中最重要的组分之一&#xff0c;也是对气候变化最敏感的组分&#xff0c;其在全球变化过程中起着重要作用&#xff0c;能够指示自然环境中的大气、水、土壤等成分的变化&#xff0c;其年际和季节性变化可以作为地球气候变化的重要指标。此外&#xff0c;由于…

eclipse上的Java静态分析工具

相比动态测试而言。静态分析效率高&#xff0c;成本较低&#xff0c;对于提高产品质量非常重要。 下面介绍几个elcipse上的静态分析插件 1. findugs a) 安装findbugs插件 1&#xff09;点击菜单 Help ->Eclipse Marketplace 在弹出窗口中的搜索条件中输入 ”findbugs“后…

[素数筛][容斥原理]:埃拉托斯特尼筛法

求解问题&#xff1a;不超过一个给定正整数N的素数的个数 方法介绍&#xff1a; 根据合数的性质&#xff1a;一个合数可以被一个不超过它的平方根的素数整除 这里举例N100&#xff1a; 介绍&#xff1a;为了找出不超过100的素数个数&#xff0c;首先根据合数的性质可以知道…

Docker详解,windows上安装与使用

Hi I’m Shendi Docker详解&#xff0c;windows上安装与使用 Docker详解 Docker 容器是一个开源的应用容器引擎&#xff0c;让开发者可以以统一的方式打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到任何安装了docker引擎的服务器上&#xff08;包括流行的…

面试时被问:为什么裁员只裁你,不裁别人,该怎么回答?

面试官总有各种奇奇怪怪的问题&#xff0c;比如这个&#xff1a;为什么裁员裁了你&#xff0c;而不是裁别人&#xff1f;这个充满恶意的问题该怎么回答&#xff1f;网友给出了各种各样的答案&#xff0c;有人说&#xff0c;就说行业动荡&#xff0c;不稳定。有人说&#xff0c;…

Golang每日一练(leetDay0023)

目录 67. 二进制求和 Add Binary &#x1f31f; 68. 文本左右对齐 Text Justification &#x1f31f;&#x1f31f;&#x1f31f; 69. x 的平方根 Sqrt x &#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/C…

vscode软件设置头文件路径的方法

一. 设置头文件路径原因 在使用 vscode 软件进行 C 开发过程中&#xff0c;有些 .c 文件引用的头文件&#xff0c;提示会找不到头文件路径。因此&#xff0c;vscode 软件需要设置头文件路径。 二. vscode设置头文件路径 在 vscode 软件打开的情况下&#xff0c;默认打开一…

UNIX环境高级编程——UNIX基础知识

1.1 引言 所有操作系统都为它们所运行的程序提供服务&#xff0c;典型的服务包括&#xff1a; 执行新程序打开文件读文件分配存储区获得当前时间… 1.2 UNIX体系结构 可将操作系统定义为一种软件&#xff0c;它控制计算机硬件资源&#xff0c;提供程序运行环境&#xff0c;…

JAVA打飞机游戏的设计与实现

手机软件现状 在信息社会中&#xff0c;手机及其他无线设备越来越多的走进普通百姓的工作和生活&#xff0c;随着信息网络化的不断进展&#xff0c;手机及其他无线设备上网络势在必行。但是传统手机存在以下弊端&#xff1a; 1. 传统手机出厂时均由硬件厂商固化程序&#xf…

Spring Boot使用GraphQL开发Web API

目录前言Spring Boot中GraphQL的实现方案前言 传统的Restful API 存在诸多的问题&#xff0c;首先它无法控制返回的字段&#xff0c;前端也无法预判后端的返回结果&#xff0c;另外不同的返回结果对应不同的请求地址&#xff0c;这就导致了多次请求的问题。而GraphQL正是基于这…

9.网络爬虫—MySQL基础

网络爬虫—MySQL基础MySQL安装教程MySQL登录Mysql数据库操作显示数据库创建数据库删除数据库查询数据库使用数据库Mysql数据类型Mysql数据表创建Mysql增删查改PyMysql安装Python的MySQL库连接数据库增添字段操作游标PyMysql插入PyMysql查询PyMysql更新PyMysql删除前言&#xff…

生成式 AI 背后的共同框架:Stable Diffusion、DALL-E、Imagen

前言 如果你对这篇文章感兴趣&#xff0c;可以点击「【访客必读 - 指引页】一文囊括主页内所有高质量博客」&#xff0c;查看完整博客分类与对应链接。 框架 这些生成式 AI 的整体功能为&#xff1a;输入「文字」&#xff0c;返回「图像」&#xff0c;即 Text-to-image Gener…

计算机发展史之阿达·洛芙莱斯

你一定想不到世界上最早的程序员竟然是一位女士&#xff0c;而且还有专门的编程语言为了纪念她而命名&#xff0c;她就是阿达洛芙莱斯&#xff08;Ada Lovelace&#xff09; 奥古斯塔阿达拜伦是她的原名&#xff0c;因为嫁给威廉金后晋封为洛芙莱斯伯爵&#xff0c;而后改的名字…

R -- 卡方检验--原理及应用

1.单样本方差同质性检验 2.适合性/拟合优度/吻合性检验 或者公式书写如下&#xff1a; 图片来源&#xff1a;https://www.bilibili.com/opus/730576389651038260?fromsearch&spm_id_from333.337.0.0 例题 3.独立性检验 如何理根据列联表推算论值 E11 sum(Row1) * sum(…