中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

springboot~ApplicationContextAware和(he)Interceptor產生了(le)真感情

看著題目,有點一頭污水吧,事實上,沒有經歷過,很難去說ApplicationContextAware在什么時候會用到,直接在一個bean對象里,你可以直接使用構造方法注入或者Autowired屬性注入的方式來使用其它的bean對象,這(zhe)在springboot里(li)是(shi)非(fei)常自(zi)然(ran)的(de)(de)(de),也是(shi)天然(ran)支(zhi)持的(de)(de)(de);但(dan)如果你(ni)的(de)(de)(de)這(zhe)個(ge)bean不(bu)是(shi)由(you)spring ioc自(zi)動(dong)注入(ru)的(de)(de)(de),而是(shi)通過攔(lan)截(jie)器(qi)動(dong)態配置的(de)(de)(de),這(zhe)時(shi)你(ni)使(shi)用@Autowired時(shi),是(shi)無(wu)法獲取到其它bean對象的(de)(de)(de);這(zhe)時(shi)你(ni)需要使(shi)用ApplicationContextAware接口,再(zai)定義一個(ge)靜(jing)態的(de)(de)(de)ApplicationContext實例,在你(ni)的(de)(de)(de)攔(lan)截(jie)器(qi)執(zhi)行方法里(li)使(shi)用它就可以了。【應(ying)該和攔(lan)截(jie)器(qi)里(li)的(de)(de)(de)動(dong)態代(dai)理有關】

一個kafka的ConsumerInterceptor實例

在這個(ge)例(li)子中,我們(men)通(tong)過(guo)ConsumerInterceptor實現了(le)一個(ge)TTL的延時(shi)隊(dui)列,當topic過(guo)期時(shi),再通(tong)過(guo)KafkaTemplate將消(xiao)息轉發到其它(ta)隊(dui)列里

  • DelayPublisher.publish發送延時topic的方法
	/**
	 * 發送延時消息
	 * @param message 消息體
	 * @param delaySecondTime 多個秒后過期
	 * @param delayTopic 過期后發送到的話題
	 */
	public void publish(String message, long delaySecondTime, String delayTopic) {
		ProducerRecord producerRecord = new ProducerRecord<>(topic, 0, System.currentTimeMillis(), delayTopic, message,
				new RecordHeaders().add(new RecordHeader("ttl", toBytes(delaySecondTime))));
		kafkaTemplate.send(producerRecord);
	}

  • ConsumerInterceptorTTL
/**
 * @author lind
 * @date 2023/8/18 8:33
 * @since 1.0.0
 */
@Component
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String>, ApplicationContextAware {

	// 靜態化的上下文,用于獲取bean,因為ConsumerInterceptor是通過反射創建的,所以無法通過注入的方式獲取bean
	private static ApplicationContext applicationContext;

	@Override
	public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
		long now = System.currentTimeMillis();
		Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
		for (TopicPartition tp : records.partitions()) {
			List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
			List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
			for (ConsumerRecord<String, String> record : tpRecords) {
				Headers headers = record.headers();
				long ttl = -1;
				for (Header header : headers) {
					if (header.key().equals("ttl")) {
						ttl = toLong(header.value());
					}
				}
				// 消息超時判定
				if (ttl > 0 && now - record.timestamp() < ttl * 1000) {
					// 可以放在死信隊列中
					System.out.println("消息超時了,需要發到topic:" + record.key());
					KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
					kafkaTemplate.send(record.key(), record.value());
				}
				else { // 沒有設置TTL,不需要超時判定
					newTpRecords.add(record);
				}

			}
			if (!newRecords.isEmpty()) {
				newRecords.put(tp, newTpRecords);
			}
		}
		return new ConsumerRecords<>(newRecords);
	}

	@Override
	public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
		offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
	}

	@Override
	public void close() {
	}

	@Override
	public void configure(Map<String, ?> configs) {

	}

	// 它的時機是在KafkaListenerAnnotationBeanPostProcessor的postProcessAfterInitialization方法中,applicationContext應該定時成static,否則在實例對象中,它的值可能是空
	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}

}
  • 配置文件中注入攔截器
spring:
  kafka:
    consumer:
      properties:
        interceptor.classes: com.example.ConsumerInterceptorTTL 
posted @ 2023-08-18 10:28  張占嶺  閱讀(107)  評論(0)    收藏  舉報