教育行業(yè)A股IPO第一股(股票代碼 003032)

全國(guó)咨詢/投訴熱線:400-618-4000

Kafka怎樣創(chuàng)建事務(wù)編程?【完整代碼編寫(xiě)和測(cè)試流程】

更新時(shí)間:2022年01月12日15時(shí)35分 來(lái)源:傳智教育 瀏覽次數(shù):

需求

在Kafka的topic 「ods_user」中有一些用戶數(shù)據(jù),數(shù)據(jù)格式如下:

| 姓名,性別,出生日期

| 張三,1,1980-10-09

| 李四,0,1985-11-01

我們需要編寫(xiě)程序,將用戶的性別轉(zhuǎn)換為男、女(1-男,0-女),轉(zhuǎn)換后將數(shù)據(jù)寫(xiě)入到topic 「dwd_user」中。要求使用事務(wù)保障,要么消費(fèi)了數(shù)據(jù)同時(shí)寫(xiě)入數(shù)據(jù)到 topic,提交offset。要么全部失敗。

啟動(dòng)生產(chǎn)者控制臺(tái)程序模擬數(shù)據(jù)

# 創(chuàng)建名為ods_user和dwd_user的主題
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic ods_user
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic dwd_user
# 生產(chǎn)數(shù)據(jù)到 ods_user
bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic ods_user
# 從dwd_user消費(fèi)數(shù)據(jù)
bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic dwd_user 
--from-beginning  --isolation-level read_committed

編寫(xiě)創(chuàng)建消費(fèi)者代碼

編寫(xiě)一個(gè)方法 createConsumer,該方法中返回一個(gè)消費(fèi)者,訂閱「ods_user」主題。注意:需要配置事務(wù)隔離級(jí)別、關(guān)閉自動(dòng)提交。

實(shí)現(xiàn)步驟:

1. 創(chuàng)建Kafka消費(fèi)者配置

Properties props = new Properties();
 props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
 props.setProperty("group.id", "ods_user");
 props.put("isolation.level","read_committed");
 props.setProperty("enable.auto.commit", "false");
 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

2. 創(chuàng)建消費(fèi)者,并訂閱 ods_user 主題

//1.創(chuàng)建消費(fèi)者
publicstaticConsumer<String,String>createConsumer(){
//1.創(chuàng)建Kafka消費(fèi)者配置
Propertiesprops=newProperties();
props.setProperty("bootstrap.servers","node1.itcast.cn:9092");
props.setProperty("group.id","ods_user");
props.put("isolation.level","read_committed");
props.setProperty("enable.auto.commit","false");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

//2.創(chuàng)建Kafka消費(fèi)者
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

//3.訂閱要消費(fèi)的主題
consumer.subscribe(Arrays.asList("ods_user"));

returnconsumer;
}

編寫(xiě)創(chuàng)建生產(chǎn)者代碼

編寫(xiě)一個(gè)方法 createProducer,返回一個(gè)生產(chǎn)者對(duì)象。注意:需要配置事務(wù)的id,開(kāi)啟了事務(wù)會(huì)默認(rèn)開(kāi)啟冪等性。

1. 創(chuàng)建生產(chǎn)者配置

Propertiesprops=newProperties();
props.put("bootstrap.servers","node1.itcast.cn:9092");
props.put("transactional.id","dwd_user");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

2.創(chuàng)建生產(chǎn)者對(duì)象

publicstaticProducer < String, String > createProduceer() {
    //1.創(chuàng)建生產(chǎn)者配置
    Propertiesprops = newProperties();
    props.put("bootstrap.servers", "node1.itcast.cn:9092");
    props.put("transactional.id", "dwd_user");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //2.創(chuàng)建生產(chǎn)者
    Producer < String, String > producer = newKafkaProducer < > (props);
    returnproducer;
}

編寫(xiě)代碼消費(fèi)并生產(chǎn)數(shù)據(jù)

實(shí)現(xiàn)步驟:

1. 調(diào)用之前實(shí)現(xiàn)的方法,創(chuàng)建消費(fèi)者、生產(chǎn)者對(duì)象

2. 生產(chǎn)者調(diào)用initTransactions初始化事務(wù)

3. 編寫(xiě)一個(gè)while死循環(huán),在while循環(huán)中不斷拉取數(shù)據(jù),進(jìn)行處理后,再寫(xiě)入到指定的topic

(1) 生產(chǎn)者開(kāi)啟事務(wù)

(2) 消費(fèi)者拉取消息

(3) 遍歷拉取到的消息,并進(jìn)行預(yù)處理(將1轉(zhuǎn)換為男,0轉(zhuǎn)換為女)

(4) 生產(chǎn)消息到dwd_user topic中

(5) 提交偏移量到事務(wù)中

(6) 提交事務(wù)

(7) 捕獲異常,如果出現(xiàn)異常,則取消事務(wù)

publicstaticvoidmain(String[] args) {
    Consumer < String, String > consumer = createConsumer();
    Producer < String, String > producer = createProducer();
    //初始化事務(wù)
    producer.initTransactions();
    while (true) {
        try {
            //1.開(kāi)啟事務(wù)
            producer.beginTransaction();
            //2.定義Map結(jié)構(gòu),用于保存分區(qū)對(duì)應(yīng)的offset
            Map < TopicPartition, OffsetAndMetadata > offsetCommits = newHashMap < > ();
            //2.拉取消息
            ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord < String, String > record: records) {
                //3.保存偏移量
                offsetCommits.put(newTopicPartition(record.topic(), record.partition()), newOffsetAndMetadata(record.offset() + 1));
                //4.進(jìn)行轉(zhuǎn)換處理
                String[] fields = record.value().split(",");
                fields[1] = fields[1].equalsIgnoreCase("1") ? "男" : "女";
                Stringmessage = fields[0] + "," + fields[1] + "," + fields[2];
                //5.生產(chǎn)消息到dwd_user
                producer.send(newProducerRecord < > ("dwd_user", message));
            }
            //6.提交偏移量到事務(wù)
            producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
            //7.提交事務(wù)
            producer.commitTransaction();
        } catch (Exceptione) {
            //8.放棄事務(wù)
            producer.abortTransaction();
        }
    }
}

測(cè)試

往之前啟動(dòng)的console-producer中寫(xiě)入消息進(jìn)行測(cè)試,同時(shí)檢查console-consumer是否能夠接收到消息:

kafka 創(chuàng)建事務(wù)編程

逐個(gè)測(cè)試一下消息:

  • 張三,1,1980-10-09
  • 李四,0,1985-11-01
//3.保存偏移量
offsetCommits.put(newTopicPartition(record.topic(),record.partition()),
       newOffsetAndMetadata(record.offset()+1));
//4.進(jìn)行轉(zhuǎn)換處理
String[]fields=record.value().split(",");
fields[1]=fields[1].equalsIgnoreCase("1")?"男":"女";
Stringmessage=fields[0]+","+fields[1]+","+fields[2];

//模擬異常
inti=1/0;

//5.生產(chǎn)消息到dwd_user
producer.send(newProducerRecord<>("dwd_user",message));

啟動(dòng)程序一次,拋出異常。

再啟動(dòng)程序一次,還是拋出異常。

直到我們處理該異常為止。

我們發(fā)現(xiàn),可以消費(fèi)到消息,但如果中間出現(xiàn)異常的話,offset是不會(huì)被提交的,除非消費(fèi)、生產(chǎn)消息都成功,才會(huì)提交事務(wù)。






猜你喜歡:

怎樣一鍵啟動(dòng)或關(guān)閉Kafka?有快捷的方法嗎?

為什么選擇kafka采集數(shù)據(jù)?

Kafka的常用API介紹[大數(shù)據(jù)培訓(xùn)]

SparkStreaming連接Kafka兩種方式

傳智教育python+大數(shù)據(jù)開(kāi)發(fā)培訓(xùn)

0 分享到:
和我們?cè)诰€交談!