更新時(shí)間:2022年01月12日15時(shí)35分 來(lái)源:傳智教育 瀏覽次數(shù):
在Kafka的topic 「ods_user」中有一些用戶數(shù)據(jù),數(shù)據(jù)格式如下:
| 姓名,性別,出生日期
| 張三,1,1980-10-09
| 李四,0,1985-11-01
我們需要編寫(xiě)程序,將用戶的性別轉(zhuǎn)換為男、女(1-男,0-女),轉(zhuǎn)換后將數(shù)據(jù)寫(xiě)入到topic 「dwd_user」中。要求使用事務(wù)保障,要么消費(fèi)了數(shù)據(jù)同時(shí)寫(xiě)入數(shù)據(jù)到 topic,提交offset。要么全部失敗。
# 創(chuàng)建名為ods_user和dwd_user的主題 bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic ods_user bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic dwd_user # 生產(chǎn)數(shù)據(jù)到 ods_user bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic ods_user # 從dwd_user消費(fèi)數(shù)據(jù) bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic dwd_user --from-beginning --isolation-level read_committed
編寫(xiě)一個(gè)方法 createConsumer,該方法中返回一個(gè)消費(fèi)者,訂閱「ods_user」主題。注意:需要配置事務(wù)隔離級(jí)別、關(guān)閉自動(dòng)提交。
實(shí)現(xiàn)步驟:
1. 創(chuàng)建Kafka消費(fèi)者配置
Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1.itcast.cn:9092"); props.setProperty("group.id", "ods_user"); props.put("isolation.level","read_committed"); props.setProperty("enable.auto.commit", "false"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
2. 創(chuàng)建消費(fèi)者,并訂閱 ods_user 主題
//1.創(chuàng)建消費(fèi)者 publicstaticConsumer<String,String>createConsumer(){ //1.創(chuàng)建Kafka消費(fèi)者配置 Propertiesprops=newProperties(); props.setProperty("bootstrap.servers","node1.itcast.cn:9092"); props.setProperty("group.id","ods_user"); props.put("isolation.level","read_committed"); props.setProperty("enable.auto.commit","false"); props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //2.創(chuàng)建Kafka消費(fèi)者 KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props); //3.訂閱要消費(fèi)的主題 consumer.subscribe(Arrays.asList("ods_user")); returnconsumer; }
編寫(xiě)一個(gè)方法 createProducer,返回一個(gè)生產(chǎn)者對(duì)象。注意:需要配置事務(wù)的id,開(kāi)啟了事務(wù)會(huì)默認(rèn)開(kāi)啟冪等性。
1. 創(chuàng)建生產(chǎn)者配置
Propertiesprops=newProperties(); props.put("bootstrap.servers","node1.itcast.cn:9092"); props.put("transactional.id","dwd_user"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
2.創(chuàng)建生產(chǎn)者對(duì)象
publicstaticProducer < String, String > createProduceer() { //1.創(chuàng)建生產(chǎn)者配置 Propertiesprops = newProperties(); props.put("bootstrap.servers", "node1.itcast.cn:9092"); props.put("transactional.id", "dwd_user"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2.創(chuàng)建生產(chǎn)者 Producer < String, String > producer = newKafkaProducer < > (props); returnproducer; }
實(shí)現(xiàn)步驟:
1. 調(diào)用之前實(shí)現(xiàn)的方法,創(chuàng)建消費(fèi)者、生產(chǎn)者對(duì)象
2. 生產(chǎn)者調(diào)用initTransactions初始化事務(wù)
3. 編寫(xiě)一個(gè)while死循環(huán),在while循環(huán)中不斷拉取數(shù)據(jù),進(jìn)行處理后,再寫(xiě)入到指定的topic
(1) 生產(chǎn)者開(kāi)啟事務(wù)
(2) 消費(fèi)者拉取消息
(3) 遍歷拉取到的消息,并進(jìn)行預(yù)處理(將1轉(zhuǎn)換為男,0轉(zhuǎn)換為女)
(4) 生產(chǎn)消息到dwd_user topic中
(5) 提交偏移量到事務(wù)中
(6) 提交事務(wù)
(7) 捕獲異常,如果出現(xiàn)異常,則取消事務(wù)
publicstaticvoidmain(String[] args) { Consumer < String, String > consumer = createConsumer(); Producer < String, String > producer = createProducer(); //初始化事務(wù) producer.initTransactions(); while (true) { try { //1.開(kāi)啟事務(wù) producer.beginTransaction(); //2.定義Map結(jié)構(gòu),用于保存分區(qū)對(duì)應(yīng)的offset Map < TopicPartition, OffsetAndMetadata > offsetCommits = newHashMap < > (); //2.拉取消息 ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord < String, String > record: records) { //3.保存偏移量 offsetCommits.put(newTopicPartition(record.topic(), record.partition()), newOffsetAndMetadata(record.offset() + 1)); //4.進(jìn)行轉(zhuǎn)換處理 String[] fields = record.value().split(","); fields[1] = fields[1].equalsIgnoreCase("1") ? "男" : "女"; Stringmessage = fields[0] + "," + fields[1] + "," + fields[2]; //5.生產(chǎn)消息到dwd_user producer.send(newProducerRecord < > ("dwd_user", message)); } //6.提交偏移量到事務(wù) producer.sendOffsetsToTransaction(offsetCommits, "ods_user"); //7.提交事務(wù) producer.commitTransaction(); } catch (Exceptione) { //8.放棄事務(wù) producer.abortTransaction(); } } }
往之前啟動(dòng)的console-producer中寫(xiě)入消息進(jìn)行測(cè)試,同時(shí)檢查console-consumer是否能夠接收到消息:
逐個(gè)測(cè)試一下消息:
//3.保存偏移量 offsetCommits.put(newTopicPartition(record.topic(),record.partition()), newOffsetAndMetadata(record.offset()+1)); //4.進(jìn)行轉(zhuǎn)換處理 String[]fields=record.value().split(","); fields[1]=fields[1].equalsIgnoreCase("1")?"男":"女"; Stringmessage=fields[0]+","+fields[1]+","+fields[2]; //模擬異常 inti=1/0; //5.生產(chǎn)消息到dwd_user producer.send(newProducerRecord<>("dwd_user",message));
啟動(dòng)程序一次,拋出異常。
再啟動(dòng)程序一次,還是拋出異常。
直到我們處理該異常為止。
我們發(fā)現(xiàn),可以消費(fèi)到消息,但如果中間出現(xiàn)異常的話,offset是不會(huì)被提交的,除非消費(fèi)、生產(chǎn)消息都成功,才會(huì)提交事務(wù)。
怎樣一鍵啟動(dòng)或關(guān)閉Kafka?有快捷的方法嗎?
北京校區(qū)