Kaynağa Gözat

1)优化实时上报数据格式
2)修改接收设备读数据数据存储部分(由mysql直接存储修改为redis缓存再存储到mysql)

hbjzws 2 yıl önce
ebeveyn
işleme
960f5292ec

+ 13 - 1
pom.xml

@@ -48,7 +48,19 @@
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-data-redis</artifactId>
+           <!-- <exclusions>
+                &lt;!&ndash; 过滤lettuce,使用jedis作为redis客户端 &ndash;&gt;
+                <exclusion>
+                    <groupId>io.lettuce</groupId>
+                    <artifactId>lettuce-core</artifactId>
+                </exclusion>
+            </exclusions>-->
         </dependency>
+        <!-- jedis-->
+       <!-- <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+        </dependency>-->
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-pool2</artifactId>
@@ -162,7 +174,7 @@
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>druid-spring-boot-starter</artifactId>
-            <version>1.1.10</version>
+            <version>1.2.8</version>
         </dependency>
         <dependency>
             <groupId>net.sf.json-lib</groupId>

+ 155 - 0
src/main/java/com/jpsoft/zlopd/netty/comm/MultiFutureThread.java

@@ -0,0 +1,155 @@
+package com.jpsoft.zlopd.netty.comm;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.compress.utils.Lists;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.*;
+
+/**
+ * 多线程执行结果汇总工具
+ *
+ * @author 张帅
+ * @date 2021年05月20日 11时28分02秒
+ * @version V1.0
+ *
+ * @param <T>
+ */
+@Slf4j
+public class MultiFutureThread<T> {
+
+    // 总线程数量
+    private int threadSize;
+    // 单次执行多少次线程
+    private int singleSize;
+
+    private List<Callable<T>> callableList = Lists.newArrayList();
+
+    /**
+     * 多线程执行结果汇总工具 构造方法
+     *
+     * @param threadSize 总线程数量
+     * @param singleSize 单次执行多少次线程
+     */
+    public MultiFutureThread(int threadSize, int singleSize) {
+        super();
+        this.threadSize = threadSize;
+        this.singleSize = singleSize < 1 ? threadSize : singleSize;
+    }
+
+    /**
+     * 设计要执行的程序段
+     *
+     * @param @param  callable
+     * @param @return 参数
+     * @return MultiFutureThread<T> 返回类型
+     * @throws
+     * @Title: setCallable
+     */
+    public MultiFutureThread<T> setCallable(Callable<T> callable) {
+        if (callable != null) {
+            callableList.add(callable);
+        }
+        return this;
+    }
+
+    /**
+     * 运行线程
+     *
+     * @param @return 参数
+     * @return List<T> 返回类型
+     * @throws
+     * @Title: exec
+     */
+    public List<T> exec() {
+
+        // 如果开启的线程数量为1时,刚不开启线程
+        List<T> list = Lists.newArrayList();
+        if (singleSize <= 1) {
+            callableList.forEach(e -> {
+                try {
+                    T dataList = e.call();
+                    list.add(dataList);
+                } catch (Exception e1) {
+                }
+            });
+            return list;
+        }
+
+//        ExecutorService executor = Executors.newFixedThreadPool(singleSize);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        List<Future<T>> pointTaskFutureList = new ArrayList<>(singleSize);
+        int total = threadSize; // 总计算结果
+        int done = 0; //完成任务的数量
+        try {
+            int count = (total / singleSize) + 1;
+            for (int j = 0; j < count; j++) {
+                int index = j * singleSize;
+                int endIndex = index + singleSize;
+                int runSize = callableList.size() > endIndex ? endIndex : callableList.size();
+                for (int i = index; i < runSize; i++) {
+                    // 提交任务,任务的执行由线程池去调用执行并管理。
+                    // 这里获取结果任务的Future,并放到list中,供所有任务提交完后,通过每个任务的Future判断执行状态和结果。
+                    Future<T> future = executor.submit(callableList.get(i));
+                    pointTaskFutureList.add(future);
+                }
+
+                while (!pointTaskFutureList.isEmpty()) {
+                    Iterator<Future<T>> iter = pointTaskFutureList.iterator();
+                    while (iter.hasNext()) {
+                        Future<T> next = iter.next();
+                        if (next.isDone()) {
+                            done++;
+                            T dataList = next.get();
+                            list.add(dataList);
+                            iter.remove();
+                        }
+                    }
+                    log.info("总任务量:{},已完成任务量:{}", total, done);
+                    // 停留一会,避免一直循环。
+                    Thread.sleep(10);
+                }
+            }
+            log.info("总任务量:{},完成任务量:{}", total, done);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        } finally {
+            executor.shutdown();
+            try {
+                executor.awaitTermination(1, TimeUnit.MINUTES);
+            } catch (InterruptedException e) {
+                log.error("线程超时,中断异常{}", e);
+            }
+        }
+
+        return list;
+    }
+
+    /**
+     * 测试
+     *
+     * @param @param  args
+     * @param @throws Exception 参数
+     * @return void 返回类型
+     * @throws
+     * @Title: main
+     */
+    public static void main(String[] args) throws Exception {
+        MultiFutureThread<Integer> thread = new MultiFutureThread<Integer>(10, 10);
+        for (int i = 0; i < 10; i++) {
+            thread.setCallable(() -> {
+                for (int j = 0; j < 10000; j++) {
+                    System.out.println(new Random().nextInt() + "----------");
+                }
+                return new Random().nextInt();
+            });
+        }
+        List<Integer> list = thread.exec();
+        list.forEach(System.out::println);
+    }
+
+}

+ 61 - 0
src/main/java/com/jpsoft/zlopd/netty/comm/TaskCallable.java

@@ -0,0 +1,61 @@
+package com.jpsoft.zlopd.netty.comm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+
+public class TaskCallable implements Callable<String> {
+
+    private static Logger logger = LoggerFactory.getLogger(TaskCallable.class);
+    String name;
+    int sleep;
+    public TaskCallable(String name, int sleep) {
+        this.name = name;
+        this.sleep = sleep;
+    }
+
+    @Override
+    public String call() throws Exception {
+        logger.info("[{}]任务开始执行",name);
+        int random = new Random().nextInt(100);
+        random = random + 1;
+        logger.info("[{}]任务开始睡眠[{}]秒",name,random);
+        Thread.sleep(random*1000);
+        logger.info("[{}]任务执行结束",name);
+        return name;
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        List<Future<String>> results = new ArrayList<>();
+        for (int i = 1; i < 3; i++) {
+            results.add(pool.submit(new TaskCallable("the "+i,i)));
+        }
+//        pool.shutdown();
+        Future<String> done = null;
+        int i = 0;
+        while (null==done&&i<100) {
+            Thread.sleep(1000);
+            logger.info("开始等待处理结果");
+            for (Future f: results) {
+                if (f.isDone()) {
+                    done=f;
+                    logger.info("处理已经有了结果");
+                    break;
+                }
+            }
+            i++;
+        }
+        pool.shutdownNow();
+        System.out.println(done.get());
+
+    }
+}

+ 0 - 3
src/main/java/com/jpsoft/zlopd/netty/hander/OileValDataAgainHandler.java

@@ -49,8 +49,5 @@ public class OileValDataAgainHandler extends SimpleChannelInboundHandler<OileVal
         String ooSerNoKeyStr ="";
         if(oSerNoKey!=null){ooSerNoKeyStr = (String)oSerNoKey;}
         oileManager.send(ooSerNoKeyStr,ModBusUtils.hexStringToBytes(oileValDataAgainVo.getData()),"",3);
-
-
-
     }
 }

+ 137 - 147
src/main/java/com/jpsoft/zlopd/netty/hander/OileValDataHandler.java

@@ -1,6 +1,7 @@
 package com.jpsoft.zlopd.netty.hander;
 
 import cn.hutool.core.date.DateUtil;
+import com.alibaba.fastjson.JSONObject;
 import com.jpsoft.zlopd.modules.base.service.GatherDataMultiService;
 import com.jpsoft.zlopd.modules.base.service.GatherDataService;
 import com.jpsoft.zlopd.modules.base.service.TsysUserService;
@@ -18,9 +19,9 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
 import org.springframework.stereotype.Component;
 import java.util.Set;
 
@@ -33,20 +34,9 @@ import java.util.Set;
 @ChannelHandler.Sharable
 public class OileValDataHandler extends SimpleChannelInboundHandler<OileValDataVo> {
 
-    @Autowired
-    private OileManager oileManager;
-    @Autowired
-    private DtuManage dtuManage;
-    @Resource
-    private SocketProperties socketProperties;
-    @Autowired
-    private TsysUserService sysUserService;
     @Autowired
     private RedisTemplate<String, Object> redisTemplate;
-    @Autowired
-    private GatherDataMultiService gatherDataMultiService;
-    @Autowired
-    private GatherDataService gatherDataService;
+
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, OileValDataVo oileValDataVo) throws Exception {
         log.info(ctx.channel().remoteAddress().toString() + "单数据采集获取有效数据为:" + oileValDataVo.getData());
@@ -71,149 +61,149 @@ public class OileValDataHandler extends SimpleChannelInboundHandler<OileValDataV
         String owellParamStr ="";
         if(owellParam!=null){owellParamStr = (String)owellParam;}
         log.info("----handler-----获取通道属性--1.数据传输状态{}---2.指令原文{}-----3.设备号{}--4.数据类型{}----5.数据重发标志{}-- 6.井参数ID{}---",oDataStatusStr,oDataOrgStr,ooSerNoKeyStr,oDataTypeStr,oDataAgainStr,owellParamStr);
-
-        String data = String.valueOf(oileValDataVo.getData());
-        if(data.contains(" ")){data =data.replace(" ","");}
-        log.info("---寄存器返回报文数据原文--"+data);
-        String data1=  ModBusUtils.effectiveData(data);
-        log.info("---寄存器返回报文数据有效数据--"+data1);
-        if(data1.startsWith("14")){
-            //01 03 14 3F 3C 26 E9 3F C1 00 00 00 00 1E B8 41 D7 CC CD 42 0B 00 00 C3 5F
-            data1 = data1.substring(2,data1.length());
-            String str1= data1.substring(0, 8);
-            String str2= data1.substring(8, 16);
-            String str3= data1.substring(16, 24);
-            String str4= data1.substring(24, 32);
-            String str5= data1.substring(32, 40);
-            log.info("---解析真实数据------油压:{},套压:{},回压:{},井口温度:{},载荷:{}",ModBusUtils.tranTsOilParam(str1),
-                    ModBusUtils.tranTsOilParam(str2),ModBusUtils.tranTsOilParam(str3)
-                    ,ModBusUtils.tranTsOilParam(str4),ModBusUtils.tranTsOilParam(str5));
-            StringBuffer strOrg = new StringBuffer();
-            strOrg.append(ModBusUtils.tranTsOilParam(str1)).append(","+ModBusUtils.tranTsOilParam(str2)).append(","+ModBusUtils.tranTsOilParam(str3))
-                    .append(","+ModBusUtils.tranTsOilParam(str4)).append(","+ModBusUtils.tranTsOilParam(str5));
-            String keyCurr ="pak_message_single_oile"+ooSerNoKeyStr;
-            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
-            Set<String> keysOile = redisTemplate.keys("gather:"+ooSerNoKeyStr+":oile*time_*");
-            boolean sendOileBl =false;
-            for (String key : keysOile) {
-                key = key.substring(key.indexOf("time_")+5,key.length());
-                key =key.substring(0,key.length()-2);
-                String currentTimeMin =DateTimeUtil.getCurrentTimeMin();
-                if(currentTimeMin.equals(key)){
-                    sendOileBl = true;
+        if(StringUtils.isNotEmpty(ooSerNoKeyStr)){
+            String data = String.valueOf(oileValDataVo.getData());
+            if(data.contains(" ")){data =data.replace(" ","");}
+            log.info("---寄存器返回报文数据原文--"+data);
+            String data1=  ModBusUtils.effectiveData(data);
+            log.info("---寄存器返回报文数据有效数据--"+data1);
+            if(data1.startsWith("14")){
+                //01 03 14 3F 3C 26 E9 3F C1 00 00 00 00 1E B8 41 D7 CC CD 42 0B 00 00 C3 5F
+                data1 = data1.substring(2,data1.length());
+                String str1= data1.substring(0, 8);
+                String str2= data1.substring(8, 16);
+                String str3= data1.substring(16, 24);
+                String str4= data1.substring(24, 32);
+                String str5= data1.substring(32, 40);
+                log.info("---解析真实数据------油压:{},套压:{},回压:{},井口温度:{},载荷:{}",ModBusUtils.tranTsOilParam(str1),
+                        ModBusUtils.tranTsOilParam(str2),ModBusUtils.tranTsOilParam(str3)
+                        ,ModBusUtils.tranTsOilParam(str4),ModBusUtils.tranTsOilParam(str5));
+                StringBuffer strOrg = new StringBuffer();
+                strOrg.append(ModBusUtils.tranTsOilParam(str1)).append(","+ModBusUtils.tranTsOilParam(str2)).append(","+ModBusUtils.tranTsOilParam(str3))
+                        .append(","+ModBusUtils.tranTsOilParam(str4)).append(","+ModBusUtils.tranTsOilParam(str5));
+                String keyCurr ="pak_message_single_oile"+ooSerNoKeyStr;
+                redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+                Set<String> keysOile = redisTemplate.keys("gather:"+ooSerNoKeyStr+":oile*time_*");
+                boolean sendOileBl =false;
+                for (String key : keysOile) {
+                    key = key.substring(key.indexOf("time_")+5,key.length());
+                    key =key.substring(0,key.length()-2);
+                    String currentTimeMin =DateTimeUtil.getCurrentTimeMin();
+                    if(currentTimeMin.equals(key)){
+                        sendOileBl = true;
+                    }
+                }
+                if(!sendOileBl){
+                    GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                    gatherSingleVo.setId(IdGenerator.getId());
+                    gatherSingleVo.setVal(strOrg.toString());
+                    gatherSingleVo.setGatherDate(DateUtil.now());
+                    gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                    gatherSingleVo.setType("oile");
+                    String str = JSONObject.toJSONString(gatherSingleVo);
+                    String key ="gather:"+ooSerNoKeyStr+":oile:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                    redisTemplate.opsForValue().set(key,str);
                 }
             }
-            if(!sendOileBl){
-                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
-                gatherSingleVo.setId(IdGenerator.getId());
-                gatherSingleVo.setVal(strOrg.toString());
-                gatherSingleVo.setGatherDate(DateUtil.now());
-                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
-                gatherSingleVo.setType("oile");
-                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
-                String key ="gather:"+ooSerNoKeyStr+":oile:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
-                redisTemplate.opsForValue().set(key,gatherSingleVo);
-            }
-        }
 
-
-        if(data1.startsWith("30")){
-            //01 03 30 41 E9 C0 00 41 E7 00 00 3D 00 28 00 43 65 08 00 43 66 70 00 43 65 35 88 48 35 8F 85 48 EE A8 88 40 83 CD F5 41 45 00 00 00 00 85 37 3E 99 00 00 C6 3A
-            data1 = data1.substring(2,data1.length());
-            String str1= data1.substring(0, 8);
-            String str2= data1.substring(8, 16);
-            String str3= data1.substring(16, 24);
-            String str4= data1.substring(24, 32);
-            String str5= data1.substring(32, 40);
-            String str6= data1.substring(40, 48);
-            String str7= data1.substring(48, 56);
-            String str8= data1.substring(56, 64);
-            String str9= data1.substring(64, 72);
-            String str10= data1.substring(72, 80);
-            String str11= data1.substring(80, 88);
-            String str12= data1.substring(88, 96);
-            log.info("----解析真实数据-----电机工作电流A相:{},电机工作电流B相:{},电机工作电流C相:{},电机工作电压A相:{},电机工作电压B相:{},电机工作电压C相:{}",ModBusUtils.tranTsOilParam(str1),
-                    ModBusUtils.tranTsOilParam(str2),ModBusUtils.tranTsOilParam(str3)
-                    ,ModBusUtils.tranTsOilParam(str4),ModBusUtils.tranTsOilParam(str5),ModBusUtils.tranTsOilParam(str6));
-            log.info("----解析真实数据-----电机有功功耗:{},电机无功功耗:{},电机有功功率:{},电机无功功率:{},电机反向功率:{},电机功率因数:{}",ModBusUtils.tranTsOilParam(str7),
-                    ModBusUtils.tranTsOilParam(str8),ModBusUtils.tranTsOilParam(str9)
-                    ,ModBusUtils.tranTsOilParam(str10),ModBusUtils.tranTsOilParam(str11),ModBusUtils.tranTsOilParam(str12));
-            StringBuffer strOrg = new StringBuffer();
-            strOrg.append(ModBusUtils.tranTsOilParam(str1)).append(","+ModBusUtils.tranTsOilParam(str2)).append(","+ModBusUtils.tranTsOilParam(str3))
-                    .append(","+ModBusUtils.tranTsOilParam(str4)).append(","+ModBusUtils.tranTsOilParam(str5)).append(","+ModBusUtils.tranTsOilParam(str6));
-            strOrg.append(","+ModBusUtils.tranTsOilParam(str7)).append(","+ModBusUtils.tranTsOilParam(str8)).append(","+ModBusUtils.tranTsOilParam(str9))
-                    .append(","+ModBusUtils.tranTsOilParam(str10)).append(","+ModBusUtils.tranTsOilParam(str11)).append(","+ModBusUtils.tranTsOilParam(str12));
-            String keyCurr ="pak_message_single_currentvoltage"+ooSerNoKeyStr;
-            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
-            Set<String> keysCurrentvoltage = redisTemplate.keys("gather:"+ooSerNoKeyStr+":currentvoltage*time_*");
-            boolean sendCurrentvoltageBl =false;
-            for (String key : keysCurrentvoltage) {
-                key = key.substring(key.indexOf("time_")+5,key.length());
-                key =key.substring(0,key.length()-2);
-                String currentTimeMin =DateTimeUtil.getCurrentTimeMin();
-                if(currentTimeMin.equals(key)){
-                    sendCurrentvoltageBl = true;
+            if(data1.startsWith("30")){
+                //01 03 30 41 E9 C0 00 41 E7 00 00 3D 00 28 00 43 65 08 00 43 66 70 00 43 65 35 88 48 35 8F 85 48 EE A8 88 40 83 CD F5 41 45 00 00 00 00 85 37 3E 99 00 00 C6 3A
+                data1 = data1.substring(2,data1.length());
+                String str1= data1.substring(0, 8);
+                String str2= data1.substring(8, 16);
+                String str3= data1.substring(16, 24);
+                String str4= data1.substring(24, 32);
+                String str5= data1.substring(32, 40);
+                String str6= data1.substring(40, 48);
+                String str7= data1.substring(48, 56);
+                String str8= data1.substring(56, 64);
+                String str9= data1.substring(64, 72);
+                String str10= data1.substring(72, 80);
+                String str11= data1.substring(80, 88);
+                String str12= data1.substring(88, 96);
+                log.info("----解析真实数据-----电机工作电流A相:{},电机工作电流B相:{},电机工作电流C相:{},电机工作电压A相:{},电机工作电压B相:{},电机工作电压C相:{}",ModBusUtils.tranTsOilParam(str1),
+                        ModBusUtils.tranTsOilParam(str2),ModBusUtils.tranTsOilParam(str3)
+                        ,ModBusUtils.tranTsOilParam(str4),ModBusUtils.tranTsOilParam(str5),ModBusUtils.tranTsOilParam(str6));
+                log.info("----解析真实数据-----电机有功功耗:{},电机无功功耗:{},电机有功功率:{},电机无功功率:{},电机反向功率:{},电机功率因数:{}",ModBusUtils.tranTsOilParam(str7),
+                        ModBusUtils.tranTsOilParam(str8),ModBusUtils.tranTsOilParam(str9)
+                        ,ModBusUtils.tranTsOilParam(str10),ModBusUtils.tranTsOilParam(str11),ModBusUtils.tranTsOilParam(str12));
+                StringBuffer strOrg = new StringBuffer();
+                strOrg.append(ModBusUtils.tranTsOilParam(str1)).append(","+ModBusUtils.tranTsOilParam(str2)).append(","+ModBusUtils.tranTsOilParam(str3))
+                        .append(","+ModBusUtils.tranTsOilParam(str4)).append(","+ModBusUtils.tranTsOilParam(str5)).append(","+ModBusUtils.tranTsOilParam(str6));
+                strOrg.append(","+ModBusUtils.tranTsOilParam(str7)).append(","+ModBusUtils.tranTsOilParam(str8)).append(","+ModBusUtils.tranTsOilParam(str9))
+                        .append(","+ModBusUtils.tranTsOilParam(str10)).append(","+ModBusUtils.tranTsOilParam(str11)).append(","+ModBusUtils.tranTsOilParam(str12));
+                String keyCurr ="pak_message_single_currentvoltage"+ooSerNoKeyStr;
+                redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+                Set<String> keysCurrentvoltage = redisTemplate.keys("gather:"+ooSerNoKeyStr+":currentvoltage*time_*");
+                boolean sendCurrentvoltageBl =false;
+                for (String key : keysCurrentvoltage) {
+                    key = key.substring(key.indexOf("time_")+5,key.length());
+                    key =key.substring(0,key.length()-2);
+                    String currentTimeMin =DateTimeUtil.getCurrentTimeMin();
+                    if(currentTimeMin.equals(key)){
+                        sendCurrentvoltageBl = true;
+                    }
+                }
+                if(!sendCurrentvoltageBl){
+                    GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                    gatherSingleVo.setId(IdGenerator.getId());
+                    gatherSingleVo.setVal(strOrg.toString());
+                    gatherSingleVo.setGatherDate(DateUtil.now());
+                    gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                    gatherSingleVo.setType("currentvoltage");
+                    String str = JSONObject.toJSONString(gatherSingleVo);
+                    String key ="gather:"+ooSerNoKeyStr+":currentvoltage:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                    redisTemplate.opsForValue().set(key,str);
                 }
             }
-            if(!sendCurrentvoltageBl){
-                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
-                gatherSingleVo.setId(IdGenerator.getId());
-                gatherSingleVo.setVal(strOrg.toString());
-                gatherSingleVo.setGatherDate(DateUtil.now());
-                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
-                gatherSingleVo.setType("currentvoltage");
-                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
-                String key ="gather:"+ooSerNoKeyStr+":currentvoltage:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
-                redisTemplate.opsForValue().set(key,gatherSingleVo);
-            }
-        }
 
-        if(data1.startsWith("08")){
-            //01 03 08 40 9B 85 1F 40 3B 00 00 55 74
-            data1 = data1.substring(2,data1.length());
-            String str1= data1.substring(0, 8);
-            String str2= data1.substring(8, 16);
-            log.info("-----解析真实数据----冲次:{},冲程:{}",ModBusUtils.tranTsOilParam(str1),
-                    ModBusUtils.tranTsOilParam(str2));
-            StringBuffer strOrg = new StringBuffer();
-            strOrg.append(ModBusUtils.tranTsOilParam(str1)).append(","+ModBusUtils.tranTsOilParam(str2));
-            String keyCurr ="pak_message_single_frequencytravel"+ooSerNoKeyStr;
-            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
-            Set<String> keysFrequencytravel = redisTemplate.keys("gather:"+ooSerNoKeyStr+":frequencytravel*time_*");
-            boolean sendFrequencytravelBl =false;
-            for (String key : keysFrequencytravel) {
-                key = key.substring(key.indexOf("time_")+5,key.length());
-                key =key.substring(0,key.length()-2);
-                String currentTimeMin =DateTimeUtil.getCurrentTimeMin();
-                if(currentTimeMin.equals(key)){
-                    sendFrequencytravelBl = true;
+            if(data1.startsWith("08")){
+                //01 03 08 40 9B 85 1F 40 3B 00 00 55 74
+                data1 = data1.substring(2,data1.length());
+                String str1= data1.substring(0, 8);
+                String str2= data1.substring(8, 16);
+                log.info("-----解析真实数据----冲次:{},冲程:{}",ModBusUtils.tranTsOilParam(str1),
+                        ModBusUtils.tranTsOilParam(str2));
+                StringBuffer strOrg = new StringBuffer();
+                strOrg.append(ModBusUtils.tranTsOilParam(str1)).append(","+ModBusUtils.tranTsOilParam(str2));
+                String keyCurr ="pak_message_single_frequencytravel"+ooSerNoKeyStr;
+                redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+                Set<String> keysFrequencytravel = redisTemplate.keys("gather:"+ooSerNoKeyStr+":frequencytravel*time_*");
+                boolean sendFrequencytravelBl =false;
+                for (String key : keysFrequencytravel) {
+                    key = key.substring(key.indexOf("time_")+5,key.length());
+                    key =key.substring(0,key.length()-2);
+                    String currentTimeMin =DateTimeUtil.getCurrentTimeMin();
+                    if(currentTimeMin.equals(key)){
+                        sendFrequencytravelBl = true;
+                    }
+                }
+                if(!sendFrequencytravelBl){
+                    GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                    gatherSingleVo.setId(IdGenerator.getId());
+                    gatherSingleVo.setVal(strOrg.toString());
+                    gatherSingleVo.setGatherDate(DateUtil.now());
+                    gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                    gatherSingleVo.setType("frequencytravel");
+                    String str = JSONObject.toJSONString(gatherSingleVo);
+                    //redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
+                    String key ="gather:"+ooSerNoKeyStr+":frequencytravel:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                   // redisTemplate.opsForValue().set(key,gatherSingleVo);
+                    redisTemplate.opsForValue().set(key,str);
                 }
             }
-            if(!sendFrequencytravelBl){
-                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
-                gatherSingleVo.setId(IdGenerator.getId());
-                gatherSingleVo.setVal(strOrg.toString());
-                gatherSingleVo.setGatherDate(DateUtil.now());
-                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
-                gatherSingleVo.setType("frequencytravel");
-                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
-                String key ="gather:"+ooSerNoKeyStr+":frequencytravel:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
-                redisTemplate.opsForValue().set(key,gatherSingleVo);
+            if(data.length()==14&&data.startsWith("0103")&&data.endsWith("3807")&&data.indexOf("FA")>-1){
+                //示功图数据
+                log.info("-----------示功图计数数据开始采集-----------------------");
+                String key ="pak_message_mutile_"+ooSerNoKeyStr+"_status";
+                redisTemplate.opsForValue().set(key,"1");
+            }
+            if(data.length()==14&&data.startsWith("0103")&&data.indexOf("FA")==-1){
+                //示功图数据
+                log.info("-----------示功图计数数据未开始采集-----------------------");
+                String key ="pak_message_mutile_"+ooSerNoKeyStr+"_status";
+                redisTemplate.opsForValue().set(key,"0");
             }
-
-        }
-
-        if(data.length()==14&&data.startsWith("0103")&&data.endsWith("3807")&&data.indexOf("FA")>-1){
-            //示功图数据
-            log.info("-----------示功图数据开始采集-----------------------");
-            String key ="pak_message_mutile_"+ooSerNoKeyStr+"_status";
-            redisTemplate.opsForValue().set(key,"1");
-        }
-        if(data.length()==14&&data.startsWith("0103")&&data.indexOf("FA")==-1){
-            //示功图数据
-            log.info("-----------示功图数据未开始采集-----------------------");
-            String key ="pak_message_mutile_"+ooSerNoKeyStr+"_status";
-            redisTemplate.opsForValue().set(key,"0");
         }
     }
 }

+ 138 - 24
src/main/java/com/jpsoft/zlopd/netty/hander/OileValDataSgtHandler.java

@@ -1,13 +1,17 @@
 package com.jpsoft.zlopd.netty.hander;
 
+import cn.hutool.core.date.DateUtil;
 import com.jpsoft.zlopd.modules.base.service.GatherDataMultiService;
 import com.jpsoft.zlopd.modules.base.service.GatherDataService;
 import com.jpsoft.zlopd.modules.base.service.TsysUserService;
 import com.jpsoft.zlopd.netty.comm.DtuManage;
 import com.jpsoft.zlopd.netty.comm.OilAttrKeys;
 import com.jpsoft.zlopd.netty.comm.OileManager;
+import com.jpsoft.zlopd.netty.vo.GatherSingleVo;
 import com.jpsoft.zlopd.netty.vo.OileValDataSgtVo;
 import com.jpsoft.zlopd.netty.vo.OileValDataVo;
+import com.jpsoft.zlopd.util.DateTimeUtil;
+import com.jpsoft.zlopd.util.IdGenerator;
 import com.jpsoft.zlopd.util.ModBusUtils;
 import com.jpsoft.zlopd.util.SocketProperties;
 import io.netty.channel.ChannelHandler;
@@ -17,10 +21,12 @@ import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * 功能描述: netty服务端处理类
@@ -31,20 +37,8 @@ import java.util.List;
 @ChannelHandler.Sharable
 public class OileValDataSgtHandler extends SimpleChannelInboundHandler<OileValDataSgtVo> {
 
-    @Autowired
-    private OileManager oileManager;
-    @Autowired
-    private DtuManage dtuManage;
     @Resource
-    private SocketProperties socketProperties;
-    @Autowired
-    private TsysUserService sysUserService;
-    @Autowired
     private RedisTemplate<String, Object> redisTemplate;
-    @Autowired
-    private GatherDataMultiService gatherDataMultiService;
-    @Autowired
-    private GatherDataService gatherDataService;
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, OileValDataSgtVo oileValDataVo) throws Exception {
@@ -105,8 +99,33 @@ public class OileValDataSgtHandler extends SimpleChannelInboundHandler<OileValDa
                 }
             }
             log.info("示功图-位移值250点第1段数据为:{}",strOrg.toString());
-            String key ="pak_message_diagram_shift"+ooSerNoKeyStr+"_250_1-100";
-            redisTemplate.opsForValue().set(key,strOrg.toString());
+            String keyCurr ="pak_message_diagram_shift"+ooSerNoKeyStr+"_250_1-100";
+            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+            Set<String> keysDiagramShift1 = redisTemplate.keys("gather:"+ooSerNoKeyStr+":diagram_shift_250_1-100*time_*");
+            boolean sendDiagramShift1Bl =false;
+            for (String key : keysDiagramShift1) {
+                key = key.substring(key.indexOf("time_")+5,key.length());
+                /*key =key.substring(0,key.length()-2);
+                String currentTimeMin = DateTimeUtil.getCurrentTimeMin();
+                Long timeCom= Long.valueOf(key)+10;
+                if(timeCom>Long.valueOf(currentTimeMin)){
+                    sendDiagramShift1Bl = true;
+                }*/
+                if(DateTimeUtil.getCurrTimeCom(key)){
+                    sendDiagramShift1Bl = true;
+                }
+            }
+            if(!sendDiagramShift1Bl){
+                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                gatherSingleVo.setId(IdGenerator.getId());
+                gatherSingleVo.setVal(strOrg.toString());
+                gatherSingleVo.setGatherDate(DateUtil.now());
+                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                gatherSingleVo.setType("diagramShift1");
+                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
+                String key ="gather:"+ooSerNoKeyStr+":diagram_shift_250_1-100:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                redisTemplate.opsForValue().set(key,gatherSingleVo);
+            }
         }
         if(data1.startsWith("C0")){
             //当前示功图数据-位移值250点的1-100个数据
@@ -138,8 +157,27 @@ public class OileValDataSgtHandler extends SimpleChannelInboundHandler<OileValDa
                 }
             }
             log.info("示功图-位移值250点第2段数据为:{}",strOrg.toString());
-            String key ="pak_message_diagram_shift"+ooSerNoKeyStr+"_250_101-200";
-            redisTemplate.opsForValue().set(key,strOrg.toString());
+            String keyCurr ="pak_message_diagram_shift"+ooSerNoKeyStr+"_250_101-200";
+            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+            Set<String> keysDiagramShift2 = redisTemplate.keys("gather:"+ooSerNoKeyStr+":diagram_shift_250_101-200*time_*");
+            boolean sendDiagramShift2Bl =false;
+            for (String key : keysDiagramShift2) {
+                key = key.substring(key.indexOf("time_")+5,key.length());
+                if(DateTimeUtil.getCurrTimeCom(key)){
+                    sendDiagramShift2Bl = true;
+                }
+            }
+            if(!sendDiagramShift2Bl){
+                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                gatherSingleVo.setId(IdGenerator.getId());
+                gatherSingleVo.setVal(strOrg.toString());
+                gatherSingleVo.setGatherDate(DateUtil.now());
+                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                gatherSingleVo.setType("diagramShift2");
+                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
+                String key ="gather:"+ooSerNoKeyStr+":diagram_shift_250_101-200:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                redisTemplate.opsForValue().set(key,gatherSingleVo);
+            }
         }
         if(data1.startsWith("70")){
             //当前示功图数据-位移值250点的200-250个数据(50个数据)
@@ -171,8 +209,27 @@ public class OileValDataSgtHandler extends SimpleChannelInboundHandler<OileValDa
                 }
             }
             log.info("示功图-位移值250点第3段数据为:{}",strOrg.toString());
-            String key ="pak_message_diagram_shift"+ooSerNoKeyStr+"_250_201-250";
-            redisTemplate.opsForValue().set(key,strOrg.toString());
+            String keyCurr ="pak_message_diagram_shift"+ooSerNoKeyStr+"_250_201-250";
+            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+            Set<String> keysDiagramShift3 = redisTemplate.keys("gather:"+ooSerNoKeyStr+":diagram_shift_250_201-250*time_*");
+            boolean sendDiagramShift3Bl =false;
+            for (String key : keysDiagramShift3) {
+                key = key.substring(key.indexOf("time_")+5,key.length());
+                if(DateTimeUtil.getCurrTimeCom(key)){
+                    sendDiagramShift3Bl = true;
+                }
+            }
+            if(!sendDiagramShift3Bl){
+                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                gatherSingleVo.setId(IdGenerator.getId());
+                gatherSingleVo.setVal(strOrg.toString());
+                gatherSingleVo.setGatherDate(DateUtil.now());
+                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                gatherSingleVo.setType("diagramShift3");
+                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
+                String key ="gather:"+ooSerNoKeyStr+":diagram_shift_250_201-250:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                redisTemplate.opsForValue().set(key,gatherSingleVo);
+            }
         }
         if(data1.startsWith("BC")){
             //当前示功图数据-位移值250点的1-100个数据
@@ -204,8 +261,27 @@ public class OileValDataSgtHandler extends SimpleChannelInboundHandler<OileValDa
                 }
             }
             log.info("示功图-载荷值250点第1段数据为:{}",strOrg.toString());
-            String key ="pak_message_diagram_load"+ooSerNoKeyStr+"_250_1-100";
-            redisTemplate.opsForValue().set(key,strOrg.toString());
+            String keyCurr ="pak_message_diagram_load"+ooSerNoKeyStr+"_250_1-100";
+            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+            Set<String> keysDiagramLoad1 = redisTemplate.keys("gather:"+ooSerNoKeyStr+":diagram_load_250_1-100*time_*");
+            boolean sendDiagramLoad1Bl =false;
+            for (String key : keysDiagramLoad1) {
+                key = key.substring(key.indexOf("time_")+5,key.length());
+                if(DateTimeUtil.getCurrTimeCom(key)){
+                    sendDiagramLoad1Bl = true;
+                }
+            }
+            if(!sendDiagramLoad1Bl){
+                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                gatherSingleVo.setId(IdGenerator.getId());
+                gatherSingleVo.setVal(strOrg.toString());
+                gatherSingleVo.setGatherDate(DateUtil.now());
+                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                gatherSingleVo.setType("diagramLoad1");
+                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
+                String key ="gather:"+ooSerNoKeyStr+":diagram_load_250_1-100:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                redisTemplate.opsForValue().set(key,gatherSingleVo);
+            }
         }
         if(data1.startsWith("B8")){
             //当前示功图数据-位移值250点的1-100个数据
@@ -237,8 +313,27 @@ public class OileValDataSgtHandler extends SimpleChannelInboundHandler<OileValDa
                 }
             }
             log.info("示功图-载荷值250点第2段数据为:{}",strOrg.toString());
-            String key ="pak_message_diagram_load"+ooSerNoKeyStr+"_250_101-200";
-            redisTemplate.opsForValue().set(key,strOrg.toString());
+            String keyCurr ="pak_message_diagram_load"+ooSerNoKeyStr+"_250_101-200";
+            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+            Set<String> keysDiagramLoad2 = redisTemplate.keys("gather:"+ooSerNoKeyStr+":diagram_load_250_101-200*time_*");
+            boolean sendDiagramLoad2Bl =false;
+            for (String key : keysDiagramLoad2) {
+                key = key.substring(key.indexOf("time_")+5,key.length());
+                if(DateTimeUtil.getCurrTimeCom(key)){
+                    sendDiagramLoad2Bl = true;
+                }
+            }
+            if(!sendDiagramLoad2Bl){
+                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                gatherSingleVo.setId(IdGenerator.getId());
+                gatherSingleVo.setVal(strOrg.toString());
+                gatherSingleVo.setGatherDate(DateUtil.now());
+                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                gatherSingleVo.setType("diagramLoad2");
+                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
+                String key ="gather:"+ooSerNoKeyStr+":diagram_load_250_101-200:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                redisTemplate.opsForValue().set(key,gatherSingleVo);
+            }
         }
         if(data1.startsWith("80")){
             //当前示功图数据-位移值250点的200-250个数据(50个数据)
@@ -270,8 +365,27 @@ public class OileValDataSgtHandler extends SimpleChannelInboundHandler<OileValDa
                 }
             }
             log.info("示功图-载荷值250点第3段数据为:{}",strOrg.toString());
-            String key ="pak_message_diagram_load"+ooSerNoKeyStr+"_250_201-250";
-            redisTemplate.opsForValue().set(key,strOrg.toString());
+            String keyCurr ="pak_message_diagram_load"+ooSerNoKeyStr+"_250_201-250";
+            redisTemplate.opsForValue().set(keyCurr,strOrg.toString());
+            Set<String> keysDiagramLoad3 = redisTemplate.keys("gather:"+ooSerNoKeyStr+":diagram_load_250_201-250*time_*");
+            boolean sendDiagramLoad3Bl =false;
+            for (String key : keysDiagramLoad3) {
+                key = key.substring(key.indexOf("time_")+5,key.length());
+                if(DateTimeUtil.getCurrTimeCom(key)){
+                    sendDiagramLoad3Bl = true;
+                }
+            }
+            if(!sendDiagramLoad3Bl){
+                GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+                gatherSingleVo.setId(IdGenerator.getId());
+                gatherSingleVo.setVal(strOrg.toString());
+                gatherSingleVo.setGatherDate(DateUtil.now());
+                gatherSingleVo.setDeviceNo(ooSerNoKeyStr);
+                gatherSingleVo.setType("diagramLoad3");
+                redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(GatherSingleVo.class));
+                String key ="gather:"+ooSerNoKeyStr+":diagram_load_250_201-250:"+gatherSingleVo.getId()+"time_"+DateTimeUtil.getDealTime(gatherSingleVo.getGatherDate());
+                redisTemplate.opsForValue().set(key,gatherSingleVo);
+            }
         }
     }
 }

+ 44 - 48
src/main/java/com/jpsoft/zlopd/netty/task/BatchRegisterGatherTask.java

@@ -16,6 +16,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.annotation.Order;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
@@ -42,24 +43,10 @@ public class BatchRegisterGatherTask {
     private WellService wellService;
     @Autowired
     private WellParamService wellParamService;
-
-    @Autowired
-    private GatherDataMultiService gatherDataMultiService;
-    @Autowired
-    private GatherDataService gatherDataService;
     @Autowired
     private GatherDeviceService gatherDeviceService;
-    @Autowired
-    private AlarmLogService alarmLogService;
-    @Autowired
-    private AlarmService alarmService;
-
     @Autowired
     private RedisTemplate<String, Object> redisTemplate;
-
-    @Resource
-    private SocketProperties socketProperties;
-
     @Autowired
     private OileManager oileManager;
 
@@ -78,7 +65,7 @@ public class BatchRegisterGatherTask {
         oileManager.send(deviceCode,msgBytes6, owellParamStr,36);
     }
 
-    //@Scheduled(cron ="0 */1 * * * ?")
+   // @Scheduled(cron ="0 */1 * * * ?")
     public void taskDiagram1(){
         log.info("------示功图采集指令计算----定时任务开始----");
         List<Well> list = wellService.list();
@@ -101,36 +88,47 @@ public class BatchRegisterGatherTask {
                     oileManager.send(deviceCode,msgBytes,wp.getParamId(),3);
                 }
             }
-
         }
     }
-   // @Scheduled(cron ="0 */3 * * * ?")
+   //@Scheduled(cron ="0 */3 * * * ?")
+   // @Scheduled(cron ="*/5 * * * * ?")
     public void taskDiagram2(){
+        log.info("-----示功图采集任务开始--------"+ DateUtil.now());
         List<Well> list = wellService.list();
+        ExecutorService executor = ThreadUtil.newExecutor(list.size());
         for(Well well:list){
             String wellId=  well.getWellId();
             List<GatherDevice>  listG=gatherDeviceService.list(wellId);
-            for(GatherDevice gatherDevice:listG){
-                String ooSerNoKeyStr=  gatherDevice.getDeviceCode();
-                List<WellParam> listWP= wellParamService.list(wellId);
-                List<WellParam> listWPMuti = new ArrayList<WellParam>();
-                for (WellParam wp:listWP){
-                    if(StringUtils.isNotEmpty(wp.getParamCode())&&wp.getParamCode().contains("diagram")){
-                        listWPMuti.add(wp);
+            executor.execute(() -> {
+                for(GatherDevice gatherDevice:listG){
+                    String ooSerNoKeyStr=  gatherDevice.getDeviceCode();
+                    List<WellParam> listWP= wellParamService.list(wellId);
+                    List<WellParam> listWPMuti = new ArrayList<WellParam>();
+                    for (WellParam wp:listWP){
+                        if(StringUtils.isNotEmpty(wp.getParamCode())&&wp.getParamCode().contains("diagram")){
+                            listWPMuti.add(wp);
+                        }
                     }
-                }
-                if(listWPMuti!=null&&listWPMuti.size()>0){
-                    WellParam wp =listWPMuti.get(0);
-                    String key ="pak_message_mutile_"+ooSerNoKeyStr+"_status";
-                    String status =(String)redisTemplate.opsForValue().get(key);
-                    if("1".equals(status)){
-                        log.info("------示功图采集指令发出--------");
-                        sendMsgSGT(ooSerNoKeyStr,wp.getParamId());
+                    if(listWPMuti!=null&&listWPMuti.size()>0){
+                        WellParam wp =listWPMuti.get(0);
+                        String key ="pak_message_mutile_"+ooSerNoKeyStr+"_status";
+                        redisTemplate.setValueSerializer(new StringRedisSerializer());
+                        String status =(String)redisTemplate.opsForValue().get(key);
+                      if(StringUtils.isNotEmpty(status)){
+                          status=status.replace("\"","");
+                          if("1".equals(status)){
+                              log.info("------示功图采集指令发出--------");
+                              sendMsgSGT(ooSerNoKeyStr,wp.getParamId());
+                          }
+                      }
+
                     }
                 }
-            }
+            });
+
 
         }
+        log.info("-----示功图采集任务结束--------"+ DateUtil.now());
     }
 
     //@Scheduled(cron ="*/29 * * * * ?")
@@ -144,22 +142,20 @@ public class BatchRegisterGatherTask {
             log.info("-----wellId--------"+ wellId);
             List<GatherDevice>  listG=gatherDeviceService.list(wellId);
             executor.execute(() -> {
-            for(GatherDevice gatherDevice:listG){
-                    log.info("当前执行线程:" + Thread.currentThread().getName());
-                    String deviceCode=  gatherDevice.getDeviceCode();
-                    List<WellParam> listWP= wellParamService.list(wellId);
-                    List<WellParam> listWPSinge = new ArrayList<WellParam>();
-                    for (WellParam wp:listWP){
-                        if(StringUtils.isNotEmpty(wp.getParamCode())&&!wp.getParamCode().contains("diagram")){
-                            listWPSinge.add(wp);
+                for(GatherDevice gatherDevice:listG){
+                        log.info("当前执行线程:" + Thread.currentThread().getName());
+                        String deviceCode=  gatherDevice.getDeviceCode();
+                        List<WellParam> listWP= wellParamService.list(wellId);
+                        List<WellParam> listWPSinge = new ArrayList<WellParam>();
+                        for (WellParam wp:listWP){
+                            if(StringUtils.isNotEmpty(wp.getParamCode())&&!wp.getParamCode().contains("diagram")){
+                                listWPSinge.add(wp);
+                            }
                         }
-                    }
-                    if(listWPSinge!=null&&listWPSinge.size()>0){
-                        sendMessage(deviceCode, listWPSinge);
-                    }
-
-
-            }
+                        if(listWPSinge!=null&&listWPSinge.size()>0){
+                            sendMessage(deviceCode, listWPSinge);
+                        }
+                }
             });
         }
 

Dosya farkı çok büyük olduğundan ihmal edildi
+ 978 - 345
src/main/java/com/jpsoft/zlopd/netty/task/BatchRegisterStorageTask.java


+ 193 - 34
src/main/java/com/jpsoft/zlopd/netty/task/BatchRegisterTask.java

@@ -1,18 +1,19 @@
 package com.jpsoft.zlopd.netty.task;
 
 import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.thread.ThreadUtil;
 import cn.hutool.core.util.IdUtil;
+import com.alibaba.fastjson.JSONObject;
 import com.jpsoft.zlopd.modules.base.entity.*;
 import com.jpsoft.zlopd.modules.base.service.*;
 import com.jpsoft.zlopd.netty.comm.DtuManage;
 import com.jpsoft.zlopd.netty.comm.ModbusDataInUtils;
+import com.jpsoft.zlopd.netty.vo.GatherSingleMidVo;
 import com.jpsoft.zlopd.netty.vo.GatherSingleVo;
 import com.jpsoft.zlopd.util.DateTimeUtil;
 import com.jpsoft.zlopd.util.IdGenerator;
 import com.jpsoft.zlopd.util.ModBusUtils;
 import com.jpsoft.zlopd.util.SocketProperties;
-import io.lettuce.core.RedisClient;
-import io.lettuce.core.api.StatefulRedisConnection;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -25,6 +26,7 @@ import org.springframework.stereotype.Component;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 /**
@@ -64,6 +66,163 @@ public class BatchRegisterTask {
 
     //@Scheduled(cron ="*/5 * * * * ?")
     public void taskTest() throws IOException {
+        System.out.println("111:"+DateUtil.now());
+        String wellId="2304274406447761";
+        List<WellParam> list = wellParamService.list(wellId);
+        String ret ="0.73,1.51,0.00,26.98,34.75,29.22,28.88,0.03,229.03,230.44,229.21,185918.08,488772.25," +
+                "4.12,12.31,0.00,0.30,4.86,2.92~-~2023-04-27 14:41:48~-~2023-04-27 14:41:48~-~2023-04-27 14:41:48";
+        String orgStr=ret.split("~-~")[0];
+        String gatharDateOile =ret.split("~-~")[1];
+        String gatharDateCt =ret.split("~-~")[2];
+        String gatharDateFt =ret.split("~-~")[3];
+        String[] orgStrSZ = orgStr.split(",");
+        int i = 0;
+        List<GatherSingleMidVo> listMid =new ArrayList<>();
+        for (String orgS : orgStrSZ) {
+            i++;
+            GatherSingleMidVo gatherSingleMidVo = new GatherSingleMidVo();
+            if (i<6){
+                gatherSingleMidVo.setGatherDate(gatharDateOile);
+                gatherSingleMidVo.setVal(orgS);
+                if(i==1){
+                    gatherSingleMidVo.setParamCode("oil_press");
+                }
+                if(i==2){
+                    gatherSingleMidVo.setParamCode("casing_press");
+                }
+                if(i==3){
+                    gatherSingleMidVo.setParamCode("back_press");
+                }
+                if(i==4){
+                    gatherSingleMidVo.setParamCode("well_head_temp");
+                }
+                if(i==5){
+                    gatherSingleMidVo.setParamCode("load");
+                }
+                listMid.add(gatherSingleMidVo);
+            }
+            if(i>=6&&i<18){
+                gatherSingleMidVo.setGatherDate(gatharDateCt);
+                gatherSingleMidVo.setVal(orgS);
+                if(i==6){
+                    gatherSingleMidVo.setParamCode("current_a");
+                }
+                if(i==7){
+                    gatherSingleMidVo.setParamCode("current_b");
+                }
+                if(i==8){
+                    gatherSingleMidVo.setParamCode("current_c");
+                }
+                if(i==9){
+                    gatherSingleMidVo.setParamCode("voltage_a");
+                }
+                if(i==10){
+                    gatherSingleMidVo.setParamCode("voltage_b");
+                }
+                if(i==11){
+                    gatherSingleMidVo.setParamCode("voltage_c");
+                }
+                if(i==12){
+                    gatherSingleMidVo.setParamCode("useful_power_loss");
+                }
+                if(i==13){
+                    gatherSingleMidVo.setParamCode("unuseful_power_loss");
+                }
+                if(i==14){
+                    gatherSingleMidVo.setParamCode("useful_power");
+                }
+                if(i==15){
+                    gatherSingleMidVo.setParamCode("unuseful_power");
+                }
+                if(i==16){
+                    gatherSingleMidVo.setParamCode("reverse_power");
+                }
+                if(i==17){
+                    gatherSingleMidVo.setParamCode("power_factor");
+                }
+                listMid.add(gatherSingleMidVo);
+            }
+            if(i>=18){
+                gatherSingleMidVo.setGatherDate(gatharDateFt);
+                gatherSingleMidVo.setVal(orgS);
+                if(i==18){
+                    gatherSingleMidVo.setParamCode("freq");
+                }
+                if(i==19){
+                    gatherSingleMidVo.setParamCode("stroke");
+                }
+                listMid.add(gatherSingleMidVo);
+            }
+        }
+
+        List<GatherData> listData = new ArrayList<>();
+        for (WellParam wellParam : list) {
+            GatherData gatherData = new GatherData();
+            if(wellParam.getParamCode().indexOf("diagram_")==-1){
+                List<GatherSingleMidVo> listGsm= listMid.stream().
+                        filter((GatherSingleMidVo gatherSingleMidVo)->wellParam.getParamCode().equals(gatherSingleMidVo.getParamCode()))
+                        .collect(Collectors.toList());
+                if(listGsm!=null&&listGsm.size()>0){
+                    GatherSingleMidVo vo =listGsm.get(0);
+                    vo.getGatherDate();
+                    vo.getVal();
+                    gatherData.setWellParam(wellParam.getParamId());
+                    Date date=  DateUtil.parse(vo.getGatherDate(),"yyyy-MM-dd HH:mm:ss");
+                    gatherData.setGatherTime(date);
+                    String str1 = vo.getVal();
+                    if("null".equals(str1)){
+                        str1="0.00";
+                    }
+                    str1=str1.replace("\"","");
+                    gatherData.setDataId(IdGenerator.getId());
+                    gatherData.setDataVal(new BigDecimal(str1));
+                    listData.add(gatherData);
+                }
+            }
+
+        }
+        System.out.println(listData.size());
+        System.out.println("222:"+DateUtil.now());
+//        ExecutorService executor = ThreadUtil.newExecutor(5);
+//        for(int i=0;i<5;i++){
+//            int finalI = i;
+//            System.out.println("111111:"+DateUtil.now());
+//            executor.execute(()->{
+//                try {
+//                    Thread.sleep(6000);
+//                    log.info("---i:---"+ finalI);
+//                } catch (InterruptedException e) {
+//                    e.printStackTrace();
+//                }
+//            });
+//            System.out.println("222222:"+DateUtil.now());
+//        }
+
+//        GatherSingleVo gatherSingleVo = new  GatherSingleVo();
+//        gatherSingleVo.setId(IdGenerator.getId());
+//        gatherSingleVo.setVal("3.56");
+//        gatherSingleVo.setGatherDate(DateUtil.now());
+//        gatherSingleVo.setDeviceNo("50188");
+//        gatherSingleVo.setType("frequencytravel");
+//        String str = JSONObject.toJSONString(gatherSingleVo);
+//        redisTemplate.opsForValue().set("888888",str);
+//        String ret =  (String)redisTemplate.opsForValue().get("888888");
+//        JSONObject jsonObject = JSONObject.parseObject(ret);
+//        GatherSingleVo gatherSingleVo = JSONObject.toJavaObject(jsonObject, GatherSingleVo.class);
+//        System.out.println(gatherSingleVo.getGatherDate()+"--"+gatherSingleVo.getVal());
+
+
+        //        System.out.println(DateUtil.now());
+//        String key ="88888888";
+//        Map<String, String> map = new HashMap<>();
+//        map.put("param1","2.56");
+//        map.put("param2","30.67");
+//        map.put("param3","230.56");
+//        map.put("param4","2222238.95");
+//        map.put("param5","0.01");
+//        map.put("param6",DateUtil.now());
+//        redisTemplate.opsForHash().putAll(key,map);
+//        System.out.println(DateUtil.now());
 //        Map<String, String> map = new HashMap<>();
 //        map.put("key1","1");
 //        map.put("key2","2");
@@ -73,38 +232,38 @@ public class BatchRegisterTask {
 //
 //        redisTemplate.opsForValue().multiSet(map);
 
-        List<String> listKey = new ArrayList<>();
-        listKey.add("key1");
-        listKey.add("key2");
-        listKey.add("key3");
-        listKey.add("key4");
-        listKey.add("key5");
-        redisTemplate.delete(listKey);
-        //测试批量插入和循环单个插入的时间区别
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < 1000; i++) {
-            GatherData gatherData = new GatherData();
-            gatherData.setDataId(IdGenerator.getId());
-            //gatherData.setGatherTime(new Date());
-            gatherData.setWellParam("owellParamStr");
-            gatherData.setDataVal(new BigDecimal(i));
-            gatherDataService.insert(gatherData);
-        }
-        long end = System.currentTimeMillis();
-        log.info("-----111----------" + (start - end) + "---------------");
-        long start2 = System.currentTimeMillis();
-        List<GatherData> list = new ArrayList<>();
-        for (int i = 0; i < 1000; i++) {
-            GatherData gatherData = new GatherData();
-            gatherData.setDataId(IdGenerator.getId());
-            //gatherData.setGatherTime(new Date());
-            gatherData.setWellParam("owellParamStr");
-            gatherData.setDataVal(new BigDecimal(i));
-            list.add(gatherData);
-        }
-        gatherDataService.insertBatch(list);
-        long end2 = System.currentTimeMillis();
-        log.info("-----222----------" + (start2 - end2) + "---------------");
+//        List<String> listKey = new ArrayList<>();
+//        listKey.add("key1");
+//        listKey.add("key2");
+//        listKey.add("key3");
+//        listKey.add("key4");
+//        listKey.add("key5");
+//        redisTemplate.delete(listKey);
+//        //测试批量插入和循环单个插入的时间区别
+//        long start = System.currentTimeMillis();
+//        for (int i = 0; i < 1000; i++) {
+//            GatherData gatherData = new GatherData();
+//            gatherData.setDataId(IdGenerator.getId());
+//            //gatherData.setGatherTime(new Date());
+//            gatherData.setWellParam("owellParamStr");
+//            gatherData.setDataVal(new BigDecimal(i));
+//            gatherDataService.insert(gatherData);
+//        }
+//        long end = System.currentTimeMillis();
+//        log.info("-----111----------" + (start - end) + "---------------");
+//        long start2 = System.currentTimeMillis();
+//        List<GatherData> list = new ArrayList<>();
+//        for (int i = 0; i < 1000; i++) {
+//            GatherData gatherData = new GatherData();
+//            gatherData.setDataId(IdGenerator.getId());
+//            //gatherData.setGatherTime(new Date());
+//            gatherData.setWellParam("owellParamStr");
+//            gatherData.setDataVal(new BigDecimal(i));
+//            list.add(gatherData);
+//        }
+//        gatherDataService.insertBatch(list);
+//        long end2 = System.currentTimeMillis();
+//        log.info("-----222----------" + (start2 - end2) + "---------------");
 //        System.out.println(DateUtil.now());
 //        String keyOile ="gather:50188:oile:*";
 //        Set<String> keys = redisTemplate.keys(keyOile);

+ 13 - 0
src/main/java/com/jpsoft/zlopd/netty/vo/GatherSingleMidVo.java

@@ -0,0 +1,13 @@
+package com.jpsoft.zlopd.netty.vo;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class GatherSingleMidVo implements Serializable {
+    private String gatherDate;
+    private String val;
+    private String paramCode;
+    private String paramId;
+}

+ 88 - 0
src/main/java/com/jpsoft/zlopd/util/DateTimeUtil.java

@@ -1,7 +1,10 @@
 package com.jpsoft.zlopd.util;
 
+import cn.hutool.core.date.DatePattern;
 import cn.hutool.core.date.DateUtil;
 
+import java.text.DecimalFormat;
+
 public class DateTimeUtil {
     public static String getCurrentTime(){
         String ret =DateUtil.now().replace("-","").replace(" ","").replace(":","");
@@ -18,4 +21,89 @@ public class DateTimeUtil {
         String ret =time.replace("-","").replace(" ","").replace(":","");
         return ret;
     }
+    public static String getDealTimeMinZero(String orgTime){
+        String orgTime2 =orgTime.substring(orgTime.indexOf(":")+1,orgTime.lastIndexOf(":"));
+        String orgTime3 ="";
+        orgTime3 =orgTime2.substring(0,1)+"0";
+        String orgTime4 ="";
+        orgTime4 =orgTime.substring(0,orgTime.indexOf(":")+1)+orgTime3+orgTime.substring(orgTime.lastIndexOf(":"),orgTime.length());
+        return orgTime4;
+    }
+    public static boolean getCurrTimeCom(String orgTime){
+        orgTime =orgTime.substring(0,11)+"000";
+        String nowTime=  DateUtil.now();
+        nowTime=nowTime.replace("-","").replace(" ","").replace(":","");
+        nowTime =nowTime.substring(0,11)+"000";
+        if(orgTime.equals(nowTime)){
+            return true;
+        }else {
+            return false;
+        }
+
+    }
+    public static void main(String[] args) {
+        System.out.println(getCurrTimeCom("20230424090908"));
+        String orgTime= "2023-04-23 10:46:08";
+
+        System.out.println(DateUtil.parse(orgTime, DatePattern.NORM_DATETIME_PATTERN));
+        /*String orgTime= "20230423104608";
+//        System.out.println(orgTime);
+//        System.out.println(orgTime.substring(0,10));
+//        System.out.println(orgTime.substring(10,12));
+//        System.out.println(orgTime.substring(12,14));
+//        System.out.println(orgTime.substring(0,11)+"000");
+        orgTime =orgTime.substring(0,11)+"000";
+        System.out.println(orgTime);
+        String nowTime=  DateUtil.now();
+        nowTime=nowTime.replace("-","").replace(" ","").replace(":","");
+       // System.out.println(nowTime);
+        nowTime =nowTime.substring(0,11)+"000";
+        System.out.println(nowTime);
+        if(orgTime.equals(nowTime)){
+
+        }*/
+
+//        String orgTime1= getDealTimeMinZero(orgTime);
+//        System.out.println(orgTime1);
+//        String nowTime=  DateUtil.now();
+//        System.out.println(nowTime);
+//        String nowTime1= getDealTimeMinZero(nowTime);
+//        System.out.println(nowTime1);
+
+//        String orgTime= "2023-04-23 10:46:08";
+//        System.out.println(orgTime);
+//        String orgTime1= getDealTimeMinZero(orgTime);
+//        System.out.println(orgTime1);
+//        String nowTime=  DateUtil.now();
+//        System.out.println(nowTime);
+//        String nowTime1= getDealTimeMinZero(nowTime);
+//        System.out.println(nowTime1);
+
+//        String orgTime= "2023-04-23 10:46:08";
+//        System.out.println(orgTime);
+//        String orgTime2 =orgTime.substring(orgTime.indexOf(":")+1,orgTime.lastIndexOf(":"));
+//        System.out.println(orgTime2);
+//        String orgTime3 ="";
+//        orgTime3 =orgTime2.substring(0,1)+"0";
+//        System.out.println(orgTime3);
+//        String orgTime4 ="";
+//        orgTime4 =orgTime.substring(0,orgTime.indexOf(":")+1)+orgTime3+orgTime.substring(orgTime.lastIndexOf(":"),orgTime.length());
+//        System.out.println(orgTime4);
+
+
+
+//        String key="20230423101608";
+//        key =key.substring(0,key.length()-2);
+//        String currTime=getCurrentTimeMin();
+//        System.out.println(key);
+//        System.out.println(currTime);
+//
+//        Long timeCom= Long.valueOf(key)+10;
+//        if(timeCom>Long.valueOf(currTime)){
+//            System.out.println("区间内");
+//        }else{
+//            System.out.println("区间外");
+//        }
+
+    }
 }

+ 5 - 2
src/main/resources/application-dev.yml

@@ -5,8 +5,11 @@ server:
 
 spring:
   datasource:
-    url: jdbc:log4jdbc:mysql://127.0.0.1:3306/zl_opd?autoReconnect=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false&rewriteBatchedStatements=true
-    #url: jdbc:log4jdbc:mysql://42.56.120.92:9601/zl_opd?autoReconnect=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false&rewriteBatchedStatements=true
+    #driver-class-name: net.sf.log4jdbc.DriverSpy
+    driver-class-name: com.mysql.cj.jdbc.Driver
+    type: com.alibaba.druid.pool.DruidDataSource
+    url: jdbc:mysql://127.0.0.1:3306/zl_opd?autoReconnect=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false&rewriteBatchedStatements=true
+    #url: jdbc:mysql://42.56.120.92:9601/zl_opd?autoReconnect=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false&rewriteBatchedStatements=true
     username: root
     #password: zlmysql
     password: hbjzws

+ 6 - 1
src/main/resources/application-pro.yml

@@ -5,9 +5,14 @@ server:
 
 spring:
   datasource:
-    url: jdbc:log4jdbc:mysql://42.56.120.92:9601/zl_opd?autoReconnect=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
+    #driver-class-name: net.sf.log4jdbc.DriverSpy
+    driver-class-name: com.mysql.cj.jdbc.Driver
+    type: com.alibaba.druid.pool.DruidDataSource
+    #url: jdbc:mysql://127.0.0.1:3306/zl_opd?autoReconnect=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false&rewriteBatchedStatements=true
+    url: jdbc:mysql://42.56.120.92:9601/zl_opd?autoReconnect=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false&rewriteBatchedStatements=true
     username: root
     password: zlmysql
+    #password: hbjzws
   devtools:
     add-properties: false
     restart:

+ 12 - 6
src/main/resources/application.yml

@@ -13,11 +13,11 @@ spring:
       max-request-size: 20MB
       max-file-size: 20MB
   datasource:
-    driver-class-name: net.sf.log4jdbc.DriverSpy
-    type: com.alibaba.druid.pool.DruidDataSource
     druid:
       # 连接池的配置信息
       # 初始化大小,最小,最大
+      mysql:
+        usePingMethod: false
       initial-size: 5
       min-idle: 5
       maxActive: 20
@@ -31,6 +31,8 @@ spring:
       testWhileIdle: true
       testOnBorrow: false
       testOnReturn: false
+      removeAbandoned: false
+      removeAbandonedTimeout: 1800
       # 打开PSCache,并且指定每个连接上PSCache的大小
       poolPreparedStatements: true
       maxPoolPreparedStatementPerConnectionSize: 20
@@ -71,13 +73,17 @@ spring:
       content-type: text/html; charset=utf-8
   data:
     redis:
-      host: 42.56.120.92
-      #host: 127.0.0.1
+      #host: 42.56.120.92
+      host: 127.0.0.1
       port: 9608
       username: zl_gather
       password: redis_gather
-      timeout: 5000
-
+      timeout: 500000
+      #jedis:
+        #pool:
+          #max-active: 8
+          #min-idle: 0
+          #max-idle: 8
 
 
 

+ 1 - 0
src/test/java/com/jpsoft/zlopd/ZlOpdTests.java

@@ -71,6 +71,7 @@ class ZlOpdTests {
             redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(TsysUser.class));
             //tsysUserService.insert(user);
             redisTemplate.opsForValue().set("user:"+user.getUserId(),user);
+
         }
 
     }

Bu fark içinde çok fazla dosya değişikliği olduğu için bazı dosyalar gösterilmiyor