springBoot + netty搭建高性能 websocket 服务 & 性能测试(包含python 测试脚本)

一、前言

1、如果我们的app类似于股票这种,数据很多很快,之前用的tomcat自带的websocket 又或者 spring-boot-starter-websocke集成,但是性能在数据并发很大时就会存在问题。

2、我前面写的一篇关于 springBoot+webosket的,没有使用***ty的文章 springBoot使用webSocket的几种方式以及在高并发出现的问题及解决 ,其中就包含了 以下者两种方式,都有说明,大家如果量不大,下面这两种方式也是可以的。

  • tomcat自带的websocket
  • spring-boot-starter-websocke集成

二、使用***ty 完成 webSocket

1、如何使用 ,可以参考 ***ty + webSocket + SpringBott 是参考文章 SpringBoot整合***ty处理WebSocket(支持url参数) 这篇文章是,说的已经很ok了

2、但是上面那篇文章还是有所不足,因为我需要加上token认证,只有认证了,才可以建立链接,上面那篇的文章,只是获取参数,在认证方面,还是有所不足,满足不了这个条件。后续我可以把我的方式,写一篇文章放出来

2.1、RequestUriUtils 的 getBasePath 方法

2、比如你的链接是 ws://192.168.172.139:1234/ws/id=1,使用它文章中的获取后得到 /ws/,建议改成如下,获取之后是 /ws

    /**
     * 获取URI中参数以外部分路径
     *
     * @param uri
     * @return
     */
    public static String getBasePath(String uriStr) {
        String pathWithSlash ="";
        try {
            // 使用URI解析URL字符串
            URI uri = new URI(uriStr);
            // 获取路径部分
            pathWithSlash = uri.getPath();
            // 去掉末尾的斜杠
            return pathWithSlash.replaceAll("/$", "");
        } catch (URISyntaxException e) {
            log.error("解析path错误", e);
        }
        return pathWithSlash;
    }

2.2、WebSocketChannelInitializer 中的 ChannelPipeline 说明

在WebSocket服务器的构建中添加.addLast(new HttpServerCodec())的主要原因是WebSocket握手是基于HTTP协议的,WebSocket连接的建立需要经过以下步骤:

  1. 客户端向服务器发送一个HTTP请求,请求升级到WebSocket协议。
  2. 服务器收到这个请求后,需要进行协议升级处理,将HTTP协议切换到WebSocket协议。
  3. 一旦升级成功,WebSocket连接建立,客户端和服务器之间可以通过WebSocket协议进行双向通信。

因此,WebSocket握手的开始阶段仍然是HTTP请求和响应。为了处理这个初始的HTTP请求,需要在***ty的ChannelPipeline中添加.addLast(new HttpServerCodec()),以确保能够解析和处理这个HTTP请求,并在需要时将其升级为WebSocket连接。简而言之,.addLast(new HttpServerCodec())的作用是为了使WebSocket服务器能够正确地处理WebSocket握手之前的HTTP请求和响应,确保WebSocket连接能够成功建立。一旦WebSocket连接建立,就可以通过WebSocket协议进行实时双向通信。

这是WebSocket服务器构建中的一个标准操作。websocket协议本身是基于http协议的,所以这边也要使用http解编码器

2.3、addLast(new ChunkedWriteHandler())

.addLast(new ChunkedWriteHandler()) 是 ***ty 中的一个 ChannelHandler,它的主要作用是支持异步写大数据流(例如文件传输)。

在某些情况下,你可能需要向客户端发送大量的数据,例如文件的内容,而不是一次性将整个数据写入缓冲区,因为这可能会导致内存占用过高。相反,你可以将数据分成小块(chunk)并逐块写入客户端,以避免内存问题。

ChunkedWriteHandler 的作用如下:

  1. 支持大数据流的异步写入: 它允许你将数据切割成小块并异步地将这些块写入客户端。这对于传输大型文件或大量数据非常有用,因为它可以避免将整个数据加载到内存中。
  2. 维护写入顺序: 它确保数据块按照它们添加到 Channel 的顺序进行写入。这有助于保持数据的有序性。
  3. 提高性能: 通过异步写入数据块,ChunkedWriteHandler 可以提高网络性能,因为它不会阻塞线程等待数据传输完成。

这个处理器通常与其他处理器一起使用,以完成完整的数据传输过程。例如,如果你要实现文件传输,通常会使用 ChunkedWriteHandler 将文件数据切割成小块,然后使用其他处理器来处理文件的传输,例如文件块的编码和解码。
总之,.addLast(new ChunkedWriteHandler()) 的作用是支持异步写大数据流,以提高性能并降低内存使用,尤其在需要传输大量数据时非常有用。

2.4、addLast(new HttpObjectAggregator(1024 * 64))

将HttpMessage和HttpContents聚合到一个完成的 FullHttpRequest或FullHttpResponse中,具体是FullHttpRequest对象还是FullHttpResponse对象取决于是请求还是响应

.addLast(new HttpObjectAggregator(1024 * 64)) 是 ***ty 中的一个 ChannelHandler,主要用于将HTTP请求或响应的多个部分聚合成一个完整的HTTP消息。这对于处理HTTP消息非常有用,特别是当你需要处理大量的HTTP数据时。

以下是.addLast(new HttpObjectAggregator(1024 * 64))的主要作用:

  1. 消息聚合: 在HTTP通信中,请求或响应可能会分成多个部分(例如,HTTP请求头和HTTP请求体)。HttpObjectAggregator 负责将这些部分聚合成一个完整的FullHttpRequestFullHttpResponse,以便更容易处理和操作。
  2. 内存管理: 这个处理器还具有内存管理功能。你可以在构造函数中指定一个最大的聚合字节数(在示例中是64 KB)。如果接收到的HTTP数据超过了这个大小,HttpObjectAggregator 将抛出异常以防止内存泄漏。
  3. 简化HTTP消息处理: 聚合HTTP消息使得你可以更容易地处理完整的HTTP请求和响应,而不必手动处理每个部分。这对于构建Web服务器或HTTP代理非常有用。

示例使用:

pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器
pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
pipeline.addLast(new MyHttpRequestHandler()); // 自定义的HTTP请求处理器

在上面的示例中,首先使用 HttpServerCodec 添加了HTTP编解码器,然后使用 HttpObjectAggregator 聚合HTTP消息,最后添加了一个自定义的HTTP请求处理器。
总之,.addLast(new HttpObjectAggregator(1024 * 64)) 的作用是将HTTP请求或响应的多个部分聚合成一个完整的HTTP消息,以简化和改善处理HTTP消息的流程,并提供内存管理功能。这在构建支持HTTP的应用程序中非常有用。

2.5、addLast(new WebSocketServer***pressionHandler())

webSocket 数据压缩扩展,当添加这个的时候WebSocketServerProtocolHandler的第三个参数需要设置成true
.addLast(new WebSocketServer***pressionHandler()) 是 ***ty 中的一个 ChannelHandler,用于支持 WebSocket 消息的压缩和解压缩。WebSocket 消息压缩可以减小消息的大小,提高网络传输效率,尤其在低带宽环境下非常有用。

以下是 .addLast(new WebSocketServer***pressionHandler()) 的主要作用:

  1. WebSocket 消息压缩: 当客户端和服务器之间通过 WebSocket 协议传输大量数据时,可以使用压缩技术将消息压缩为更小的尺寸,以减少网络带宽的使用。WebSocketServer***pressionHandler 负责处理消息的压缩。
  2. WebSocket 消息解压缩: 对于接收到的已压缩的 WebSocket 消息,服务器需要将其解压缩以获取原始消息。WebSocketServer***pressionHandler 也负责解压缩已压缩的消息。
  3. 支持多种压缩算法: WebSocketServer***pressionHandler 支持多种压缩算法,包括通常的 DEFLATE 和 GZIP 压缩算法,以及自定义的压缩算法。

在WebSocket应用程序中,通常需要在WebSocket连接建立时协商是否启用压缩,以及使用哪种压缩算法。如果客户端和服务器都支持压缩,那么它们可以在消息传输过程中启用压缩。

要使用 .addLast(new WebSocketServer***pressionHandler()),你需要在 WebSocket 服务器的处理管道中添加该处理器。例如:

pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器
pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
pipeline.addLast(new WebSocketServer***pressionHandler()); // 添加WebSocket消息压缩处理器
pipeline.addLast(new MyWebSocketHandler()); // 自定义的WebSocket处理器

在上面的示例中,首先使用 HttpServerCodec 添加了HTTP编解码器,然后使用 HttpObjectAggregator 聚合HTTP消息,接下来添加了 WebSocketServer***pressionHandler 以支持WebSocket消息压缩,最后添加了一个自定义的WebSocket处理器。

总之,.addLast(new WebSocketServer***pressionHandler()) 的作用是为WebSocket服务器添加消息压缩和解压缩的功能,以减小消息大小并提高网络传输效率。这在需要传输大量数据的WebSocket应用中非常有用。

2.6、.addLast(new MyWebSocketHandler())

自定义处理器 - 处理 web socket 消息(消息的父类是WebSocketFrame,旗下有很多子类,比如BinaryWebSocketFrame TextWebSocketFrame 等等)

如果你使用的是 父类是WebSocketFrame,则需要在其内部,判断是什么类型的数据,如果你使用的具体的子类,那么只有具体的消息类型会到哪里

2.7、 .addLast(new WebSocketServerProtocolHandler(WebSocketProperties.path, null, true, 10485760));

服务器端向外暴露的 web socket 端点,当客户端传递比较大的对象时,maxFrameSize参数的值需要调大

WebSocketServerProtocolHandler 是 ***ty 中的一个关键组件,用于处理 WebSocket 握手和协议升级,以及管理 WebSocket 连接的生命周期。它的主要作用如下:

  1. WebSocket 握手处理: 当客户端通过 HTTP 请求发起 WebSocket 握手时,WebSocketServerProtocolHandler 负责识别并处理这些握手请求。它可以检查HTTP请求中的升级标头和协议头,以确定是否需要升级到 WebSocket 协议。
  2. WebSocket 握手协议升级: 如果客户端发送了符合 WebSocket 握手规范的请求,WebSocketServerProtocolHandler 会处理协议升级,将连接从 HTTP 协议切换到 WebSocket 协议。这个过程包括升级响应的构建和升级握手的处理。
  3. WebSocket 生命周期管理: 一旦 WebSocket 握手成功,WebSocketServerProtocolHandler 管理 WebSocket 连接的生命周期。它会处理连接的打开、关闭、异常和消息传递等事件。
  4. Ping/Pong 处理: WebSocket 协议支持 Ping 和 Pong 消息,用于保持连接的活动状态。WebSocketServerProtocolHandler 会自动处理这些心跳消息,以确保连接保持活动状态。

以下是一个示例,展示了如何在 ***ty 中使用 WebSocketServerProtocolHandler

pipeline.addLast(new HttpServerCodec()); // 添加HTTP编解码器
pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 聚合HTTP消息,最大64KB
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket")); // 添加WebSocket握手处理器
pipeline.addLast(new MyWebSocketHandler()); // 自定义的WebSocket处理器

在上面的示例中,WebSocketServerProtocolHandler 被添加到处理管道中,并指定了 WebSocket 的路径(在示例中是"/websocket")。一旦握手成功,连接将切换到 WebSocket 协议,并且可以在 MyWebSocketHandler 中处理 WebSocket 消息。

总之,WebSocketServerProtocolHandler 是用于处理 WebSocket 握手和协议升级的关键组件,它使得在 ***ty 中创建 WebSocket 服务器变得更加容易。

三、Web Socket 性能对比——Spring Boot vs Tomcat vs ***ty

参考文章 Web Socket 性能对比——Spring Boot vs Tomcat vs ***ty 说的很ok了。

四、使用四种框架分别实现百万websocket常连接的服务器(写的很好,必看)

1、文章包含了一些线上的参数调整,都是干活
原文地址: https://colobu.***/2015/05/22/implement-C1000K-servers-by-spray-***ty-undertow-and-node-js/

五、七种WebSocket框架的性能比较

原文地址: https://colobu.***/2015/07/14/performance-***parison-of-7-websocket-frameworks/

六、使用python 脚本测试

1、主要测试两部分

  • 大量客户端同时在线,查看性能,内存消耗问题
  • 大量客户端同时在线,数据发送效率

2、python 安装这里就不再说了

3、本文的 第四节 和第五节 请务必了解,需要修改对应的 服务器 tcp链接数等等参数。

4、我的webSocket 链接格式是 ws://192.168.172.226:7081/ws/token 最后的那个token,用于线上的认证,只有认证了的用户,才会建立通道,这里为了方便测试,直接用数值代替,如下,这样,就代表用户id好了,毕竟后续我要是测试50w个客户端,总不能先生成50w个token吧。

  • ws://192.168.172.226:7081/ws/1
  • ws://192.168.172.226:7081/ws/2

6.1、python 脚本

1、脚本内容

import threading
import time
import websocket

# 定义带有顺序编号的 WebSocket URL
url_base = "ws://192.168.172.226:7081/ws/"
num_connections = 10000  # 要模拟的连接数
running_connections = 0  # 跟踪当前正在运行的连接数

# 创建线程本地存储对象来存储每个线程的文件名
local = threading.local()


# 建立 WebSocket 连接的函数
def connect_websocket():
    global running_connections
    try:
        # 使用顺序编号生成 URL
        url = url_base + str(running_connections)

        # 为当前线程创建文件名
        local.filename = f"{running_connections}.txt"

        while True:
            # 创建 WebSocket 连接
            ws = websocket.create_connection(url)

            while True:
                # 接收来自服务端的消息
                message = ws.recv()

                # 保存消息到文件
                with open(local.filename, "a") as file:
                    file.write(message + "\n")

    except Exception as e:
        print(f"WebSocket 连接失败: {e}")

    running_connections -= 1


# 开始模拟 WebSocket 连接
while running_connections < num_connections:
    t = threading.Thread(target=connect_websocket)
    t.start()
    running_connections += 1

# 等待所有连接完成
while running_connections > 0:
    time.sleep(1)

print("所有 WebSocket 连接完成。")

2、运行

# 安装  websocket-client
pip install websocket-client

# 运行test.py 文件
python test.py

3、说明
脚本作用是生成指定 num_connections 的webSocket 连接数,并一直监听服务端返回的消息,如果服务端有消息就会保存到对应链接的文件夹下面,包含其服务端返回的内容。

6.2、***ty 服务端

1、具体的链接的代码我这里就不说了

2、主要需要写两个接口,一个接口是向所有在线的客户端发送一条消息,另一个接口是向所有在线的客户端发送指定数量mockCount的消息

package ***.jt.thermalapi.***mon.controller;

import ***.jt.thermalapi.response.Response;
import ***.jt.thermalapi.websocket.session.SessionFactory;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年10月13日
 */
@ApiIgnore
@Api(tags = "测试api")
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @GetMapping("mockOne")
    public Response<String> mockOne() {
        AtomicInteger count = new AtomicInteger(0);
        SessionFactory.getSession().broadcast(count.getAndIncrement() + "");
        return Response.buildSu***ess();
    }

    @GetMapping("mockMany/{mockCount}")
    public Response<String> mockMany(@PathVariable("mockCount") int mockCount) {
        AtomicInteger count = new AtomicInteger(0);

        while (count.getAndIncrement() <= mockCount) {
            SessionFactory.getSession().broadcast(count.getAndIncrement() + "");
        }

        return Response.buildSu***ess();
    }
}


6.3、演示

1、启动你的***ty 服务端

2、启动测试脚本

python test.py

3、服务端日志输出,我测试出来,1w 链接大约30s左右,看自己机器吧,我这还是在idea里面跑的。

4、请求test/mockOne接口,大于1s

5、在 test.py文件下,生成了对应的1w客户端的文件,其内容就是服务端发送的。

5、请求test/mockMany/100接口,这个大家可以自己测试下,或者等我后续在服务器测试结束后,再把这篇文章整理,一下,因为本次测试都是在我本机上测试的,只是初步了解。但是脚本已经可以使用,后续大家测试服务器上面,步骤是一样的

转载请说明出处内容投诉
AJ教程_站长资源网 » springBoot + netty搭建高性能 websocket 服务 & 性能测试(包含python 测试脚本)

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买