Prechádzať zdrojové kódy

调整车场接收数据为mongo实现的队列

Levi 3 rokov pred
rodič
commit
20019b9f0f

+ 20 - 0
src/main/java/com/ebei/screen/common/task/ScreenTask.java

@@ -4,6 +4,7 @@ import cn.hutool.core.date.DateUtil;
 import com.ebei.screen.common.util.ParkUtils;
 import com.ebei.screen.service.EbaSystemService;
 import com.ebei.screen.service.EnergyPlatformService;
+import com.ebei.screen.service.ParkSystemService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -25,6 +26,9 @@ public class ScreenTask {
     @Autowired
     private EnergyPlatformService energyPlatformService;
 
+    @Autowired
+    private ParkSystemService parkSystemService;
+
     /**
      * 每小时执行一次数据拉取
      */
@@ -40,4 +44,20 @@ public class ScreenTask {
             log.info("定时任务执行出现错误:{}", e);
         }
     }
+
+    /**
+     * 每5秒执行一次数据处理
+     */
+    @Scheduled(cron = "*/3 * * * * ?")
+    public void exec() {
+        try {
+            log.info("开始处理数据 开始时间:{}", DateUtil.now());
+            // 获取10条进行处理
+            boolean result = parkSystemService.taskExec(10);
+            log.info("数据处理结束 结束时间:{} 处理结果:{}", DateUtil.now(), result);
+        } catch (Exception e) {
+            log.info("定时任务执行出现错误:{}", e);
+        }
+    }
+
 }

+ 3 - 3
src/main/java/com/ebei/screen/common/util/MyRunner.java

@@ -26,9 +26,9 @@ public class MyRunner implements ApplicationRunner {
 
     @Override
     public void run(ApplicationArguments args) {
-        energyPlatformService.initData();
-        ebaSystemService.initData();
-        ParkUtils.getLoginToken();
+//        energyPlatformService.initData();
+//        ebaSystemService.initData();
+//        ParkUtils.getLoginToken();
         log.info("=============全部初始化完毕=============");
     }
 }

+ 47 - 0
src/main/java/com/ebei/screen/common/util/RunnableUtils.java

@@ -0,0 +1,47 @@
+package com.ebei.screen.common.util;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.*;
+import java.util.function.Supplier;
+
+@Slf4j
+public class RunnableUtils {
+    private static final int IN_USE_SIZE = 40;
+    private static final ExecutorService service = createPoolExecutor();
+
+    public static void start(Running run) {
+        try {
+            service.execute(() -> run.run());
+        } catch (Exception e) {
+            log.error("RunnableUtils==>start:", e);
+        }
+    }
+
+    public static <T> T submit(Supplier<T> call) {
+        try {
+            Future<T> future = service.submit(() -> call.get());
+            return future.get();
+        } catch (Exception e) {
+            log.error("RunnableUtils==>submit:", e);
+        }
+        return null;
+    }
+
+    public static void stop() {
+        try {
+            service.shutdownNow();
+        } catch (Exception e) {
+            log.error("RunnableUtils==>stop:", e);
+        }
+    }
+
+    private static ExecutorService createPoolExecutor() {
+        return new ThreadPoolExecutor(IN_USE_SIZE, IN_USE_SIZE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
+    }
+
+    public static Integer getActiveNum() {
+        return IN_USE_SIZE - ((ThreadPoolExecutor) service).getActiveCount();
+    }
+
+}

+ 6 - 0
src/main/java/com/ebei/screen/common/util/Running.java

@@ -0,0 +1,6 @@
+package com.ebei.screen.common.util;
+
+@FunctionalInterface
+public interface Running {
+    void run();
+}

+ 8 - 19
src/main/java/com/ebei/screen/controller/park/ParkSystemController.java

@@ -1,5 +1,6 @@
 package com.ebei.screen.controller.park;
 
+import cn.hutool.core.lang.UUID;
 import com.ebei.screen.common.constants.CommonConstants;
 import com.ebei.screen.common.response.ResponseBean;
 import com.ebei.screen.common.util.Levi;
@@ -33,55 +34,43 @@ public class ParkSystemController {
     @ApiOperation("车场信息数据推送")
     @PostMapping("/parkInfo")
     public Levi parkInfo(@RequestBody Map params) {
-        parkSystemService.parkInfo(params);
-        return Levi.by("resultCode", 0).set("message", "处理成功");
+        return parkSystemService.exec("parkInfo" + UUID.fastUUID(), params, true);
     }
 
     @ApiOperation("车位信息数据推送")
     @PostMapping("/parkSpace")
     public Levi parkSpace(@RequestBody Map params) {
-        parkSystemService.parkSpace(params);
-        return Levi.by("resultCode", 0).set("message", "处理成功");
+        return parkSystemService.exec("parkSpace" + UUID.fastUUID(), params, true);
     }
 
     @ApiOperation("入场信息数据推送")
     @PostMapping("/parkIn")
     public Levi parkIn(@RequestBody Map params) {
-        parkSystemService.parkIn(params);
-        return Levi.by("resultCode", 0).set("message", "处理成功");
+        return parkSystemService.exec("parkIn" + UUID.fastUUID(), params, true);
     }
 
     @ApiOperation("出场信息数据推送")
     @PostMapping("/parkOut")
     public Levi parkOut(@RequestBody Map params) {
-        parkSystemService.parkOut(params);
-        return Levi.by("resultCode", 0).set("message", "处理成功");
+        return parkSystemService.exec("parkOut" + UUID.fastUUID(), params, true);
     }
 
     @ApiOperation("订单信息数据推送")
     @PostMapping("/parkOrder")
     public Levi parkOrder(@RequestBody Map params) {
-        parkSystemService.parkOrder(params);
-        return Levi.by("resultCode", 0).set("message", "处理成功");
+        return parkSystemService.exec("parkOrder" + UUID.fastUUID(), params, true);
     }
 
     @ApiOperation("收费记录数据推送")
     @PostMapping("/parkCharge")
     public Levi parkCharge(@RequestBody Map params) {
-        try {
-            parkSystemService.parkCharge(params);
-            return Levi.by("code", 1).set("msg", "成功");
-        } catch (Exception e) {
-            log.info("收费记录推送===>", e);
-            return Levi.by("code", 0).set("msg", "失败");
-        }
+        return parkSystemService.exec("parkCharge" + UUID.fastUUID(), params, false);
     }
 
     @ApiOperation("服务信息数据推送")
     @PostMapping("/parkServiceInfo")
     public Levi parkServiceInfo(@RequestBody Map params) {
-        parkSystemService.parkServiceInfo(params);
-        return Levi.by("resultCode", 0).set("message", "处理成功");
+        return parkSystemService.exec("parkServiceInfo" + UUID.fastUUID(), params, true);
     }
 
     @ApiOperation("今日车场流量分析查询 ✔")

+ 18 - 0
src/main/java/com/ebei/screen/service/ParkSystemService.java

@@ -1,6 +1,7 @@
 package com.ebei.screen.service;
 
 import com.ebei.screen.common.response.ResponseBean;
+import com.ebei.screen.common.util.Levi;
 import com.ebei.screen.modules.req.*;
 
 import java.util.Map;
@@ -13,6 +14,23 @@ import java.util.Map;
  */
 public interface ParkSystemService {
 
+    /**
+     * 统一存放到mongo中后续处理
+     *
+     * @param id     业务id
+     * @param params 参数
+     * @return
+     */
+    Levi exec(String id, Map params, boolean flag);
+
+    /**
+     * 处理任务
+     *
+     * @param size 处理数量
+     * @return
+     */
+    boolean taskExec(int size) throws InterruptedException;
+
     /**
      * 车场信息数据推送
      *

+ 95 - 4
src/main/java/com/ebei/screen/service/impl/ParkSystemServiceImpl.java

@@ -14,10 +14,7 @@ import com.ebei.screen.common.constants.PayTypeCode;
 import com.ebei.screen.common.response.PageBean;
 import com.ebei.screen.common.response.ResponseBean;
 import com.ebei.screen.common.response.ResponseBuilder;
-import com.ebei.screen.common.util.EbaUtils;
-import com.ebei.screen.common.util.EnergyUtils;
-import com.ebei.screen.common.util.Levi;
-import com.ebei.screen.common.util.ParkUtils;
+import com.ebei.screen.common.util.*;
 import com.ebei.screen.modules.req.*;
 import com.ebei.screen.service.ParkSystemService;
 import lombok.extern.slf4j.Slf4j;
@@ -30,9 +27,12 @@ import org.springframework.data.mongodb.core.query.Update;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.time.Duration;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -52,6 +52,97 @@ public class ParkSystemServiceImpl implements ParkSystemService {
     @Autowired
     private MongoTemplate mongoTemplate;
 
+    /**
+     * 统一存放到mongo中后续处理
+     *
+     * @param id     业务id
+     * @param params 参数
+     * @param flag   false:jlite
+     * @return
+     */
+    @Override
+    public synchronized Levi exec(String id, Map params, boolean flag) {
+        try {
+            // 插入
+            mongoTemplate.insert(Levi.by("_id", id).set("data", params), "parkQueue");
+            // 更新字典值
+            JSONObject obj = mongoTemplate.findById("idDict", JSONObject.class, "parkQueue");
+            Query query = new Query();
+            query.addCriteria(Criteria.where("_id").is("idDict"));
+            Update update = new Update();
+            if (obj != null) {
+                update.set("data", obj.getJSONArray("data").fluentAdd(id));
+                update.set("count", obj.getIntValue("count") + 1);
+            } else {
+                update.set("data", new JSONArray().fluentAdd(id));
+                update.set("count", 1);
+            }
+            mongoTemplate.upsert(query, update, "parkQueue");
+        } catch (Exception e) {
+            log.info("存储队列出现异常:", e);
+            return Levi.by(flag ? "resultCode" : "code", flag ? 1 : 0).set(flag ? "message" : "msg", flag ? "处理失败" : "失败");
+        }
+        return Levi.by(flag ? "resultCode" : "code", flag ? 0 : 1).set(flag ? "message" : "msg", flag ? "处理成功" : "成功");
+    }
+
+    /**
+     * 处理任务
+     *
+     * @param size 处理数量
+     * @return
+     */
+    @Override
+    public synchronized boolean taskExec(int size) throws InterruptedException {
+        boolean result = false;
+        JSONObject obj = mongoTemplate.findById("idDict", JSONObject.class, "parkQueue");
+        JSONArray data = obj != null ? obj.getJSONArray("data") : new JSONArray();
+        if (data != null && data.size() > 0) {
+            List<String> ids = (List<String>) (List) data.subList(0, Integer.min(data.size(), size));
+            CountDownLatch countDownLatch = new CountDownLatch(Integer.min(data.size(), size));
+            ids.forEach(x -> RunnableUtils.start(() -> {
+                JSONObject xx = mongoTemplate.findById(x, JSONObject.class, "parkQueue");
+                if (xx == null) {
+                    countDownLatch.countDown();
+                    return;
+                }
+                Instant start = Instant.now();
+                JSONObject params = xx.getJSONObject("data");
+                if (x.startsWith("parkInfo")) {
+                    this.parkInfo(params);
+                } else if (x.startsWith("parkSpace")) {
+                    this.parkSpace(params);
+                } else if (x.startsWith("parkIn")) {
+                    this.parkIn(params);
+                } else if (x.startsWith("parkOut")) {
+                    this.parkOut(params);
+                } else if (x.startsWith("parkOrder")) {
+                    this.parkOrder(params);
+                } else if (x.startsWith("parkCharge")) {
+                    this.parkCharge(params);
+                } else if (x.startsWith("parkServiceInfo")) {
+                    this.parkServiceInfo(params);
+                }
+                // 处理完成删除当前document
+                mongoTemplate.remove(xx, "parkQueue");
+                countDownLatch.countDown();
+                Instant end = Instant.now();
+                log.info("当前执行:" + x + " 执行时间:" + Duration.between(start, end).toMillis() + "毫秒 当前可用线程数:" + RunnableUtils.getActiveNum());
+            }));
+            countDownLatch.await();
+            // 删除本批次处理的id
+            data.removeAll(ids);
+            Query query = new Query();
+            query.addCriteria(Criteria.where("_id").is("idDict"));
+            Update update = new Update();
+            update.set("data", data);
+            update.set("count", data.size());
+            mongoTemplate.upsert(query, update, "parkQueue");
+            result = true;
+        }
+        return result;
+    }
+
+
     /**
      * 车场信息数据推送
      *