Browse Source

定时采集任务处理优化

chenwen 1 year ago
parent
commit
8354ca7635

+ 30 - 23
src/main/java/com/hb/proj/gather/business/DataTransRepSingleTask.java

@@ -33,32 +33,39 @@ public class DataTransRepSingleTask implements Runnable{
 	
 	@Override
 	public void run() {
-		logger.info("开始单值数据转换处理{}",singleCombPO.getDevSerial());
-		Map<String,Float>  gatherDatas=singleCombPO.getGatherDatas();
-		WellParamVO paramConfig=null;
-		List<SingleInsertPO> insPOs=new ArrayList<>(gatherDatas.size());
-		SingleInsertPO insPOItm=null;
-		for(String pcode : gatherDatas.keySet()) {
-			paramConfig=DataTransConfig.get(singleCombPO.getDevSerial()+"_"+pcode);
-			if(paramConfig==null) {
-				logger.info("未找到参数配置{}_{}",singleCombPO.getDevSerial(),pcode);
-				continue;
+		try {
+			logger.info("开始单值数据转换处理{}",singleCombPO.getDevSerial());
+			Map<String,Float>  gatherDatas=singleCombPO.getGatherDatas();
+			WellParamVO paramConfig=null;
+			List<SingleInsertPO> insPOs=new ArrayList<>(gatherDatas.size());
+			SingleInsertPO insPOItm=null;
+			for(String pcode : gatherDatas.keySet()) {
+				paramConfig=DataTransConfig.get(singleCombPO.getDevSerial()+"_"+pcode);
+				if(paramConfig==null) {
+					logger.info("未找到参数配置{}_{}",singleCombPO.getDevSerial(),pcode);
+					continue;
+				}
+				insPOItm=new SingleInsertPO(pcode,paramConfig.getParamId(),gatherDatas.get(pcode),singleCombPO.getGatherTime());
+				DataTransUtils.transSingle(insPOItm, paramConfig);
+				insPOs.add(insPOItm);
+			
 			}
-			insPOItm=new SingleInsertPO(pcode,paramConfig.getParamId(),gatherDatas.get(pcode),singleCombPO.getGatherTime());
-			DataTransUtils.transSingle(insPOItm, paramConfig);
-			insPOs.add(insPOItm);
-		
+			
+			logger.info("单值数据转换完:{}",insPOs.size());
+			
+			RedisRepComponent repRedis=ApplicationContextUtils.getBean("redisRepComponent", RedisRepComponent.class);
+			
+			repRedis.put(paramConfig.getWellId(), buildRedisDatas(insPOs));
+			
+			GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
+			
+			repService.save(insPOs);  //入库
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			logger.error("单值转换、入库任务执行出现异常:{}",e.getMessage());
 		}
 		
-		logger.info("单值数据转换完:{}",insPOs.size());
-		
-		RedisRepComponent repRedis=ApplicationContextUtils.getBean("redisRepComponent", RedisRepComponent.class);
-		
-		repRedis.put(paramConfig.getWellId(), buildRedisDatas(insPOs));
-		
-		GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
-		
-		repService.save(insPOs);  //入库
 	}
 	
 	

+ 21 - 14
src/main/java/com/hb/proj/gather/business/DataTransRepTask.java

@@ -26,22 +26,29 @@ public class DataTransRepTask implements Runnable{
 	
 	@Override
 	public void run() {
-		logger.info("开始数据转换处理{}",diagramPO.getDevSerial());
-		WellParamVO paramConfig=DataTransConfig.get(diagramPO.getDevSerial()+"_"+diagramPO.getParamCode());
-		if(paramConfig==null) {
-			logger.info("未找到参数配置{}_{}",diagramPO.getDevSerial(),diagramPO.getParamCode());
-			return;
+		try {
+			logger.info("开始数据转换处理{}",diagramPO.getDevSerial());
+			WellParamVO paramConfig=DataTransConfig.get(diagramPO.getDevSerial()+"_"+diagramPO.getParamCode());
+			if(paramConfig==null) {
+				logger.info("未找到参数配置{}_{}",diagramPO.getDevSerial(),diagramPO.getParamCode());
+				return;
+			}
+			
+			diagramPO.setWellParam(paramConfig.getParamId());
+			
+			DataTransUtils.transMulti(diagramPO, paramConfig); //数据转换
+			
+			logger.info("数据转换完:{}",diagramPO.getParamCode());
+			
+			GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
+			
+			repService.save(diagramPO);  //入库
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			logger.error("多值转换、入库任务执行出现异常:{}",e.getMessage());
 		}
 		
-		diagramPO.setWellParam(paramConfig.getParamId());
-		
-		DataTransUtils.transMulti(diagramPO, paramConfig); //数据转换
-		
-		logger.info("数据转换完:{}",diagramPO.getParamCode());
-		
-		GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
-		
-		repService.save(diagramPO);  //入库
 	}
 
 }

+ 7 - 8
src/main/java/com/hb/proj/gather/business/GatherScheduler.java

@@ -25,11 +25,9 @@ public class GatherScheduler {
 	
 	private final static  Logger logger = LoggerFactory.getLogger(GatherScheduler.class);
 	
-	private static int scheNum=1; //执行次数
+	private static int scheNum=0; //执行次数
+	
 	
-	public static void resetScheNum() {
-		scheNum=1;
-	}
 	
 	public static int getScheNum() {
 		return scheNum;
@@ -37,19 +35,20 @@ public class GatherScheduler {
 	
 
 	/**
-	 * 单值1分钟执行一次采集
+	 * 单值1分钟执行一次采集,首次等待1分钟,等待服务启动,设备连接上
 	 */
-	@Scheduled(fixedRate=60*1000)  //每分钟执行一次
+	@Scheduled(fixedRate=60*1000,initialDelay= 60000)  //每分钟执行一次
 	public void startSingleGather() {
 		logger.info("定时采集启动...");
+		scheNum+=1;
 		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
 		Channel channel=null;
 		while(iterator.hasNext()) {
 			channel=iterator.next();
-			GatherTaskExecutor.execute(new GatherTask(channel));
+			GatherTaskExecutor.execute(new GatherTask(channel));  //复合任务包含单值、多值
 		}
 		
-		scheNum+=1;
+		scheNum=scheNum%5==0?0:scheNum;
 	}
 	
 	

+ 0 - 1
src/main/java/com/hb/proj/gather/business/GatherTask.java

@@ -33,7 +33,6 @@ public class GatherTask implements Runnable{
 			logger.info("多值采集开始...");
 			multiGather(alloc,channel);
 			
-			GatherScheduler.resetScheNum();
 		}
 	}
 	

+ 1 - 1
src/main/java/com/hb/proj/gather/business/GatherTaskExecutor.java

@@ -20,7 +20,7 @@ public class GatherTaskExecutor {
 	private static final ExecutorService  executor=new ThreadPoolExecutor(5,30,30,TimeUnit.MINUTES,new LinkedBlockingQueue<Runnable>(20),new ThreadPoolExecutor.CallerRunsPolicy());
 	
 	public static void execute(Runnable task){
-		executor.execute(task);
+		executor.submit(task);   //execute 只能执行无返回结果的任务,submit 可以返回结果Feture,任务有两种:runnable,callable
 	}
 	
 	public static void shutdown() {