浏览代码

代码的提交

hbjzws 2 年之前
父节点
当前提交
729ad91c83

+ 65 - 0
common/src/main/java/com/jpsoft/education/netty/DiscardServer.java

@@ -0,0 +1,65 @@
+package com.jpsoft.education.netty;
+import com.jpsoft.education.netty.hander.TimeServerHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+/**
+ * @author automannn@163.com
+ * @time 2018/11/26 14:50
+ */
+public class DiscardServer {
+    private int port;
+
+    public DiscardServer(int port){
+        this.port = port;
+    }
+
+    public void run() throws Exception{
+        //Nio事件循环组 是一个 能够处理I/O操作的多线程的事件循环
+        //netty提供了不同的事件循环组,是西安了不同种类的传输,我们正在实现的是服务端应用,因此有两个循环组
+        //第一个叫做boss的组,用于接收进入的连接。
+        //第二个通常叫做worker,处理接受的连接的交通情况,一旦boss接受了连接并且注册这个连接给worker后。
+        //有多少个线程被使用,以及他们如何被映射到创建的信道,取决于事件循环组的实现和构造器配置
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+        try {
+            //serverBootstrap是一个用于设置服务器的辅助类。  你可以直接设置信道。
+            //但是请注意,这是一个枯燥的过程,我们通常不必这样做
+            //这里,我们指定了用NioServerSocketChannel这个类,他可以用于实例新的信道去接受进入的请求
+            //ChannelInitializer是一个特殊的处理器,它的目的是帮助用户去配置一个新的信道。 它更像你想要去配置新信道的
+            //  的管道,通过加入一些处理器是实现你的联网应用。
+            //你也可以指定参数去定制这个信道实现。 我们正在写一个TCP/IP服务器,所以我们可以设置sokcet选项。
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup,workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        protected void initChannel(SocketChannel ch) throws Exception {
+                            ch.pipeline().addLast(new TimeServerHandler());
+                        }
+                    }).option(ChannelOption.SO_BACKLOG,128)
+                    .childOption(ChannelOption.SO_KEEPALIVE,true);
+
+            ChannelFuture f = b.bind(port).sync();
+
+            f.channel().closeFuture().sync();
+        }finally {
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+
+    public static void main(String[] args) {
+        try {
+            new DiscardServer(8379).run();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 13 - 0
common/src/main/java/com/jpsoft/education/netty/Main.java

@@ -0,0 +1,13 @@
+package com.jpsoft.education.netty;
+
+public class Main {
+    public static void main(String[] args) throws Exception {
+        int port = 8379;
+        if (args.length>0){
+            port = Integer.parseInt(args[0]);
+        }
+
+        new DiscardServer(port).run();
+    }
+
+}

+ 78 - 0
common/src/main/java/com/jpsoft/education/netty/Server.java

@@ -0,0 +1,78 @@
+package com.jpsoft.education.netty;
+
+import com.jpsoft.education.netty.hander.ServerHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class Server {
+
+    private int port;
+
+    public Server(int port) {
+        this.port = port;
+    }
+
+    public void run() {
+        EventLoopGroup bossGroup = new NioEventLoopGroup(); //用于处理服务器端接收客户端连接
+        EventLoopGroup workerGroup = new NioEventLoopGroup(); //进行网络通信(读写)
+        try {
+            ServerBootstrap bootstrap = new ServerBootstrap(); //辅助工具类,用于服务器通道的一系列配置
+            bootstrap.group(bossGroup, workerGroup) //绑定两个线程组
+                    .channel(NioServerSocketChannel.class) //指定NIO的模式
+                    .childHandler(new ChannelInitializer<SocketChannel>() { //配置具体的数据处理方式
+                        @Override
+                        protected void initChannel(SocketChannel socketChannel) throws Exception {
+                            socketChannel.pipeline().addLast(new ServerHandler());
+                        }
+                    })
+                    /**
+                     * 对于ChannelOption.SO_BACKLOG的解释:
+                     * 服务器端TCP内核维护有两个队列,我们称之为A、B队列。客户端向服务器端connect时,会发送带有SYN标志的包(第一次握手),服务器端
+                     * 接收到客户端发送的SYN时,向客户端发送SYN ACK确认(第二次握手),此时TCP内核模块把客户端连接加入到A队列中,然后服务器接收到
+                     * 客户端发送的ACK时(第三次握手),TCP内核模块把客户端连接从A队列移动到B队列,连接完成,应用程序的accept会返回。也就是说accept
+                     * 从B队列中取出完成了三次握手的连接。
+                     * A队列和B队列的长度之和就是backlog。当A、B队列的长度之和大于ChannelOption.SO_BACKLOG时,新的连接将会被TCP内核拒绝。
+                     * 所以,如果backlog过小,可能会出现accept速度跟不上,A、B队列满了,导致新的客户端无法连接。要注意的是,backlog对程序支持的
+                     * 连接数并无影响,backlog影响的只是还没有被accept取出的连接
+                     */
+                    .option(ChannelOption.SO_BACKLOG, 128) //设置TCP缓冲区
+                    .option(ChannelOption.SO_SNDBUF, 32 * 1024) //设置发送数据缓冲大小
+                    .option(ChannelOption.SO_RCVBUF, 32 * 1024) //设置接受数据缓冲大小
+                    .childOption(ChannelOption.SO_KEEPALIVE, true); //保持连接
+            ChannelFuture future = bootstrap.bind(port).sync();
+            future.channel().closeFuture().sync();
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+  /*  private void start() {
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(workerGroup).channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true)
+    // 给父Channel配置参数
+        // .handler(new ChannelInitializer<SocketChannel>() {
+        // 配置父Channel与Handler之间的关系
+        // @Override
+        // protected void initChannel(SocketChannel socketChannel) throws Exception {
+        // socketChannel.pipeline().addLast(new TimeClientHandler());
+        // }                });
+        // try {
+        // bootstrap.connect(new InetSocketAddress(8888)).sync(); // 连接服务器
+        // } catch (InterruptedException e) {
+        // workerGroup.shutdownGracefully();
+        // }    }*/
+
+    public static void main(String[] args) {
+        new Server(8379).run();
+    }
+}

+ 51 - 0
common/src/main/java/com/jpsoft/education/netty/TimeClient.java

@@ -0,0 +1,51 @@
+package com.jpsoft.education.netty;
+
+import com.jpsoft.education.netty.hander.TimeClientHandler;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * @author automannn@163.com
+ * @time 2018/11/26 18:38
+ */
+public class TimeClient {
+    private int port;
+    private String host;
+    public TimeClient(int port,String host){
+        this.port = port;
+        this.host = host;
+    }
+
+    public void execute() throws InterruptedException {
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+        try {
+            Bootstrap b = new Bootstrap();
+            b.group(workerGroup);
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.SO_KEEPALIVE,true);
+
+            b.handler(new ChannelInitializer<SocketChannel>() {
+                protected void initChannel(SocketChannel ch) throws Exception {
+                    ch.pipeline().addLast(new TimeClientHandler());
+                }
+            });
+
+            ChannelFuture f = b.connect(host,port).sync();
+
+            f.channel().closeFuture().sync();
+        }finally {
+            workerGroup.shutdownGracefully();
+        }
+    }
+
+    public static void main(String[] args) throws InterruptedException {
+        new TimeClient(8379,"127.0.0.1").execute();
+    }
+}

+ 52 - 0
common/src/main/java/com/jpsoft/education/netty/hander/DiscardServerHandler.java

@@ -0,0 +1,52 @@
+package com.jpsoft.education.netty.hander;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+
+/**
+ * @author automannn@163.com
+ * @time 2018/11/26 14:01
+ */
+
+/*
+ *  该类继承自 信道绑定处理适配器,它是信道保定处理器的一个实现类。
+ *  信道处理器提供了多种我们能够覆盖的事件处理方法
+ *  这里只继承信道绑定处理适配器而不是实现处理器就足够了
+ * */
+public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
+
+    /*
+     *  我们覆盖了这个 channelRead()方法。
+     *  这个方法在接收到消息时调用,无论合适从客户端接收到消息。
+     *  在这个例子中,接收到的消息是 字节缓冲(字节数组)
+     *  为了实现discard协议,处理器必须忽略接收到的消息。
+     *  bytebuf是一个引用计数对象,它必须明确地被释放,通过release()方法。
+     *  请记住 释放任何经由这个处理器的引用计数对象并不是处理器的责任。通常通过 ReferenceCountUtil.release()完成
+     * */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        ByteBuf in = (ByteBuf) msg;
+
+        try{
+            while (in.isReadable()){
+                System.out.print((char) in.readByte());
+                System.out.println("我是反馈的信息!");
+                System.out.flush();
+            }
+        }finally {
+            ReferenceCountUtil.release(msg);
+        }
+    }
+
+    /*
+     *  当一个异常被netty产生便会调用以下方法。 通常可用于响应消息。
+     * */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}
+

+ 23 - 0
common/src/main/java/com/jpsoft/education/netty/hander/EchoServerHandler.java

@@ -0,0 +1,23 @@
+package com.jpsoft.education.netty.hander;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+/**
+ * @author automannn@163.com
+ * @time 2018/11/26 16:20
+ */
+public class EchoServerHandler extends ChannelInboundHandlerAdapter {
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        ctx.write("->");
+        ctx.write(msg);
+        ctx.flush();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}

+ 35 - 0
common/src/main/java/com/jpsoft/education/netty/hander/ServerHandler.java

@@ -0,0 +1,35 @@
+package com.jpsoft.education.netty.hander;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelHandlerContext;
+
+public class ServerHandler  extends ChannelHandlerAdapter {
+
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+        //do something msg
+        ByteBuf buf = (ByteBuf)msg;
+        byte[] data = new byte[buf.readableBytes()];
+        buf.readBytes(data);
+        String request = new String(data, "utf-8");
+        System.out.println("Server: " + request);
+        //写给客户端
+        String response = "我是反馈的信息";
+        ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
+        //.addListener(ChannelFutureListener.CLOSE);
+
+
+    }
+    /*@Override
+    public void handlerAdd(){}*/
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+}
+

+ 32 - 0
common/src/main/java/com/jpsoft/education/netty/hander/TimeClientHandler.java

@@ -0,0 +1,32 @@
+package com.jpsoft.education.netty.hander;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.util.Date;
+
+/**
+ * @author automannn@163.com
+ * @time 2018/11/26 18:38
+ */
+public class TimeClientHandler extends ChannelInboundHandlerAdapter {
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        ByteBuf m = (ByteBuf) msg;
+
+        try {
+            long currentTimeMillis = (m.readUnsignedInt()-2208988800L)*1000L;
+            System.out.println(new Date(currentTimeMillis));
+            ctx.close();
+        }finally {
+            m.release();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}

+ 46 - 0
common/src/main/java/com/jpsoft/education/netty/hander/TimeServerHandler.java

@@ -0,0 +1,46 @@
+package com.jpsoft.education.netty.hander;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+/**
+ * @author automannn@163.com
+ * @time 2018/11/26 16:51
+ */
+public class TimeServerHandler extends ChannelInboundHandlerAdapter {
+
+    /*
+     *  channelActive()方法将一个链接建立并且生成通道的时候被执行。 在当前方法中我们写入了一个32位整型数据代表当前时间
+     *
+     * */
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+        //为了发送消息,我们需要分配一个新的缓冲,它将装载这个信息。  我们将要发送写入一个32位的整型数据,因此,我们需要
+        //  至少4字节容量的ByteBuf。  获得当前的ByteBuf分配器通过alloc()方法,然偶分配
+        final ByteBuf time = ctx.alloc().buffer(4);
+
+        //ByteBuf 具有两个指针,一个用于读,一个用于写。 这样对我们来说就方便了很多。因为没有了flip操作
+        time.writeInt((int)(System.currentTimeMillis()/1000L + 2208988800L));
+
+        //一个ChannelFuture 代表了一个还没有发发生的I/O操作。 这意味着,任何请求操作都可能还没有形成。因为在netty
+        //   中,所有的操作都是异步的。
+        final ChannelFuture f = ctx.writeAndFlush(time);
+
+        f.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture future) throws Exception {
+                assert f == future;
+                ctx.close();
+            }
+        });
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}
+