中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

springboot~kafka-stream實現實時統計

實(shi)時(shi)統計(ji),也可以理解為流式(shi)計(ji)算,一個(ge)輸(shu)入流,一個(ge)輸(shu)出流,源源不斷。

Kafka Stream

Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲于Kafka內(nei)的數據進行流式處理和分析的功能。

Kafka Stream的特點

  • Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應用中,也可以任意方式打包和部署
  • 除了Kafka外,無任何外部依賴
  • 充分利用Kafka分區機制實現水平擴展和順序性保證
  • 通過可容錯的state store實現高效的狀態操作(如windowed join和aggregation)
  • 支持正好一次處理語義
  • 提供記錄級的處理能力,從而實現毫秒級的低延遲
  • 支持基于事件時間的窗口操作,并且可處理晚到的數據(late arrival of records)
  • 同時提供底層的處理原語Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)

相關術語

源處(chu)(chu)理器和Sink處(chu)(chu)理器是Kafka Streams中的(de)(de)(de)兩個重要(yao)組(zu)件,它們分別用于從輸(shu)入(ru)流(liu)(liu)獲取(qu)數據并將處(chu)(chu)理后的(de)(de)(de)數據發送到輸(shu)出流(liu)(liu)。以下是它們的(de)(de)(de)工作流(liu)(liu)程的(de)(de)(de)文字圖(tu)示表(biao)達:

[Source Processor] -> [Processor Topology] -> [Sink Processor]
  1. 源處理器(Source Processor)

    • 源處理器負責從一個或多個輸入主題(topics)中提取數據,并將數據轉換為KStream或KTable對象。
    • 它通常是處理拓撲結構的起點,從一個或多個輸入主題中讀取數據,并將其發送到處理拓撲中的下一個處理器。
  2. Sink 處理器(Sink Processor)

    • Sink處理器負責將經過處理的數據發送到一個或多個輸出主題,或者執行其他終端操作。
    • 它通常是處理拓撲結構的終點,在處理拓撲的最后階段接收處理后的數據,并將其發送到輸出主題,或者執行其他終端操作,如存儲到數據庫、發送到外部系統等。
  3. Processor Topology

    • 處理拓撲包含了源處理器、中間處理器和Sink處理器,它定義了數據流的處理邏輯。
    • 在處理拓撲中,數據流會通過一系列的處理器進行轉換、聚合和處理,最終到達Sink處理器,完成整個處理流程。

通過這種處理(li)流(liu)(liu)(liu)程,Kafka Streams可以實現對數據流(liu)(liu)(liu)的靈活處理(li)和轉換,使得你(ni)能(neng)夠方便(bian)地構建實時流(liu)(liu)(liu)處理(li)應用(yong)程序(xu)。

kafka stream demo

依賴

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
  <version>2.5.1</version>
</dependency>

環境準備

  • 安裝kafka
  • 建立topic,我以keycloak為例,它有login_in這個主題,用來記錄登錄信息
  • 建立topic,如total_record,用來存儲login_in的實時統計的結果
  • 可使用springboot繼承的消費者,去消費total_record,如寫入數據庫進行持久化

業務代碼

  • 配置類
@Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {

	private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;

	@Value("${spring.kafka.bootstrap-servers}")
	private String hosts;

	@Value("${spring.kafka.consumer.group-id}")
	private String group;

	@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
	public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
		Map<String, Object> props = new HashMap<>();
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, group + "_stream_aid");
		props.put(StreamsConfig.CLIENT_ID_CONFIG, group + "_stream_cid");
		props.put(StreamsConfig.RETRIES_CONFIG, 3);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//從最近的消息開始消費
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		return new KafkaStreamsConfiguration(props);
	}

}
  • 消費類
@Configuration
@Slf4j
public class KafkaStreamListener {

	@Autowired
	ReportLoginTypeMapper reportLoginTypeMapper;
	@KafkaListener(topics = "total_record")
	public void listen(ConsumerRecord<String, String> record) {
		// 將時間戳轉換為本地日期時間
		LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault());
		ReportLoginType reportLoginType=new ReportLoginType();
		reportLoginType.setLoginType(record.key());
		reportLoginType.setCreateAt(dateTime);
		reportLoginType.setCount(Integer.parseInt(record.value()));
		reportLoginTypeMapper.insert(reportLoginType);
	}

	@Bean
	public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<String, String> stream = streamsBuilder.stream("KC_LOGIN");
		KStream<String, String> serializedStream = stream.mapValues(jsonString -> {
			// 分組依據
			if (JSONUtil.parseObj(jsonString).containsKey("details")) {
				JSONObject details = JSONUtil.parseObj(jsonString).getJSONObject("details");
				if (details.containsKey("loginType")) {
					String loginType = details.getStr("loginType");
					return loginType;
				}
				return "";
			}
			else {
				return "";
			}
		});
		/**
		 * 處理消息的value
		 */
		serializedStream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
			@Override
			public Iterable<String> apply(String value) {
				return Arrays.asList(value.split(" "));
			}
		}).filter((key, value) -> !value.equals(""))
				// 按照value進行聚合處理
				.groupBy((key, value) -> value)// 這進而的value是kafka的消息內容
				// 時間窗口
				.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
				// 統計單詞的個數
				.count()
				// 轉換為kStream
				.toStream().map((key, value) -> {
					// key是分組的key,它是一個window對象,它里面有分組key和時間窗口的開始時間和結束時間,方便后期我們統計,value是分組count的結果
					return new KeyValue<>(key.toString(), value.toString());
				})
				// 發送消息
				.to("topic-out");
		return stream;
	}

}

上面代(dai)碼(ma)在分(fen)組統計(ji)(ji)之后(hou),給(gei)把(ba)數(shu)據(ju)發到topic-out的(de)(de)(de)(de)(de)kafka主題(ti)里,需(xu)要注意kafka主題(ti)的(de)(de)(de)(de)(de)key是一(yi)個代(dai)碼(ma)分(fen)組key和窗(chuang)口期(qi)的(de)(de)(de)(de)(de)字(zi)符串(chuan),方便我(wo)(wo)們后(hou)期(qi)做數(shu)據(ju)統計(ji)(ji),一(yi)般這些窗(chuang)口期(qi)的(de)(de)(de)(de)(de)數(shu)據(ju)和key一(yi)樣,會寫到數(shu)據(ju)表里,像我(wo)(wo)們查詢數(shu)據(ju)表時,會根據(ju)它們選擇最大的(de)(de)(de)(de)(de)value值,因為同一(yi)窗(chuang)口里的(de)(de)(de)(de)(de)計(ji)(ji)數(shu),我(wo)(wo)們取最大就可(ke)以,它已經包含(han)了(le)相同窗(chuang)口期(qi)的(de)(de)(de)(de)(de)其它值。

select login_type,window_start,window_end,max(count) FROM report_login_type
where login_type='password' and create_at>='2024-01-10 14:00:00' 
group by login_type,window_start,window_end

最后看一下total_record的內容

posted @ 2024-01-09 16:20  張占嶺  閱讀(1265)  評論(0)    收藏  舉報