Browse Source

代码的提交

hbjzws 2 years ago
parent
commit
f5e9428d9b

+ 49 - 0
common/src/main/java/com/jpsoft/education/netty/DtuServer.java

@@ -0,0 +1,49 @@
+package com.jpsoft.education.netty;
+
+import com.jpsoft.education.netty.hander.DtuServiceHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TODO
+ *
+ * @author linfeng
+ * @date 2022/12/26 15:57
+ */
+public class DtuServer implements Runnable {
+
+    public static void main(String[] args) {
+        new Thread(new DtuServer()).start();
+    }
+
+    @Override
+    public void run() {
+        EventLoopGroup bossGroup = new NioEventLoopGroup(2);
+        EventLoopGroup workGroup = new NioEventLoopGroup(10);
+        ServerBootstrap serverBootstrap = new ServerBootstrap();
+        serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+
+                    @Override
+                    protected void initChannel(SocketChannel ch) throws Exception {
+                        ch.pipeline().addLast(new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
+                        ch.pipeline().addLast(new DtuServiceHandler());
+                    }
+
+                });
+        try {
+            ChannelFuture channelFuture = serverBootstrap.bind(9005).sync();
+            channelFuture.channel().closeFuture().sync();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 14 - 0
common/src/main/java/com/jpsoft/education/netty/Main2.java

@@ -0,0 +1,14 @@
+package com.jpsoft.education.netty;
+
+import java.net.InetSocketAddress;
+
+public class Main2 {
+    public static void main(String[] args) throws Exception {
+       // InetSocketAddress address = new InetSocketAddress("1","2");
+        //InetSocketAddress address = new InetSocketAddress(socketProperties.getHost(),socketProperties.getPort());
+        InetSocketAddress address = new InetSocketAddress("127.0.0.1",23456);
+
+        new NettyServer().start(address);
+    }
+
+}

+ 9 - 0
common/src/main/java/com/jpsoft/education/netty/MainClient.java

@@ -0,0 +1,9 @@
+package com.jpsoft.education.netty;
+
+import com.jpsoft.education.netty.util.DtuManage;
+
+public class MainClient {
+    public static void main(String[] args) {
+        new DtuManage().sendMsg();
+    }
+}

+ 55 - 0
common/src/main/java/com/jpsoft/education/netty/ModeBusClient.java

@@ -0,0 +1,55 @@
+package com.jpsoft.education.netty;
+
+import com.jpsoft.education.netty.hander.TimeClientHandler;
+import com.jpsoft.education.netty.util.MyDecoder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * @author automannn@163.com
+ * @time 2018/11/26 18:38
+ */
+public class ModeBusClient {
+    private int port;
+    private String host;
+    public ModeBusClient(int port, String host){
+        this.port = port;
+        this.host = host;
+    }
+
+    public void execute() throws InterruptedException {
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+        try {
+            Bootstrap b = new Bootstrap();
+            b.group(workerGroup);
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.SO_KEEPALIVE,true);
+
+            b.handler(new ChannelInitializer<SocketChannel>() {
+                protected void initChannel(SocketChannel ch) throws Exception {
+                    System.out.println("正在连接服务端中...");
+                    ch.pipeline().addLast("decoder",new MyDecoder());
+                    ch.pipeline().addLast(new TimeClientHandler());
+                    ch.pipeline().addLast(new TimeClientHandler());
+                }
+            });
+
+            ChannelFuture f = b.connect(host,port).sync();
+
+            f.channel().closeFuture().sync();
+        }finally {
+            workerGroup.shutdownGracefully();
+        }
+    }
+
+    public static void main(String[] args) throws InterruptedException {
+        new ModeBusClient(8379,"127.0.0.1").execute();
+    }
+}

+ 58 - 0
common/src/main/java/com/jpsoft/education/netty/NettyServer.java

@@ -0,0 +1,58 @@
+package com.jpsoft.education.netty;
+
+import com.jpsoft.education.netty.hander.NettyServerChannelInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.net.InetSocketAddress;
+
+/**
+ * 功能描述: netty服务启动类
+ *
+ * @Author keLe
+ * @Date 2022/8/26
+ */
+@Slf4j
+@Component
+public class NettyServer {
+    public void start(InetSocketAddress address) {
+        //配置服务端的NIO线程组
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            // 绑定线程池,编码解码
+            //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
+            ServerBootstrap bootstrap = new ServerBootstrap()
+                    .group(bossGroup, workerGroup)
+                    // 指定Channel
+                    .channel(NioServerSocketChannel.class)
+                    //使用指定的端口设置套接字地址
+                    .localAddress(address)
+                    //使用自定义处理类
+                    .childHandler(new NettyServerChannelInitializer())
+                    //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
+                    .option(ChannelOption.SO_BACKLOG, 128)
+                    //保持长连接,2小时无数据激活心跳机制
+                    .childOption(ChannelOption.SO_KEEPALIVE, true)
+                    //将小的数据包包装成更大的帧进行传送,提高网络的负载
+                    .childOption(ChannelOption.TCP_NODELAY, true);
+            // 绑定端口,开始接收进来的连接
+            ChannelFuture future = bootstrap.bind(address).sync();
+            if (future.isSuccess()) {
+                log.info("netty服务器开始监听端口:{}",address.getPort());
+            }
+            //关闭channel和块,直到它被关闭
+            future.channel().closeFuture().sync();
+        } catch (Exception e) {
+            e.printStackTrace();
+            bossGroup.shutdownGracefully();
+            workerGroup.shutdownGracefully();
+        }
+    }
+}

+ 2 - 1
common/src/main/java/com/jpsoft/education/netty/Server.java

@@ -1,6 +1,7 @@
 package com.jpsoft.education.netty;
 
 import com.jpsoft.education.netty.hander.ServerHandler;
+import com.jpsoft.education.netty.hander.ServerModeBusHandler;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -28,7 +29,7 @@ public class Server {
                     .childHandler(new ChannelInitializer<SocketChannel>() { //配置具体的数据处理方式
                         @Override
                         protected void initChannel(SocketChannel socketChannel) throws Exception {
-                            socketChannel.pipeline().addLast(new ServerHandler());
+                            socketChannel.pipeline().addLast(new ServerModeBusHandler());
                         }
                     })
                     /**

+ 1 - 0
common/src/main/java/com/jpsoft/education/netty/TimeClient.java

@@ -33,6 +33,7 @@ public class TimeClient {
 
             b.handler(new ChannelInitializer<SocketChannel>() {
                 protected void initChannel(SocketChannel ch) throws Exception {
+                    System.out.println("正在连接服务端中...");
                     ch.pipeline().addLast(new TimeClientHandler());
                 }
             });

+ 103 - 0
common/src/main/java/com/jpsoft/education/netty/hander/DtuServiceHandler.java

@@ -0,0 +1,103 @@
+package com.jpsoft.education.netty.hander;
+
+import com.jpsoft.education.netty.util.ModBusUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * TODO
+ *
+ * @author linfeng
+ * @date 2022/12/26 15:58
+ */
+public class DtuServiceHandler extends ChannelInboundHandlerAdapter {
+
+    // 保存
+    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+    // 保存客户端channelId与注册包信息
+    private static Map<String,String> channelIdMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        Channel channel = ctx.channel();
+        ChannelId channelId = channel.id();
+        ByteBuf byteBuf = (ByteBuf) msg;
+        byte[] bytes = new byte[byteBuf.readableBytes()];
+        byteBuf.readBytes(bytes);
+        String str = ModBusUtils.bytes2HexString(bytes);
+        System.out.println(bytes.length);
+        if(bytes.length == 16){
+            // 注册包长度为16(我这边用到4G DTU(ZHC4013)注册包长度为16,如果有变化需要修改)
+            // 注册包为设备编号,设备编号是唯一的。
+            String registerPackage = "";
+            for(int i=0;i<bytes.length;i++) {
+                registerPackage=registerPackage+ModBusUtils.byteToASCLL(bytes[i]);
+            }
+            // channelId.asLongText()获取的是客户端全局唯一ID,根据channelId获取注册包信息,可以判断出数据是哪个节点上传的
+            channelIdMap.put(channelId.asLongText(),registerPackage);
+            System.out.println("注册包:"+registerPackage);
+        }else {
+            // 从channelIdMap获取注册包(设备编号)
+            String registerPackage = channelIdMap.get(channelId.asLongText());
+            // 获取到客户端注册包,可以根据注册包(设备编号)查询数据库唯一的设备信息。
+            // 数据就不做解析了,数据解析相关代码在上面分享的博客地址中。
+            // 后面就是业务逻辑和数据解析了。
+            /** 如果要服务端向客户端发送信息,可以利用客户端心跳机制发送数据(也可以写发送数据的线程),其中具体业务逻辑就需要利用数据库了。
+             具体思想:首先数据库保存发送状态和发送数据,利用channelIdMap查询注册包,可以通过注册包在数据库中查询相关数据,
+             根据发送状态判断是否需要发送数据。
+             Netty发送数据需要转ByteBuf,具体代码:channel.writeAndFlush(Unpooled.copiedBuffer(new byte[]{1, 2, 3})),
+             */
+        }
+
+        System.out.println("收到的数据:"+str);
+        System.out.println("链接数:"+channelGroup.size());
+        System.out.println("注册包数:"+channelIdMap.size());
+        super.channelRead(ctx, msg);
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        super.channelActive(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        Channel channel = ctx.channel();
+        ChannelId channelId = channel.id();
+        channelIdMap.remove(channelId.asLongText());
+        System.out.println(channelId.asLongText()+" channelInactive客户端关闭连接");
+        super.channelInactive(ctx);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        super.userEventTriggered(ctx, evt);
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        // 连接建立
+        Channel channel = ctx.channel();
+        channelGroup.add(channel);
+        super.handlerAdded(ctx);
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        Channel channel = ctx.channel();
+        ChannelId channelId = channel.id();
+        channelIdMap.remove(channelId.asLongText());
+        System.out.println(channelId.asLongText()+" ChannelHandlerContext客户端关闭连接");
+        super.handlerRemoved(ctx);
+    }
+}

+ 35 - 0
common/src/main/java/com/jpsoft/education/netty/hander/ModeBusClientHandler.java

@@ -0,0 +1,35 @@
+package com.jpsoft.education.netty.hander;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.util.Date;
+
+/**
+ * @author automannn@163.com
+ * @time 2018/11/26 18:38
+ */
+public class ModeBusClientHandler extends ChannelInboundHandlerAdapter {
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        System.out.println("------------------");
+        ByteBuf m = (ByteBuf) msg;
+
+        try {
+            System.out.println("3、输出ByteBuf当前可读字节数readableBytes:" + m.readableBytes());
+            long currentTimeMillis = (m.readUnsignedInt()-2208988800L)*1000L;
+            System.out.println(new Date(currentTimeMillis));
+            ctx.close();
+        }finally {
+            m.release();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        System.out.println("---------22222222222222222---------");
+        cause.printStackTrace();
+        ctx.close();
+    }
+}

+ 34 - 0
common/src/main/java/com/jpsoft/education/netty/hander/NettyServerChannelInitializer.java

@@ -0,0 +1,34 @@
+package com.jpsoft.education.netty.hander;
+
+
+import com.jpsoft.education.netty.util.MyDecoder;
+import com.jpsoft.education.netty.util.MyEncoder;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 功能描述: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
+ *
+ * @Author keLe
+ * @Date 2022/8/26
+ */
+public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+        ChannelPipeline pipeline = socketChannel.pipeline();
+        //接收消息格式,使用自定义解析数据格式
+        pipeline.addLast("decoder",new MyDecoder());
+        //发送消息格式,使用自定义解析数据格式
+        pipeline.addLast("encoder",new MyEncoder());
+
+        //针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开
+        //如果是读空闲或者写空闲,不处理,这里根据自己业务考虑使用
+        //pipeline.addLast(new IdleStateHandler(600,0,0, TimeUnit.SECONDS));
+        //自定义的空闲检测
+        pipeline.addLast(new NettyServerHandler());
+    }
+}

+ 158 - 0
common/src/main/java/com/jpsoft/education/netty/hander/NettyServerHandler.java

@@ -0,0 +1,158 @@
+package com.jpsoft.education.netty.hander;
+
+import com.jpsoft.education.netty.util.ChannelMap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.InetSocketAddress;
+
+/**
+ * 功能描述: netty服务端处理类
+ *
+ * @Author keLe
+ * @Date 2022/8/26
+ */
+@Slf4j
+public class NettyServerHandler extends ChannelInboundHandlerAdapter {
+
+    /**
+     * 功能描述: 有客户端连接服务器会触发此函数
+     * @Author keLe
+     * @Date 2022/8/26
+     * @param  ctx 通道
+     * @return void
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
+        String clientIp = insocket.getAddress().getHostAddress();
+        int clientPort = insocket.getPort();
+        //获取连接通道唯一标识
+        ChannelId channelId = ctx.channel().id();
+        //如果map中不包含此连接,就保存连接
+        if (ChannelMap.getChannelMap().containsKey(channelId)) {
+            log.info("客户端:{},是连接状态,连接通道数量:{} ",channelId,ChannelMap.getChannelMap().size());
+        } else {
+            //保存连接
+            ChannelMap.addChannel(channelId, ctx.channel());
+            log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]",channelId, clientIp,clientPort);
+            log.info("连接通道数量: {}",ChannelMap.getChannelMap().size());
+        }
+    }
+
+    /**
+     * 功能描述: 有客户端终止连接服务器会触发此函数
+     * @Author keLe
+     * @Date 2022/8/26
+     * @param  ctx 通道处理程序上下文
+     * @return void
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+        String clientIp = inSocket.getAddress().getHostAddress();
+        ChannelId channelId = ctx.channel().id();
+        //包含此客户端才去删除
+        if (ChannelMap.getChannelMap().containsKey(channelId)) {
+            //删除连接
+            ChannelMap.getChannelMap().remove(channelId);
+            log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]",channelId, clientIp,inSocket.getPort());
+            log.info("连接通道数量: " + ChannelMap.getChannelMap().size());
+        }
+    }
+
+    /**
+     * 功能描述: 有客户端发消息会触发此函数
+     * @Author keLe
+     * @Date 2022/8/26
+     * @param  ctx 通道处理程序上下文
+     * @param  msg 客户端发送的消息
+     * @return void
+     */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        log.info("加载客户端报文,客户端id:{},客户端消息:{}",ctx.channel().id(), msg);
+        String data = String.valueOf(msg);
+        Integer water = Integer.parseInt(data.substring(6,10),16);
+        log.info("当前水位:{}cm",water);
+        //响应客户端
+        //this.channelWrite(ctx.channel().id(), msg);
+    }
+
+   /* @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        String bytes = "01 03 00 02 00 01 25 CA";
+        ctx.writeAndFlush(bytes);
+    }*/
+
+    /**
+     * 功能描述: 服务端给客户端发送消息
+     * @Author keLe
+     * @Date 2022/8/26
+     * @param  channelId 连接通道唯一id
+     * @param  msg 需要发送的消息内容
+     * @return void
+     */
+    public void channelWrite(ChannelId channelId, Object msg) throws Exception {
+        Channel channel = ChannelMap.getChannelMap().get(channelId);
+        if (channel == null) {
+            log.info("通道:{},不存在",channelId);
+            return;
+        }
+        if (msg == null || msg == "") {
+            log.info("服务端响应空的消息");
+            return;
+        }
+        //将客户端的信息直接返回写入ctx
+        channel.write(msg);
+        //刷新缓存区
+        channel.flush();
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        String socketString = ctx.channel().remoteAddress().toString();
+        if (evt instanceof IdleStateEvent) {
+            IdleStateEvent event = (IdleStateEvent) evt;
+            if (event.state() == IdleState.READER_IDLE) {
+                log.info("Client:{},READER_IDLE 读超时",socketString);
+                ctx.disconnect();
+                Channel channel = ctx.channel();
+                ChannelId id = channel.id();
+                ChannelMap.removeChannelByName(id);
+            } else if (event.state() == IdleState.WRITER_IDLE) {
+                log.info("Client:{}, WRITER_IDLE 写超时",socketString);
+                ctx.disconnect();
+                Channel channel = ctx.channel();
+                ChannelId id = channel.id();
+                ChannelMap.removeChannelByName(id);
+            } else if (event.state() == IdleState.ALL_IDLE) {
+                log.info("Client:{},ALL_IDLE 总超时",socketString);
+                ctx.disconnect();
+                Channel channel = ctx.channel();
+                ChannelId id = channel.id();
+                ChannelMap.removeChannelByName(id);
+            }
+        }
+    }
+
+    /**
+     * 功能描述: 发生异常会触发此函数
+     * @Author keLe
+     * @Date 2022/8/26
+     * @param  ctx 通道处理程序上下文
+     * @param  cause 异常
+     * @return void
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        ctx.close();
+        log.info("{}:发生了错误,此连接被关闭。此时连通数量:{}",ctx.channel().id(),ChannelMap.getChannelMap().size());
+    }
+
+}

+ 36 - 0
common/src/main/java/com/jpsoft/education/netty/hander/ServerModeBusHandler.java

@@ -0,0 +1,36 @@
+package com.jpsoft.education.netty.hander;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelHandlerContext;
+
+public class ServerModeBusHandler extends ChannelHandlerAdapter {
+
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+        //do something msg
+        ByteBuf buf = (ByteBuf)msg;
+         buf.readByte();
+        byte[] data = new byte[buf.readableBytes()];
+        buf.readBytes(data);
+        String request = new String(data, "utf-8");
+        System.out.println("Server: " + request);
+        //写给客户端
+        String response = "我是反馈的信息";
+        ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
+        //.addListener(ChannelFutureListener.CLOSE);
+
+
+    }
+    /*@Override
+    public void handlerAdd(){}*/
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+}
+

+ 4 - 0
common/src/main/java/com/jpsoft/education/netty/hander/TimeClientHandler.java

@@ -1,6 +1,7 @@
 package com.jpsoft.education.netty.hander;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 
@@ -13,9 +14,11 @@ import java.util.Date;
 public class TimeClientHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        System.out.println("------------------");
         ByteBuf m = (ByteBuf) msg;
 
         try {
+            System.out.println("3、输出ByteBuf当前可读字节数readableBytes:" + m.readableBytes());
             long currentTimeMillis = (m.readUnsignedInt()-2208988800L)*1000L;
             System.out.println(new Date(currentTimeMillis));
             ctx.close();
@@ -26,6 +29,7 @@ public class TimeClientHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        System.out.println("---------22222222222222222---------");
         cause.printStackTrace();
         ctx.close();
     }

+ 69 - 0
common/src/main/java/com/jpsoft/education/netty/util/ChannelMap.java

@@ -0,0 +1,69 @@
+package com.jpsoft.education.netty.util;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import org.springframework.util.CollectionUtils;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 功能描述: 管理通道Map类
+ *
+ * @Author keLe
+ * @Date 2022/8/26
+ */
+public class ChannelMap {
+
+    /**
+     * 管理一个全局map,保存连接进服务端的通道数量
+     */
+    private static final ConcurrentHashMap<ChannelId, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(128);
+
+    public static ConcurrentHashMap<ChannelId, Channel> getChannelMap() {
+        return CHANNEL_MAP;
+    }
+
+    /**
+     *  获取指定name的channel
+     */
+    public static Channel getChannelByName(ChannelId channelId){
+        if(CollectionUtils.isEmpty(CHANNEL_MAP)){
+            return null;
+        }
+        return CHANNEL_MAP.get(channelId);
+    }
+
+    /**
+     *  将通道中的消息推送到每一个客户端
+     */
+    public static boolean pushNewsToAllClient(String obj){
+        if(CollectionUtils.isEmpty(CHANNEL_MAP)){
+            return false;
+        }
+        for(ChannelId channelId: CHANNEL_MAP.keySet()) {
+            Channel channel = CHANNEL_MAP.get(channelId);
+            channel.writeAndFlush(new TextWebSocketFrame(obj));
+        }
+        return true;
+    }
+
+    /**
+     *  将channel和对应的name添加到ConcurrentHashMap
+     */
+    public static void addChannel(ChannelId channelId,Channel channel){
+        CHANNEL_MAP.put(channelId,channel);
+    }
+
+    /**
+     *  移除掉name对应的channel
+     */
+    public static boolean removeChannelByName(ChannelId channelId){
+        if(CHANNEL_MAP.containsKey(channelId)){
+            CHANNEL_MAP.remove(channelId);
+            return true;
+        }
+        return false;
+    }
+
+}

+ 76 - 0
common/src/main/java/com/jpsoft/education/netty/util/DtuManage.java

@@ -0,0 +1,76 @@
+package com.jpsoft.education.netty.util;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelId;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.Resource;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 功能描述: 定时发送Dtu报文
+ *
+ * @Author keLe
+ * @Date 2022/8/29
+ */
+@Slf4j
+@Component
+public class DtuManage {
+
+
+    public void sendMsg(){
+        ConcurrentHashMap<ChannelId, Channel> channelMap = ChannelMap.getChannelMap();
+        if(CollectionUtils.isEmpty(channelMap)){
+            return;
+        }
+        ConcurrentHashMap.KeySetView<ChannelId, Channel> channelIds = channelMap.keySet();
+        byte[] msgBytes = {0x01, 0x03, 0x00, 0x02, 0x00, 0x01, 0x25, (byte) 0xCA};
+        for(ChannelId channelId : channelIds){
+            Channel channel = ChannelMap.getChannelByName(channelId);
+            // 判断是否活跃
+            if(channel==null || !channel.isActive()){
+                ChannelMap.getChannelMap().remove(channelId);
+                log.info("客户端:{},连接已经中断",channelId);
+                return ;
+            }
+            // 指令发送
+            ByteBuf buffer = Unpooled.buffer();
+            log.info("开始发送报文:{}",Arrays.toString(msgBytes));
+            buffer.writeBytes(msgBytes);
+            channel.writeAndFlush(buffer).addListener((ChannelFutureListener) future -> {
+                if (future.isSuccess()) {
+                    log.info("客户端:{},回写成功:{}",channelId,Arrays.toString(msgBytes));
+                } else {
+                    log.info("客户端:{},回写失败:{}",channelId,Arrays.toString(msgBytes));
+                }
+            });
+        }
+    }
+
+    /**
+     * 功能描述: 定时删除不活跃的连接
+     * @Author keLe
+     * @Date 2022/8/26
+     * @return void
+     */
+    public void deleteInactiveConnections(){
+        ConcurrentHashMap<ChannelId, Channel> channelMap = ChannelMap.getChannelMap();
+        if(!CollectionUtils.isEmpty(channelMap)){
+            for (Map.Entry<ChannelId, Channel> next : channelMap.entrySet()) {
+                ChannelId channelId = next.getKey();
+                Channel channel = next.getValue();
+                if (!channel.isActive()) {
+                    channelMap.remove(channelId);
+                    log.info("客户端:{},连接已经中断",channelId);
+                }
+            }
+        }
+    }
+}

+ 34 - 0
common/src/main/java/com/jpsoft/education/netty/util/ModBusUtils.java

@@ -0,0 +1,34 @@
+package com.jpsoft.education.netty.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * TODO
+ *
+ * @author linfeng
+ * @date 2022/12/8 16:04
+ */
+public class ModBusUtils {
+
+    public static char byteToASCLL(byte b) {
+        return (char) b;
+    }
+
+
+    /*
+     * 字节数组转16进制字符串
+     */
+    public static String bytes2HexString(byte[] b) {
+        String r = "";
+        for (int i = 0; i < b.length; i++) {
+            String hex = Integer.toHexString(b[i] & 0xFF);
+            if (hex.length() == 1) {
+                hex = '0' + hex;
+            }
+            r += hex.toUpperCase() + " ";
+        }
+        return r;
+    }
+}

+ 59 - 0
common/src/main/java/com/jpsoft/education/netty/util/MyDecoder.java

@@ -0,0 +1,59 @@
+package com.jpsoft.education.netty.util;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+/**
+ * 功能描述: 自定义接收消息格式
+ *
+ * @Author keLe
+ * @Date 2022/8/26
+ */
+public class MyDecoder extends ByteToMessageDecoder {
+    @Override
+    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
+        //创建字节数组,buffer.readableBytes可读字节长度
+        System.out.println("1111111111111111");
+        byte[] b = new byte[byteBuf.readableBytes()];
+        //复制内容到字节数组b
+        byteBuf.readBytes(b);
+        //字节数组转字符串
+        String str = new String(b);
+
+        list.add(bytesToHexString(b));
+    }
+
+    public String bytesToHexString(byte[] bArray) {
+        StringBuffer sb = new StringBuffer(bArray.length);
+        String sTemp;
+        for (int i = 0; i < bArray.length; i++) {
+            sTemp = Integer.toHexString(0xFF & bArray[i]);
+            if (sTemp.length() < 2) {
+                sb.append(0);
+            }
+            sb.append(sTemp.toUpperCase());
+        }
+        return sb.toString();
+    }
+
+    public static String toHexString1(byte[] b) {
+        StringBuffer buffer = new StringBuffer();
+        for (int i = 0; i < b.length; ++i) {
+            buffer.append(toHexString1(b[i]));
+        }
+        return buffer.toString();
+    }
+
+    public static String toHexString1(byte b) {
+        String s = Integer.toHexString(b & 0xFF);
+        if (s.length() == 1) {
+            return "0" + s;
+        } else {
+            return s;
+        }
+    }
+
+}

+ 38 - 0
common/src/main/java/com/jpsoft/education/netty/util/MyEncoder.java

@@ -0,0 +1,38 @@
+package com.jpsoft.education.netty.util;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import java.util.List;
+/**
+ * 功能描述: 自定义发送消息格式
+ *
+ * @Author keLe
+ * @Date 2022/8/26
+ */
+public class MyEncoder extends MessageToByteEncoder<String> {
+
+    @Override
+    protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
+        //将16进制字符串转为数组
+        System.out.println("222222222222");
+        byteBuf.writeBytes(hexString2Bytes(s));
+    }
+
+    /**
+     * 功能描述: 16进制字符串转字节数组
+     * @Author keLe
+     * @Date 2022/8/26
+     * @param src 16进制字符串
+     * @return byte[]
+     */
+    public static byte[] hexString2Bytes(String src) {
+        int l = src.length() / 2;
+        byte[] ret = new byte[l];
+        for (int i = 0; i < l; i++) {
+            ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
+        }
+        return ret;
+    }
+}