返回首页

gbase数据、南大通用产品文档:GBase8akafka consumer 概述

更新日期:2024年09月11日

GBase 8a 集群支持kafka 流式数据消费入库。可以实现OLTP 数据库数据以及程
序产生的数据实时同步到8a 集群中。
kafka 是一款第三方的高吞吐量的分布式发布订阅消息系统,kafka 支持大量消息
数据进入kafka 集群系统,在kafka 集群中持久化存储和排队,等待消息消费者
(kafka consumer)读取消息。
GBase 8a 集群中集成了kafka consumer 组件,该组件支持用户在8a 集群中通过
sql 方式配置和管理consumer task,并从kafka 集群中读取消息同步到8a 集群数
据库内。
GBase 8a 集群中支持运行多个kafka consumer,多个kafka consumer 可以连接同
一个kafka server 也可以连接不同的kafka server。

GBase 8a MPP Cluster 产品手册
5 数据库管理指南
文档版本953(2022-04-10)
南大通用数据技术股份有限公司
1269
GBase 8a 集群也提供了kafka consumer 数据同步的状态监控,用户可以通过查询
系统表查看同步任务(consumer task)的状态。
8a 集群中的kafka consumer 组件工作流程如下:
1. OLTP 数据库通过OGG 或者RTSync 工具实时将变化数据信息发布到kafka 集
群中或者用户程序产生的数据通过API 发布到kafka 集群中。
2. 8a 集群内的kafka consumer 组件实时从kafka 集群中读取发布的数据信息,并
将这些数据信息转化成数据库操作在8a 集群中执行,以达到数据同步的目的。
kafka consumer 的主要功能是从kafka 集群中读取消息,
并按消息的格式解析消息
内容,将消息内容转化成数据库操作在8a 集群中执行。当前8a 集群的kafka
consumer 支持解析两种类型的数据库操作,
分别是kafka transaction topic 和kafka
loader consumer。
kafka transaction topic
可以解析insert、update(包括全列update 和非全列update)、delete 操作。不
支持DDL 和truncate 等其他操作。
transaction topic 的kafka consumer 以消息作为同步的基本单元,一条消息中可
以包含一个或多个数据库操作,
一条消息在8a 集群中要么全部执行成功,
要么
全部失败。
transaction topic 的消息格式有json 格式和puredata 格式:
json 是文本数据,可以来自用户程序产生或者RTSync 工具、OGG 工具。
puredata 是二进制数据,当前均来自RTSync 工具。
json 格式消息举例如下:
{
"table":"BDTEST.TEST4",
"op_type":"I",
"op_ts":"2022-01-16 09:26:29.707674",
"current_ts":"2022-01-16T17:26:34.556001",
"pos":"00000000030000002194",
"after":{
"A":4,
"B":40,
"C":"t4"
}
}
转化的数据库操作为:insert into test4 values(4,40,'t4');

GBase 8a MPP Cluster 产品手册
5 数据库管理指南
文档版本953(2022-04-10)
南大通用数据技术股份有限公司
1270
{
"table":"BDTEST.TEST4",
"op_type":"D",
"op_ts":"2022-01-16 09:36:44.703860",
"current_ts":"2022-01-16T17:36:49.047000",
"pos":"00000000030000003188",
"primary_keys":{"A"},
"before":{
"A":20
}
}
转化的数据库操作为:delete from test4 where a=20;
{
"table":"BDTEST.TEST4",
"op_type":"U",
"op_ts":"2022-01-16 09:32:33.705303",
"current_ts":"2022-01-16T17:32:36.839000",
"pos":"00000000030000002612",
"primary_keys":{"A"},
"before":{
"A":2
}
"after":{
"A":20,
"B":200,
"C":"t20"
}
}
转化的数据库操作为:update test4 set a=20,b=200,c='t20' where a=2;
注意:
json 消息中识别"A":"NULL"是A 的值为NULL 的字符串,
识别"A":"null"
是A 的值为空,识别"A":""是A 的值为’’。

GBase 8a MPP Cluster 产品手册
5 数据库管理指南
文档版本953(2022-04-10)
南大通用数据技术股份有限公司
1271
kafka loader topic
可以解析load 操作。消息内容是需要加载的裸数据,消息格式为裸数据本身的
文本格式。
loader topic 与transaction topic 在创建consumer 时有如下区别:
1. consumer loader topic 因为消息内容为裸数据,内容中不包含库表信息,所以
创建consumer 时需要指定加载到的目标表。而consumer transaction topic 消息
内容中自带目标表信息,因此不需要创建consumer 时指定。
2. consumer loader topic 允许对应的kafka topic 有多个partition,并且用户可以
指定从哪个(哪些)partition 进行消费。而consumer transaction topic 只允许对
应的kafka topic 中有一个partition。
3. consumer loader topic 对读取的消息数据落到库内的时间有延迟要求,需要通
过集群节点参数duration 设置每消费多长时间落地一次。consumer transaction
topic 没有这个要求。
4. consumer loader topic 需要配置加载选项
(字段间隔符、
行间隔符等)

consumer
transaction topic 不需要。
kafka consumer 的操作流程为:
1. 按需在集群各节点配置文件中调整kafka 相关参数
2. 创建consumer task
3. 启动consumer task
4. consumer 实时读取该任务发布的消息存入库中
5. 通过系统表gclusterdb.kafka_consumers 查看kafka consumer 进度和状态
注意

发布到kafka 集群的消息顺序必须与数据变化的发生顺序一致,kafka consumer 读
取消息将会直接按照kafka 集群中的消息顺序读取并同时进行执行。

kafka consumer 同步的消息内容中非全列update 性能比全列update 性能要慢,
使用
中请合理使用非全列update。

建议kafka server 使用UTF8(UTF8MB4)编码,发布到kafka server 的消息也使用
UTF8(UTF8MB4)编码。
因为json 消息中的格式关键字符在GBK 编码中可能是某个
汉子的一个字节,kafka server 转码json 消息可能会造成json 格式无效,导致8a 集
群kafka consumer 无法解析读取到的消息。

kafka consumer 支持任务接管,即kafka consumer task 1 所属的consumerA 因为软
硬件等原因异常停机,
会有其他好的coordinator 节点上启动新的consumerB 来接管
kafka consumer task 1,并从consumerA 的中断处继续进行同步。

kafka consumer task 启动后将持续运行,直到用户stop 该task。

GBase 8a MPP Cluster 产品手册
5 数据库管理指南
文档版本953(2022-04-10)
南大通用数据技术股份有限公司
1272

获取或设置参数的值。

语法
[Visual Basic]
Public Overrides Property Value As Object

Get

Set



GBase 8a 程序员手册ADO.NET 篇
南大通用数据技术股份有限公司

- 293 -
[C#]
public override Object Value { get; set; }

实现
IDataParameter.Value

语法:

-pr 选项执行与 oncheck -cr 相同的检查并显示保留页的信息。
-pR 选项执行与 onchdeck -cR 相同的检查,显示保留页的信息,并显示有关逻辑日志和物
理日志页的详细信息(标记活动物理日志页的开始和结束)。
如果已更改了配置参数信息(通过编辑配置文件),但还未重新初始化共享内存,那么
oncheck -pr 和 oncheck -pR 检测到不一致性并返回错误消息。
有关 oncheck -pr 输出的列表和解释,请参阅 保留页。有关 -cr 选项的描述,请参阅
oncheck -cr 和 -cR: 检查保留页。