Quellcode durchsuchen

增加结束功图诊断请求(来自采集程序),并批量进行诊断

chenwen vor 4 Monaten
Ursprung
Commit
e22700afbe

+ 57 - 0
src/main/java/com/hb/proj/allconfig/RedisConfig.java

@@ -0,0 +1,57 @@
+package com.hb.proj.allconfig;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
+import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+import com.hb.proj.third.RedisMessageSubscriber;
+
+@Configuration
+public class RedisConfig {
+	
+	// 定义监听的频道名称:功图样本(待诊断功图)
+    public static final String CHANNEL_SAMPLE = "diagram_sample";
+    
+    public static final String CHANNEL_DIAGNOSE = "diagnose_rst";
+
+	@Bean
+	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
+		RedisTemplate<String, Object> template = new RedisTemplate<>();
+		template.setConnectionFactory(factory);
+		template.setKeySerializer(new StringRedisSerializer());
+		template.setHashKeySerializer(new StringRedisSerializer());
+		template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
+		template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
+		template.afterPropertiesSet();
+		return template;
+	}
+	
+	 /**
+     * 创建消息监听容器
+     */
+    @Bean
+    public RedisMessageListenerContainer container(
+            RedisConnectionFactory connectionFactory,
+            MessageListenerAdapter listenerAdapter) {
+
+        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+        container.setConnectionFactory(connectionFactory);
+        // 订阅指定频道
+        container.addMessageListener(listenerAdapter, new ChannelTopic(CHANNEL_SAMPLE));
+        return container;
+    }
+    
+    /**
+     * 绑定消息监听器
+     */
+    @Bean
+    public MessageListenerAdapter listenerAdapter(RedisMessageSubscriber subscriber) {
+        return new MessageListenerAdapter(subscriber, "onMessage");
+    }
+}

+ 168 - 0
src/main/java/com/hb/proj/gather/model/DiagramPO.java

@@ -0,0 +1,168 @@
+package com.hb.proj.gather.model;
+
+import java.util.Date;
+import java.util.List;
+
+public class DiagramPO {
+	
+	public static final String DIAGRAM_LOAD_CODE="diagram_load";
+	
+	public static final String DIAGRAM_CURR_CODE="diagram_current";
+	
+	public static final String DIAGRAM_POWER_CODE="diagram_power";
+	
+	private String devSerial;  //设备序号
+
+	private String paramCode;
+	
+	private String wellParam;  //对应的井采集参数  wellId_paramCode
+	
+	private List<Float> disps;
+	
+	private List<Float> oths;
+	
+	private Date  gatherTime;
+	
+	private Float upstrokeMax;  //上冲程最大(y值)采集后计算
+	private Float dwnstrokeMax; //下冲程最大(y值)采集后计算
+	private Float stroke; //冲程 采集后计算
+	private Float glbMax; //最大y值
+	private Float glbMin; //最小y值
+	private Double balance; //平衡度,百分数
+	private Boolean startup; //起始是上行
+	private Integer turnIndex; //上下行切换点索引
+	private Double freq; //冲次
+	
+	public DiagramPO() {
+		
+	}
+	
+	public DiagramPO(String devSerial,String paramCode,List<Float> disps,List<Float> oths) {
+		this.gatherTime=new Date();
+		this.devSerial=devSerial;
+		this.paramCode=paramCode;
+		this.disps=disps;
+		this.oths=oths;
+		
+	}
+
+	public String getParamCode() {
+		return paramCode;
+	}
+
+	public void setParamCode(String paramCode) {
+		this.paramCode = paramCode;
+	}
+
+	public List<Float> getDisps() {
+		return disps;
+	}
+
+	public void setDisps(List<Float> disps) {
+		this.disps = disps;
+	}
+
+	public List<Float> getOths() {
+		return oths;
+	}
+
+	public void setOths(List<Float> oths) {
+		this.oths = oths;
+	}
+
+	public Date getGatherTime() {
+		return gatherTime;
+	}
+
+	public void setGatherTime(Date gatherTime) {
+		this.gatherTime = gatherTime;
+	}
+
+	public String getDevSerial() {
+		return devSerial;
+	}
+
+	public void setDevSerial(String devSerial) {
+		this.devSerial = devSerial;
+	}
+
+	public String getWellParam() {
+		return wellParam;
+	}
+
+	public void setWellParam(String wellParam) {
+		this.wellParam = wellParam;
+	}
+
+	public Float getUpstrokeMax() {
+		return upstrokeMax;
+	}
+
+	public void setUpstrokeMax(Float upstrokeMax) {
+		this.upstrokeMax = upstrokeMax;
+	}
+
+	public Float getDwnstrokeMax() {
+		return dwnstrokeMax;
+	}
+
+	public void setDwnstrokeMax(Float dwnstrokeMax) {
+		this.dwnstrokeMax = dwnstrokeMax;
+	}
+
+	public Float getStroke() {
+		return stroke;
+	}
+
+	public void setStroke(Float stroke) {
+		this.stroke = stroke;
+	}
+
+	public Float getGlbMax() {
+		return glbMax;
+	}
+
+	public void setGlbMax(Float glbMax) {
+		this.glbMax = glbMax;
+	}
+
+	public Float getGlbMin() {
+		return glbMin;
+	}
+
+	public void setGlbMin(Float glbMin) {
+		this.glbMin = glbMin;
+	}
+
+	public Double getBalance() {
+		return balance;
+	}
+
+	public void setBalance(Double balance) {
+		this.balance = balance;
+	}
+
+	public Boolean getStartup() {
+		return startup;
+	}
+
+	public void setStartup(Boolean startup) {
+		this.startup = startup;
+	}
+
+	public Integer getTurnIndex() {
+		return turnIndex;
+	}
+
+	public void setTurnIndex(Integer turnIndex) {
+		this.turnIndex = turnIndex;
+	}
+
+	public Double getFreq() {
+		return freq;
+	}
+
+	public void setFreq(Double freq) {
+		this.freq = freq;
+	}
+}

+ 62 - 0
src/main/java/com/hb/proj/third/RedisMessageSubscriber.java

@@ -0,0 +1,62 @@
+package com.hb.proj.third;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import com.hb.proj.gather.model.DiagramPO;
+import com.hb.proj.model.DiagramSample;
+/**
+ * 
+ * @author cwen
+ * redis 消息订阅者
+ */
+@Component
+public class RedisMessageSubscriber implements MessageListener {
+
+	private final static  Logger logger = LoggerFactory.getLogger(RedisMessageSubscriber.class);
+	
+	
+	@Autowired
+    private RedisTemplate<String, Object> redisTemplate;
+	
+	@Autowired
+	private RedisMsgProcessor  msgProcessor;
+	
+	 /**
+     * 处理接收到的消息
+     * @param message 原始消息对象
+     * @param pattern 匹配的频道模式(若使用模式订阅)
+     * Redis 的机制:
+		Redis 客户端的接收缓冲区有大小限制(默认配置通常为 client-output-buffer-limit pubsub),
+		一旦溢出,Redis 会强制断开订阅者连接,导致消息丢失。
+     */
+	@Override
+	public void onMessage(Message message, byte[] pattern) {
+		 String channel = new String(message.getChannel());
+		 //String body = new String(message.getBody());
+		 
+		 DiagramPO diagram=(DiagramPO)redisTemplate.getValueSerializer().deserialize(message.getBody());
+		 
+		 msgProcessor.addMsg(convert(diagram));
+		 
+		 logger.info("Received message from redis,channel:{}",channel);
+
+	}
+	
+	
+	private DiagramSample  convert(DiagramPO diagram) {
+		DiagramSample  sample=new DiagramSample();
+		sample.setDisps(diagram.getDisps());
+		sample.setLoads(diagram.getOths());
+		sample.setWellName(diagram.getDevSerial());
+		return sample;
+	}
+	
+	
+
+}

+ 146 - 0
src/main/java/com/hb/proj/third/RedisMsgProcessor.java

@@ -0,0 +1,146 @@
+package com.hb.proj.third;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.hb.proj.diagnose.FeatureExtractor;
+import com.hb.proj.diagnose.XGBoostDataUtil;
+import com.hb.proj.diagnose.XGBoostEngine;
+import com.hb.proj.model.DiagnoseStandard;
+import com.hb.proj.model.DiagramFeatureSample;
+import com.hb.proj.model.DiagramSample;
+import com.hb.proj.rep.DiagramSampleService;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+
+@Component
+public class RedisMsgProcessor {
+	
+	private final static  Logger logger = LoggerFactory.getLogger(RedisMsgProcessor.class);
+
+	private final BlockingQueue<DiagramSample> msgQueue = new ArrayBlockingQueue<>(200);
+	
+	private volatile boolean running = true;
+	
+	private static final int BATCH_SIZE = 20;
+	
+	// 队列为空时的等待时间(毫秒)
+    private static final long POLL_TIMEOUT = 1000;
+    
+    @Autowired
+	private XGBoostEngine   xgbEngine;
+    
+    @Autowired
+	private DiagramSampleService  sampleService;
+    
+    
+    public int getQueueSize() {
+    	return msgQueue.size();
+    }
+	
+	/**
+	 * 将消息加入队列
+	 * @param msg
+	 */
+	public void addMsg(DiagramSample msg) {
+		try {
+			msgQueue.put(msg);
+		} catch (InterruptedException e) {
+			logger.error("接收功图样本出现异常:{}",e.getMessage());
+			Thread.currentThread().interrupt();
+            throw new RuntimeException("Message queue interrupted", e);
+		}
+	}
+	
+	/**
+	 * 启动实际处理线程
+	 */
+	@PostConstruct
+    public void init() {
+		new Thread(this::process).start();
+	}
+	
+	@PreDestroy
+    public void shutdown() {
+        running = false;
+     }
+	
+	public void process() {
+		List<DiagramSample> batch=null;
+		DiagramSample  sample=null;
+		int drained=0;
+		while (running && !Thread.currentThread().isInterrupted()) {
+			try {
+				batch= new ArrayList<>(BATCH_SIZE);
+	            drained = msgQueue.drainTo(batch, BATCH_SIZE);
+	            if (drained == 0) {
+	                // 队列为空时,阻塞等待至少一条消息
+	            	sample = msgQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS);
+	                if (sample != null) {
+	                    batch.add(sample);
+	                    // 继续非阻塞获取剩余消息
+	                    msgQueue.drainTo(batch, BATCH_SIZE - 1);
+	                }
+	            }
+	            
+	            if (!batch.isEmpty()) {
+	                handleBatch(batch);
+	            }
+			}
+			catch(InterruptedException e) {
+				 Thread.currentThread().interrupt();
+	             break;
+			}
+			catch (Exception e) {
+				logger.error("处理功图样本消息出现异常:{}",e.getMessage());
+			}
+		}
+		
+		//应用关闭时检查是否还有未处理的样本
+		handleRemainingMsg();
+	}
+	
+	/**
+	 * 实际的业务处理
+	 * @param batch
+	 */
+	private void handleBatch(List<DiagramSample> batchSamples) {
+		List<DiagramFeatureSample> feaSamples=new ArrayList<>(batchSamples.size());
+		for(DiagramSample sample : batchSamples) {
+			feaSamples.add(FeatureExtractor.extract(sample));
+		}
+		float[][] rst=xgbEngine.predict(XGBoostDataUtil.build(feaSamples, false));
+		
+		Map<Integer,DiagnoseStandard> labelStdMapping=sampleService.loadLabelStdMapping();
+		
+		DiagnoseStandard  diagnoseStd=null;
+		for(int i=0,len=rst.length;i<len;i++) {
+			diagnoseStd=labelStdMapping.get((int)rst[i][0]);
+			logger.info("诊断结果:{},{}",batchSamples.get(i).getWellName(),diagnoseStd.getConclusion());
+		}
+		
+		logger.info("本次完成批量诊断{}个",batchSamples.size());
+	}
+	
+	/**
+     * 处理队列剩余消息(优雅关闭用)
+     */
+    private void handleRemainingMsg() {
+        List<DiagramSample> remaining = new ArrayList<>();
+        msgQueue.drainTo(remaining);
+        if (!remaining.isEmpty()) {
+        	logger.info("处理剩余功图样本消息: " + remaining.size());
+            handleBatch(remaining);
+        }
+    }
+}

+ 35 - 0
src/main/java/com/hb/proj/third/RedisPublisher.java

@@ -0,0 +1,35 @@
+package com.hb.proj.third;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import com.hb.proj.allconfig.RedisConfig;
+/**
+ * redis 消息发布者
+ * @author cwen
+ *
+ */
+@Service
+public class RedisPublisher {
+
+	@Autowired
+	private RedisTemplate<String, Object> redisTemplate;
+	
+	/**
+	 * 发送消息到指定频道
+	 * @param channel  频道名称
+	 * @param message  消息内容
+	 */
+	public void sendMessage(String channel, Object message) {
+        redisTemplate.convertAndSend(channel, message);
+    }
+	
+	/**
+	 * 发送诊断结果消息
+	 * @param message
+	 */
+	public void sendDiagnose(Object message) {
+        redisTemplate.convertAndSend(RedisConfig.CHANNEL_DIAGNOSE, message);
+    }
+}

+ 7 - 1
src/main/resources/application-dev.properties

@@ -35,4 +35,10 @@ spring.datasource.hikari.maximum-pool-size: 10
 spring.datasource.hikari.minimum-idle: 5
 spring.datasource.hikari.read-only: false
 
-#xgboost.model.path=D:/workspace
+#xgboost.model.path=D:/workspace
+
+#redis 连接配置
+spring.data.redis.database=0
+spring.data.redis.host=42.56.120.107
+spring.data.redis.port=9608
+spring.data.redis.password=redis7.0

Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
src/main/resources/static/model/xgb_model.json


Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.