فهرست منبع

单值数据采集,转换、入库

chenwen 1 سال پیش
والد
کامیت
ea73904ac7

+ 22 - 3
src/main/java/com/hb/proj/gather/business/DataAssembler.java

@@ -8,6 +8,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.hb.proj.gather.model.DiagramPO;
+import com.hb.proj.gather.model.SingleCombPO;
 
 /**
  * 采集数据组装器(将分批采集的数据组装成完整数据,特别是功图数据)
@@ -20,16 +21,22 @@ public class DataAssembler extends Thread{
 	
 	private static boolean stopTag=false;
 			
-	private static Map<String,DiagramPieceBox> diagramPkgs=new HashMap<String,DiagramPieceBox>(100);
+	private static Map<String,DataPieceDiagramBox> diagramPkgs=new HashMap<String,DataPieceDiagramBox>(100);
+	
+	private static Map<String,DataPieceSingleBox> singlePkgs=new HashMap<String,DataPieceSingleBox>(100);
 	
 	public  static void putPieceData(String channelSerial,Map<String,Float> pieceData) {
 		logger.info("接收单值片段数据:{},{}",channelSerial,pieceData);
+		if(!singlePkgs.containsKey(channelSerial)) {
+			singlePkgs.put(channelSerial, new DataPieceSingleBox(channelSerial));
+		}
+		singlePkgs.get(channelSerial).putAll(pieceData);
 	}
 	
 	public  static void putPieceData(String channelSerial,String pieceName,List<Float> pieceDatas) {
 		logger.info("接收功图片段数据:{},{}",channelSerial,pieceName);
 		if(!diagramPkgs.containsKey(channelSerial)) {
-			diagramPkgs.put(channelSerial, new DiagramPieceBox(channelSerial));
+			diagramPkgs.put(channelSerial, new DataPieceDiagramBox(channelSerial));
 		}
 		diagramPkgs.get(channelSerial).put(pieceName,pieceDatas);
 	}
@@ -37,11 +44,23 @@ public class DataAssembler extends Thread{
 	@Override
 	public void run() {
 		logger.info("开始数据组装检测");
-		DiagramPieceBox  diaPiece=null;
+		DataPieceDiagramBox  diaPiece=null;
 		DiagramPO  diagramPO=null;
 		
+		DataPieceSingleBox  singPiece=null;
+		SingleCombPO  singlePO=null;
+		
 		try {
 			while(!stopTag) {
+				
+				for(String channelSerial : singlePkgs.keySet()) {
+					singPiece=singlePkgs.get(channelSerial);
+					singlePO=singPiece.assemble();
+					if(singlePO!=null) {
+						GatherTaskExecutor.execute(new DataTransRepSingleTask(singlePO)); 
+					}
+				}
+				
 				for(String channelSerial : diagramPkgs.keySet()) {
 					/*
 					 * if(ChannelGroupMgr.get(channelSerial)== null) { //通道已不存在,释放片段数据缓存

+ 24 - 10
src/main/java/com/hb/proj/gather/business/DiagramPieceBox.java → src/main/java/com/hb/proj/gather/business/DataPieceDiagramBox.java

@@ -5,20 +5,21 @@ import java.util.List;
 import java.util.Map;
 
 import com.hb.proj.gather.model.DiagramPO;
+import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
 
 /**
  * 功图片段数据包
  * @author cwen
  *
  */
-public class DiagramPieceBox {
+public class DataPieceDiagramBox {
 	
 	private int gatherNum;  //采集批次号
 	
 	private String channelSerial; //通道编号
 	
 	
-	public DiagramPieceBox(String channelSerial) {
+	public DataPieceDiagramBox(String channelSerial) {
 		this.channelSerial=channelSerial;
 	}
 	
@@ -27,32 +28,45 @@ public class DiagramPieceBox {
 	
 	public void put(String pieceName,List<Float> pieceDatas) {
 		pieceMap.put(pieceName, pieceDatas);
-		//this.canBeAssemble();
 	}
 	
 	public DiagramPO  assembleDiagramLoad() {
-		return assembleDiagram(DiagramPO.DIAGRAM_LOAD_CODE,"chartload_1","chartload_2","chartload_3"); //与指令ZLOpdProtCMDEnum中配置的参数名一致
+		return assembleDiagram(DiagramPO.DIAGRAM_LOAD_CODE,
+				(ZLOpdProtCMDEnum.DIAGRAM_LOAD_1.getParamCodes())[0],
+				(ZLOpdProtCMDEnum.DIAGRAM_LOAD_2.getParamCodes())[0],
+				(ZLOpdProtCMDEnum.DIAGRAM_LOAD_3.getParamCodes())[0]); //与指令ZLOpdProtCMDEnum中配置的参数名一致
 	}
 	
 	public DiagramPO  assembleDiagramCurr() {
-		return assembleDiagram(DiagramPO.DIAGRAM_CURR_CODE,"chartcurr_1","chartcurr_2","chartcurr_3");
+		return assembleDiagram(DiagramPO.DIAGRAM_CURR_CODE,
+				(ZLOpdProtCMDEnum.DIAGRAM_CURR_1.getParamCodes())[0],
+				(ZLOpdProtCMDEnum.DIAGRAM_CURR_2.getParamCodes())[0],
+				(ZLOpdProtCMDEnum.DIAGRAM_CURR_3.getParamCodes())[0]);
 	}
 	
 	public DiagramPO  assembleDiagramPower() {
-		return assembleDiagram(DiagramPO.DIAGRAM_POWER_CODE,"chartpower_1","chartpower_2","chartpower_3");
+		return assembleDiagram(DiagramPO.DIAGRAM_POWER_CODE,
+				(ZLOpdProtCMDEnum.DIAGRAM_POWER_1.getParamCodes())[0],
+				(ZLOpdProtCMDEnum.DIAGRAM_POWER_2.getParamCodes())[0],
+				(ZLOpdProtCMDEnum.DIAGRAM_POWER_3.getParamCodes())[0]);
 	}
 	
 	public DiagramPO  assembleDiagram(String diagramCode,String othkey1,String othkey2,String othkey3) {
-		boolean dispOk=pieceMap.get("disp_1")!=null&&pieceMap.get("disp_2")!=null&&pieceMap.get("disp_3")!=null;
+		
+		String dispCode1=(ZLOpdProtCMDEnum.DIAGRAM_DISP_1.getParamCodes())[0],
+			   dispCode2=(ZLOpdProtCMDEnum.DIAGRAM_DISP_2.getParamCodes())[0],
+			   dispCode3=(ZLOpdProtCMDEnum.DIAGRAM_DISP_3.getParamCodes())[0];
+		
+		boolean dispOk=pieceMap.get(dispCode1)!=null&&pieceMap.get(dispCode2)!=null&&pieceMap.get(dispCode3)!=null;
 		boolean othOk=pieceMap.get(othkey1)!=null&&pieceMap.get(othkey2)!=null&&pieceMap.get(othkey3)!=null;
 		
 		if(!dispOk||!othOk) {
 			return null;
 		}
 		
-		List<Float> disps=pieceMap.get("disp_1");
-		disps.addAll(pieceMap.get("disp_2"));
-		disps.addAll(pieceMap.get("disp_3"));
+		List<Float> disps=pieceMap.get(dispCode1);
+		disps.addAll(pieceMap.get(dispCode2));
+		disps.addAll(pieceMap.get(dispCode3));
 		
 		List<Float> oths=pieceMap.get(othkey1);
 		oths.addAll(pieceMap.get(othkey2));

+ 76 - 0
src/main/java/com/hb/proj/gather/business/DataPieceSingleBox.java

@@ -0,0 +1,76 @@
+package com.hb.proj.gather.business;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.hb.proj.gather.model.SingleCombPO;
+import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+
+/**
+ * 单值片段数据包
+ * @author cwen
+ *
+ */
+public class DataPieceSingleBox {
+	
+	
+
+	private int gatherNum;  //采集批次号
+	
+	private String channelSerial; //通道编号
+	
+	private Map<String,Float>  pieceMap=new HashMap<String,Float>(20);
+	
+	public DataPieceSingleBox(String channelSerial) {
+		this.channelSerial=channelSerial;
+	}
+	
+	public void putAll(Map<String,Float> gatherPiece) {
+		pieceMap.putAll(gatherPiece);
+	}
+	
+	public void put(String pieceName,Float val) {
+		pieceMap.put(pieceName, val);
+	}
+	
+	
+	public SingleCombPO  assemble() {
+		boolean b1=checkReady(ZLOpdProtCMDEnum.PRESS_TEMP_LOAD.getParamCodes());
+		boolean b2=checkReady(ZLOpdProtCMDEnum.CURR_VOL_LOS_PW.getParamCodes());
+		boolean b3=checkReady(ZLOpdProtCMDEnum.FREQ_STROKE.getParamCodes());
+		
+		if(!b1 || !b2 || !b3) {
+			return null;
+		}
+		Map<String,Float> copyData=new HashMap<>(pieceMap.size());
+		copyData.putAll(pieceMap);
+		pieceMap.clear();
+		return new SingleCombPO(channelSerial,copyData);
+	}
+	
+	public boolean  checkReady(String[] paramCodes) {
+		for(String pcode : paramCodes) {
+			if(!pieceMap.containsKey(pcode)) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+
+	public int getGatherNum() {
+		return gatherNum;
+	}
+
+	public void setGatherNum(int gatherNum) {
+		this.gatherNum = gatherNum;
+	}
+
+	public String getChannelSerial() {
+		return channelSerial;
+	}
+
+	public void setChannelSerial(String channelSerial) {
+		this.channelSerial = channelSerial;
+	}
+}

+ 58 - 0
src/main/java/com/hb/proj/gather/business/DataTransRepSingleTask.java

@@ -0,0 +1,58 @@
+package com.hb.proj.gather.business;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.model.SingleCombPO;
+import com.hb.proj.gather.model.SingleInsertPO;
+import com.hb.proj.gather.model.WellParamVO;
+import com.hb.proj.gather.rep.GatherDataRepService;
+import com.hb.xframework.util.ApplicationContextUtils;
+
+/**
+ * 单值采集数据 数据转换任务
+ * @author cwen
+ *
+ */
+public class DataTransRepSingleTask implements Runnable{
+	
+	private final static  Logger logger = LoggerFactory.getLogger(DataTransRepSingleTask.class);
+
+	private SingleCombPO  singleCombPO;
+	
+	public DataTransRepSingleTask(SingleCombPO  singleCombPO) {
+		this.singleCombPO=singleCombPO;
+	}
+	
+	
+	@Override
+	public void run() {
+		logger.info("开始单值数据转换处理{}",singleCombPO.getDevSerial());
+		Map<String,Float>  gatherDatas=singleCombPO.getGatherDatas();
+		WellParamVO paramConfig=null;
+		List<SingleInsertPO> insPOs=new ArrayList<>(gatherDatas.size());
+		SingleInsertPO insPOItm=null;
+		for(String pcode : gatherDatas.keySet()) {
+			paramConfig=DataTransConfig.get(singleCombPO.getDevSerial()+"_"+pcode);
+			if(paramConfig==null) {
+				logger.info("未找到参数配置{}_{}",singleCombPO.getDevSerial(),pcode);
+				continue;
+			}
+			insPOItm=new SingleInsertPO(paramConfig.getParamId(),gatherDatas.get(pcode),singleCombPO.getGatherTime());
+			DataTransUtils.transSingle(insPOItm, paramConfig);
+			insPOs.add(insPOItm);
+		
+		}
+		
+		logger.info("单值数据转换完:{}",insPOs.size());
+		
+		GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
+		
+		repService.save(insPOs);  //入库
+	}
+
+}

+ 6 - 1
src/main/java/com/hb/proj/gather/business/DataTransRepTask.java

@@ -8,6 +8,11 @@ import com.hb.proj.gather.model.WellParamVO;
 import com.hb.proj.gather.rep.GatherDataRepService;
 import com.hb.xframework.util.ApplicationContextUtils;
 
+/**
+ * 多值数据  数据转换任务
+ * @author cwen
+ *
+ */
 public class DataTransRepTask implements Runnable{
 	
 	private final static  Logger logger = LoggerFactory.getLogger(DataTransRepTask.class);
@@ -28,7 +33,7 @@ public class DataTransRepTask implements Runnable{
 			return;
 		}
 		
-		
+		diagramPO.setWellParam(paramConfig.getParamId());
 		
 		DataTransUtils.transMulti(diagramPO, paramConfig); //数据转换
 		

+ 28 - 5
src/main/java/com/hb/proj/gather/business/DataTransUtils.java

@@ -3,6 +3,7 @@ package com.hb.proj.gather.business;
 import java.util.List;
 
 import com.hb.proj.gather.model.DiagramPO;
+import com.hb.proj.gather.model.SingleInsertPO;
 import com.hb.proj.gather.model.WellParamVO;
 
 /**
@@ -11,14 +12,36 @@ import com.hb.proj.gather.model.WellParamVO;
  *
  */
 public class DataTransUtils {
-
-	public static void transMulti(DiagramPO diaramData,WellParamVO  paramConfig) {
+	
+	public static void transSingle(SingleInsertPO  singleInsPO,WellParamVO  wp) {
+		transSingle(singleInsPO.getDataVal(),wp.getCalibrateA(),wp.getCalibrateB(),wp.getCalibrateC(),wp.getGatInsScale());
+	}
+	
+	
+	
+	public static void transSingle(Float data,Double calA,Double calB,Double calC,Double unitScale) {
+		if(calA==null) {
+			calA=0.0;
+		}
+		if(calB==null) {
+			calB=1.0;
+		}
+		if(calC==null) {
+			calC=0.0;
+		}
 		
-		diaramData.setWellParam(paramConfig.getParamId());
+		unitScale=unitScale!=null?unitScale:1;
+		
+		double d= ((calA*data*data+calB*data+calC)*unitScale);
+		
+		data=Float.parseFloat(String.format("%.3f", d));
+	}
+	
+	public static void transMulti(DiagramPO diaramData,WellParamVO  wp) {
 		
-		transMulti(diaramData.getOths(),paramConfig.getCalibrateA(),paramConfig.getCalibrateB(),paramConfig.getCalibrateC(),paramConfig.getGatInsScale());
+		transMulti(diaramData.getOths(),wp.getCalibrateA(),wp.getCalibrateB(),wp.getCalibrateC(),wp.getGatInsScale());
 		
-		transMulti(diaramData.getDisps(),paramConfig.getCalibrateA2(),paramConfig.getCalibrateB2(),paramConfig.getCalibrateC2(),paramConfig.getGatInsScale2());
+		transMulti(diaramData.getDisps(),wp.getCalibrateA2(),wp.getCalibrateB2(),wp.getCalibrateC2(),wp.getGatInsScale2());
 	}
 	
 	public static void transMulti(List<Float> datas,Double calA,Double calB,Double calC,Double unitScale) {

+ 0 - 17
src/main/java/com/hb/proj/gather/business/SinglePieceBox.java

@@ -1,17 +0,0 @@
-package com.hb.proj.gather.business;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class SinglePieceBox {
-
-	private int gatherNum;  //采集批次号
-	
-	private String channelSerial; //通道编号
-	
-	private Map<String,Float>  pieceMap=new HashMap<String,Float>(20);
-	
-	public void put(String pieceName,Float val) {
-		pieceMap.put(pieceName, val);
-	}
-}

+ 52 - 0
src/main/java/com/hb/proj/gather/model/SingleCombPO.java

@@ -0,0 +1,52 @@
+package com.hb.proj.gather.model;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * 单值采集数据 组合实体
+ * @author cwen
+ *
+ */
+public class SingleCombPO {
+
+	private String devSerial;  //设备序号
+	
+	private Date  gatherTime;
+	
+	private Map<String,Float> gatherDatas;
+	
+	public SingleCombPO() {
+		
+	}
+	
+	public SingleCombPO(String devSerial,Map<String,Float> gatherDatas) {
+		this.gatherTime=new Date();
+		this.devSerial=devSerial;
+		this.gatherDatas=gatherDatas;
+	}
+
+	public String getDevSerial() {
+		return devSerial;
+	}
+
+	public void setDevSerial(String devSerial) {
+		this.devSerial = devSerial;
+	}
+
+	public Date getGatherTime() {
+		return gatherTime;
+	}
+
+	public void setGatherTime(Date gatherTime) {
+		this.gatherTime = gatherTime;
+	}
+
+	public Map<String, Float> getGatherDatas() {
+		return gatherDatas;
+	}
+
+	public void setGatherDatas(Map<String, Float> gatherDatas) {
+		this.gatherDatas = gatherDatas;
+	}
+}

+ 52 - 0
src/main/java/com/hb/proj/gather/model/SingleInsertPO.java

@@ -0,0 +1,52 @@
+package com.hb.proj.gather.model;
+
+import java.util.Date;
+
+/**
+ * 单值采集入库实体对象
+ * @author cwen
+ *
+ */
+public class SingleInsertPO {
+
+	private String wellParam;
+	
+	private Float  dataVal;
+	
+	private Date gatherTime;
+	
+	public SingleInsertPO() {
+		
+	}
+	
+	public SingleInsertPO(String wellParam,Float  dataVal,Date gatherTime) {
+		this.wellParam=wellParam;
+		this.dataVal=dataVal;
+		this.gatherTime=gatherTime;
+	}
+	
+
+	public String getWellParam() {
+		return wellParam;
+	}
+
+	public void setWellParam(String wellParam) {
+		this.wellParam = wellParam;
+	}
+
+	public Float getDataVal() {
+		return dataVal;
+	}
+
+	public void setDataVal(Float dataVal) {
+		this.dataVal = dataVal;
+	}
+
+	public Date getGatherTime() {
+		return gatherTime;
+	}
+
+	public void setGatherTime(Date gatherTime) {
+		this.gatherTime = gatherTime;
+	}
+}

+ 1 - 0
src/main/java/com/hb/proj/gather/protocol/GatherRespParser.java

@@ -28,6 +28,7 @@ public class GatherRespParser {
 		ZLOpdProtCMDEnum  cmdEum=ZLOpdProtCMDEnum.valueOf(cmd);
 		if(cmdEum.getItemBytCount()==4) {
 			 Map<String,Float> dataMap=parseFloat(byteBuf,startIndex,dataLen,cmdEum.getParamCodes());
+			 DataAssembler.putPieceData(serial, dataMap);
 		}
 		else if(cmdEum.getItemBytCount()==2) { //默认为功图数据解析
 			 List<Float> datas=parseShort2Float(byteBuf,startIndex,dataLen);

+ 37 - 0
src/main/java/com/hb/proj/gather/rep/GatherDataRepService.java

@@ -1,11 +1,13 @@
 package com.hb.proj.gather.rep;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import com.hb.proj.gather.model.DiagramPO;
+import com.hb.proj.gather.model.SingleInsertPO;
 import com.hb.xframework.dao.core.SpringJdbcDAO;
 
 /**
@@ -19,6 +21,24 @@ public class GatherDataRepService {
 	@Autowired
 	private SpringJdbcDAO  dao;
 	
+	/**
+	 * 单值采集数据入库
+	 * @param singleInPOs
+	 */
+	public void save(List<SingleInsertPO>  singleInPOs) {
+		if(singleInPOs==null||singleInPOs.size()==0) {
+			return;
+		}
+		String sql="""
+				insert into tzl_gather_data(well_param,gather_time,data_val) values(?,?,?)
+				""";
+		dao.getJdbcTemplate().batchUpdate(sql, buildBatchParams(singleInPOs));
+	}
+	
+	/**
+	 * 多值采集数据 入库
+	 * @param diagramPO
+	 */
 	public void save(DiagramPO diagramPO) {
 		String sql="""
 				insert into tzl_gather_data_multi(well_param,gather_time,data_val1,data_val2) values(?,?,?,?)
@@ -33,4 +53,21 @@ public class GatherDataRepService {
 		}
 		return strb.substring(1);
 	}
+	
+	/**
+	 * 构建单值批量入库参数
+	 * @param singleInPOs
+	 * @return
+	 */
+	private List<Object[]>  buildBatchParams(List<SingleInsertPO>  singleInPOs){
+		
+		List<Object[]>  params=new ArrayList<>(singleInPOs.size());
+		
+		for(SingleInsertPO po : singleInPOs) {
+			
+			params.add(new Object[] {po.getWellParam(),po.getGatherTime(),po.getDataVal()});
+		}
+		
+		return params;
+	}
 }