Parcourir la source

fix: 修复tableStore 写入异常

Marko552 il y a 3 ans
Parent
commit
9ec4abd50d

+ 8 - 6
abi-cloud-qr-platform-common/src/main/java/com/abi/task/common/tablestore/TableStorePlusUtils.java

@@ -16,6 +16,7 @@ import com.alicloud.openservices.tablestore.model.Row;
 import com.alicloud.openservices.tablestore.model.search.SearchResponse;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.ListUtils;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.poi.ss.formula.functions.T;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -50,11 +51,9 @@ public class TableStorePlusUtils {
 
     /**
      * 存储单条 重构方法
-     *
-     * @param entity
      */
-    public void putRow(TableStoreEntity entity) {
-        putRow(Arrays.asList(entity));
+    public <T> void putRow(T t) {
+        putRow(Arrays.asList(t));
     }
 
     /**
@@ -62,7 +61,7 @@ public class TableStorePlusUtils {
      *
      * @param entityList
      */
-    public void putRow(List<TableStoreEntity> entityList) {
+    public <T> void putRow(List<T> entityList) {
         if (CollectionUtil.isEmpty(entityList)) {
             throw new BusinessException(ErrorCodeEnum.ERROR_PARAM.getCode(), "入参列表为空");
         }
@@ -72,10 +71,13 @@ public class TableStorePlusUtils {
             List<String> pkValueList = new ArrayList<>();
             List<List<Column>> columnsList = new ArrayList<>();
 
-            for (TableStoreEntity entity : entityList) {
+            for (T entity : entityList) {
                 List<Column> columns = new ArrayList<>();
                 //表名
                 TableStore tableStore = entity.getClass().getAnnotation(TableStore.class);
+                if (ObjectUtils.isEmpty(tableStore)){
+                    continue;
+                }
                 tableName = tableStore.tableName();
                 if (StringUtils.isBlank(tableName)) {
                     throw new BusinessException(ErrorCodeEnum.ERROR_PARAM.getCode(), "存储表名为空");

+ 29 - 29
abi-cloud-qr-platform-common/src/main/java/com/abi/task/common/tablestore/TableStoreUtils.java

@@ -19,7 +19,6 @@ import com.alicloud.openservices.tablestore.model.search.sort.Sort;
 import com.alicloud.openservices.tablestore.model.search.sort.SortOrder;
 import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -51,6 +50,7 @@ public class TableStoreUtils {
     @Value("${tableStore.instanceName}")
     private String instanceName;
 
+
     /**
      * 单词交互条数 200条
      */
@@ -135,34 +135,34 @@ public class TableStoreUtils {
             throw new BusinessException(ErrorCodeEnum.ERROR_PARAM.getCode(), "主键与字段数量不匹配");
         }
 
-        //<=最大传输数量直接执行
-        if(CollectionUtil.isEmpty(pkValueList) && pkValueList.size()<=MAX_COUNT_ONCE){
-            doBatchPutRow(primaryKeyName,pkValueList,tableName,columnsList);
-            return;
-        }
-
-        //拆成多个list,按核数去拆(200一个list)
-        List<List<String>> pkValuePartitionList = ListUtils.partition(pkValueList, MAX_COUNT_ONCE);
-        List<List<List<Column>>> columnsPartitionList = ListUtils.partition(columnsList, MAX_COUNT_ONCE);
-
-        log.info("batchPutRow 拆分任务数:"+pkValuePartitionList.size());
-
-        //丢给执行器去多线程执行
-        for(int i=0;i<pkValuePartitionList.size();i++){
-            final int num = i;
-            List<String> tempPkList = pkValuePartitionList.get(i);
-            List<List<Column>> tempColumnsList = columnsPartitionList.get(i);
-
-
-            taskExecutor.execute(()->{
-                long begin = System.currentTimeMillis();
-                log.info("第"+num+"个任务开始");
-                doBatchPutRow(primaryKeyName,tempPkList,tableName,tempColumnsList);
-                long end = System.currentTimeMillis();
-                log.info("第"+num+"个任务结束,耗时"+(end-begin));
-
-            });
-        }
+        doBatchPutRow(primaryKeyName,pkValueList,tableName,columnsList);
+
+//        //<=最大传输数量直接执行
+//        if(CollectionUtil.isEmpty(pkValueList) && pkValueList.size()<=MAX_COUNT_ONCE){
+//            doBatchPutRow(primaryKeyName,pkValueList,tableName,columnsList);
+//            return;
+//        }
+//
+//        //拆成多个list,按核数去拆(200一个list)
+//        List<List<String>> pkValuePartitionList = ListUtils.partition(pkValueList, MAX_COUNT_ONCE);
+//        List<List<List<Column>>> columnsPartitionList = ListUtils.partition(columnsList, MAX_COUNT_ONCE);
+//
+//        log.info("batchPutRow 拆分任务数:"+pkValuePartitionList.size());
+//
+//        //丢给执行器去多线程执行
+//        for(int i=0;i<pkValuePartitionList.size();i++){
+//            final int num = i;
+//            List<String> tempPkList = pkValuePartitionList.get(i);
+//            List<List<Column>> tempColumnsList = columnsPartitionList.get(i);
+//            taskExecutor.execute(()->{
+//                long begin = System.currentTimeMillis();
+//                log.info("第"+num+"个任务开始");
+//                doBatchPutRow(primaryKeyName,tempPkList,tableName,tempColumnsList);
+//                long end = System.currentTimeMillis();
+//                log.info("第"+num+"个任务结束,耗时"+(end-begin));
+//
+//            });
+//        }
     }
 
     /**

+ 32 - 7
abi-cloud-qr-platform-server/src/main/java/com/abi/qms/platform/infrastructure/mq/GenerateCodeConsumer.java

@@ -35,6 +35,7 @@ import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
+import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.amqp.support.AmqpHeaders;
@@ -111,6 +112,9 @@ public class GenerateCodeConsumer {
 	@Autowired
 	private BaseMaterialMapper baseMaterialMapper;
 
+	@Autowired
+	private AmqpTemplate amqpTemplate;
+
 	//生成码数据
 	final List<QrData> qrDataList = new LinkedList<>();
 
@@ -260,7 +264,7 @@ public class GenerateCodeConsumer {
 			log.info("6 loopGenerateCode transform tableStoreEntityList -->"+System.currentTimeMillis());
 
 			// table数据库  批量插入
-			saveTableStore(tableStoreEntityList);
+			saveTableStore(qrPackageBatch, codeList);
 			log.info("7 loopGenerateCode save tablestore async -->"+System.currentTimeMillis());
 
 			// 创建码文件
@@ -308,7 +312,7 @@ public class GenerateCodeConsumer {
 		BaseFactory baseFactory = baseFactoryMapper.selectOne(baseQuery);
 		//发送邮件
 		String aliPath = qrPackage.getDownloadPath().substring(qrPackage.getDownloadPath().lastIndexOf("/") + 1);
-		qrPackageService.sendEmailForQrPackageCreated(baseFactory.getEmail(),qrPackage.getSapOrderNo(),aliPath,qrPackage.getZipPassword());
+//		qrPackageService.sendEmailForQrPackageCreated(baseFactory.getEmail(),qrPackage.getSapOrderNo(),aliPath,qrPackage.getZipPassword());
 		//邮件发送后修改状态
 		qrPackageMapper.updateById(qrPackage);
 		QrIndex index = new QrIndex();
@@ -371,13 +375,34 @@ public class GenerateCodeConsumer {
 	/**
 	 * 批量新增tableStore,码表
 	 */
-	private void saveTableStore(List<TableStoreEntity> tableStoreEntityList) {
-		//保存tablestore
-		tableStorePlusUtils.putRow(tableStoreEntityList);
-
-		sum+=tableStoreEntityList.size();
+	private void saveTableStore(QrPackageBatch qrPackageBatch, List<QrData> codeList) {
+		List<QrCode> qrCodes = new ArrayList<>();
+		for (QrData sout : codeList) {
+			List<QrInnerData> innerDataList = sout.getInnerDataList();
+			List<QrCode> qrCodeCope = PojoConverterUtils.copyList(innerDataList, QrCode.class);
+			//			批次号 码包
+			qrCodeCope.forEach(stable -> {
+				stable.setBatchNumberId(qrPackageBatch.getId());
+				stable.setPackageId(qrPackageBatch.getPackageId());
+				stable.setCreateTime(LocalDateTime.now());
+				stable.setInvalid(InvalidEnum.NOT_INVALID.getCode());
+			});
+			qrCodes.addAll(qrCodeCope);
+		}
+		// 开始多线程写tableStore
+		int repeatTimes = qrCodes.size() / 200;
+		int leftover = qrCodes.size() % 200;
+		for (int i = 0; i < repeatTimes+1; i++) {
+			if (i == repeatTimes){
+				amqpTemplate.convertAndSend(TableStoreBatchInsertConsumer.TABLE_STORE_BATCH_INSERT_QUEUE, qrCodes.subList(i*200,i*200+leftover));
+				continue;
+			}
+			amqpTemplate.convertAndSend(TableStoreBatchInsertConsumer.TABLE_STORE_BATCH_INSERT_QUEUE, qrCodes.subList(i*200,i*200+200));
+		}
+		sum+=qrCodes.size();
 	}
 
+
 	/**
      * qrRepertoryColumnList  码库的列
      * qrRepertoryId 码库id

+ 23 - 2
abi-cloud-qr-platform-server/src/main/java/com/abi/qms/platform/infrastructure/mq/TableStoreBatchInsertConsumer.java

@@ -1,13 +1,19 @@
 package com.abi.qms.platform.infrastructure.mq;
 
+import cn.hutool.json.JSONUtil;
+import com.abi.qms.platform.dao.tablestore.entity.QrCode;
+import com.abi.task.common.tablestore.TableStorePlusUtils;
 import com.rabbitmq.client.Channel;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * @className: com.abi.qms.platform.infrastructure.mq-> TableStoreBatchInsertConsumer
  * @description: tableStore batchInsert
@@ -20,14 +26,29 @@ import org.springframework.stereotype.Component;
 @Component
 public class TableStoreBatchInsertConsumer implements ChannelAwareMessageListener {
 
-    public static final String TABLE_STORE_BATCH_INSERT_QUEUE = "table_store__queue";
-
+    public static final String TABLE_STORE_BATCH_INSERT_QUEUE = "qms_table_store_queue";
 
+    @Autowired
+    private TableStorePlusUtils tableStorePlusUtils;
 
     @Override
     @SneakyThrows
     @RabbitListener(queues = TABLE_STORE_BATCH_INSERT_QUEUE)
     public void onMessage(Message message, Channel channel){
+        long deliveryTag = message.getMessageProperties().getDeliveryTag();
+        try {
+            String messageInfo = new String(message.getBody(), "utf-8");
+            operateTableStore(messageInfo);
+            channel.basicAck(deliveryTag, false);
+        } catch (Exception e) {
+            log.error("DEMO异常:{}", e);
+            channel.basicAck(deliveryTag, false);
+        }
+    }
 
+    private void operateTableStore(String messageInfo) {
+        List<QrCode> qrCodes = JSONUtil.toList(messageInfo,QrCode.class);
+        tableStorePlusUtils.putRow(qrCodes);
     }
+
 }

+ 0 - 1
abi-cloud-qr-platform-server/src/main/java/com/abi/qms/platform/service/impl/QrPackageServiceImpl.java

@@ -141,7 +141,6 @@ public class QrPackageServiceImpl implements QrPackageService {
 
         //4-保存码包批次子表信息
         saveQrPackageBatch(qrPackage, isAdd, req);
-
     }
 
     /**

+ 2 - 2
pom.xml

@@ -39,8 +39,8 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <tomcat.embed.core.version>9.0.43</tomcat.embed.core.version>
-        <tablestore.version>5.10.6</tablestore.version>
-        <protobuf-java>3.17.3</protobuf-java>
+        <tablestore.version>5.10.3</tablestore.version>
+        <protobuf-java>2.4.1</protobuf-java>
         <commons.lang.version>2.6</commons.lang.version>
     </properties>
 

+ 1 - 1
version.txt

@@ -1 +1 @@
-feature1.0.02106161123
+feature1.0.02106161608