记录kafka-flink-kafka的end-to-end的exactly-once语义
步骤
- 开启checkpoint、stateBackend的设置和checkpoint配置
- 设置kafka source的配置
- 读取kafka source message
- 随意的transformation;并打印结果
- kafka sink端的配置
- 输出到kafka sink端
- 执行
代码
package com.javaye.demo.exactly;
import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class ExactlyOnce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000L);
if (SystemUtils.IS_OS_WINDOWS) {
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
} else {
env.setStateBackend(new FsStateBackend("hdfs://only:9870/flink-checkpoints"));
}
CheckpointConfig config = env.getCheckpointConfig();
config.setMinPauseBetweenCheckpoints(500L);
config.setTolerableCheckpointFailureNumber(5);
config.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setCheckpointTimeout(60 * 1000);
config.setMaxConcurrentCheckpoints(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
String kafkaServer = "only:9092";
String sourceTopic = "flink_kafka_source";
String groupId = "flink_kafka_source_exactly_once";
String clientIdPrefix = "flink_exactly_once";
Properties kafkaSourceProp = new Properties();
KafkaSource<String> kafkaSource = KafkaSource
.<String>builder()
.setBootstrapServers(kafkaServer)
.setTopics(sourceTopic)
.setGroupId(groupId)
.setClientIdPrefix(clientIdPrefix)
.setStartingOffsets(OffsetsInitializer.latest())
.setProperty("partition.discovery.interval.ms", "50000")
.setProperty("auto.offset.reset", "latest")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperties(kafkaSourceProp)
.build();
DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "flink_kafka_exactly_once", TypeInformation.of(String.class));
SingleOutputStreamOperator<String> flatMapDS = kafkaDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(",");
for (String word : words) {
Random random = new Random();
int i = random.nextInt(5);
if (i > 3) {
System.out.println("模拟出现bug...");
throw new RuntimeException("模拟出现bug...");
}
System.out.println(word + "===" + i);
out.collect(word + "===" + i);
}
}
});
flatMapDS.print();
Properties kafkaSinkProp = new Properties();
kafkaSinkProp.setProperty("transaction.timeout.ms", 1000 * 5 + "");
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(kafkaServer)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flink_kafka_sink")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setKafkaProducerConfig(kafkaSinkProp)
.build();
flatMapDS.sinkTo(kafkaSink);
env.execute(ExactlyOnce.class.getName());
}
}