فهرست منبع

封装批量插入多条的tablestore

tanzhongran 4 سال پیش
والد
کامیت
276c276d9a

+ 86 - 58
abi-cloud-qr-platform-common/src/main/java/com/abi/task/common/tablestore/TableStorePlusUtils.java

@@ -1,18 +1,19 @@
 package com.abi.task.common.tablestore;
 
+import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.date.DateUtil;
 import com.abi.task.common.api.exception.BusinessException;
 import com.abi.task.common.tablestore.common.TableStore;
 import com.abi.task.common.tablestore.common.TableStoreEntity;
 import com.abi.task.common.utils.IStringUtil;
 import com.abi.task.common.utils.TableStoreUtils;
-import com.alicloud.openservices.tablestore.model.Column;
-import com.alicloud.openservices.tablestore.model.ColumnValue;
-import com.alicloud.openservices.tablestore.model.Row;
+import com.alicloud.openservices.tablestore.model.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.poi.ss.formula.functions.T;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import sun.security.provider.certpath.SunCertPathBuilderException;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -30,70 +31,97 @@ public class TableStorePlusUtils {
     private TableStoreUtils tableStoreUtils;
 
     /**
-     * 存储一行数据(如果主键key相同,则覆盖)
+     * 存储单条 重构方法
      * @param entity
      */
     public void putRow(TableStoreEntity entity){
-        try {
-            List<Column> columns = new ArrayList<>();
-            //表名
-            TableStore tableStore = entity.getClass().getAnnotation(TableStore.class);
-            String tableName = tableStore.tableName();
-            if (StringUtils.isBlank(tableName)) {
-                throw new BusinessException("存储表名为空");
-            }
-            //主键
-            String primaryKeyName = tableStore.primaryKeyName();
-            if (StringUtils.isBlank(primaryKeyName)) {
-                throw new BusinessException("存储主键为空");
-            }
-            //主键的驼峰转下划线
-            primaryKeyName = IStringUtil.camelToUnderline(primaryKeyName);
-            //主键的值
-            String primaryKeyVal = null;
-
+        putRow(Arrays.asList(entity));
+    }
 
-            //column列表  循环放入字段
-            Field[] fields = entity.getClass().getDeclaredFields();
-            for (Field field : fields) {
-                String fieldName = field.getName();
-                String underlineFieldName = IStringUtil.camelToUnderline(fieldName);
-                //获取method
-                Method method = entity.getClass().getMethod("get" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1));
-                if (method == null){
-                    throw new BusinessException("get方法不存在");
+    /**
+     * 存储多行数据(如果主键key相同,则覆盖)
+     * @param entityList
+     */
+    public void putRow(List<TableStoreEntity> entityList){
+        if(CollectionUtil.isEmpty(entityList)){
+            throw new BusinessException("入参列表为空");
+        }
+        try {
+            String primaryKeyName = null;
+            String tableName = null;
+            List<String> pkValueList = new ArrayList<>();
+            List<List<Column>> columnsList = new ArrayList<>();
+
+            for(TableStoreEntity entity:entityList){
+                List<Column> columns = new ArrayList<>();
+                //表名
+                TableStore tableStore = entity.getClass().getAnnotation(TableStore.class);
+                tableName = tableStore.tableName();
+                if (StringUtils.isBlank(tableName)) {
+                    throw new BusinessException("存储表名为空");
                 }
-                //反射调用拿到结果
-                Object fieldValue = method.invoke(entity);
-                //如果是主键则赋值,主键值不放入column,不然阿里会报错
-                if (primaryKeyVal == null && underlineFieldName.equals(primaryKeyName)) {
-                    primaryKeyVal = (String) fieldValue;
-                    continue;
+                //主键
+                primaryKeyName = tableStore.primaryKeyName();
+                if (StringUtils.isBlank(primaryKeyName)) {
+                    throw new BusinessException("存储主键为空");
                 }
-                //放入table的column
-                Column column = null;
-                if (fieldValue instanceof String) {
-                    column = new Column(underlineFieldName, ColumnValue.fromString(String.valueOf(fieldValue)));
-                } else if (fieldValue instanceof Integer) {
-                    column = new Column(underlineFieldName, ColumnValue.fromLong(Long.valueOf((Integer) fieldValue)));
-                } else if (fieldValue instanceof Long) {
-                    column = new Column(underlineFieldName, ColumnValue.fromLong((Long) fieldValue));
-                } else if (fieldValue instanceof Double) {
-                    column = new Column(underlineFieldName, ColumnValue.fromDouble((Double) fieldValue));
-                } else if (fieldValue instanceof Boolean) {
-                    column = new Column(underlineFieldName, ColumnValue.fromBoolean((Boolean) fieldValue));
-                } else if (fieldValue instanceof Date) {
-                    column = new Column(underlineFieldName, ColumnValue.fromString(DateUtil.format((Date) fieldValue, "yyyy-MM-dd HH:mm:ss")));
-                } else if (fieldValue instanceof LocalDateTime) {
-                    column = new Column(underlineFieldName, ColumnValue.fromString(DateUtil.format((LocalDateTime) fieldValue, "yyyy-MM-dd HH:mm:ss")));
-                } else {
-                    log.info("不识别的字段类型 fieldName=" + fieldName);
-                    continue;
+                //主键的驼峰转下划线
+                primaryKeyName = IStringUtil.camelToUnderline(primaryKeyName);
+                //主键的值
+                String primaryKeyVal = null;
+
+
+                //column列表  循环放入字段
+                Field[] fields = entity.getClass().getDeclaredFields();
+                for (Field field : fields) {
+                    String fieldName = field.getName();
+                    String underlineFieldName = IStringUtil.camelToUnderline(fieldName);
+                    //获取method
+                    Method method = entity.getClass().getMethod("get" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1));
+                    if (method == null){
+                        throw new BusinessException("get方法不存在");
+                    }
+                    //反射调用拿到结果
+                    Object fieldValue = method.invoke(entity);
+                    //如果是主键则赋值,主键值不放入column,不然阿里会报错
+                    if (primaryKeyVal == null && underlineFieldName.equals(primaryKeyName)) {
+                        primaryKeyVal = (String) fieldValue;
+                        continue;
+                    }
+                    //放入table的column
+                    Column column = null;
+                    if (fieldValue instanceof String) {
+                        column = new Column(underlineFieldName, ColumnValue.fromString(String.valueOf(fieldValue)));
+                    } else if (fieldValue instanceof Integer) {
+                        column = new Column(underlineFieldName, ColumnValue.fromLong(Long.valueOf((Integer) fieldValue)));
+                    } else if (fieldValue instanceof Long) {
+                        column = new Column(underlineFieldName, ColumnValue.fromLong((Long) fieldValue));
+                    } else if (fieldValue instanceof Double) {
+                        column = new Column(underlineFieldName, ColumnValue.fromDouble((Double) fieldValue));
+                    } else if (fieldValue instanceof Boolean) {
+                        column = new Column(underlineFieldName, ColumnValue.fromBoolean((Boolean) fieldValue));
+                    } else if (fieldValue instanceof Date) {
+                        column = new Column(underlineFieldName, ColumnValue.fromString(DateUtil.format((Date) fieldValue, "yyyy-MM-dd HH:mm:ss")));
+                    } else if (fieldValue instanceof LocalDateTime) {
+                        column = new Column(underlineFieldName, ColumnValue.fromString(DateUtil.format((LocalDateTime) fieldValue, "yyyy-MM-dd HH:mm:ss")));
+                    } else {
+                        log.info("不识别的字段类型 fieldName=" + fieldName);
+                        continue;
+                    }
+                    columns.add(column);
                 }
-                columns.add(column);
+                //放入调用入参
+                pkValueList.add(primaryKeyVal);
+                columnsList.add(columns);
+            }
+
+            //真实去调用
+            if(pkValueList.size()>1){
+                tableStoreUtils.batchPutRow(primaryKeyName, pkValueList, tableName, columnsList);
+            }else{
+                tableStoreUtils.putRow(primaryKeyName, pkValueList.get(0), tableName, columnsList.get(0));
             }
 
-            tableStoreUtils.putRow(primaryKeyName, primaryKeyVal, tableName, columns);
         }catch (Exception e){
             log.info("setRow error",e);
             throw new BusinessException("存储table异常");

+ 62 - 32
abi-cloud-qr-platform-common/src/main/java/com/abi/task/common/utils/TableStoreUtils.java

@@ -1,5 +1,7 @@
 package com.abi.task.common.utils;
 
+import cn.hutool.core.collection.CollectionUtil;
+import com.abi.task.common.api.exception.BusinessException;
 import com.alicloud.openservices.tablestore.SyncClient;
 import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
 import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
@@ -102,6 +104,65 @@ public class TableStoreUtils {
      * @param tableName      表名
      */
     public void putRow(String primaryKeyName, String pkValue, String tableName, List<Column> columns) {
+        RowPutChange rowPutChange = buildRowPutChange(primaryKeyName, pkValue, tableName, columns);
+        client().putRow(new PutRowRequest(rowPutChange));
+    }
+
+    /**
+     * 批量插入
+     * @param primaryKeyName
+     * @param pkValueList
+     * @param tableName
+     * @param columnsList
+     */
+    public void batchPutRow(String primaryKeyName, List<String> pkValueList, String tableName, List<List<Column>> columnsList) {
+        if(CollectionUtil.isEmpty(pkValueList)){
+            throw new BusinessException("主键内容为空");
+        }
+        if(CollectionUtil.isEmpty(columnsList)){
+            throw new BusinessException("字段列表为空");
+        }
+        if(pkValueList.size() != columnsList.size()){
+            throw new BusinessException("主键与字段数量不匹配");
+        }
+
+        BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
+        //构造rowPutChange
+        for(int i=0;i<pkValueList.size();i++){
+            String pkValue = pkValueList.get(0);
+            List<Column> columns = columnsList.get(0);
+            RowPutChange rowPutChange = buildRowPutChange(primaryKeyName, pkValue, tableName, columns);
+            //添加属性列
+            rowPutChange.addColumns(columns);
+            //添加到batch操作中。
+            batchWriteRowRequest.addRowChange(rowPutChange);
+        }
+
+        BatchWriteRowResponse response = client().batchWriteRow(batchWriteRowRequest);
+
+        log.info("是否全部成功:{}", response.isAllSucceed());
+        if (!response.isAllSucceed()) {
+            for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
+                log.info("失败的行:{}", batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
+                log.info("失败原因:{}", rowResult.getError());
+            }
+            /**
+             * 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试。此处只给出构造重试请求的部分。
+             * 推荐的重试方法是使用SDK的自定义重试策略功能,支持对batch操作的部分行错误进行重试。设置重试策略后,调用接口处无需增加重试代码。
+             */
+            batchWriteRowRequest.createRequestForRetry(response.getFailedRows());
+        }
+    }
+
+    /**
+     * 构造RowPutChange
+     * @param primaryKeyName
+     * @param pkValue
+     * @param tableName
+     * @param columns
+     * @return
+     */
+    private RowPutChange buildRowPutChange(String primaryKeyName, String pkValue, String tableName, List<Column> columns){
         //构造主键。
         PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
         //设置主键名称和主键值
@@ -113,7 +174,7 @@ public class TableStoreUtils {
         //添加属性列
         rowPutChange.addColumns(columns);
 
-        client().putRow(new PutRowRequest(rowPutChange));
+        return rowPutChange;
     }
 
     /**
@@ -205,37 +266,6 @@ public class TableStoreUtils {
         client().deleteRow(new DeleteRowRequest(rowDeleteChange));
     }
 
-
-    public void batchWriteRow(String primaryKeyName, String pkValue, String tableName, List<Column> columns) {
-        BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
-
-        //构造rowPutChange。
-        PrimaryKeyBuilder pkBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
-        pkBuilder.addPrimaryKeyColumn(primaryKeyName, PrimaryKeyValue.fromString(pkValue));
-        RowPutChange rowPutChange = new RowPutChange(tableName, pkBuilder.build());
-        //添加属性列
-        rowPutChange.addColumns(columns);
-        //添加到batch操作中。
-        batchWriteRowRequest.addRowChange(rowPutChange);
-
-
-        BatchWriteRowResponse response = client().batchWriteRow(batchWriteRowRequest);
-
-        log.info("是否全部成功:{}", response.isAllSucceed());
-        if (!response.isAllSucceed()) {
-            for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
-                log.info("失败的行:{}", batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
-                log.info("失败原因:{}", rowResult.getError());
-            }
-            /**
-             * 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试。此处只给出构造重试请求的部分。
-             * 推荐的重试方法是使用SDK的自定义重试策略功能,支持对batch操作的部分行错误进行重试。设置重试策略后,调用接口处无需增加重试代码。
-             */
-            batchWriteRowRequest.createRequestForRetry(response.getFailedRows());
-        }
-    }
-
-
     private SingleRowQueryCriteria queryCriteria(String primaryKeyName, String pkValue, String tableName) {
         //构造主键。
         PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();

+ 11 - 1
abi-cloud-qr-platform-server/src/test/java/com/abi/qms/platform/TableStorePlusTest.java

@@ -1,5 +1,6 @@
 package com.abi.qms.platform;
 import java.time.LocalDateTime;
+import java.util.Arrays;
 
 import com.abi.task.common.tablestore.TableStorePlusUtils;
 import com.abi.task.common.tablestore.common.TableStoreEntity;
@@ -27,7 +28,16 @@ public class TableStorePlusTest {
         entity.setActiveCount(33);
         entity.setIsDog(false);
 
-        tableStorePlusUtils.putRow(entity);
+        DemoEntity entity2 = new DemoEntity();
+        entity.setQrCode("10293827182716");
+        entity.setActName("活动2");
+        entity.setPayAmount(990.12);
+        entity.setCurrentTime(LocalDateTime.now());
+        entity.setUserId(10001L);
+        entity.setActiveCount(33);
+        entity.setIsDog(false);
+
+        tableStorePlusUtils.putRow(Arrays.asList(entity,entity2));
     }
 
     @Test

+ 43 - 0
deploy-back.sh

@@ -0,0 +1,43 @@
+starttime=`date +'%Y-%m-%d %H:%M:%S'`
+mvn clean package -DskipTests
+
+currentTimeStamp=$(cat version.txt)
+sed -i "s%#version_num#%$currentTimeStamp%g" deployment_local.yaml
+
+docker build . -t registry.cn-shanghai.aliyuncs.com/ab-inbev-apac/abi-cloud-qr-platform-service:$currentTimeStamp
+docker push registry.cn-shanghai.aliyuncs.com/ab-inbev-apac/abi-cloud-qr-platform-service:$currentTimeStamp
+kubectl apply -f deployment_local.yaml -n $1
+kubectl get pods -n $1  -o wide | grep abi-cloud-qr-platform-service
+if  [ $2 ]  ; then
+  kubectl apply -f deployment_local.yaml -n $2
+  kubectl get pods -n $2 -o wide | grep abi-cloud-qr-platform-service
+  echo __$1__$2"环境发版结束"
+else
+  echo $1"环境发版结束"
+fi
+sed -i "s%$currentTimeStamp%#version_num#%g" deployment_local.yaml
+COUNTER=$(kubectl get pods -n $1 -o wide| grep abi-cloud-qr-platform-service  | awk 'END{print NR}')
+
+    endtime=`date +'%Y-%m-%d %H:%M:%S'`
+    start_seconds=$(date --date="$starttime" +%s)
+    end_seconds=$(date --date="$endtime" +%s)
+    echo "运行时间: "$((end_seconds-start_seconds))"s"
+
+timeout=0
+while [ "$COUNTER" -gt 1 ]; do
+      sleep 10
+      COUNTER=$(kubectl get pods -n $1 -o wide| grep abi-cloud-qr-platform-service  | awk 'END{print NR}')
+      kubectl get pods -n $1 -o wide| grep abi-cloud-qr-platform-service
+     echo $COUNTER
+     timeout=$(($timeout+10))
+     if [ "$timeout" -gt 100 ]; then
+       echo "发版异常,请访问https://cs.console.aliyun.com/"
+       break
+     fi
+#strin=$(kubectl get pods -n $1  -o wide| grep abi-cloud-qr-platform-service | grep Running | awk '{print$1}')
+#     kubectl logs  --tail 100  -f  strin   -n $1
+    done
+    sh lookLog.sh $1
+
+
+