소스 검색

fix: 修复mq手动创建queue错误

Marko552 3 년 전
부모
커밋
f4a523f8b0
1개의 변경된 파일26개의 추가작업 그리고 6개의 파일을 삭제
  1. 26 6
      abi-cloud-qr-platform-server/src/main/java/com/abi/qms/platform/infrastructure/config/RabbitmqConfig.java

+ 26 - 6
abi-cloud-qr-platform-server/src/main/java/com/abi/qms/platform/infrastructure/config/RabbitmqConfig.java

@@ -1,19 +1,19 @@
 package com.abi.qms.platform.infrastructure.config;
 
-import com.abi.qms.platform.infrastructure.mq.TableStoreBatchInsertConsumer;
-import com.abi.qms.platform.infrastructure.mq.TableStoreBatchUpdateConsumer;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.*;
 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
 import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.core.annotation.Order;
 
 import static com.abi.qms.platform.infrastructure.mq.TableStoreBatchInsertConsumer.TABLE_STORE_BATCH_INSERT_QUEUE;
 import static com.abi.qms.platform.infrastructure.mq.TableStoreBatchUpdateConsumer.TABLE_STORE_BATCH_UPDATE_QUEUE;
@@ -26,6 +26,7 @@ import static com.abi.qms.platform.infrastructure.mq.TableStoreBatchUpdateConsum
  */
 @Configuration
 @Slf4j
+@Order(Integer.MIN_VALUE)
 public class RabbitmqConfig {
 
 	@Value("${spring.profiles.active:local}")
@@ -41,6 +42,18 @@ public class RabbitmqConfig {
 		return new Queue(TABLE_STORE_BATCH_UPDATE_QUEUE+"_"+env);
 	}
 
+	@Bean
+	Binding bindingTableBatchInsertDirect() {
+		return BindingBuilder.bind(declareTableBatchInsertQueue()).to(DirectExchange.DEFAULT).with(TABLE_STORE_BATCH_INSERT_QUEUE+"_"+env);
+	}
+
+	@Bean
+	Binding bindingTableBatchUpdateDirect() {
+		return BindingBuilder.bind(declareTableBatchUpdateQueue()).to(DirectExchange.DEFAULT).with(TABLE_STORE_BATCH_UPDATE_QUEUE+"_"+env);
+	}
+
+
+
 	@Value("${spring.rabbitmq.addresses}")
 	private String addresses;
 	@Value("${spring.rabbitmq.username}")
@@ -62,6 +75,14 @@ public class RabbitmqConfig {
 		return connectionFactory;
 	}
 
+	/**
+	 * 因为使用了自定义的ConnectionFactory,所以需要定义RabbitAdmin
+	 * */
+	@Bean(value = "qmsRabbitAdmin")
+	public RabbitAdmin pmsRabbitAdmin(){
+		return new RabbitAdmin(connectionFactory());
+	}
+
 	@Bean(name = "qmsRabbitTemplate")
 	public RabbitTemplate rabbitTemplate(){
 		RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
@@ -73,5 +94,4 @@ public class RabbitmqConfig {
 				log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}", message, replyCode, replyText, exchange, routingKey));
 		return template;
 	}
-
 }