记录kafka-flink-kafka的end-to-end的exactly-once语义

记录kafka-flink-kafka的end-to-end的exactly-once语义

  • 步骤
  • 代码

步骤

  1. 开启checkpoint、stateBackend的设置和checkpoint配置
  2. 设置kafka source的配置
  3. 读取kafka source message
  4. 随意的transformation;并打印结果
  5. kafka sink端的配置
  6. 输出到kafka sink端
  7. 执行

代码

package com.javaye.demo.exactly;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @Author: Java页大数据
 * @Date: 2024-04-11:17:59
 * @Describe:
 *  kafka - flink - kafka 验证end-to-end的exactly once
 */
public class ExactlyOnce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        1.1. 开启checkpoint,间隔为1000L ms
        env.enableCheckpointing(1000L);

//        1.2. stateBackend:checkpoint持久化目录
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://only:9870/flink-checkpoints"));
        }

        CheckpointConfig config = env.getCheckpointConfig();
//        1.3. ckp的配置
//        1.3.1. 前后两次checkpoint的最小间隔:防止前后两次的checkpoint重叠
        config.setMinPauseBetweenCheckpoints(500L);
//        1.3.2. 容忍5次checkpoint失败
        config.setTolerableCheckpointFailureNumber(5);
//        1.3.3. job被取消时,保留外部的checkpoint
        config.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        1.3.4. 设置checkpoint的语义为 exactly-once
        config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        1.3.5. 设置checkpoint的超时时间,若checkpoint超过该超时时间则说明该次checkpoint失败,丢弃该checkpoint
        config.setCheckpointTimeout(60 * 1000);
//        1.3.6. 设置同一时刻允许多少个checkpoint同时执行
        config.setMaxConcurrentCheckpoints(1);

//        1.4. 设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));


//        2. 设置kafka source的配置
        String kafkaServer = "only:9092";
        String sourceTopic = "flink_kafka_source";
        String groupId = "flink_kafka_source_exactly_once";
        String clientIdPrefix = "flink_exactly_once";
        Properties kafkaSourceProp = new Properties();
        KafkaSource<String> kafkaSource = KafkaSource
                .<String>builder()
                .setBootstrapServers(kafkaServer)
                .setTopics(sourceTopic)
                .setGroupId(groupId)
                .setClientIdPrefix(clientIdPrefix)
                .setStartingOffsets(OffsetsInitializer.latest()) // Start from latest offset
                .setProperty("partition.discovery.interval.ms", "50000") // discover new partitions per 50 seconds
                .setProperty("auto.offset.reset", "latest")
                .setValueOnlyDeserializer(new SimpleStringSchema())
//                执行checkpoint时提交offset到checkpoint,flink内部使用,并且提交一份到默认主题__consumer_offsets
//                .setCommitOffsetsOnCheckpoints(true) // checkpoint开启默认为true,否则为false;不支持该方法
                .setProperties(kafkaSourceProp)
                .build();

//        3. 读取kafka source message
        DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "flink_kafka_exactly_once", TypeInformation.of(String.class));

//        4. 随意的transformation
        SingleOutputStreamOperator<String> flatMapDS = kafkaDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(",");
                for (String word : words) {
                    Random random = new Random();
                    int i = random.nextInt(5);
                    if (i > 3) {
                        System.out.println("模拟出现bug...");
                        throw new RuntimeException("模拟出现bug...");
                    }
                    System.out.println(word + "===" + i);
                    out.collect(word + "===" + i);
                }
            }
        });

//        4.1. 打印结果容易观察
        flatMapDS.print();

//        5. kafka sink端的配置
        Properties kafkaSinkProp = new Properties();
        kafkaSinkProp.setProperty("transaction.timeout.ms", 1000 * 5 + ""); //设置事务超时时间,也可在kafka配置中设置
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers(kafkaServer)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("flink_kafka_sink")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setKafkaProducerConfig(kafkaSinkProp)
                .build();

//        6. 输出到kafka sink端
        flatMapDS.sinkTo(kafkaSink);

//        7. 执行
        env.execute(ExactlyOnce.class.getName());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/554814.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

大量excel文件私密性较强 需要密码保护 如何给excel文件批量加密

一&#xff0c;前言 在现代办公环境中&#xff0c;Excel文件已成为数据存储和交流的常见工具。然而&#xff0c;随着数据泄露和信息安全问题的日益严重&#xff0c;如何保护Excel文件的安全性成为了我们关注的焦点。批量加密Excel文件成为了一种有效的解决方案&#xff0c;它可…

【光伏行业】光伏发电的应用领域

光伏发电作为一种清洁、可再生的能源技术&#xff0c;在多个领域都有广泛的应用。以下是一些光伏发电的主要应用领域&#xff1a; 住宅和商业建筑&#xff1a;光伏板可以安装在屋顶上&#xff0c;为建筑提供电力。这不仅降低了对电网的依赖&#xff0c;还减少了电费支出&#x…

Spring (四) 之配置及配置文件的操作

文章目录 1、Spring 基于注解的配置基于注解的配置引入依赖包配置实体类数据访问层业务层业务层实现测试 2、Bean和Component和Configuration的区别1 Bean:2 Component:3 Configuration:总结&#xff1a; 区别Component和Configuration区别 3、Spring读取properties配置文件准备…

【C++】开始使用优先队列

送给大家一句话: 这世上本来就没有童话&#xff0c;微小的获得都需要付出莫大的努力。 – 简蔓 《巧克力色微凉青春》 开始使用优先队列 1 前言2 优先队列2.1 什么是优先队列2.2 使用手册2.3 仿函数 3 优先队列的实现3.1 基本框架3.2 插入操作3.3 删除操作3.4 其他函数 4 总结T…

【ZYNQ】PS和PL数据交互丨AXI总线(主机模块RTL代码实现)

文章目录 一、PS-PL数据交互桥梁&#xff1a;AXI总线1.1 AXI总线和AXI4总线协议1.2 PS-PL数据传输的主要场景1.2.1 PL通过AXI_HP操作DDR3 Controller读写DDR31.2.2 PS作主机使用GP接口传输数据 1.3 AXI端口带宽理论1.4 AXI 总线的读写分离机制1.5 握手机制1.6 AXI_Lite总线1.7 …

【软考】设计模式之命令模式

目录 1. 说明2. 应用场景3. 结构图4. 构成5. 优缺点5.1 优点5.2 缺点 6. 适用性7.java示例 1. 说明 1.命令模式&#xff08;Command Pattern&#xff09;是一种数据驱动的设计模式。2.属于行为型模式。3.请求以命令的形式被封装在对象中&#xff0c;并传递给调用对象。4.调用对…

制作直通网线和交叉网线

制作直通网线和交叉网线 1. 网络直通线2. 网络交叉线References 双绞线的连接方法有两种&#xff1a;直通连接和交叉连接 。 直通连接是将双绞线的两端分别都依次按白橙、橙、白绿、蓝、白蓝、绿、白棕、棕色的顺序 (国际 EIA/TIA 568B 标准) 压入 RJ45 水晶头内。这种方法制作…

剧本杀小程序:线上剧本杀成为行业必然趋势

剧本杀作为一个社交娱乐游戏方式&#xff0c;受到了年轻人的喜爱。剧本杀是一个新型的游戏方式&#xff0c;能够带大众带来新鲜感和刺激感&#xff0c;让玩家通过角色扮演进行游戏体验&#xff1b;并且剧本杀还具有较强的社交性&#xff0c;在当下快节奏生活下&#xff0c;以游…

【AI】在Windows10下部署本地LLM RAG服务

【背景】 上一篇介绍了如何用Ubuntu命令行部署ollama LLM+RAG服务。部署后等于拥有了基于内网的AI Saas服务,其它内网用户可以通过默认的网址访问Playground对AI进行问答。 【概念】 RAG:通过词向量技术,将文件内容向量化后,通过语言模型以自然交流的形式得到文本相关的…

MySQL表级锁——技术深度+1

引言 本文是对MySQL表级锁的学习&#xff0c;MySQL一直停留在会用的阶段&#xff0c;需要弄清楚锁和事务的原理并DEBUG查看。 PS:本文涉及到的表结构均可从https://github.com/WeiXiao-Hyy/blog中获取&#xff0c;欢迎Star&#xff01; MySQL表级锁 MySQL中表级锁主要有表锁…

【CAD建模号】学习笔记(三):图形绘制区1

图形绘制区介绍 CAD建模号的图形绘制区可以绘制我们所需要的各种3D模型&#xff0c;绘制的图形即为模型对象&#xff0c;包括线、面、体等。 1. 二维图形绘制组 二维图形是建模的基础&#xff0c;大多数复杂的模型都是基于二维图形制作出来的&#xff0c;掌握二维图形的绘制等…

png静图转换gif动图如何操作?轻松一键快速转换gif动图

想要把多张Png格式图片转换成gif格式动图时要怎么操作&#xff1f;图片常见的有静图和动图&#xff0c;而jpg、png、gif等是最常见的图片格式。想要把png格式图片转换成gif动画还不想下载任何软件的时候就可以使用gif制作工具。不需要下载软件在线就能操作。能够轻轻松松就能快…

uniapp开发之【上传图片上传文件】的功能

一、上传图片功能&#xff0b;图片回显点击图片预览&#xff1a; 是通过uview框架的u-upload进行开发的&#xff0c;先导入uview&#xff01; <template><view class""><!-- 按钮 --><view class"listBtn" click"uploadDesign()…

透过内核收包流程理解DPDK

前言 网络通信作为互联网的底座&#xff0c;其网络服务质量直接影响着用户的上网体验。如微信这类级别的应用&#xff0c;拥有上亿级别的日活&#xff0c;是典型的高并发的场景&#xff0c;简单的堆硬件无法有效的解决该类问题&#xff0c;提高单台服务器的性能成为问题的焦点…

百度文心一言与谷歌Gemini的对比

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 本文从多角度将百度文心一言与谷歌Gemini进行对比。因为不同评测基准的侧重点和难度可能有所不同&#xff0c;所以本文涉及到的评测结果仅供参考。Gemini和文心一言都是非常…

文件 IO

IO 的概念 I&#xff1a;Input 输入 O&#xff1a;Output 输出 输入和输出的规定 人为规定&#xff1a; 以CPU为视角&#xff0c;数据远离 CPU 的是输出&#xff0c;数据朝着 CPU 过来的是输入 例子&#xff1a; 1.在电脑上&#xff0c;通过网络下载文件 > 数据通过网卡…

IDM的实用功能介绍+下载地址

下载地址 &#xff1a; 下载到idm 互联网下载管理器&#xff08;IDM&#xff09;实用功能概述 1. 多线程下载 IDM使用多线程技术&#xff0c;将文件分割成多个部分同时下载&#xff0c;显著提高下载速度。 2. 计划任务 用户可以设定下载任务的开始时间&#xff0c;甚至在特…

如何解决msvcp140.dll文件丢失的问题?有效修复msvcp140.dll的方法分析

在使用Windows操作系统时&#xff0c;经常会遇到一些烦人的问题&#xff0c;其中&#xff0c;缺少dll文件是比较常见的情况之一。而其中&#xff0c;缺少msvcp140.dll文件是常见的一种情况。今天&#xff0c;我们将重点介绍如何解决msvcp140.dll文件丢失的问题&#xff0c;并向…

Docker 磁盘占用过多问题处理过程记录

一、问题描述 突然发现服务器磁盘使用超过95%了&#xff08;截图时2.1 和 2.2 已经执行过了&#xff09; 二、问题分析与解决 2.1&#xff0c;docker 无用镜像占用磁盘 # 使用 docker images 查看服务的镜像 docker images# 可以手动删除一些很大不用的 docker rmi ***## 也…

javaWeb项目-校园交友网站功能介绍

项目关键技术 开发工具&#xff1a;IDEA 、Eclipse 编程语言: Java 数据库: MySQL5.7 框架&#xff1a;ssm、Springboot 前端&#xff1a;Vue、ElementUI 关键技术&#xff1a;springboot、SSM、vue、MYSQL、MAVEN 数据库工具&#xff1a;Navicat、SQLyog 1、Java语言 Java语…
最新文章