浏览代码

修改批量保存tablestore util

tanzhongran 3 年之前
父节点
当前提交
dc672b1f06

+ 40 - 1
abi-cloud-qr-platform-common/src/main/java/com/abi/task/common/tablestore/TableStorePlusUtils.java

@@ -15,11 +15,14 @@ import com.alicloud.openservices.tablestore.model.PrimaryKeyColumn;
 import com.alicloud.openservices.tablestore.model.Row;
 import com.alicloud.openservices.tablestore.model.search.SearchResponse;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.poi.ss.formula.functions.T;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.LocalDateTime;
@@ -40,6 +43,11 @@ public class TableStorePlusUtils {
     @Autowired
     private TableStoreUtils tableStoreUtils;
 
+    /**
+     * method缓存,减少过多反射开销
+     */
+    private final Map<Class,Map<String,Method>> methodCache = new HashMap<>();
+
     /**
      * 存储单条 重构方法
      *
@@ -88,7 +96,7 @@ public class TableStorePlusUtils {
                     String fieldName = field.getName();
                     String underlineFieldName = IStringUtil.camelToUnderline(fieldName);
                     //获取method
-                    Method method = entity.getClass().getMethod("get" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1));
+                    Method method = getMethod(entity.getClass(),fieldName);
                     if (method == null) {
                         throw new BusinessException("get方法不存在");
                     }
@@ -142,6 +150,37 @@ public class TableStorePlusUtils {
         }
     }
 
+    /**
+     * 反射拿到method(带缓存)
+     * @param clz
+     * @param fieldName
+     * @return
+     */
+    private Method getMethod(Class clz,String fieldName){
+        //拿到这个class得map,第一次用到这个类,则缓存空map
+        if(!methodCache.containsKey(clz)){
+            Map<String,Method> methodMap = new HashMap<>();
+            methodCache.put(clz,methodMap);
+        }
+        Map<String, Method> methodMap = methodCache.get(clz);
+
+        //拿到method,没有则反射放入
+        if(!methodMap.containsKey(fieldName)){
+            synchronized (methodMap){
+                //两次验证防止极限并发
+                if(!methodMap.containsKey(fieldName)){
+                    try{
+                        Method method = clz.getMethod("get" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1));
+                        methodMap.put(fieldName,method);
+                    }catch (Exception e){
+                        log.error("table store getMethod error",e);
+                    }
+                }
+            }
+        }
+        return methodMap.get(fieldName);
+    }
+
     /**
      * 读取一行数据
      *

+ 57 - 10
abi-cloud-qr-platform-common/src/main/java/com/abi/task/common/tablestore/TableStoreUtils.java

@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.date.DateUtil;
 import com.abi.task.common.api.exception.BusinessException;
 import com.abi.task.common.api.exception.ErrorCodeEnum;
+import com.abi.task.common.tablestore.common.TableStoreEntity;
 import com.abi.task.common.tablestore.common.TableStoreReq;
 import com.abi.task.common.tablestore.common.TableStoreRes;
 import com.alicloud.openservices.tablestore.SyncClient;
@@ -18,10 +19,13 @@ import com.alicloud.openservices.tablestore.model.search.sort.Sort;
 import com.alicloud.openservices.tablestore.model.search.sort.SortOrder;
 import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.time.LocalDateTime;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -47,6 +51,26 @@ public class TableStoreUtils {
     @Value("${tableStore.instanceName}")
     private String instanceName;
 
+    /**
+     * 单词交互条数 200
+     */
+    private static final Integer MAX_COUNT_ONCE = 200;
+
+    /**
+     * 批量新增时用到得线程池
+     */
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @PostConstruct
+    public void initTaskExecutor(){
+        taskExecutor = new ThreadPoolTaskExecutor();
+        //设置线程池参数
+        taskExecutor.setCorePoolSize(1);
+        taskExecutor.setMaxPoolSize(20);
+        taskExecutor.setQueueCapacity(10000);
+        taskExecutor.initialize();
+    }
+
     /**
      * 创建SyncClient
      *
@@ -100,14 +124,6 @@ public class TableStoreUtils {
         client().putRow(new PutRowRequest(rowPutChange));
     }
 
-    /**
-     * 批量插入
-     *
-     * @param primaryKeyName
-     * @param pkValueList
-     * @param tableName
-     * @param columnsList
-     */
     public void batchPutRow(String primaryKeyName, List<String> pkValueList, String tableName, List<List<Column>> columnsList) {
         if (CollectionUtil.isEmpty(pkValueList)) {
             throw new BusinessException(ErrorCodeEnum.ERROR_PARAM.getCode(), "主键内容为空");
@@ -119,6 +135,36 @@ public class TableStoreUtils {
             throw new BusinessException(ErrorCodeEnum.ERROR_PARAM.getCode(), "主键与字段数量不匹配");
         }
 
+        //<=最大传输数量直接执行
+        if(CollectionUtil.isEmpty(pkValueList) && pkValueList.size()<=MAX_COUNT_ONCE){
+            doBatchPutRow(primaryKeyName,pkValueList,tableName,columnsList);
+            return;
+        }
+
+        //拆成多个list,按核数去拆(200一个list)
+        List<List<String>> pkValuePartitionList = ListUtils.partition(pkValueList, MAX_COUNT_ONCE);
+        List<List<List<Column>>> columnsPartitionList = ListUtils.partition(columnsList, MAX_COUNT_ONCE);
+
+        //丢给执行器去多线程执行
+        for(int i=0;i<pkValuePartitionList.size();i++){
+            List<String> tempPkList = pkValuePartitionList.get(i);
+            List<List<Column>> tempColumnsList = columnsPartitionList.get(i);
+            taskExecutor.execute(()->{
+                doBatchPutRow(primaryKeyName,tempPkList,tableName,tempColumnsList);
+            });
+        }
+    }
+
+    /**
+     * 真正批量插入
+     * @param primaryKeyName
+     * @param pkValueList
+     * @param tableName
+     * @param columnsList
+     */
+    public void doBatchPutRow(String primaryKeyName, List<String> pkValueList, String tableName, List<List<Column>> columnsList) {
+        long begin = System.currentTimeMillis();
+
         BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
         //构造rowPutChange
         for (int i = 0; i < pkValueList.size(); i++) {
@@ -126,10 +172,11 @@ public class TableStoreUtils {
             //添加到batch操作中。
             batchWriteRowRequest.addRowChange(rowPutChange);
         }
-
         BatchWriteRowResponse response = client().batchWriteRow(batchWriteRowRequest);
 
-        log.info("是否全部成功:{}", response.isAllSucceed());
+        long end = System.currentTimeMillis();
+
+        log.info("是否全部成功:{} ,耗时{}毫秒", response.isAllSucceed(),end-begin);
         if (!response.isAllSucceed()) {
             for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
                 log.info("失败的行:{}", batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());

+ 5 - 4
abi-cloud-qr-platform-server/pom.xml

@@ -33,6 +33,11 @@
         <!-- 其他微服务api -->
 
         <!-- 外部jar包 -->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-kubernetes-all</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-data-redis</artifactId>
@@ -104,10 +109,6 @@
             <groupId>io.springfox</groupId>
             <artifactId>springfox-boot-starter</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.springframework.cloud</groupId>
-            <artifactId>spring-cloud-starter-kubernetes-all</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.baomidou</groupId>
             <artifactId>mybatis-plus-boot-starter</artifactId>

+ 32 - 0
abi-cloud-qr-platform-server/src/main/java/com/abi/qms/platform/controller/console/QrPackageController.java

@@ -1,10 +1,14 @@
 package com.abi.qms.platform.controller.console;
 
+import com.abi.qms.platform.annotation.PassToken;
+import com.abi.qms.platform.dao.tablestore.entity.QrCode;
 import com.abi.qms.platform.dto.req.*;
 import com.abi.qms.platform.dto.res.*;
 import com.abi.qms.platform.service.MaterialService;
 import com.abi.qms.platform.service.QrPackageService;
 import com.abi.task.common.api.base.BaseResponse;
+import com.abi.task.common.tablestore.TableStorePlusUtils;
+import com.abi.task.common.tablestore.common.TableStoreEntity;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
@@ -13,6 +17,9 @@ import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletResponse;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * 码包 Controller
@@ -111,4 +118,29 @@ public class QrPackageController {
         return BaseResponse.create(result);
     }
 
+
+    @Autowired
+    private TableStorePlusUtils tableStorePlusUtils;
+
+    @ApiOperation("测试批量")
+    @GetMapping("/testBatch")
+    @PassToken
+    public void testBatch() {
+        List<TableStoreEntity> qrCodeList = new ArrayList<>();
+        for(int i=0;i<20000;i++){
+            QrCode qrCode = new QrCode();
+            qrCode.setCode(System.currentTimeMillis()+"-"+i);
+            qrCode.setPackageId(0L);
+            qrCode.setBatchNumberId(0L);
+            qrCode.setCreateTime(LocalDateTime.now());
+            qrCode.setCodeIndex(0L);
+            qrCode.setQrRepertoryColumnId(0L);
+            qrCode.setInvalid(0);
+            qrCodeList.add(qrCode);
+        }
+
+        tableStorePlusUtils.putRow(qrCodeList);
+
+    }
+
 }

+ 24 - 22
abi-cloud-qr-platform-server/src/main/java/com/abi/qms/platform/infrastructure/mq/GenerateCodeConsumer.java

@@ -256,8 +256,11 @@ public class GenerateCodeConsumer {
                     qrRepertorySerialNumberMapper.updateById(updateSerialNumber);
                 }
             }
+            //转化为tablestore的对象列表
+			List<TableStoreEntity> tableStoreEntityList = convertToTableStoreEntity(codeList, qrPackageBatch);
+
 			// table数据库  批量插入
-			saveTableStore(qrPackageBatch, codeList);
+			saveTableStore(tableStoreEntityList);
 
 			// 创建码文件
 			createCodeFile(qrRepertory,batchNumber,boxCodeFormat);
@@ -285,8 +288,8 @@ public class GenerateCodeConsumer {
 			}
 			//放入oss地址和密码
 			qrPackage.setZipPassword(pas);
-			//删除生成文件
-			FileUtil.del(file);
+			//删除生成文件 TODO
+			//FileUtil.del(file);
 		}catch (Exception e){
 			throw new BusinessException("上传OSS失败");
 		}
@@ -342,15 +345,14 @@ public class GenerateCodeConsumer {
 		return multipartFile;
 	}
 
-	/**
-	 * 批量新增tableStore,码表
-	 */
-	private void saveTableStore(QrPackageBatch qrPackageBatch, List<QrData> codeList) {
+	private List<TableStoreEntity> convertToTableStoreEntity(List<QrData> codeList,QrPackageBatch qrPackageBatch){
+		//保存所有码的列表
 		List<TableStoreEntity> qrCodes = new ArrayList<>();
+		//将每一列的码拆成多条码保存
 		codeList.forEach(sout-> {
 			List<QrInnerData> innerDataList = sout.getInnerDataList();
 			List<QrCode> qrCodeCope = PojoConverterUtils.copyList(innerDataList, QrCode.class);
-			//			批次号 码包
+			//批次号 码包
 			qrCodeCope.forEach(stable->{
 				stable.setBatchNumberId(qrPackageBatch.getId());
 				stable.setPackageId(qrPackageBatch.getPackageId());
@@ -359,15 +361,18 @@ public class GenerateCodeConsumer {
 			});
 			qrCodes.addAll(qrCodeCope);
 		});
-		List<TableStoreEntity> qrlist = new ArrayList<>();
-		for (int i = 0; i < qrCodes.size(); i++) {
-			qrlist.add(qrCodes.get(i));
-			if(qrlist.size()==200){
-				tableStorePlusUtils.putRow(qrlist);
-				qrlist = new ArrayList<>();
-			}
-		}
-		sum+=qrCodes.size();
+
+		return qrCodes;
+	}
+
+	/**
+	 * 批量新增tableStore,码表
+	 */
+	private void saveTableStore(List<TableStoreEntity> tableStoreEntityList) {
+		//保存tablestore
+		tableStorePlusUtils.putRow(tableStoreEntityList);
+
+		sum+=tableStoreEntityList.size();
 	}
 
 	/**
@@ -411,11 +416,8 @@ public class GenerateCodeConsumer {
 						Long qrRepertoryColumnId = qrRepertoryColumn.getId();
 						//声明拆分数量
 						int splitNum = 0;
-						if(splitCache.size() != 0 && !splitCache.isEmpty()){
-							Integer splitNum1 = splitCache.get(qrRepertoryColumnId).getSplitNum();
-							if(null != splitNum1){
-								splitNum = splitNum1;
-							}
+						if(splitCache.containsKey(qrRepertoryColumnId)){
+							splitNum = splitCache.get(qrRepertoryColumnId).getSplitNum();
 						}
 						// 非clone列,设置code和url
 						String code = buildUniqueCode(uniqueSet, qrRepertoryColumn, codeSb, paramMap, qrRepertorySerialNumberMap, 0);

+ 26 - 13
abi-cloud-qr-platform-server/src/main/resources/application.yml

@@ -24,19 +24,32 @@ spring:
   datasource:
     driver-class-name: org.mariadb.jdbc.Driver
 
-#    rabbitmq:
-#      host: 192.168.2.53
-#      port: 5672
-#      username: guest
-#      password: guest
-#      # 启用发布确认
-#      publisher-confirm-type: correlated
-#      listener:
-#        simple:
-#          # 消息确认方式:手动
-#          acknowledge-mode: manual
-#          concurrency: 1
-#          max-concurrency: 5
+
+###################在家办公#####################
+#    password: qmsqrdev@2021
+#    username: qmsqrdev
+#    url: jdbc:mysql://rm-uf6t3m8yo40d5b905go.mysql.rds.aliyuncs.com:3306/qms_qr_platform_dev?characterEncoding=utf8&serverTimezone=Asia/Shanghai
+#  rabbitmq:
+#    password: guest
+#    username: guest
+#    virtual-host: /
+#    addresses: 127.0.0.1:5672
+#  redis:
+#    password: Abi@12345
+#    database: 0
+#    host: r-uf6kxuqc1iw6f8rwrjpd.redis.rds.aliyuncs.com
+#  cloud:
+#    kubernetes:
+#      enabled: false
+#      discovery:
+#        enabled: false
+#
+#sms-center-service:
+#  url: xxx
+#basic-service:
+#  url: xxx
+##############################################
+
 
 management:
   endpoint:

+ 20 - 0
abi-cloud-qr-platform-server/src/test/java/com/abi/qms/platform/TableStorePlusTest.java

@@ -2,6 +2,7 @@ package com.abi.qms.platform;
 
 import com.abi.qms.platform.dao.tablestore.entity.QrCode;
 import com.abi.task.common.tablestore.TableStorePlusUtils;
+import com.abi.task.common.tablestore.common.TableStoreEntity;
 import com.abi.task.common.tablestore.common.TableStoreReq;
 import com.abi.task.common.tablestore.common.TableStoreRes;
 import com.abi.task.common.tablestore.entity.DemoEntity;
@@ -11,6 +12,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -42,6 +44,24 @@ public class TableStorePlusTest {
         tableStorePlusUtils.putRow(Arrays.asList(entity, entity2));
     }
 
+    @Test
+    public void testBatchAddRow(){
+        List<TableStoreEntity> qrCodeList = new ArrayList<>();
+        for(int i=0;i<20000;i++){
+            QrCode qrCode = new QrCode();
+            qrCode.setCode(System.currentTimeMillis()+"-"+i);
+            qrCode.setPackageId(0L);
+            qrCode.setBatchNumberId(0L);
+            qrCode.setCreateTime(LocalDateTime.now());
+            qrCode.setCodeIndex(0L);
+            qrCode.setQrRepertoryColumnId(0L);
+            qrCode.setInvalid(0);
+            qrCodeList.add(qrCode);
+        }
+
+        tableStorePlusUtils.putRow(qrCodeList);
+    }
+
     @Test
     public void testGetRow() {
         QrCode entity = tableStorePlusUtils.selectOne(QrCode.class, "1397521498780426242");