Quellcode durchsuchen

代码的提交

hbjzws vor 2 Jahren
Ursprung
Commit
4e7bed1b8c

+ 90 - 0
common/src/main/java/com/jpsoft/education/netty/MyClientNetty.java

@@ -0,0 +1,90 @@
+package com.jpsoft.education.netty;
+
+import com.jpsoft.education.netty.hander.MyClientHandler;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.io.UnsupportedEncodingException;
+import java.util.concurrent.CountDownLatch;
+
+public class MyClientNetty {
+    public static CountDownLatch countDownLatch = new CountDownLatch(1);
+    public static CountDownLatch countDownLatch2 = new CountDownLatch(1);
+    private String ip;
+    private int port;
+    private static ChannelFuture cf;
+    private static EventLoopGroup bossGroup;
+
+    public MyClientNetty(String ip, int port) {
+        this.ip = ip;
+        this.port = port;
+    }
+
+    public String sendRecv(String msg){
+        try {
+            cf.channel().writeAndFlush(Unpooled.copiedBuffer(msg.getBytes()));
+            MyClientNetty.countDownLatch.await();
+            return MyClientHandler.message;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+    public void connect() throws UnsupportedEncodingException, InterruptedException {
+        this.action();
+        countDownLatch2.await();
+    }
+
+    public void close() throws InterruptedException {
+        cf.channel().closeFuture().sync();
+        bossGroup.shutdownGracefully();
+    }
+
+    public void action() throws InterruptedException, UnsupportedEncodingException {
+        bossGroup = new NioEventLoopGroup();
+        final Bootstrap bs = new Bootstrap();
+
+        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)bs.group(bossGroup)).channel(NioSocketChannel.class)).option(ChannelOption.SO_KEEPALIVE, true)).option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))).handler(new ChannelInitializer<SocketChannel>() {
+            protected void initChannel(SocketChannel socketChannel) throws Exception {
+                socketChannel.pipeline().addLast(new ChannelHandler[]{new MyClientHandler()});
+            }
+        });
+
+        (new Thread(new Runnable() {
+            public void run() {
+                try {
+                    MyClientNetty.cf = bs.connect(MyClientNetty.this.ip, MyClientNetty.this.port).sync();
+                    MyClientNetty.countDownLatch2.countDown();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+            }
+        })).start();
+    }
+    public static void main(String[] args) {
+        try {
+            MyClientNetty myClientNetty = new MyClientNetty("127.0.0.1",9999);
+            myClientNetty.connect();
+            String sendStr ="01 03 01 2c 00 0a";
+            System.out.println("客户端发送数据:"+sendStr);
+            String result = myClientNetty.sendRecv(sendStr);
+            System.out.println("客户端获取返回数据:"+result);
+            if(result.contains(" ")){
+                result=result.replace(" ","");
+                System.out.println("客户端获取返回处理:"+result);
+            }
+            myClientNetty.close();
+        } catch (UnsupportedEncodingException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+    }
+}

+ 42 - 0
common/src/main/java/com/jpsoft/education/netty/MyServerNetty.java

@@ -0,0 +1,42 @@
+package com.jpsoft.education.netty;
+
+import com.jpsoft.education.netty.hander.MyServerHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+public class MyServerNetty {
+    private int port;
+
+    public MyServerNetty(int port) {
+        this.port = port;
+    }
+
+    public void action(){
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+        try{
+            ServerBootstrap serverBootstrap = new ServerBootstrap();
+            ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.SO_BACKLOG, 128)).childOption(ChannelOption.SO_KEEPALIVE, true).handler(new LoggingHandler(LogLevel.INFO))).childHandler(new ChannelInitializer<SocketChannel>() {
+                protected void initChannel(SocketChannel socketChannel) throws Exception {
+                    socketChannel.pipeline().addLast(new ChannelHandler[]{new MyServerHandler()});
+                }
+            });
+            System.err.println("server 开启--------------");
+            ChannelFuture cf = serverBootstrap.bind(this.port).sync();
+            cf.channel().closeFuture().sync();
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+    }
+
+    public static void main(String[] args) throws InterruptedException {
+        (new MyServerNetty(9999)).action();
+        System.out.println("main==============>启动服务端");
+    }
+}

+ 40 - 0
common/src/main/java/com/jpsoft/education/netty/hander/MyClientHandler.java

@@ -0,0 +1,40 @@
+package com.jpsoft.education.netty.hander;
+
+import com.jpsoft.education.netty.MyClientNetty;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class MyClientHandler extends ChannelInboundHandlerAdapter {
+    public static String message;
+
+    public MyClientHandler() {
+    }
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        try {
+            ByteBuf byteBuf = (ByteBuf)msg;
+            byte[] bytes = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(bytes);
+            message = new String(bytes);
+            MyClientNetty.countDownLatch.countDown();
+        } finally {
+            ReferenceCountUtil.release(msg);
+        }
+
+    }
+
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        System.err.println("客户端读取数据完毕");
+        ctx.close();
+    }
+
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        System.err.println("client 读取数据出现异常");
+        ctx.close();
+    }
+}

+ 35 - 0
common/src/main/java/com/jpsoft/education/netty/hander/MyServerHandler.java

@@ -0,0 +1,35 @@
+package com.jpsoft.education.netty.hander;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class MyServerHandler extends ChannelInboundHandlerAdapter {
+
+    public MyServerHandler() {
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        ByteBuf byteBuf = (ByteBuf) msg;
+        byte[] bytes = new byte[byteBuf.readableBytes()];
+        byteBuf.readBytes(bytes);
+        String msg_str = new String(bytes,"UTF-8");
+        String retStr ="";
+        if("hello world".equals(msg_str)){
+            retStr ="01 03 14 3F 3C 26 E9 3F C1 00 00 00 00 1E B8 41 D7 CC CD 42 0B 00 00 C3 5F";
+        }else{
+            retStr ="01 03 FF FF FF FF FF";
+        }
+        System.out.println("===========>接收客户端消息:"+msg_str);
+        System.out.println("===========>向客户端发送消息:"+msg_str);
+        ctx.writeAndFlush(Unpooled.copiedBuffer(retStr.getBytes()));
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        System.err.println("服务端读取数据完毕");
+    }
+
+}