Browse Source

socket server 구조 수정 작업

master
노승철 2 years ago
parent
commit
1515f43332
  1. 5
      src/main/java/com/palnet/comn/model/CtrHistoryShareContext.java
  2. 25
      src/main/java/com/palnet/comn/utils/ContextUtils.java
  3. 13
      src/main/java/com/palnet/process/message/consumer/MessageConsumer.java
  4. 7
      src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java
  5. 16
      src/main/java/com/palnet/server/codec/WebPayLoad.java
  6. 23
      src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java
  7. 17
      src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java
  8. 15
      src/main/java/com/palnet/server/codec/WebPayLoadResponse.java
  9. 126
      src/main/java/com/palnet/server/handler/WebHandler.java
  10. 13
      src/main/java/com/palnet/server/initializer/WebInitializer.java
  11. 1
      src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java
  12. 63
      src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java

5
src/main/java/com/palnet/comn/model/GPHistoryShareContext.java → src/main/java/com/palnet/comn/model/CtrHistoryShareContext.java

@ -2,12 +2,11 @@ package com.palnet.comn.model;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class GPHistoryShareContext {
public class CtrHistoryShareContext {
/*
* GPS Data History ( Websocket <-> Client Data Share )
@ -18,7 +17,7 @@ public class GPHistoryShareContext {
private final Map<String, CtrCntrlModel> maps;
public GPHistoryShareContext() {
public CtrHistoryShareContext() {
this.maps = new ConcurrentHashMap<>();
}

25
src/main/java/com/palnet/comn/utils/ContextUtils.java

@ -0,0 +1,25 @@
package com.palnet.comn.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ContextUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
applicationContext = ctx;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String beanName) {
return getApplicationContext().getBean(beanName);
}
}

13
src/main/java/com/palnet/process/message/consumer/MessageConsumer.java

@ -3,8 +3,7 @@ package com.palnet.process.message.consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.GPHistoryShareContext;
import com.palnet.comn.model.GPDatabaseModel;
import com.palnet.comn.model.CtrHistoryShareContext;
import com.palnet.comn.model.GPModel;
import com.palnet.comn.utils.JsonUtils;
import com.palnet.server.collection.ChannelCollection;
@ -20,9 +19,9 @@ public class MessageConsumer {
private ChannelCollection cc = new ChannelCollection();
private final ObjectMapper objectMapper;
private final GPHistoryShareContext gpHistoryShareModel;
private final CtrHistoryShareContext gpHistoryShareModel;
public MessageConsumer(GPHistoryShareContext gpHistoryShareModel) {
public MessageConsumer(CtrHistoryShareContext gpHistoryShareModel) {
this.objectMapper = JsonUtils.getObjectMapper();
this.gpHistoryShareModel = gpHistoryShareModel;
}
@ -30,7 +29,7 @@ public class MessageConsumer {
@RabbitHandler
@RabbitListener(queues = {"websocket.drone.queue"})
public void receiveDroneMessage(final String message) throws JsonProcessingException {
// log.info("websocket message : {}", message);
log.info("websocket message : {}", message);
GPModel model = objectMapper.readValue(message, GPModel.class);
CtrCntrlModel history = this.modelConvert(model); // 웹 표출 model로 변환
@ -92,10 +91,6 @@ public class MessageConsumer {
model.setControlCacheCount(1);
// log.info("===============================");
// log.info("Warn : {}", model.isControlWarnCd());
// log.info("Noty : {}", model.isControlWarnNotyCd());
return model;
}
}

7
src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java

@ -1,13 +1,12 @@
package com.palnet.process.scheduler;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.GPHistoryShareContext;
import com.palnet.comn.model.CtrHistoryShareContext;
import com.palnet.comn.utils.DateUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -15,9 +14,9 @@ import java.util.Objects;
public class GpHistoryScheduler {
private final int timeLimit = 60; // -- 화면 노출시간 1분
private final GPHistoryShareContext gpHistoryShareContext;
private final CtrHistoryShareContext gpHistoryShareContext;
public GpHistoryScheduler(GPHistoryShareContext gpHistoryShareContext) {
public GpHistoryScheduler(CtrHistoryShareContext gpHistoryShareContext) {
this.gpHistoryShareContext = gpHistoryShareContext;
}

16
src/main/java/com/palnet/server/codec/WebPayLoad.java

@ -0,0 +1,16 @@
package com.palnet.server.codec;
import lombok.Data;
import java.io.Serializable;
@Data
public class WebPayLoad implements Serializable {
private static final long serialVersionUID = 1L;
private String authKey;
private String teminalId;
private String command;
private String body;
}

23
src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java

@ -1,20 +1,35 @@
package com.palnet.server.codec;
import com.palnet.comn.model.GPModel;
import com.palnet.comn.utils.JsonUtils;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
@Sharable
public class WebPayLoadDecoder extends MessageToMessageDecoder<String> {
@Slf4j
public class WebPayLoadDecoder extends MessageToMessageDecoder<Object> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
logger.info("MSG ::" + msg);
protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) {
GPModel payLoad;
try {
// payLoad = JsonUtils.fromJson((String) msg, GPModel.class);
//
// if(payLoad != null) {
// out.add(payLoad);
// }
} catch (Exception e) {
log.error(e.getMessage());
}
}

17
src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java

@ -1,22 +1,27 @@
package com.palnet.server.codec;
import com.palnet.comn.utils.JsonUtils;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
@Sharable
public class WebPayLoadEncorder extends MessageToMessageEncoder<String> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Slf4j
public class WebPayLoadEncorder extends MessageToMessageEncoder<WebPayLoadResponse> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
logger.info("MSG ::" + msg);
protected void encode(ChannelHandlerContext ctx, WebPayLoadResponse msg, List<Object> out) {
try {
// out.add(JsonUtils.toJson(msg));
} catch (Exception e) {
log.error(e.getMessage());
}
}

15
src/main/java/com/palnet/server/codec/WebPayLoadResponse.java

@ -0,0 +1,15 @@
package com.palnet.server.codec;
import lombok.Data;
import java.io.Serializable;
@Data
public class WebPayLoadResponse implements Serializable {
private static final long serialVersionUID = 1L;
private String rspCode;
private String rspMessage;
}

126
src/main/java/com/palnet/server/handler/WebHandler.java

@ -1,75 +1,125 @@
package com.palnet.server.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.CtrHistoryShareContext;
import com.palnet.comn.model.GPModel;
import com.palnet.comn.utils.ContextUtils;
import com.palnet.comn.utils.JsonUtils;
import com.palnet.server.codec.WebPayLoad;
import com.palnet.server.codec.WebPayLoadResponse;
import com.palnet.server.collection.ChannelCollection;
import com.palnet.server.task.ctr.service.CtrCntrlTaskService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
public class WebHandler extends SimpleChannelInboundHandler<WebSocketFrame>{
import java.nio.charset.Charset;
@Slf4j
public class WebHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private Logger logger = LoggerFactory.getLogger(getClass());
private CtrCntrlTaskService taskService;
private CtrHistoryShareContext context;
private ObjectMapper objectMapper;
public WebHandler() {
this.taskService = (CtrCntrlTaskService) ContextUtils.getBean("ctrCntrlTaskService");
this.context = (CtrHistoryShareContext) ContextUtils.getBean("ctrHistoryShareContext");
this.objectMapper = JsonUtils.getObjectMapper();
}
ChannelCollection cc = new ChannelCollection();
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
logger.info("==================== [ channelRead0 ] ==================== ");
logger.info("Channel Size [" + cc.getAllChannels().size() + "]");
cc.getAllChannels().stream().forEach(c -> {
logger.info(":" + c);
c.writeAndFlush(new TextWebSocketFrame("test"));
// command.execute(c, data, result);
// command.execute(c, data, result);
});
// ctx.writeAndFlush(new TextWebSocketFrame("Test"));
}
// private class HeartBeatTask implements Runnable{
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// String json = ((ByteBuf) msg).toString(Charset.forName("UTF8"));
// GPModel model = objectMapper.readValue(json, GPModel.class);
//
// private final ChannelHandlerContext ctx;
// CtrCntrlModel history = taskService.modelConvert(model);
// context.putHistory(model.getObjectId(), history);
//
// public HeartBeatTask(final ChannelHandlerContext ctx){
// this.ctx = ctx;
// }
// log.info("message from received : {}", msg);
// }
//
// @Override
// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// }
// @Override
// protected void channelRead0(ChannelHandlerContext ctx, GPModel payload) {
// WebPayLoadResponse response = new WebPayLoadResponse();
//
// logger.info("==================== [ channelRead0 ] ==================== ");
// logger.info("AUTH KEY : {}", payload.getAuthKey());
//
// if(payload.getObjectId().isEmpty()) {
// logger.info("====== auth key is empty ======");
//
// response.setRspCode("-2000");
// response.setRspMessage("auth key is empty !");
// } else {
// log.info("WEBSOCKET BODY : {}", payload);
// String json = payload.getBody();
// GPModel model = objectMapper.readValue(json, GPModel.class);
//
// CtrCntrlModel history = taskService.modelConvert(model);
// context.putHistory(model.getObjectId(), history);
// }
//
// cc.getAllChannels().stream().forEach(c -> {
// logger.info(":" + c);
// c.writeAndFlush(new TextWebSocketFrame("test"));
//// command.execute(c, data, result);
// });
//
//
//
// }
//
// public void run() {
// ctx.writeAndFlush(new TextWebSocketFrame("test"));
//
// System.out.println("Client send heart beat message to server : ---> ");
// }
//
// }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// SocketAddress remoteAddress = ctx.channel().remoteAddress();
cc.setAllChannels(ctx.channel());
// ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,500,
// TimeUnit.MILLISECONDS);
logger.info("==================== [ channelActive ] ==================== ");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("==================== [ channelInactive ] ==================== ");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//2 Triggered when a connection is established with the selector of a thread in the worker thread group (channel is registered to EventLoop)

13
src/main/java/com/palnet/server/initializer/WebInitializer.java

@ -10,27 +10,24 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.codec.json.JsonObjectDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;
public class WebInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec())
pipeline
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerCompressionHandler())
.addLast(new WebSocketServerProtocolHandler("/ws", null, true))
.addLast(new IdleStateHandler(0, 0, 180)) // [5]
// .addLast(idleHandler) // [6]
// .addLast(healthEndpointHandler) // [7]
.addLast(new WebPayLoadDecoder() , new WebPayLoadEncorder())
// .addLast(new WebCommandHandler())
.addLast(new WebHandler());
// .addLast(authHandler) // [9]
// .addLast(commandHandler); // [10]
}
}

1
src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java

@ -31,7 +31,6 @@ public class CtrCntrlTask implements Runnable{
}catch(Exception e) {
e.printStackTrace();
}
}

63
src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java

@ -2,8 +2,9 @@ package com.palnet.server.task.ctr.service;
import com.palnet.comn.model.CtrCntrlModel;
import com.palnet.comn.model.GPHistoryShareContext;
import com.palnet.comn.model.CtrHistoryShareContext;
import com.palnet.comn.model.GPModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -19,7 +20,65 @@ import java.util.Objects;
@RequiredArgsConstructor
public class CtrCntrlTaskService {
private final GPHistoryShareContext historyShareContext;
private final CtrHistoryShareContext historyShareContext;
/**
*
*/
public CtrCntrlModel modelConvert(final GPModel dataInfo) {
CtrCntrlModel model = new CtrCntrlModel();
CtrCntrlModel prevModel = historyShareContext.getHistory(dataInfo.getObjectId());
model.setObjectId(dataInfo.getObjectId());
model.setControlId(dataInfo.getControlId());
model.setControlStartDt(dataInfo.getControlStartDt());
model.setObjectTypeCd(dataInfo.getObjectType());
model.setLat(dataInfo.getLat());
model.setLng(dataInfo.getLng());
model.setElevType(dataInfo.getElevType());
model.setElev(dataInfo.getElev());
model.setSpeedType(dataInfo.getSpeedType());
model.setSpeed(dataInfo.getSpeed());
model.setBetteryLevel(dataInfo.getBetteryLevel());
model.setBetteryVoltage(dataInfo.getBetteryVoltage());
model.setDronStatus(dataInfo.getDronStatus());
model.setHeading(dataInfo.getHeading());
model.setMoveDistance(dataInfo.getMoveDistance());
model.setMoveDistanceType(dataInfo.getMoveDistanceType());
model.setServerRcvDt(dataInfo.getServerRcvDt());
// 환경 데이터 필드 추가
model.setSensorCo(dataInfo.getSensorCo());
model.setSensorSo2(dataInfo.getSensorSo2());
model.setSensorNo2(dataInfo.getSensorNo2());
model.setSensorO3(dataInfo.getSensorO3());
model.setSensorDust(dataInfo.getSensorDust());
// 비정상 상황 식별코드 추가
model.setControlWarnCd(dataInfo.isControlWarnCd());
if(prevModel == null) {
if(model.isControlWarnCd()) {
model.setControlWarnNotyCd(true); // 최초 비정상 발생
}
} else {
if(prevModel.isControlWarnCd() && model.isControlWarnCd()) {
model.setControlWarnNotyCd(false); // 비정상 -> 비정상
}
if(prevModel.isControlWarnCd() && !model.isControlWarnCd()) {
model.setControlWarnNotyCd(false); // 비정상 -> 정상
}
if(!prevModel.isControlWarnCd() && model.isControlWarnCd()) {
model.setControlWarnNotyCd(true); // 정상 -> 비정상상
}
}
model.setControlCacheCount(1);
return model;
}
/**
* Websocket 통해 전달될 위치 정보를 관리

Loading…
Cancel
Save