diff --git a/src/main/java/com/palnet/comn/model/GPHistoryShareContext.java b/src/main/java/com/palnet/comn/model/CtrHistoryShareContext.java similarity index 88% rename from src/main/java/com/palnet/comn/model/GPHistoryShareContext.java rename to src/main/java/com/palnet/comn/model/CtrHistoryShareContext.java index 892b75c..5fcc3be 100644 --- a/src/main/java/com/palnet/comn/model/GPHistoryShareContext.java +++ b/src/main/java/com/palnet/comn/model/CtrHistoryShareContext.java @@ -2,12 +2,11 @@ package com.palnet.comn.model; import org.springframework.stereotype.Component; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Component -public class GPHistoryShareContext { +public class CtrHistoryShareContext { /* * GPS Data History ( Websocket <-> Client Data Share ) @@ -18,7 +17,7 @@ public class GPHistoryShareContext { private final Map maps; - public GPHistoryShareContext() { + public CtrHistoryShareContext() { this.maps = new ConcurrentHashMap<>(); } diff --git a/src/main/java/com/palnet/comn/utils/ContextUtils.java b/src/main/java/com/palnet/comn/utils/ContextUtils.java new file mode 100644 index 0000000..34ee00b --- /dev/null +++ b/src/main/java/com/palnet/comn/utils/ContextUtils.java @@ -0,0 +1,25 @@ +package com.palnet.comn.utils; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +@Component +public class ContextUtils implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext ctx) throws BeansException { + applicationContext = ctx; + } + + public static ApplicationContext getApplicationContext() { + return applicationContext; + } + + public static Object getBean(String beanName) { + return getApplicationContext().getBean(beanName); + } +} diff --git a/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java b/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java index 42933fc..5b7d821 100644 --- a/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java +++ b/src/main/java/com/palnet/process/message/consumer/MessageConsumer.java @@ -3,8 +3,7 @@ package com.palnet.process.message.consumer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.palnet.comn.model.CtrCntrlModel; -import com.palnet.comn.model.GPHistoryShareContext; -import com.palnet.comn.model.GPDatabaseModel; +import com.palnet.comn.model.CtrHistoryShareContext; import com.palnet.comn.model.GPModel; import com.palnet.comn.utils.JsonUtils; import com.palnet.server.collection.ChannelCollection; @@ -20,9 +19,9 @@ public class MessageConsumer { private ChannelCollection cc = new ChannelCollection(); private final ObjectMapper objectMapper; - private final GPHistoryShareContext gpHistoryShareModel; + private final CtrHistoryShareContext gpHistoryShareModel; - public MessageConsumer(GPHistoryShareContext gpHistoryShareModel) { + public MessageConsumer(CtrHistoryShareContext gpHistoryShareModel) { this.objectMapper = JsonUtils.getObjectMapper(); this.gpHistoryShareModel = gpHistoryShareModel; } @@ -30,7 +29,7 @@ public class MessageConsumer { @RabbitHandler @RabbitListener(queues = {"websocket.drone.queue"}) public void receiveDroneMessage(final String message) throws JsonProcessingException { -// log.info("websocket message : {}", message); + log.info("websocket message : {}", message); GPModel model = objectMapper.readValue(message, GPModel.class); CtrCntrlModel history = this.modelConvert(model); // 웹 표출 model로 변환 @@ -92,10 +91,6 @@ public class MessageConsumer { model.setControlCacheCount(1); -// log.info("==============================="); -// log.info("Warn : {}", model.isControlWarnCd()); -// log.info("Noty : {}", model.isControlWarnNotyCd()); - return model; } } diff --git a/src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java b/src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java index f7027c0..832b979 100644 --- a/src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java +++ b/src/main/java/com/palnet/process/scheduler/GpHistoryScheduler.java @@ -1,13 +1,12 @@ package com.palnet.process.scheduler; import com.palnet.comn.model.CtrCntrlModel; -import com.palnet.comn.model.GPHistoryShareContext; +import com.palnet.comn.model.CtrHistoryShareContext; import com.palnet.comn.utils.DateUtils; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.Date; -import java.util.List; import java.util.Map; import java.util.Objects; @@ -15,9 +14,9 @@ import java.util.Objects; public class GpHistoryScheduler { private final int timeLimit = 60; // -- 화면 노출시간 1분 - private final GPHistoryShareContext gpHistoryShareContext; + private final CtrHistoryShareContext gpHistoryShareContext; - public GpHistoryScheduler(GPHistoryShareContext gpHistoryShareContext) { + public GpHistoryScheduler(CtrHistoryShareContext gpHistoryShareContext) { this.gpHistoryShareContext = gpHistoryShareContext; } diff --git a/src/main/java/com/palnet/server/codec/WebPayLoad.java b/src/main/java/com/palnet/server/codec/WebPayLoad.java new file mode 100644 index 0000000..9d0808a --- /dev/null +++ b/src/main/java/com/palnet/server/codec/WebPayLoad.java @@ -0,0 +1,16 @@ +package com.palnet.server.codec; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class WebPayLoad implements Serializable { + + private static final long serialVersionUID = 1L; + + private String authKey; + private String teminalId; + private String command; + private String body; +} diff --git a/src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java b/src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java index 983d854..1be7b11 100644 --- a/src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java +++ b/src/main/java/com/palnet/server/codec/WebPayLoadDecoder.java @@ -1,20 +1,35 @@ package com.palnet.server.codec; +import com.palnet.comn.model.GPModel; +import com.palnet.comn.utils.JsonUtils; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; @Sharable -public class WebPayLoadDecoder extends MessageToMessageDecoder { +@Slf4j +public class WebPayLoadDecoder extends MessageToMessageDecoder { - private Logger logger = LoggerFactory.getLogger(getClass()); @Override - protected void decode(ChannelHandlerContext ctx, String msg, List out) throws Exception { - logger.info("MSG ::" + msg); + protected void decode(ChannelHandlerContext ctx, Object msg, List out) { + GPModel payLoad; + + try { +// payLoad = JsonUtils.fromJson((String) msg, GPModel.class); +// +// if(payLoad != null) { +// out.add(payLoad); +// } + + } catch (Exception e) { + log.error(e.getMessage()); + + } } diff --git a/src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java b/src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java index a0f57fa..c584e2f 100644 --- a/src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java +++ b/src/main/java/com/palnet/server/codec/WebPayLoadEncorder.java @@ -1,22 +1,27 @@ package com.palnet.server.codec; +import com.palnet.comn.utils.JsonUtils; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; @Sharable -public class WebPayLoadEncorder extends MessageToMessageEncoder { - - private Logger logger = LoggerFactory.getLogger(getClass()); - +@Slf4j +public class WebPayLoadEncorder extends MessageToMessageEncoder { @Override - protected void encode(ChannelHandlerContext ctx, String msg, List out) throws Exception { - logger.info("MSG ::" + msg); + protected void encode(ChannelHandlerContext ctx, WebPayLoadResponse msg, List out) { + + try { +// out.add(JsonUtils.toJson(msg)); + } catch (Exception e) { + log.error(e.getMessage()); + } } diff --git a/src/main/java/com/palnet/server/codec/WebPayLoadResponse.java b/src/main/java/com/palnet/server/codec/WebPayLoadResponse.java new file mode 100644 index 0000000..ed34e6e --- /dev/null +++ b/src/main/java/com/palnet/server/codec/WebPayLoadResponse.java @@ -0,0 +1,15 @@ +package com.palnet.server.codec; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class WebPayLoadResponse implements Serializable { + + private static final long serialVersionUID = 1L; + + private String rspCode; + + private String rspMessage; +} diff --git a/src/main/java/com/palnet/server/handler/WebHandler.java b/src/main/java/com/palnet/server/handler/WebHandler.java index 3afac0b..e260d11 100644 --- a/src/main/java/com/palnet/server/handler/WebHandler.java +++ b/src/main/java/com/palnet/server/handler/WebHandler.java @@ -1,75 +1,125 @@ package com.palnet.server.handler; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.palnet.comn.model.CtrCntrlModel; +import com.palnet.comn.model.CtrHistoryShareContext; +import com.palnet.comn.model.GPModel; +import com.palnet.comn.utils.ContextUtils; +import com.palnet.comn.utils.JsonUtils; +import com.palnet.server.codec.WebPayLoad; +import com.palnet.server.codec.WebPayLoadResponse; import com.palnet.server.collection.ChannelCollection; +import com.palnet.server.task.ctr.service.CtrCntrlTaskService; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; -public class WebHandler extends SimpleChannelInboundHandler{ +import java.nio.charset.Charset; + + + +@Slf4j +public class WebHandler extends SimpleChannelInboundHandler { private Logger logger = LoggerFactory.getLogger(getClass()); - + + + private CtrCntrlTaskService taskService; + private CtrHistoryShareContext context; + private ObjectMapper objectMapper; + + public WebHandler() { + this.taskService = (CtrCntrlTaskService) ContextUtils.getBean("ctrCntrlTaskService"); + this.context = (CtrHistoryShareContext) ContextUtils.getBean("ctrHistoryShareContext"); + this.objectMapper = JsonUtils.getObjectMapper(); + } + ChannelCollection cc = new ChannelCollection(); - - + @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { - + logger.info("==================== [ channelRead0 ] ==================== "); - + logger.info("Channel Size [" + cc.getAllChannels().size() + "]"); - + cc.getAllChannels().stream().forEach(c -> { logger.info(":" + c); c.writeAndFlush(new TextWebSocketFrame("test")); -// command.execute(c, data, result); +// command.execute(c, data, result); }); - - - - -// ctx.writeAndFlush(new TextWebSocketFrame("Test")); - } - -// private class HeartBeatTask implements Runnable{ + +// @Override +// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +// String json = ((ByteBuf) msg).toString(Charset.forName("UTF8")); +// GPModel model = objectMapper.readValue(json, GPModel.class); // -// private final ChannelHandlerContext ctx; +// CtrCntrlModel history = taskService.modelConvert(model); +// context.putHistory(model.getObjectId(), history); // -// public HeartBeatTask(final ChannelHandlerContext ctx){ -// this.ctx = ctx; -// } +// log.info("message from received : {}", msg); +// } +// +// @Override +// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +// super.exceptionCaught(ctx, cause); +// } + + // @Override +// protected void channelRead0(ChannelHandlerContext ctx, GPModel payload) { +// WebPayLoadResponse response = new WebPayLoadResponse(); +// +// logger.info("==================== [ channelRead0 ] ==================== "); +// logger.info("AUTH KEY : {}", payload.getAuthKey()); +// +// if(payload.getObjectId().isEmpty()) { +// logger.info("====== auth key is empty ======"); +// +// response.setRspCode("-2000"); +// response.setRspMessage("auth key is empty !"); +// } else { +// log.info("WEBSOCKET BODY : {}", payload); +// String json = payload.getBody(); +// GPModel model = objectMapper.readValue(json, GPModel.class); +// +// CtrCntrlModel history = taskService.modelConvert(model); +// context.putHistory(model.getObjectId(), history); +// } +// +// cc.getAllChannels().stream().forEach(c -> { +// logger.info(":" + c); +// c.writeAndFlush(new TextWebSocketFrame("test")); +//// command.execute(c, data, result); +// }); +// +// +// +// } // -// public void run() { -// ctx.writeAndFlush(new TextWebSocketFrame("test")); -// -// System.out.println("Client send heart beat message to server : ---> "); -// } -// -// } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { -// SocketAddress remoteAddress = ctx.channel().remoteAddress(); cc.setAllChannels(ctx.channel()); -// ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,500, -// TimeUnit.MILLISECONDS); + logger.info("==================== [ channelActive ] ==================== "); - - - + } - + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info("==================== [ channelInactive ] ==================== "); } - - - + + + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { //2 Triggered when a connection is established with the selector of a thread in the worker thread group (channel is registered to EventLoop) diff --git a/src/main/java/com/palnet/server/initializer/WebInitializer.java b/src/main/java/com/palnet/server/initializer/WebInitializer.java index 0334c0d..2167daf 100644 --- a/src/main/java/com/palnet/server/initializer/WebInitializer.java +++ b/src/main/java/com/palnet/server/initializer/WebInitializer.java @@ -10,27 +10,24 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; +import io.netty.handler.codec.json.JsonObjectDecoder; import io.netty.handler.timeout.IdleStateHandler; +import org.springframework.stereotype.Component; + public class WebInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new HttpServerCodec()) + pipeline + .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(65536)) .addLast(new WebSocketServerCompressionHandler()) .addLast(new WebSocketServerProtocolHandler("/ws", null, true)) .addLast(new IdleStateHandler(0, 0, 180)) // [5] -// .addLast(idleHandler) // [6] -// .addLast(healthEndpointHandler) // [7] .addLast(new WebPayLoadDecoder() , new WebPayLoadEncorder()) -// .addLast(new WebCommandHandler()) .addLast(new WebHandler()); - -// .addLast(authHandler) // [9] -// .addLast(commandHandler); // [10] - } } diff --git a/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java b/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java index 4ed14bc..4ac99db 100644 --- a/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java +++ b/src/main/java/com/palnet/server/task/ctr/CtrCntrlTask.java @@ -31,7 +31,6 @@ public class CtrCntrlTask implements Runnable{ }catch(Exception e) { e.printStackTrace(); } - } diff --git a/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java b/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java index dc08a6e..642a993 100644 --- a/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java +++ b/src/main/java/com/palnet/server/task/ctr/service/CtrCntrlTaskService.java @@ -2,8 +2,9 @@ package com.palnet.server.task.ctr.service; import com.palnet.comn.model.CtrCntrlModel; -import com.palnet.comn.model.GPHistoryShareContext; +import com.palnet.comn.model.CtrHistoryShareContext; +import com.palnet.comn.model.GPModel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -19,7 +20,65 @@ import java.util.Objects; @RequiredArgsConstructor public class CtrCntrlTaskService { - private final GPHistoryShareContext historyShareContext; + private final CtrHistoryShareContext historyShareContext; + + /** + * + */ + public CtrCntrlModel modelConvert(final GPModel dataInfo) { + CtrCntrlModel model = new CtrCntrlModel(); + CtrCntrlModel prevModel = historyShareContext.getHistory(dataInfo.getObjectId()); + + model.setObjectId(dataInfo.getObjectId()); + model.setControlId(dataInfo.getControlId()); + model.setControlStartDt(dataInfo.getControlStartDt()); + model.setObjectTypeCd(dataInfo.getObjectType()); + model.setLat(dataInfo.getLat()); + model.setLng(dataInfo.getLng()); + model.setElevType(dataInfo.getElevType()); + model.setElev(dataInfo.getElev()); + model.setSpeedType(dataInfo.getSpeedType()); + model.setSpeed(dataInfo.getSpeed()); + model.setBetteryLevel(dataInfo.getBetteryLevel()); + model.setBetteryVoltage(dataInfo.getBetteryVoltage()); + model.setDronStatus(dataInfo.getDronStatus()); + model.setHeading(dataInfo.getHeading()); + model.setMoveDistance(dataInfo.getMoveDistance()); + model.setMoveDistanceType(dataInfo.getMoveDistanceType()); + + model.setServerRcvDt(dataInfo.getServerRcvDt()); + + // 환경 데이터 필드 추가 + model.setSensorCo(dataInfo.getSensorCo()); + model.setSensorSo2(dataInfo.getSensorSo2()); + model.setSensorNo2(dataInfo.getSensorNo2()); + model.setSensorO3(dataInfo.getSensorO3()); + model.setSensorDust(dataInfo.getSensorDust()); + + // 비정상 상황 식별코드 추가 + model.setControlWarnCd(dataInfo.isControlWarnCd()); + + if(prevModel == null) { + if(model.isControlWarnCd()) { + model.setControlWarnNotyCd(true); // 최초 비정상 발생 + } + } else { + if(prevModel.isControlWarnCd() && model.isControlWarnCd()) { + model.setControlWarnNotyCd(false); // 비정상 -> 비정상 + } + if(prevModel.isControlWarnCd() && !model.isControlWarnCd()) { + model.setControlWarnNotyCd(false); // 비정상 -> 정상 + } + if(!prevModel.isControlWarnCd() && model.isControlWarnCd()) { + model.setControlWarnNotyCd(true); // 정상 -> 비정상상 + } + } + + model.setControlCacheCount(1); + + return model; + } + /** * Websocket 을 통해 전달될 위치 정보를 관리