DataTransRepSingleTask.java 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package com.hb.proj.gather.process;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import com.hb.proj.gather.model.AlarmLogVO;
  8. import com.hb.proj.gather.model.SingleCombPO;
  9. import com.hb.proj.gather.model.SingleInsertPO;
  10. import com.hb.proj.gather.model.WellParamVO;
  11. import com.hb.proj.gather.rep.GatherDataRepService;
  12. import com.hb.proj.gather.rep.RedisRepComponent;
  13. import com.hb.proj.utils.JacksonUtils;
  14. import com.hb.xframework.util.ApplicationContextUtils;
  15. /**
  16. * 单值采集数据 数据转换任务
  17. * @author cwen
  18. *
  19. */
  20. public class DataTransRepSingleTask implements Runnable{
  21. private final static Logger logger = LoggerFactory.getLogger(DataTransRepSingleTask.class);
  22. private SingleCombPO singleCombPO;
  23. public DataTransRepSingleTask(SingleCombPO singleCombPO) {
  24. this.singleCombPO=singleCombPO;
  25. }
  26. @Override
  27. public void run() {
  28. try {
  29. logger.info("开始单值数据转换处理{}",singleCombPO.getDevSerial());
  30. Map<String,Float> gatherDatas=singleCombPO.getGatherDatas();
  31. WellParamVO paramConfig=null;
  32. SingleInsertPO insPOItm=null;
  33. Map<String,Object> redisDatas=new HashMap<>(gatherDatas.size()); //进入redis的数据
  34. Map<String,SingleInsertPO> mappingDatas=new HashMap<>(gatherDatas.size()); //入库及报警判断用数据
  35. for(String pcode : gatherDatas.keySet()) {
  36. paramConfig=DataTransConfig.get(singleCombPO.getDevSerial()+"_"+pcode);
  37. if(paramConfig==null) {
  38. logger.warn("未找到参数配置{}_{}",singleCombPO.getDevSerial(),pcode);
  39. continue;
  40. }
  41. insPOItm=new SingleInsertPO(pcode,paramConfig.getParamId(),gatherDatas.get(pcode),singleCombPO.getGatherTime());
  42. DataTransUtils.transSingle(insPOItm, paramConfig);
  43. redisDatas.put(insPOItm.getParamCode(), insPOItm.getDataVal());
  44. mappingDatas.put(insPOItm.getWellParam(), insPOItm);
  45. }
  46. if(paramConfig==null) {
  47. logger.warn("设备{}未关联任何井,取消后续操",singleCombPO.getDevSerial());
  48. return;
  49. }
  50. List<AlarmLogVO> almlogs=DataAlarmDetector.detect(paramConfig.getWellId(), mappingDatas);
  51. RedisRepComponent repRedis=ApplicationContextUtils.getBean("redisRepComponent", RedisRepComponent.class);
  52. redisDatas.put("time", singleCombPO.getGatherTime());
  53. repRedis.put(paramConfig.getWellId(), redisDatas);
  54. repRedis.put("alarm_"+paramConfig.getWellId(), buildRedisAlarm(almlogs),true); //覆盖方式,同步于采集数据,不会一直缓存
  55. //有报警时,缓存wellId,便于展示系统通过wellId获取报警,否则移除
  56. repRedis.updateAlarmWell(paramConfig.getWellId(),almlogs!=null&&almlogs.size()>0);
  57. GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
  58. repService.save(mappingDatas.values()); //入库
  59. repService.saveAlarm(almlogs);
  60. logger.info("单值数据转换、报警、入库完成:{}",singleCombPO.getDevSerial());
  61. }
  62. catch(Exception e) {
  63. e.printStackTrace();
  64. logger.error("单值转换、入库任务执行出现异常:{}",e.getMessage());
  65. }
  66. }
  67. private Map<String,String> buildRedisAlarm(List<AlarmLogVO> almlogs){
  68. if(almlogs==null||almlogs.size()==0) {
  69. return null;
  70. }
  71. Map<String,String> rtn=new HashMap<String,String>();
  72. for(AlarmLogVO almlog : almlogs) {
  73. rtn.put(almlog.getParamCode(), JacksonUtils.getJSON(almlog));
  74. }
  75. return rtn;
  76. }
  77. }