使用kafka consumer 需要按照如下方式进行配置,可变更参数的配置参考补充说 明。 1. 配置gcluster 参数 $GCLUSTER_BASE/config/gbase_8a_gcluster.cnf _gbase_transaction_disable=1(注意一定不要用0) gcluster_lock_level=10(不建议用2) _gcluster_insert_cache_buffer_flag=1 gcluster_assign_kafka_topic_period=20 gcluster_kafka_max_message_size=1000000 gcluster_kafka_batch_commit_dml_count=100000 gcluster_kafka_local_queue_size=210000 gcluster_kafka_consume_batch=100 gcluster_kafka_user_allowed_max_latency=15 gcluster_kafka_consumer_enable= 1 gcluster_suffix_consistency_check=1 gcluster_kafka_primarykey_can_be_null=0 gcluster_kafka_parallel_commit = 1 gcluster_kafka_consumer_latency_time_statistics = 1 gcluster_kafka_ignore_pos_field = 1 gcluster_kafka_consumer_escape_zero=1 说明(可变更参数) gcluster_assign_kafka_topic_period,自动接管consumer 的时间周期,单 位为秒,例如A 节点宕机了,最大需要等待gcluster_assign_kafka_topic_period 秒之后, A 节点负责的同步任务会被其他节点接管。 最小值20s, 最大值120s。 gcluster_kafka_max_message_size,从kafka topic 获得消息的最大长度, GBase 8a MPP Cluster 产品手册 5 数据库管理指南 文档版本953(2022-09-15) 南大通用数据技术股份有限公司 1265 单位为字节,最大值1000000000 字节,这个值需要大于等于kafka server 的配 置(message.max.bytes),否则可能造成消费问题,如果kafka 队列中存在一 条消息,其大小超过gcluster_kafka_max_message_size 就会造成消费卡住。 gcluster_kafka_batch_commit_dml_count,一次提交dml 操作的数量,适 当调大能明显提高性能,但是如果一个topic 涉及的表很多(几百个表)则建 议该参数调小, 表越多越应该调小, 调小的目的是使得一次提交命中的表少一 些,具体需要结合具体用户场景、同步速度、资源占用情况具体对待。未来启 用新事务后, 表数量多对性能的影响会降低, 会再次更新手册。 需要注意的是, 此参数是一个意向值, 程序未必会严格按照此参数来提交, 比如如果一个事务 包含大量DML 操作,那么程序必须确保事务完整性;再比如从kafka 取消息、 解析消息的速度慢于往单机提交数据的速度, 那么程序也会选择先提交, 而不 是一定要等待满足gcluster_kafka_batch_commit_dml_count 参数。 gcluster_kafka_user_allowed_max_latency,允许消息在GBase 8a MPP Cluster 集群层缓存多长时间, 超时之后必须马上提交,单位是毫秒。 此参数与 gcluster_kafka_batch_commit_dml_count 作用类似,都是决定什么时候提交的。 多攒一些数据再提交,有利于降低磁盘占用,如果用户对数据延迟不太敏感, 而对磁盘占用比较敏感,可以通过这个参数来调节。典型值一般可以设置为5 0000~20000,需要注意提交动作本身也需要消耗时间。 gcluster_kafka_local_queue_size,储存dml 操作的队列的长度,建议至 少为gcluster_kafka_batch_commit_dml_count 的二倍多一些。 gcluster_kafka_consume_batch,consumer 一次读取kafka 消息的条数。 如果kafka 队列里的消息size 较小,可以设大,反之设小,此参数对性能的影 响不大,所以一般没必要设太大,建议设为10~1000。 gcluster_kafka_ignore_pos_field 控制单个consumer 是否比对POS (防止 重复消费) 。 客户多线程往kafka 中写入数据, 写入kafka 的数据不能确保POS 有序, 原consumer 消费数据时会做POS 检查导致无序的数据入库时会有遗漏。 现在参数gcluster_kafka_ignore_pos_field,控制consumer 是否进行POS 检查。 POS 检查开启,consumer 消费时会丢弃已消费序号之前的消息;POS 检查关 闭, consumer 会将kafka 的每条消息均入库, 所以需要生产端确保发送到kafka 的消息无重复。默认值为0,即检查重复消息;值为1 时,不检查重复消息。 用于Consumer 消费only insert 消息,客户能保证kafka 消息无重复的特殊场 景。配置方法可以手动修改gclusterdb.kafka_consumers。如:Update gclusterdb.kafka_consumers set common_options=’ gcluster_kafka_ignore_pos_field=1’ where `name`=’consumer_1’;最后重启 consumer_1。 开关参数_t_kafka_varchar_auto_truncate, 在consumer消费kafka 信息时, 遇到长度超数据库定义长度的字段(仅限varchar 类型),开启可以自动进行 截位并正常消费入库模式。 缺省值为0; 设置值为1 时, 表示让consumer 对json 消息中的after 内容进行长度判断,如果长度超过了目标表的列宽,则自动按 列宽(字符长度)截断,只对varchar 列做处理。 控制参数:gcluster_kafka_message_format_type 功能:设定consumer 在解析kafka 消息时,以什么格式来解析。 取值范围:JSON、PUREDATA、AUTO_DETECT 说明: puredata 对应rtsync 生产的protobuf 消息; GBase 8a MPP Cluster 产品手册 5 数据库管理指南 文档版本953(2022-09-15) 南大通用数据技术股份有限公司 1266 AUTO_DETECT(默认)是让consumer 自己侦测消息格式,这时候consumer 会 先尝试用puredata 格式进行解析,通过就认为是puredata 格式,否则就认为是 json 格式。 注: consumer 启动后,只在解析第一条消息时做这个判断, 后面直接用这个判 断结果。 控制单个consumer 是否比对POS(防止重复消费) gcluster_kafka_ignore_pos_field:控制单个consumer 是否比对POS(防止重复 消费)。客户多线程往kafka 中写入数据,写入kafka 的数据不能确保POS 有 序,原consumer 消费数据时会做POS 检查导致无序的数据入库时会有遗漏。 参数gcluster_kafka_ignore_pos_field,控制consumer 是否进行POS 检查。POS 检查开启,consumer 消费时会丢弃已消费序号之前的消息;POS 检查关闭, consumer 会将kafka 的每条消息均入库,所以需要生产端确保发送到kafka 的 消息无重复。 gcluster_ kafka_ig nore_pos _field 控制consumer 是否进行POS 检查 默认值为0,即检查重复消息; 值为1 时,不检查重复消息 适用场景: Consumer 消费 only insert 消息, 客户能保证kafka 消息无重复的特 殊场景 配置方法: 手动修改gclusterdb.kafka_consumers Update gclusterdb.kafka_consumers set common_options=’ gcluster_kafka_ignore_pos_field=1’ where `name`=’consumer_1’; 重启consumer_1。 GBase 8a MPP Cluster 产品手册 5 数据库管理指南 文档版本953(2022-09-15) 南大通用数据技术股份有限公司 1267 注意 下面参数支持consumer 之间独立配置: 一次提交dml 操作的数量:gcluster_kafka_batch_commit_dml_count 延迟提交时间:gcluster_kafka_user_allowd_max_latency 控制单个consumer 是否比对POS(防止重复消费):gcluster_kafk a_ignore_pos_field 配置方法: 手动修改gclusterdb.kafka_consumers Update gclusterdb.kafka_consumers set common_options=’ gcluster_ kafka_batch_commit_dml_count=10000,gcluster_kafka_user_allowed_ max_latency=1000,gcluster_kafka_ignore_pos_field=1’ where `name` =’consumer_1’; 重启consumer_1。 2. 配置gnode 参数 $GBASE_BASE/config/gbase_8a_gbase.cnf _gbase_transaction_disable=1(注意一定不要用0) gbase_buffer_insert=1024M gbase_tx_log_flush_time=5 _gbase_kafka_transaction_mode = 1 GBase 8a MPP Cluster 产品手册 5 数据库管理指南 文档版本953(2022-09-15) 南大通用数据技术股份有限公司 1268 说明 gbase_buffer_insert,insert buffer 的大小,随gcluster_kafka_batch_ commit_dml_count 的设置进行调整,如果数据量大,且consumer 任务多,建议调大。需要保证单机insert buffer 足够,否则会导致 异常。 gbase_tx_log_flush_time,单机内存数据刷新频率,单位为秒。建议 设为5 秒。 gbase_kafka_transaction_mode:kafka consumer 需要单机有最基本 的事务支持,即,在一个事务内,允许对同表多次写入。单机需要 打开标准事务或者打开此参数,否则启动kafka consumer 时会报 错。打开此参数后,单机支持在一个事务内,对同表先执行1 次d elete,再执行1 次insert。 注意 各节点的配置都要一致,只改动部分节点可能产生未知错误。