Flink 1.16.0源码编译
本机环境操作系统:win10
1234567891011121314151617181920> scala -versionScala code runner version 2.12.7 -- Copyright 2002-2018, LAMP/EPFL and Lightbend, Inc.> java -versionjava version "1.8.0_271"Java(TM) SE Runtime Environment (build 1.8.0_271-b09)Java HotSpot(TM) 64-Bit Server VM (build 25.271-b09, mixed mode)> mvn -versionApache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)Maven home: E:\apache-maven-3.6.3\bin\..Java version: 1.8.0_261, vendor: Oracle Corporation, runtime: ...
Rust-NoteBook
泛型 & 声明周期T: 'static
T 没有生命周期泛型参数
T 类型对象所包含的所有的生命周期参数都是 'static
‘static 含义参考
示例:123fn my_func<F>(handle: F) where F: 'static { todo!()}
即函数my_func拥有入参handle的所有权
类型变量
T
&T
&mut T
例子
i32, &i32, &mut i32, &&i32, &mut &mut i32, …
&i32, &&i32, &&mut i32, …
&mut i32, &mut &mut i32, &mut &i32, …
结构体中的引用成员变量123struct A<'a> { name: &'a str}
表示程序运行过程中name应 ...
Deltalake CDF & CDC
目录格式
CDC提交信息
注:以下元数据信息不是来自同一delta table,此处只是为了说明元数据包含的内容
Create1234567{"commitInfo":{"timestamp":1666008673760,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"part\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"4","numOutputRows& ...
kafka升级-Security
搭建环境
kafka-2.6.2_2.13
zookeeper-3.5.9
jdk-8u201 ( jdk1.8.0_201 )
centos7
Zookeeper开启SASL认证vim conf/zoo.cfg,增加如下安全配置
123456...authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasl# 授权登录时限jaasLoginRenew=3600000
创建JASS配置vim conf/zookeeper_sasl_jaas.conf
123456789101112131415# 配置账户信息Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super="super!@#" # 创建超级管理员,用户名:super,密码:super!@# user_kafka="kaf ...
kafka && SCRAM + ACL配置
环境
kafka-2.6.2_2.13
zookeeper-3.5.9
OS-Centos7
java:1.8.0_201
kafka-broker 与 zk节点混部
配置Zookeeperzoo.cfg1234567891011121314151617181920212223242526272829303132tickTime=2000dataDir=/data/zk/datadataLogDir=/data/zk/logsmaxClientCnxns=50minSessionTimeout=60000maxSessionTimeout=120000clientPort=2181syncLimit=5initLimit=10autopurge.snapRetainCount=20autopurge.purgeInterval=24lw.commands.whitelist=*server.1=node1:2888:3888server.2=node2:2888:3888server.3=node3:2888:3888authProvider.1=org.apache.zooke ...
kafka答疑
kafka消息大小参数brokermessage.max.bytes
kafka.log.Log#analyzeAndValidateRecords
如果生产者设置压缩, 校验batch大小是否 <= message.max.bytes
调大此参数后, 消费者获取大小也必须增加
topicmax.message.bytes
作用和message.max.bytes一样, max.message.bytes针对指定topic生效, message.max.bytes是全局
校验batch大小
producermax.request.size
org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize
校验单条消息大小是否大于max.request.size, 若大于直接抛出异常
参考
https://zhuanlan.zhihu.com/p/142139663
kafka自定义quota规则
背景现在运行kafka集群未做限流,为预防流量暴增影响集群稳定性,需对kafka集群做限流
目标可通过指定client-id限流
kafka自带的user认证部署较复杂,不考虑
现状应用app: 接入标准mq sdk的app自带client-id参数,但需要修改,格式:[固定前缀]_[sdkName]_[appName]_[topic]_xxxx
flink: 未设置client-id,且sdk未推广,格式:[固定前缀]_flink_[jobId]_[topic]_xxxx
方案修改参数:client.quota.callback.class扩展ClientQuotaCallback接口
实现client-Id解析类123456789101112131415161718192021222324252627282930313233import java.util.HashMap;import java.util.Map;public class HbClientIdParser { public static final String DEFAULT_CLIENT_I ...
Flink RPC
主要抽象类RpcEndpoint: 具体服务的尸体抽象,线程安全类
RpcGateway: 用于远程调用的代理接口,提供对应RpcEndpoint的地址方法
RpcService: RpcEndpoint的运行时环境,并提供异步任务或周期性任务调度
RpcServer: RpcEndpoint自身的代理对象
FencedRpcEndpoint & FencedRpcGateway: 在调用RPC方法时需提供token信息
Flink Sql-Increment Window
Flink Sql-Increment Window场景描述:希望能够绘制一天内的 pv/uv 曲线,即在一天内或一个大的窗口内,输出多次结果,而非等窗口结束之后统一输出一次结果。
使用示例如下:
12345678SELECT INCREMENT_START(ts, INTERVAL '10' SECOND, INTERVAL '1' MINUTE), username, COUNT(click_url)FROM user_clicksGROUP BY INCREMENT(ts, INTERVAL '10' SECOND, INTERVAL '1' MINUTE),username
底层逻辑
添加 INCREMENT 窗口集合函数解析规则修改 flink-sql-parser 模块1234567891011121314151617181920212223public class FlinkSqlOperatorTable { // ---------- ...
Flink Sql 侧流输出(二)
Flink Sql 侧流输出(二)使用示例如下:
1、定义处理逻辑
12345678public static class MyProcessFunction extends ScalarFunction { @DataTypeHint("ROW<id1 STRING, id2 STRING> NOT NULL") public Row eval(String id1, String id2) { return Row.of(id1, id2); }}
2、注册UDF函数
1tEnv.createTemporarySystemFunction("MyProcessFunction", MyProcessFunction.class);
3、创建sink表
1234567## sideOutput 输出端CREATE TABLE sideOutput_table( `data` Row < id1 INT, id2 VARCHAR >) WI ...