返回首页

gbase数据、南大通用产品文档:GBase8sifx_int8soint() 函数

更新日期:2024年09月11日


GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 763 -
ifx_int8soint() 函数将 int8 类型数值转换为 C int 类型数值。
语法
mint ifx_int8soint(int8_val, int_val)
ifx_int8_t *int8_val;
mint *int_val;
int8_val
指向 ifx_int8soint() 将其值转换为 mint 类型值的 int8 结构的指针。
int_val
指向 ifx_int8soint() 放置转换的结果处的 mint 值的指针。

用法
ifx_int8soint() 库函数将 int8 值转换为 C 整数。C 整数的大小依赖于您正在使用的
计算机的硬件和操作系统。因此,ifx_int8soint() 函数将整数值与 SQL SMALLINT 数据类
型同等看待。SMALLINT 的有效范围在 32767 与 -32767 之间。要将较大的 int8 值转换
为较大的整数,请使用 ifx_int8solong() 库函数。

返回代码
0
转换成功。
<0
转换失败。

示例
demo 目录中的文件 int8soint.ec 包含下列样例程序。
/*
* ifx_int8soint.ec *
The following program converts three strings to INT8 types and
converts the INT8 type values to C integer type values.
Then the values are displayed.

*/


GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 764 -
#include

EXEC SQL include "int8.h";

char string1[] = "-12,555";
char string2[] = "480";
char string3[] = "5.2";

main()
{
mint x;
mint i =0;
ifx_int8_t num1, num2, num3;

printf("IFX_INT8sOINT Sample ESQL Program running.\n\n");
if (x = ifx_int8cvasc(string1, strlen(string1), &num1))
{
printf("Error %d in converting string1 to INT8\n", x);
exit(1);
}
if (x = ifx_int8cvasc(string2, strlen(string2), &num2))
{
printf("Error %d in converting string2 to INT8\n", x);
exit(1);
}
if (x = ifx_int8cvasc(string3, strlen(string3), &num3))
{
printf("Error %d in converting string3 to INT8\n", x);
exit(1);
}
printf("\nConverting INT8 to integer\n");
if (x= ifx_int8soint(&num1, &i))
{

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 765 -
printf("\tError %d in converting INT8 to integer\n", x);
exit(1);
}
else
{
printf("String 1= %s\n", string1);
printf("INT8 value is = %d\n", i);
}
printf("\nConverting second INT8 to integer\n");
if (x= ifx_int8soint(&num2, &i))
{
printf("\tError %d in converting INT8 to integer\n", x);
exit(1);
}
else
{
printf("String2 = %s\n", string2);
printf("INT8 value is = %d\n", i);
}
printf("\nConverting third INT8 to integer\n");

/* note that the decimal will be truncated */

if (x= ifx_int8soint(&num3, &i))
{
printf("\tError %d in converting INT8 to integer\n", x);
exit(1);
}
else
{
printf("String3 = %s\n", string3);
printf("INT8 value is = %d\n",i);
}

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 766 -
printf("\nIFX_INT8sOINT Sample Program over.\n\n");
exit(0);
}
输出
IFX_INT8sOINT Sample ESQL Program running.
Converting INT8 to integer

Executing: ifx_int8soint(&num1,&i)
String 1= -12,555
The value of the first integer is = -12555


Converting second INT8 to integer

Executing: ifx_int8soint(&num2, &i)
String2 = 480
The value of the second integer is = 480


Converting third INT8 to integer

Executing: ifx_int8soint(&num3, &i)
String3 = 5.2
The value of the third integer is = 5
IFX_INT8sOINT Sample Program over.

增量同步支持同步工具与第三方应用集成,且仅支持部分部署读数据组件
和管理组件的方式。
读数据组件部分部署的使用方式与正常部署读写数据组件的使用方式基本
相同,区别是不需要部署写数据组件,写数据组件由第三方应用替代。同步工
具只负责将数据从源端数据库同步到消息队列,消息队列中的数据如何消费由
第三方工具决定。
需要注意的是,为了保证同步工具异常或者用户消费者工具异常情况下数
据不丢失,需要第三方工具记录消费数据的lsn 号(包含在协议数据中,由
mappingId,lowestLSN,highestLSN 三个属性组成),在任何一方异常或其他
原因导致的需要重启服务的情况下,需要在重启前将上一次正确记录的lsn 号
设置到同步工具的./lsndata/下面的配置文件[server]_LSN.properties 中,
其中[server]名称是动态获取的,
该值由config_task.xml 的server 节点配置
的id 值决定。
支持以SQL 语句、数据两种方式传输同步数据。修改server 标签中的
dataFormatType 属性为SQL 即可使用SQL 语句传输数据。注意:目前版本的同
步工具只有读数据组件支持传输SQL 语句。
由于消息队列的消息大小限制,读数据组件在发送数据前会根据消息大小
限制进行分包发送,所以第三方应用在接收数据时需要将分包数据合并。样例
代码请参见附录D。

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 165 -
第三方应用在接收到数据后,需要使用Protocol Buffer 将数据转换为数
据对象,再从中提取需要的元数据信息和数据。数据传输协议请参见附录E。
使用方式:将config_task.xml 中的target 标签注释掉或者删除即可,这
样同步工具将只启动source 节点配置的对应的读数据组件。
示例:
在本示例中,仅部署管理组件及读数据组件,源端数据库为GBase 8t, 消
息队列采用RabbitMQ,消息队列存储的数据格式为SQL。
源数据库信息:
IP 为192.168.5.4;
数据库用户名为informix,
密码为1;
数据库test,表为t1。
RabbitMQ 信息:IP 为192.168.5.13;用户名为root,密码为root。
读数据组件信息:IP 为192.168.5.1;用户名为root,密码为root。
管理组件信息:IP 为192.168.5.3;用户名为root,密码为root。
下面按照步骤详细介绍:
修改任务文件config_task.xml,该文件在读端,管理端各有一份,如果
同步工具安装后再修改该文件请确保两处配置文件一致。注意:当第三方应用
为GBase 8a consumer 时,必须将source 的dbObjToUpperCase 属性配置为
“true”。


mqType="rabbitmq" queueName="8tto8tMQ" dataRecoveryMode="auto"
isHighAvailable="false">
httpPort="8080" isTableHotPatch="true"/>
readParseAdapter="adapter" user="root" password="root" queueSize="10000"
openMonitor="true" monitorInterval="300" rpcPort="9191" sshPort="22"
dbObjToUpperCase="true" />


GBase RTSync 同步工具手册
- 166 -
南大通用数据技术股份有限公司
id="8t_8a_5.1_5.2">

charset="GB18030"
type="GBASE8T"
startLSN="0"
fetchSize="500"
oracleScnStep="50000"
timestampWithFraction="false"
maxRecordsPerRead="200"
maxSizeOfPerRecord="1024"
timeOut="2"
driver="com.informix.jdbc.IfxDriver"
url="jdbc:informix-sqli://192.168.5.4:31267/
syscdcv1:informixserver=ol_informix1210"
user="informix"
password="1"
catalog="test">



deleteMode="NORMAL"
sourceTableName="t1" sourcePkColName="" targetTableName=""
targetPkColName="" >







修改消息队列配置文件:config_rabbitmq_8tto8tMQ.properties

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 167 -
#rabbitmq 服务器信息
rabbitmq.host=192.168.5.13
#服务端口
rabbitmq.port=5672
#rabbitmq访问用户名
rabbitmq.username=admin
#rabbitmq访问密码
rabbitmq.password=admin
#rabbitmq 虚拟主机名称
rabbitmq.virtualhost=/
#rabbitmq发送消息失败后重发机制-1:代表一直尝试重发,0:代表不重发,发送失败就
退出。n(n>0):重发指定次数。
rabbitmq.resend.max.retries=-1
#rabbitmq 消息队列相关属性
rabbitmq.queue.name=8tto8aMQ
rabbitmq.queue.declare.durable=true
rabbitmq.queue.declare.exclusive=false
rabbitmq.queue.declare.autoDelete=false
#rabbitmq交换机相关属性
rabbitmq.exchange.name=8tto8tExc
rabbitmq.exchange.routingKey=sqldata8t8a
rabbitmq.exchange.type=direct
rabbitmq.exchange.durable=true
#消息持久化设置,默认按照该值即可
rabbitmq.message.durable=PERSISTENT_TEXT_PLAIN
#数据被消费时是否需要发送ack信息给rabbitmq
rabbitmq.consumer.autoAck=true
#发送单条给rabbitmq的最大值,如果超过该值将进行拆包发送。
rabbit.send.data.max.size=1000000
完成后,即可启动同步工具。使用root 用户登录192.168.5.3,执行如下
命令启动同步工具组件:
# cd /opt/RTSync /
# sh RTSyncManagerServer.sh start
启动后,同步工具将开始同步数据工作。

GBase RTSync 同步工具手册
- 168 -
南大通用数据技术股份有限公司
可以使用如下命令监控同步工具运行状况。
# tail -f logs/sync.log
7 功能约束
1. 同步工具各组件正常启动后才可对源数据库进行操作,否则有可能导致部
分数据丢失。
2. 只支持插入,更新,删除操作的同步。
3. 在同步更新、删除操作时,源数据表应尽量有唯一标识列,否则将进行整
行数据匹配。
4. 不同的写数据组件不能向同一个目标数据库的同一个表写入数据,以保证
目标数据库不会锁表。
5. 同步工具的增量同步依赖于各个数据库的JDBC 驱动,因此需要保证JDBC
驱动配置的正确性,在JDBC 的url 配置参数中请不要填写用户名和密码。
6. 同步过程中,目标GBase 8a 集群出现宕机、服务中断等异常情况时,同步
任务将继续进行,但数据可能会丢失。
7. 由于GBase 8t 和GBase 8a 的部分数据类型不完全匹配,因此当同步数据
的数据类型、长度不一致时,可能会出现同步数据的更新、删除操作失败。
8. 在目标数据库为GBase 8a 时,如果GBase 8a 版本为8.5.1,则在某些特定
场景下无法保证事务的完整性;如果GBase 8a 版本为8.6.2,则需要保证
在源数据表中含有主键(或者在同步工具中配置表主键),且需要忽略
Timestamp 类型的毫秒值,推荐使用同步工具与GBase 8a Kafka Consumer
集成使用。
9. 在源数据库和目标数据库同为GBase 8t 数据库的场景中,同步工具需要在
用户数据库下创建table_lsn 表,该表用于记录已同步的数据的LSN 号。

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 169 -
因此同步工具使用的目标数据库用户需要有在目标数据库下的操作权限。
10. 目前版本的同步工具不支持将某一个表的数据同步到多个表中,也不支持
将多个表中的数据同步到某一个表中。
11. 在同步部分列数据时,同步工具不支持将某一列数据同步到其他多列中,
也不支持将多列数据同步到某一列中。
12. 在源数据库为Oracle 数据库的场景中,同步工具需要从Oracle 中获取表
的元数据,并将获取到的元数据与配置中的表信息进行匹配校验。由于
Oracle 的数据库对象名称是大小写敏感的,因此,需要保证配置文件中的
schema 名称、表名、列名与Oracle 系统库中的大小写一致。
13. 在使用网闸的场景中,目前支持的网闸型号为:南瑞SysKeeper-2000。其
他型号的网闸暂不支持。
14. 目前版本的同步工具只支持解析GBase 8t 的部分数据类型,具体如下:
序号
支持的数据类型
1
INT8
2
INTEGER
3
SMALLINT
4
DECIMAL(m,p)
5
DOUBLE PRECISION
6
DATETIME YEAR TO FRACTION (5)
7
CHAR(n)
8
VARCHAR(n)
9
LVARCHAR(n)
10
FLOAT
11
DATE
12
SERIAL

GBase RTSync 同步工具手册
- 170 -
南大通用数据技术股份有限公司
序号
支持的数据类型
13
SERIAL8
14
BLOB
15
CLOB
16
BYTE
17
TEXT
15. 目前版本的同步工具只支持Oracle 的部分数据类型,具体如下:
序号
支持的数据类型
1
INT
2
INTEGER
3
SMALLINT
4
DECIMAL(p,s)
5
DOUBLE PRECISION
6
FLOAT
7
CHAR(n)
8
VARCHAR2(n)
9
DATE
10
TIMESTAMP
16. 目前版本的同步工具只支持MySQL 的部分数据类型,具体如下:
序号
支持的数据类型
1
INT
2
INTEGER
3
SMALLINT

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 171 -
序号
支持的数据类型
4
TINYINT
5
MEDIUMINT
6
BIGINT
7
INTEGER
8
DECIMAL
9
NUMERIC
10
FLOAT
11
REAL
12
DOUBLE
13
CHAR
14
VARCHAR
15
DATE
16
TIME
17
DATETIME
18
TIMESTAMP
19
BOOL
20
ENUM
21
SET
22
BLOB
23
TINYBLOB
24
MEDIUMBLOB
25
LONGBLOB
26
TEXT

GBase RTSync 同步工具手册
- 172 -
南大通用数据技术股份有限公司
序号
支持的数据类型
27
TINYTEXT
28
MEDIUMTEXT
29
LONGTEXT
30
BINARY
31
VARBINARY
17. 目标端为oracle 时如果oracle 表名含有小写,必须配置为表级或者列级
同步;如果列中含有小写,必须配置成列级同步。
18. 部分部署时,
只有开启了dbObjToUpperCase=”true”才支持与目标端进行
元数据映射。
19. 源端是MySQL 时,只有在《sourcedb》标签中设置
isParallelForMysql="true",才开启针对MySQL binlog 的日志的并行解
析功能。且并行的线程数,需要根据《source》标签中的
dataFormatParallel 来设置。
20. 源端与目标端都为8t 时支持ddl 同步
1)如果配置为operationType="ddl_dml",
代表同时支持ddl 和dml 同步,
有如下限制:
1
blob,
clob列必须建立在所有列的最前面,
且同一行不支持多个blob,
clob。
2
每个ddl 操作需要为独立的事务。
3
表中有历史数据,
如果该表发生ddl
(主要是alter 操作)
后再有dml
进入,可能造成该表的数据不一致,多数据或者少数据。
4
同步工具不支持按组分源的特性,即不支持一个组下配置多个
source-target 节点,且要求每个库仅能出现在一个source-target 下
5
支持库级映射,但是不支持表级,列级映射。
6
Ddl 语句有限支持如下几种:
1.create table,支持有主键,但不支持复杂约束条件。
2.drop table 支持。
3.alter table xxx add column 支持,但不能含有复杂约束,且只能追

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 173 -
加列到最后
4.alter table xxx drop column 支持,但只能从最后移除。
5.alter table modify column 支持,但不能含有复杂约束。
6.其他ddl 不支持,其他ddl 解析异常不会导致服务退出,不影响数
据的dml 同步。
2)如果配置为operationType="ddl ",代表仅同步ddl,有如下限制:
1
同步不支持任何映射,包括库,表,列的映射。
2
ddl 同步依赖于ddloperation 表,
原则上ddloperation 表不记录的ddl
sql 是不会同步的。
8 附录

GBase RTSync 同步工具手册
- 174 -
南大通用数据技术股份有限公司
附录A config_task.xml
该配置文件为同步任务配置文件,涉及读组件,写组件,管理组件的部署
信息,源端及目标端数据库信息,以及待同步的表及列信息。具体如下:




id="server1"
mqType="kafka" queueName="8tto8tMQ" isHighAvailable="false"
dataRecoveryMode="file" >

httpPort="8080" isTableHotPatch="true"/>

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 175 -

readParseAdapter="adapter" user="root" password="111111" queueSize="10000"
openMonitor="true" monitorInterval="300" rpcPort="9191" sshPort="22"
dbObjToUpperCase="false" isConvertSingleQuote="true"
queuePollTimeOut="600" isEmptyStrPkEqualsNull="true"
isAllowInsertPkNull="true"/>

writeDataAdapter="adapter" user="root" password="111111"
errorishandle="true" sendDataBySocket="true"/>



id="5.1_5.2">



charset="GB18030"
type="GBASE8T"
startLSN="0"

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 179 -
fetchSize="500"
oracleScnStep="50000"
timestampWithFraction="false"
maxRecordsPerRead="200"
maxSizeOfPerRecord="1024"
timeOut="2"
driver="com.informix.jdbc.IfxDriver"
url="jdbc:informix-sqli://192.168.5.4:31267/syscdcv1:informixserver=
OL_informix1210;CLIENT_LOCALE=zh_cn.GB18030-2000;DB_LOCALE=zh_cn.GB18030
-2000;NEWCODESET=GB18030,GB18030-2000,5488;"
user="informix"
password="informix"
catalog="test"
filter=""
directQuery=""
allowPrimaryKeyNull="true"
showCommitTime="true"
parallel="1"
pdburl=""
pdbuser=""
pdbpassword=""
pdbcatalog=""
dccHost="192.168.7.117"
dccPort="8011"
isGetDBspace="false"
onlyDml="false"
haNodeName=""
haCheckTime="30"
maxMinerQueueSize="10"
cacheMaxSize="104857600"
packetMaxSize="5242880"
bootstrapServers="192.168.7.128:9092,192.168.7.129:9092"
zookeeperServers="192.168.7.128:2181,192.168.7.129:2181"
consumerNums="1"

GBase RTSync 同步工具手册
- 180 -
南大通用数据技术股份有限公司
groupid="test"
kafkaConsumerParamers="zookeeper.sync.time.ms=4000;zookeeper.session
.timeout.ms=8000;fetch.message.max.bytes=10240000;auto.commit.enable=fal
se"
maxCountWait="500"
recordTopicUntilCount="10"
separator="\\u007C"
logRetentionDays="3"
logRetentionCheckIntervals="1800"
logArchivedIntervals="60"
longTxMaxWaitTime="3600"
longTxCheckIntervalTime="600"
ignoreDeletedArchivelog="false"
>


charset="UTF8"
type="GCLUSTER"
commitSize="10000"

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 181 -
queueSize="200000"
user="gbase"
password="gbase20110531"
driver="com.gbase.jdbc.Driver"
catalog="full"
timeOut="2"
url="jdbc:gbase://192.168.7.187:5258/full?useOldAliasMetadataBehavio
r=true&rewriteBatchedStatements=true&connectTimeout=0&socket
Timeout=0">




isContinueSyncWhileError="true" deleteMode="NORMAL"
sourceTableName="t1"
sourcePkColName="" targetTableName="t1" targetPkColName=""
sourceTextColName="">

targetColName="a1"/>
targetColName="b1"/>

sourceTableName="t2" sourcePkColName="" targetTableName="t2"
targetPkColName="" />







GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 183 -
附录B config_[mqtype]_[name].properties
mqtype,
name 的实际值根据config_task.xml 中配置的实际值确定消息队
列使用的配置文件信息。
1) config_task.xml中mqtype=kafka,name=8tto8tMQ,说明使用的消息队列为
kafka,对应配置文件为:
config_kafka_8tto8tMQ.properties
同步工具启动前,需要配置正确的Kafka 信息,内容如下:
#kafka 主题
topic.name=8tto8a
#是否自动创建topic,默认值为false
topic.enable.auto.create=true
#topic备份数量, 0<取值范围<=节点数量
topic.replication.num=2
#producer conf
#kafka集群ip及服务端口
bootstrap.servers=192.168.5.11:9092,192.168.5.12:9092
#批量发送数据到kafka的批次数
kafka.batch.commit.count=400
#批量发送数据到kafka等待的超时时间,单位毫秒
kafka.batch.commit.time=100
#kafka 发送数据到kafka异常重发的次数,超过该次数仍然未成功同步工具将退出服务
kafka.resend.max.retries=3
#kafka 接收数据的应答机制。
0代表立即返回,
无论数据是否成功写入kafka;1代表leader
节点成功写入磁盘后才返回结果;all代表所有节点都要写入磁盘才返回结果。根据数据安
全性要求可以实际配置。
kafka.acks=all
#kafka生产者其他必要参数配置,参数信息为标准的kafka生产者参数。
kafka.producer.paramers=request.timeout.ms=30000;metadata.fetch.timeout.
ms=30000
#consumer conf
#kafka所使用zookeeper信息

GBase RTSync 同步工具手册
- 184 -
南大通用数据技术股份有限公司
zookeeper.connect=192.168.5.21:2181,192.168.5.22:2181,192.168.5.23:2181
#kafka消费者组名称
group.id=test
#自动提交时间间隔,一般不用修改
auto.commit.interval.ms=1000
#zookeeper follower能落后leader多久还被认为是活着的
zookeeper.sync.time.ms=2000
#zookeeper session超时时间,如果这段时间没有收到zk的心跳,则认为kafka服务异常
zookeeper.session.timeout.ms=4000
#发送数据到kafka单条数据的最大值,单位字节,超过该大小经进行分包发送
send.data.max.size=1000000
#从kafka获取单条数据的最大值,单位字节,该值应该大于等于send.data.max.size
fetch.message.max.bytes=1000000
注:以上参数大部分为Kafka 配置文件中标准参数,如果有不明之处,可
以参考Kafka 官网了解。
2) config_task.xml 中mqtype=rabbitmq,name=8tto8tMQ,对应配置文件为:
config_rabbitmq_8tto8tMQ.properties
同步工具启动前,需要配置正确的rabbitmq 信息,内容如下:

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 185 -
#rabbitmq 服务器信息
rabbitmq.host=192.168.5.13
#服务端口
rabbitmq.port=5672
#rabbitmq访问用户名
rabbitmq.username=admin
#rabbitmq访问密码
rabbitmq.password=admin
#rabbitmq 虚拟主机名称
rabbitmq.virtualhost=/
#rabbitmq发送消息失败后重发机制-1:代表一直尝试重发,0:代表不重发,发送失败就
退出。n(n>0):重发指定次数。
rabbitmq.resend.max.retries=-1
#rabbitmq 消息队列相关属性
rabbitmq.queue.name=8tto8aMQ
rabbitmq.queue.declare.durable=true
rabbitmq.queue.declare.exclusive=false
rabbitmq.queue.declare.autoDelete=false
#rabbitmq 交换机相关属性
rabbitmq.exchange.name=8tto8tExc
rabbitmq.exchange.routingKey=sqldata8t8a
rabbitmq.exchange.type=direct
rabbitmq.exchange.durable=true
#消息持久化设置,默认按照该值即可
rabbitmq.message.durable=PERSISTENT_TEXT_PLAIN
#数据被消费时是否需要发送ack信息给rabbitmq
rabbitmq.consumer.autoAck=true
#发送单条给rabbitmq的最大值,如果超过该值将进行拆包发送。
rabbit.send.data.max.size=1000000
注:以上参数大部分为rabbitmq 相关属性,详细信息可以参考rabbitmq
官网。

GBase RTSync 同步工具手册
- 186 -
南大通用数据技术股份有限公司
附录C 合并数据包样例代码
数据接收端在接收数据后,需要将分包数据合并,样例代码如下:
//processData(ObjectToByteArrayUtil.ByteArrayToProtoData(delivery.ge
tBody())); //从消息队列中取数据,转换为Data,再调用processData方法进行合包操作,
再调用入库方法完成sql语句入库
private static List dataList = new ArrayList(); //缓存一
个事务的数据
private static long lowLsn = 0;
private static long highLsn = 0;
private static void processData(Data data) {
try {
if (data == null) { //System.out.println("发生问题,程序需要退
出,具体问题请检查log日志。");
System.exit(-1);
}
if (data.getDataType() != DataContentType.FLAG) {//
DataContentType.FLAG是一个事务的结束标记,判断是否是一个事务的结束,如果不是,
则将数据加进dataList
dataList.add(data);
} else if (data.getDataType() == DataContentType.FLAG) { // 如
果一个事务结束了
if (data.getFlagValue()) {// 如果结束标记是true,说明数据包
正常,可以继续进行处理,false则说明本次数据包数据有误,不做处理
Data.Builder txDataBuilder = null;
List txDataList = new ArrayList();
for (Data realData : dataList) {
if (txDataList.size() > 0 &&
txDataList.get(0).getTxPacketId() != realData.getTxPacketId()) { //
logger.error("事务数据不完整,为保证数据完整性,同步工具暂停工作。");
System.exit(-1);
}

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 187 -
txDataList.add(realData);
if (realData.getIsPacketFinish()) {// 如果完整数
据包结束,通过获取isPacketFinish来判断,如果不是结束,则继续循环
if (txDataList.size() !=
realData.getPacketLength()) {// logger.error("事务数据不完整,为保证数据完整
性,同步工具暂停工作。");
System.exit(-1);
} else {
if (txDataList.size() > 1) { // 有多个数
据包,需要进行包合并
for (Data txData : txDataList) { //
将多个数据包数据组合为一个完整事务
if (txData.getPacketSn() == 1)
{// 事务的第一个包
txDataBuilder =
txData.toBuilder(); //构建结构
} else {
for (String sl :
txData.getSqlList()) { // 将多个包中的sql组合进一个包中,形成一个事务的完整数
据包
txDataBuilder.addSql(sl);
}
}
}
Data newData =
txDataBuilder.build();// 合包后的newData是完整的事务,
包括一个事务中的所有sql
/*for (int i = 0; i <
newData.getSqlCount(); i++) {
System.out.println(newData.getSql(i));
}*/
/** TODO: 此处应调用入库方法,消费数据
包数据进行入库
*/
txDataBuilder = null;// 清理
txDataList.clear();
lowLsn = newData.getLowestLSN();

GBase RTSync 同步工具手册
- 188 -
南大通用数据技术股份有限公司
highLsn = newData.getHighestLSN();
} else {// 不需要合包,realData即为完整的
事务
/**
TODO:此处应调用入库方法,
消费数据
包数据进行入库
*/
/*for (int i = 0; i <
realData.getSqlCount(); i++) {
System.out.println(realData.getSql(i));
}*/
txDataList.clear();// 清理
lowLsn = realData.getLowestLSN();//
需要记录事务的lowLsn和highLsn 为了断点续传:如果同步工具异常退出,当再次启动时
需要告知读端此数值。//可以将此值输出到日志或者记录到文件等
highLsn = realData.getHighestLSN();
}
}
}
}
dataList.clear(); //消费完一个完整数据包即一个事务后清
理dataList
} else {
dataList.clear();
}
}
} catch (Exception e) {
}
}

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 189 -
附录D 数据传输协议说明
增量同步数据传输协议为Protocol Buffer 对象,协议文件内容如下:
syntax = "proto3";
option java_package = "cn.gbase.sync.intf";
option java_outer_classname = "DataProto";
//数据协议
message Data{
//源数据库类型定义,枚举值
enum DatabaseType{
GBASE8T=0;
ORACLE=1;
ORACLE12C=2;
ORACLEMODE2=3;
GBASE8TADV=4;
MYSQL=5;
GCLUSTER=6;
SQLSERVER=7;
ORACLELOB=8;
GBASE8S=9;
KAFKA=10;
CDCFILE=11;
}

GBase RTSync 同步工具手册
- 190 -
南大通用数据技术股份有限公司
//源数据内容类型定义,枚举值:目前包括纯数据、sql 语句、数据批次结
束标记(用于判断kafka 批量发送的当前批次数据是否结束,当值为结束标记
时,不包含实际数据,只包含flagValue 值)
enum DataContentType{
PUREDATA=0;
SQL=1;
FLAG=2;
HANDSHAKE=3;
}
//数据表元数据定义,供TableRowData 使用
message TableMetadata{
//数据库名称
string dbName=1;
//表名称
string tableName=2;
//列名称列表
repeated string colNames=3;
//列类型列表,与列名称一一对应
repeated string colTypes=4;
//主键列名称列表
repeated string pkColNames=5;
//是否含有大对象列
bool hasLO=6;
}
//DDL 数据内容定义

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 191 -
message DdlData{
//ddl 数据内容格式类型
enum DdlDataFormatType{
STRING=0;
MAP=1;
}
//ddl 数据格式类型
DdlDataFormatType type=1;
//ddl 操作数据
string ddlDataString=2;
//ddl 操作数据map 类型
map ddlDataMap=3;
}
//行数据内容定义
message TableRowData{
//数据库名称
string dbName=1;
//表名称
string tableName=2;
//表元数据
TableMetadata tableMetadata = 3;
//操作类型定义,枚举值
enum TableOperType{
INSERT=0;
UPDATE=1;

GBase RTSync 同步工具手册
- 192 -
南大通用数据技术股份有限公司
DELETE=2;
TRUNCATE=3;
}
//操作类型
TableOperType operType=4;
//暂不使用
int32 sourceServerIndex=5;
//暂不使用
int64 lsn=6;
//列数据值列表
repeated string colValues=7;
//修改前列数据值列表,仅对update 有效
repeated string colValuesBefore=8;
//标记对应列的是否包含,针对于oracle logminer 取主键表的列不
全的情况
repeated bool isColContains=9;
//数据值是否为空列表,由于列表中无法放置空值增加
repeated bool isColValuesNull=10;
//修改前数据值是否为空列表
repeated bool isColValuesBeforeNull=11;
//是否为ddl
bool isDDL = 12;
//ddl 数据列表,按照顺序可以拼成sql 语句

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 193 -
repeated DdlData ddlData = 13;
//存放ddl 操作类型,类型在代码中定义,可以直接匹配
int32 ddlType=14;
//tablerowdatabean 对象是否包含有含有大对象的SQL
bool hasLO=15;
//存放列类型的列表
repeated string colTypes=16;
}
//源数据库类型
DatabaseType dbType = 1;
//源数据内容类型
DataContentType dataType = 2;
//行数据列表,可以存放一个事务内的数据,当
dataType=DataContentType.PUREDATA 时使用(本项目不使用)
repeated TableRowData tableRowData = 3;
//上次停止时未完成的事务的lowest LSN,需要在提交数据时持久化,以
保证同步工具重启后的数据一致性
int64 lowestLSN = 4;
//上次停止时已完成的事务的highest LSN,需要在提交数据时持久化,以
保证同步工具重启后的数据一致性
int64 highestLSN = 5;
//组件配置的server id
string serverId = 6;
//组件配置的mappingid
string mappingId = 7;

GBase RTSync 同步工具手册
- 194 -
南大通用数据技术股份有限公司
//sql 语句列表,可以存放一个事务内的sql 语句,当
dataType=DataContentType.SQL 时使用
repeated string sql = 8;
//本批次发送标记,当dataType=DataContentType.FLAG 时使用,true 为
成功(本批次数据有效),false 为失败(本批次数据无效,可以抛弃)
bool flagValue = 9;
//大事务数据分包传输,分包大小以config_kafka.properties 配置文件
的send.data.max.size 值为依据
//同一事务下的分包id 为相同值
int32 txPacketId = 10;
//数据分包传输,包的个数
int32 packetLength = 11;
//数据分包传输,当前包的序号
int32 packetSn = 12;
//数据分包传输,当前事务是否传输完成,true 为完成,false 为未完成
bool isPacketFinish = 13;
//事务内data 的最后offset 值,由消费者维护
int64 offset = 14;
//用于计算事务中每个条数据对应的位置编号
int64 position=15;
//事务提交时间
int64 commitTime=16;
//已经解析并发送的事务个数

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 195 -
int64 commitNum=17;
//所属的用户8t 时userid oracle 时用户名
string user=18;
//用于存放ddl 操作的数据库名称,如果为空则表示事务中没有ddl 操作
string ddlDbName=19;
//data 对象是否包含有含有大对象的SQL
bool hasLO=20;
//用于存放当前事务的字节大小
int64 byteSize=21;
//针对mysql 源新增以下三个参数
//binlog 文件名称
string blFileName=22;
//binglog 文件中的开始位置
int64 blFileNamePos=23;
//时间戳
int64 blTime=24;
//数据恢复点,为8a consumer 记录用
string recoveryPoint=25;
}
其中,option java_package = "cn.gbase.sync.intf"表示Java 包名称为
cn.gbase.sync.intf,如果需要生成C++代码文件,则需要修改为option
package="",引号中应改为需要的namespace 名称;option
java_outer_classname = "DataProto"表示输出的Java 类名称为DataProto,
如果生产C++文件则该行不起作用。
将协议文件修改好后,需要使用protoc.exe 生成相应的文件,假设
data.proto 文件位于D:\protobuf\目录下,在命令行中执行:

GBase RTSync 同步工具手册
- 196 -
南大通用数据技术股份有限公司
protoc -ID:\protobuf\ --cpp_out=D:\protobuf\java\
D:\protobuf\data.proto
如果需要生成Java 文件,则将cpp_out 改为java_out 后执行。
C++使用到的类库需要通过官方提供的protobuf-cpp-3.0.0 中的源码进行
编译。
Java 使用的类库会随同步工具安装包一起,
名称为protobuf_3.0.0.jar,
该类库使用Java7 编译生成。如有特殊需求,请使用官方提供的
protobuf-java-3.0.0 中的源码进行编译。

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 197 -
附录E Zookeeper、Kafka 快速自动化安装
Zookeeper、kafka 的安装部署使用RTSync 安装包自带的版本及自动化安
装工具进行快速安装,具体的部署安装方法如下:
备注:如果需要手动部署安装zookeeper、kafka,请参考章节8 附录中的
附录F zookeeper 及kafka 安装使用说明
1. 解压zookeeper 和kafka 自动化安装包
将安装光盘中的zookeeper 和kafka 的自动化安装包
zkandkfkautoinstall.tar.gz 拷贝到服务器上,例如:以Redhat 为例,将其
放置到/opt/目录下。需要执行以下命令完成解压步骤:
#tar –zxvf zkandkfkautoinstall.tar.gz
进入安装目录,
cd /opt/zkandkfkautoinstall/
#目录列表如下:
[root@localhost zkandkfkautoinstall]# ll
total 59560
drwxr-xr-x 2 root root
4096 Feb 20 10:58 install
-rwxr-xr-x 1 root root
569765 Feb 20 11:01 install.log
-rwxr-xr-x 1 root root
11155 Feb 20 10:49 installzkandkafka.py
-rwxr-xr-x 1 root root 37664956 Feb 20 10:52 kafka_2.11-0.10.2.1.tgz
-rwxr-xr-x 1 root root
402 Feb 20 10:49 readme.txt
-rwxr-xr-x 1 root root 22724574 Feb 20 10:52 zookeeper-3.4.9.tar.gz
2. 配置ipList.cfg 文件
进入安装目录的install 目录下,
# cd /opt/zkandkfkautoinstall/install
编辑ipList.cfg 文件,主要需要配置zookeepr、kafka 的安装服务器ip、端
口号、安装用户(安装前提所有服务器节点的用户名的密码必须一致)、kakfa
存放路径等信息,详细如下ipList.cfg 内容(下面是以单个节点zookeeper

GBase RTSync 同步工具手册
- 198 -
南大通用数据技术股份有限公司
和kafka 为例):
[hosts]
#填写所有需要远程安装zookeeper 和kafka 的服务器ip(kafka 节点必须在
zookeeper 服务器上)
zk_hostList=172.16.3.76
#填写安装程序所在的服务器ip
srchost=172.16.3.76
#填写安装zookeeper、kafka 的操作系统用户
user=root
[zookeeper_conf]
#填写zookeeper 端口号,默认2181,根据现场要求可以进行修改
clientPort=2181
#填写zookeeper 的内部通讯端口,默认2888:3888 不需要修改,根据现场要求
可以进行修改
zkinsideport=2888:3888
[kafka_conf]
#填写所有安装kafka 的服务器ip(必须是zookeeper 服务器所在的机器上部
属),以逗号分隔
host.name=172.16.3.76
#填写kafka 的端口号,默认9092,根据现场要求可以进行修改
port=9092
#填写每个kafka 节点的数据日志存放路径(两个路径之间以逗号分隔,分别与
host.name 参数ip 位置顺序对应),存放磁盘空间建议至少1T 以上
log.dirs=/opt/wgh/autozkkfk/kfk-logs
以上内容配置完成后,开始执行后续的安装脚本。
3. 执行安装脚本
进入到安装目录,执行installzkandkafka.py 脚本,进入交互式界面,

GBase RTSync 同步工具手册
南大通用数据技术股份有限公司
- 199 -
[root@localhost zkandkfkautoinstall]# ./installzkandkafka.py
当前安装程序所在路径:/opt/zkandkfkautoinstall
Please entry zookeeper Install Path(default:/opt):/opt/wgh/autozkkfk
===============================================================================
==
zookeeper Install Path:/opt/wgh/autozkkfk
===============================================================================
==
Are you sure to use above configuration to install zookeeper? (y/n):y
ip list
['172.16.3.76']
Please entry system password:******

GBase 8s 产品可以支持多种语言、文化和代码集。由给定地域和编码中语言使用的与数字
数据、货币、日期和时间的字符集、整理和表示法相关的所有信息都集中在称为 Global
Language Support (GLS) 语言环境的单个环境中。
GBase 8s OLE DB Provider 遵循用于日期、时间和货币的 ISO 字符串格式,如 Microsoft™
OLE DB 标准所定义。可以通过设置 GBase 8s 环境变量或注册表项(如 DBDATE)来覆
盖该缺省值。
本出版物中的示例在编写时假设您使用以下某种语言环境:UNIX™ 平台上的 en_us.8859-
1 (ISO 8859-1) 。这些语言环境支持用于显示和输入日期、时间、数字和货币值的美国英
语格式约定。它们还支持 ISO 8859-1 代码集(在 UNIX 和 Linux™ 上),这些代码集包
括 ASCII 代码集以及很多 8 位字符(如 é、è 和 ñ)。
如果您计划在数据或 SQL 标识中使用其他语言环境中的字符,或者希望符合字符数据的
其他整理规则,那么可以指定其他语言环境。