TemporalTableJoin

2022-01-13 10:04 来自:TechWeb 收藏 分享 邀请  阅读量:11490   

摘要: 前言 实时数仓,难免会遇到join维表的业务。现总结几种方案,供各位看官选择: 查找关联状态编程,预加载数据到状态中,按需取冷热数据广播维表TemporalTableJoinLookupTableJoin 其中...

前言

TemporalTableJoin

实时数仓,难免会遇到join维表的业务。现总结几种方案,供各位看官选择:

查找关联 状态编程,预加载数据到状态中,按需取 冷热数据 广播维表 Temporal Table Join Lookup Table Join

其中中间留下两个问题,供大家思考,可留言一起讨论。

查找关联

查找关联就是在主流数据中直接访问外部数据去根据主键或者某种关键条件去关联取值。

适合: 维表数据量大,但是主数据不大的业务实时计算。

缺点:数据量大的时候,会给外部数据源库带来很大的压力,因为某条数据都需要关联。

同步

访问数据库是同步调用,导致 subtak 线程会被阻塞,影响吞吐量

defanalyses:Unit=valenv:StreamExecutionEnvironment=FlinkStreamEnv.getvalsource:DataStream=KafkaSourceEnv.getKafkaSourceStream).map(JSON.parseObject(_)).filter(_!=null).flatMap(newFlatMapFunction(JSONObject,String)overridedefflatMap(jSONObject:JSONObject,collector:Collector):Unit=//如果topic就一张表,不用区分,如果多张表,可以通过database与table区分,放到下一步去处理//表的名字valdatabaseName:String=jSONObject.getString("database")//表的名字valtableName:String=jSONObject.getString("table")//数据操作类型INSERTUPDATEDELETEvaloperationType:String=jSONObject.getString("type")//主体数据valtableData:JSONArray=jSONObject.getJSONArray("data")//oldvalold:JSONArray=jSONObject.getJSONArray("old")//canaljson可能存在批处理出现data数据多条for(i首先把维表数据初始化到state中,设置好更新时间,定时去把维表

优点:flink 自己维护状态数据,"荣辱与共",不需要频繁链接外部数据源,达到解耦。

缺点:不适合大的维表和变化大的维表。

思考下:直接定义一个Map集合这样的优缺点是什么可以留言说出自己的看法

冷热数据

思想:先去状态去取,如果没有,去外部查询,同时去存到状态里面StateTtlConfig 的过期时间可以设置短点

优点:中庸取值方案,热备常用数据到内存,也避免了数据join相对过多外部数据源。

缺点:也不能一劳永逸解决某些问题,热备数据过多,或者冷数据过大,都会对state 或者 外部数据库造成压力。。

比如上面提到的字典表,每一个Task都需要这份数据,那么需要join这份数据的时候就可以使用广播维表。

valdimStream=env.addSource//广播状态valbroadcastStateDesc=newMapStateDescriptor(String,String)("broadcaststate",BasicTypeInfo.STRING_TYPE_INFO,newMapTypeInfolt,gt,(Long.class,Dim.class))//广播流valbroadStream=dimStream.broadcast()//主数据流valmainConsumer=newFlinkKafkaConsumer("topic",newSimpleStringSchema(),kafkaConfig)valmainStream=env.addSource(mainConsumer)//广播状态与维度表关联valconnectedStream=mainStream.connect(broadStream).map(..User(id,name)).key(_.1)connectedStream.process(newKeyedBroadcastProcessFunction(String,User,Map(Long,Dim),String)overridedefprocessElement(value:User,ctx:KeyedBroadcastProcessFunction(String,User,Map(Long,Dim),String)#ReadOnlyContext,out:Collector):Unit=//取到数据就可以愉快的玩耍了valstate=ctx.getBroadcastState(broadcastStateDesc)xxxxxx

「思考:」 如果把维表流也通过实时监控binlog到kafka,当维度数据发生变化时,更新放到状态中,这种方式,是不是更具有时效性呢。

通过canal把变更binlog方式发送到kafka中。

数据流定义成为广播流,广播到数据到主数据流中。

定义一个广播状态存储数据,在主数据进行查找匹配,符合要求则join成功。

Temporal Table Join

由于维表是一张不断变化的表那如何 JOIN 一张不断变化的表呢如果用传统的 JOIN 语法来表达维表 JOIN,是不完整的因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢我们是不知道的,结果是不确定的所以 Flink SQL 的维表 JOIN 语法引入了Temporal Table 的标准语法,用来声明关联的是维表哪个时刻的快照

普通关联会一直保留关联双侧的数据,数据也就会一直膨胀,直到撑爆内存导致任务失败,Temporal Join则可以定期清理过期数据,在合理的内存配置下即可避免内存溢出。

Event Time Temporal Join

语法

SELECTFROMtable1(AS)(LEFT)JOINtable2FORSYSTEM_TIMEASOFtable1.proctime

使用事件时间属性,可以检索过去某个时间点的键值这允许在一个共同的时间点连接两个表

举例

假设我们有一个订单表,每个订单都有不同货币的价格为了将此表正确地规范化为单一货币,每个订单都需要与下订单时的适当货币兑换率相结合

CREATETABLEorders,currencySTRING,order_timeTIMESTAMP(3),WATERMARKFORorder_timeASorder_time)WITH(/*...*/),CREATETABLEcurrency_rates(currencySTRING,conversion_rateDECIMAL(32,2),update_timeTIMESTAMP(3)METADATAFROM`values.source.timestamp`VIRTUALWATERMARKFORupdate_timeASupdate_time,PRIMARYKEY(currency)NOTENFORCED)WITH('connector'='upsert—kafka',/*...*/),event—timetemporaljoin需要temporaljoin条件的等价条件中包含的主键SELECTorder_id,price,currency,conversion_rate,order_time,FROMordersLEFTJOINcurrency_ratesFORSYSTEMTIMEASOForders.order_timeONorders.currency=currency_rates.currency Processing Time Temporal Join

处理时间时态表连接使用处理时间属性将行与外部版本表中键的最新版本相关联。

根据定义,使用processing—time属性,连接将始终返回给定键的最新值可以将查找表看作是一个简单的HashMap,它存储来自构建端的所有记录这种连接的强大之处在于,当在Flink中无法将表具体化为动态表时,它允许Flink直接针对外部系统工作

使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。,TP-LINK现已推出800万像素旗舰级画质家用摄像头,型号为TL-IPC48AW全彩,首发尝鲜价489元。

Lookup Table Join

Lookup Join 通常用于通过连接外部表补充信息,要求一个表具有处理时间属性,另一个表使 Lookup Source Connector。

JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source 用到的语法是 Temporal Joins 的语法

)|WITH(|'connector'='jdbc',|'url'='xxxx',|'driver'='$DRIVER_CLASS_NAME',|'table—name'='$tableName',|'lookup.cache.max—rows'='100',|'lookup.cache.ttl'='30s'|)|""".stripMargins"""|CREATETABLEcar(|`id`bigint,|`user_id`bigint,|`proctime`asPROCTIME()|)|WITH(|'connector'='kafka',|'topic'='$topic',|'scan.startup.mode'='latest—offset',|'properties.bootstrap.servers'='$KAFKA_SERVICE',|'properties.group.id'='indicator',|'format'='canal—json'|)|""".stripMarginSELECTmc.user_iduser_id,count(1)AS`value`FROMcarmcinnerjoinusersFORSYSTEM_TIMEASOFmc.proctimeasuonmc.user_id=s.idgroupbymc.user_id 总结

总体来讲,关联维表有四个基础的方式:

查找外部数据源关联

预加载维表关联(内存,状态)

冷热数据储备(算是1和2的结合使用)

维表变更日志关联(广播也好,其他方式的流关联也好)

「同时考虑:」 吞吐量,时效性,外部数据源的负载,内存资源,解耦性等等方面。

四种join方式不存在绝对的一劳永逸,更多的是针对业务场景在各指标上的权衡取舍,因此看官需要结合场景来选择适合的。

中国商业网资讯门户;更多内容请关注中国商业网各频道、栏目资讯免责声明:凡本站注明 “来自:(非中国商业网)”的新闻稿件和图片作品,系本站转载自其它媒体,转载目的在于信息传递,并不代表本站赞同其观点和对其真实性负责
鲜花
鲜花
握手
握手
雷人
雷人
路过
路过
鸡蛋
鸡蛋
这个人很懒,什么也没留下...
粉丝 阅读53185 回复0
ads2

Powered by 中国商业网 Licensed © 2001-

, Processed in 1.132592 second(s), 12 queries

01 02 03 04 05