中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

keycloak~EventListenerProvider初(chu)始化(hua)kafka引(yin)出的類加載(zai)問題(ti)

EventListenerProvider初始

keycloak提供的(de)事(shi)件處(chu)理機制,可以通過(guo)實現(xian)(xian)EventListenerProvider接(jie)口來實現(xian)(xian)自定義的(de)事(shi)件處(chu)理邏(luo)輯。在keycloak啟動(dong)時,會通過(guo)ServiceLoader機制加(jia)載所有的(de)EventListenerProvider實現(xian)(xian)類(lei),并將其注冊到keycloak的(de)事(shi)件處(chu)理機制中(zhong)。

  • 構造方法,在每個keycloak后臺操作時,它都會重新構建實例
  • OnEvent方法,在事件發生時執行,不會出現類加載問題,因為這樣類已經被加載了

EventListenerProviderFactory

EventListenerProviderFactory是進行事件處理器的生(sheng)產工廠(chang),用于創建EventListenerProvider實例。在keycloak啟動時,會通過(guo)ServiceLoader機制加(jia)載所有的EventListenerProviderFactory實現(xian)類,并將其(qi)注冊到keycloak的事件處理機制中(zhong)。

  • init方法:keycloak啟動時會執行,用于初始化EventListenerProviderFactory實例,可以在此方法中進行一些初始化操作。
  • postInit方法:keycloak啟動時會執行,在init方法之后,會執行這個方法
  • create方法:在kc后臺開啟這個EventListenerProviderFactory之后,每次請求都會執行這個create方法,對于它生產的provider對象,可能考慮使用單例的方式, 避免每次請求都創建一個新的對象
  • close方法:在keycloak程序關閉后或者當前事件被注冊時,這個方法才會執行

問題

  • 問題描述:在EventListenerProviderFactory的init方法中,通過kafka發送消息,會出現類加載問題,因為在keycloak啟動時,kafka的類的加載器還沒有被加載,所以會出現類加載問題。
  • 解決:需要將類加載器這塊,修改成當前類加載器去加載對應的文件,如下代碼解決了類無法加載的問題
  @Override
  public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
    try {
      this.executorService = Executors.newFixedThreadPool(2);
      Properties kafkaProperties = new Properties();
      kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
          ConfigFactory.getInstance().getStrPropertyValue("kafka.host"));
      kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "kc-ListenerProviderFactory");
      kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
      kafkaProperties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
      kafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
      kafkaProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
      kafkaProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
      // 需要使用當前類加載器,否則會出現無法加載StringDeserializer的情況
      Class<?> stringDeserializerClass =
          getClass().getClassLoader().loadClass("org.apache.kafka.common.serialization.StringDeserializer");
      kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
      kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
      this.kafkaConsumerAdd = new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_add");
      executorService.submit(kafkaConsumerAdd);
      this.kafkaConsumerRemove=new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_remove");
      executorService.submit(kafkaConsumerRemove);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException(e);
    }
  }
  • 對于kafka-clients實現消費者的話,代碼還是比較簡單的
public class KcKafkaConsumer implements Runnable {
  private static final Logger logger = Logger.getLogger(ConfigFactory.class);
  private final AtomicBoolean closed = new AtomicBoolean(false);
  private final KeycloakSessionFactory keycloakSessionFactory;
  private KafkaConsumer<String, String> kafkaConsumer;

  public KcKafkaConsumer(KeycloakSessionFactory keycloakSessionFactory, Properties properties, String topic)
      throws ClassNotFoundException {
    this.keycloakSessionFactory = keycloakSessionFactory;
    this.kafkaConsumer = new KafkaConsumer<>(properties);
    this.kafkaConsumer.subscribe(Collections.singleton(topic));
  }

@Override
  public void run() {
    try {

      while (!closed.get()) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        // 處理Kafka消息
        for (ConsumerRecord<String, String> record : records) {
          System.out.println("Topic:" + record.topic() + ",Received message: " + record.value());
          //TODO: 處理Kafka消息的具體邏輯
        }
      }
    } finally {
      kafkaConsumer.close();
    }
  }
  public void shutdown() {
    closed.set(true);
    kafkaConsumer.close();
  }
}
posted @ 2023-07-19 17:11  張占嶺  閱讀(90)  評論(0)    收藏  舉報