返回首页

gbase数据、南大通用产品文档:GBase8aKafka 数据同步

更新日期:2024年09月11日

数据同步系统通过Oracle Goldengate、GBase RTSync 等工具复制Oracle、GBase 8s
等数据库的业务数据到GBase 8a MPP Cluster,为了应对业务系统可能的峰值,在
系统中加入Kafka 消息队列作为缓冲区。以Oracle 同步实时数据到GBase 8a 集群
为例,总体流程如下:
图2- 5 kafka 数据同步总体流程图
OGG 发送端(GoldenGate Extract)从Oracle 的在线日志和归档日志中抽取事务信
息,生成Trail 文件。OGG 接收端(GoldenGate Replicat)收到Trail 文件,抽取事
务信息转换为目标格式,并生产事务消息到Kafka。集群的Consumer 模块从Kafka
中消费事务消息,将数据更新到GBase 8a MPP Cluster 中。
Kafka consumer 的主要功能就是同步Kafka 数据到GBase 8a MPP Cluster:

根据配置,可以指定需要同步的业务;

在同步过程中,提供同步状态查询功能;

实现数据同步的高可用性和事务数据一致性。

本部分描述如何使用 sqlda 结构来处理 SELECT 语句。

要使用 sqlda 结构来处理未知的选择列表列:

1. 声明变量来保存 sqlda 结构的地址。
2. (以 PREPARE 语句)准备 SELECT 语句,来给它一个语句标识符。
该 SELECT 语句不可包括 INTO TEMP 子句。
3. 使用 DESCRIBE...INTO 语句来执行两项任务:
a) 分配 sqlda 结构。将分配了的结构的地址存储在您声明的
sqlda 指针中。
b) 确定选择列表列的数目和数据类型。DESCRIBE 语句为选择
列表的每一列填充 sqlvar_struct 结构。
4. 对于每一选择列表列检测 sqlda 的 sqltype 和 sqllen 字段,来
确定需要为该数目分配的内存量。
5. 保存存储在主变量中的 sqld 字段中的选择列表列的数目。
6. 声明并打开游标,然后,使用 FETCH...USING DESCRIPTOR 语句来
将列值访存至分配了的 sqlda 结构内,一次一行。
7. 将行输入从 sqlda 结构检索至带有 C 语言语句的主变量内,其访
问每一选择列表列的 sqldata 字段。

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 577 -

8. 释放分配给 sqldata 字段和 sqlda 结构的内存。

重要: 如果 SELECT 语句在 WHERE 子句中有未知数目和类型的输入参数,则您
的程序还必须以 sqlda 结构来处理这些输入参数。
执行返回多行的 SELECT
demo3.ec 样例程序以下列条件执行动态 SELECT 语句:
该 SELECT 返回多行。
该 SELECT 必须与游标相关联,以 OPEN 语句执行,并以 FETCH...USING
DESCRIPTOR 语句检索它的检索的值。

该 SELECT 或者没有输入参数,或者没有 WHERE 子句。
OPEN 语句不需要包括 USING 子句。

该 SELECT 在它的选择列表中有未知的列。
该 FETCH 语句包括 USING DESCRIPTOR 子句来将返回值存储在 sqlda 结构中。
demo3.ec 样例程序
demo4 样例程序
(demo4.ec 样例程序)
假设这些相同的条件,
在 demo4 使
用系统描述符区域来定义选择列表列时,demo3 使用 sqlda 结构。demo3 程序
不执行异常处理。
=================================================================
=====
1. #include
2. EXEC SQL include sqlda;
3. EXEC SQL include sqltypes;
4. main()
5. {
6. struct sqlda *demo3_ptr;
7. struct sqlvar_struct *col_ptr;
8. static char data_buff[1024];
9. int pos, cnt, size;
10. EXEC SQL BEGIN DECLARE SECTION;
11. int2 i, desc_count;
12. char demoquery[80];
13. EXEC SQL END DECLARE SECTION;

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 578 -

14. printf("DEMO3 Sample ESQL program running.\n\n");
15. EXEC SQL connect to 'stores7';
=================================================================
======

2 行

该程序必须包括 GBase 8s ESQL/Csqlda.h 头文件来提供对 sqlda 结构的定义。


6 - 13 行

6 和 7 行声明该程序需要的 sqlda 变量。demo3_ptr 变量指向将保存从数据库访存
的数据的 sqlda 结构。col_ptr 变量指向 sqlvar_struct 结构,以便于该代码可逐步经过
sqlda 的变长部分中的每一 sqlvar_struct 结构。
这些变量都不声明作为 GBase 8s ESQL/C
主变量。10 - 13 行声明主变量来保存从用户取得的数据,以及从 sqlda 结构检索的数据。


=================================================================
======
16. /* These next four lines have hard-wired both the query and
17. * the value for the parameter. This information could have
18. * been entered from the terminal and placed into the strings
19. * demoquery and a query value string (queryvalue), respectively.
20. */
21. sprintf(demoquery, "%s %s",
22. "select fname, lname from customer",
23. "where lname < 'C' ");
24. EXEC SQL prepare demo3id from :demoquery;
25. EXEC SQL declare demo3cursor cursor for demo3id;
26. EXEC SQL describe demo3id into demo3_ptr;
=================================================================
======

16 - 24 行

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 579 -


这些行为 SELECT 语句组装字符串
(在 demoquery 中)

并准备它作为 demo3id 语
句标识符。
25 行

此行为准备好的语句标识符 demo3id 声明 demo3cursor。


26 行

该 DESCRIBE 语句为 demo3id 语句标识符内的准备好的语句描述选择列表列。为
此,您必须在使用 DESCRIBE 之前准备该语句。此 DESCRIBE 包括 INTO 子句来执行
sqlda 结构,
demo3_ptr 指向其作为这些列描述的位置。
DESCRIBE...INTO 语句还为 sqlda
结构分配内存,并将此结构的地址存储在 demo3_ptr 变量中。

demo3 程序假设在运行时刻组装下列 SELECT 语句,并存储在 demoquery 字符串
中:
SELECT fname, lname FROM customer WHERE lname < 'C'

在 26 行中的 DESCRIBE 语句之后,sqlda 结构的组件包含如下:
sqlda 组件 demo3_ptr->sqld 有值 2,因为从 customer 表选择了两列。
组件 demo3_ptr->sqlvar[0],包含关于 customer 表的 fname 列信息的 sqlvar_struct
结构。例如,demo3_ptr->sqlvar[0].sqlname 组件给出第一列的名称(fname)。
组件 demo3_ptr->sqlvar[1],包含关于 customer 表的 lname 列的信息的 sqlvar_struct
结构。

=================================================================
======
27. desc_count = demo3_ptr->sqld;
28. printf("There are %d returned columns:\n", desc_count);
29. /* Print out what DESCRIBE returns */
30. for (i = 1; i <= desc_count; i++)
31. prsqlda(i, demo3_ptr->sqlvar[i-1]);
32. printf("\n\n");

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 580 -

=================================================================
======

27 和 28 行

27 行将由 DESCRIBE 语句找到的选择列表列的数目指定给 desc_count 主变量。28
行将此信息显示给用户。


29 - 32 行

此 for 循环仔细检查该选择列表的列的 sqlvar_struct 结构。它使用 desc_count 主
变量来确定由 DESCRIBE 初始化的这些结构的数目。对于每一 sqlvar_struct 结构,
prsqlda() 函数(31 行)显示诸如数据类型、长度和名称这样的信息。要了解 prsqlda() 的
描述,请参阅 75 - 81 行的描述。

=================================================================
======
33. for(col_ptr=demo3_ptr->sqlvar, cnt=pos=0; cnt < desc_count;
34. cnt++, col_ptr++)
35. {
36. /* Allow for the trailing null character in C
37. character arrays */
38. if(col_ptr->sqltype==SQLCHAR)
39. col_ptr->sqllen += 1;
40. /* Get next word boundary for column data and
41. assign buffer position to sqldata */
42. pos = (int)rtypalign(pos, col_ptr->sqltype);
43. col_ptr->sqldata = &data_buff[pos];
44. /* Determine size used by column data and increment
45. buffer position */
46. size = rtypmsize(col_ptr->sqltype, col_ptr->sqllen);
47. pos += size;
48. }

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 581 -

=================================================================
======

33 - 48 行

这第二个 for 循环为 sqldata 字段分配内存,并将设置 sqldata 字段来指向此内存。


对于每一选择列表列,40 - 47 行检测 sqlda 的 sqltype 和 sqllen 字段,来确定您需
要为该数据分配的内存的数量。该程序不使用 malloc() 来动态地分配空间。反而,它使用
静态数据缓冲区
(8 行上定义的 data_buff 变量)
来保存该列数据。
对于列数据类型,
GBase
8s ESQL/Crtypalign() 函数(42 行)返回下一词边界的位置(在 col_ptr->sqltype 中)。
然后,
43 行将 data_buff 数据缓冲区内此位置的地址指定给 sqldata 字段
(对于接收由该
查询返回的值的列)。

GBase 8s ESQL/Crtypmsize() 函数(46 行)返回由 sqltype 和 sqllen 字段指定的
SQL 数据类型所需要的字节数。然后,47 行按该数据需要的大小增加该数据缓冲区指针
(pos)。
=================================================================
======
49. EXEC SQL open demo3cursor;
50. for (;;)
51. {
52. EXEC SQL fetch demo3cursor using descriptor demo3_ptr;
53. if (strncmp(SQLSTATE, "00", 2) != 0)
54. break;
55. /* Print out the returned values */
56. for (i=0; i57. printf("Column: %s\tValue:%s\n", demo3_ptr-
>sqlvar[i].sqlname,
58. demo3_ptr->sqlvar[i].sqldata);
59. printf("\n");
60. }
=================================================================
======


GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 582 -

49 行

当数据库服务器打开 demo3cursor 游标时,它执行 SELECT 语句。如果您的
SELECT 语句的 WHERE 子句包含输入参数,则您还需要指定 OPEN 的 USING
DESCRIPTOR 子句。


50 - 60 行

对于从数据库访存的每一行,执行此内层的 for 循环。FETCH 语句(52 行)包括
USING DESCRIPTOR 子句来指定 demo3_ptr 指向的 sqlda 结构作为列值的位置。在此
FETCH 之后,将列值存储在指定的 sqlda 结构中。

if 语句(53 和 54 行)检测 SQLSTATE 变量的值来确定该 FETCH 是否成功。如
果 SQLSTATE 指示任何不成功的状态,则执行 54 行并终止该 for 循环。对于选择列表
的每一列,56 - 60 行显示 sqlname 和 sqldata 字段的内容。

重要: demo3 程序假设返回的列为字符数据类型。如果该程序未进行此假设,则它
会需要检查 sqltype 和 sqllen 字段,来确定保存 sqldata 值的主变量的恰当的数据类型。

=================================================================
======
61. if (strncmp(SQLSTATE, "02", 2) != 0)
62. printf("SQLSTATE after fetch is %s\n", SQLSTATE);
63. EXEC SQL close demo3cursor;
=================================================================
======

61 和 62 行

在该 for 循环之外,程序再次检测 SQLSTATE 变量,如果执行成功、发生运行时刻
错误或警告(类代码不等于 "02"),以便于它可通知用户。

63 行

在访存所有行之后,CLOSE 语句关闭 demo3cursor 游标。

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 583 -


=================================================================
======
64. EXEC SQL free demo3id;
65. EXEC SQL free demo3cursor;
66. /* No need to explicitly free data buffer in this case because
67. * it wasn't allocated with malloc(). Instead, it is a static char
68. * buffer
69. */
70. /* Free memory assigned to sqlda pointer. */
71. free(demo3_ptr);
72. EXEC SQL disconnect current;
73. printf("\nDEMO3 Sample Program Over.\n\n");
74. }
75. prsqlda(index, sp)
76. int2 index;
77. register struct sqlvar_struct *sp;
78. {
79. printf(" Column %d: type = %d, len = %d, data = %s\n",
80. index, sp->sqltype, sp->sqllen, sp->sqldata, sp->sqlname);
81. }
=================================================================
======

64 和 65 行

这些 FREE 语句释放为 demo3id 准备好的语句和 demo3cursor 数据库游标分配的
资源。

66 - 71 行

在该程序的结尾,
释放分配给 sqlda 结构的内存。
由于此程序不使用 malloc()
来分配数据缓冲区,因此,它不使用 free() 系统调用来释放 sqldata 指针。
虽然从静态缓冲区分配内存是简单的,但它有缺点:保持分配此缓冲区,直到

GBase 8s ESQL/C 编程指南
南大通用数据技术股份有限公司
- 584 -

程序结束为止。

free() 系统调用(71 行)释放 demo3_ptr 指向的 sqlda 结构。

75 - 81 行

prsqlda() 函数显示关于选择列表列的信息。它从 sqlvar_struct 结构读取此信息,将
其地址传至该函数内(sp)。
提示: GBase
8s
ESQL/C 演示程序 unload.ec 和 dyn_sql.ec(在 dyn_sql 程
序中描述)还使用 sqlda 来描述选择列表的列。
执行单个 SELECT
demo3 程序假设 SELECT 语句返回多行,因此,该程序与游标相关联。如果在此时
您知道编写的是动态 SELECT 总是只返回一行的程序,则您可省略游标并使用
EXECUTE...INTO DESCRIPTOR 语句,而不用 FETCH...USING DESCRIPTOR。您必须仍
使用 DESCRIBE 语句来定义选择列表列。

DEFAULTESCCHAR 配置参数指定用于 LIKE 和 MATCHES 的缺省转义符。
onconfig.std 值
DEFAULTESCCHAR 反斜线字符(\)。
如未出现
如果在 onconfig 文件中未设置值,则使用反斜线字符(\)。

\ = 使用反斜线字符作为转义符。
NONE = 无缺省转义符。
character = 任何一字符值都可用作转义符。
生效
编辑 onconfig 文件并重启数据库服务器之后。
用法
通过使用带有您想使用的转义符的 SET ENVIRONMENT DEFAULTESCCHAR 语句,
可以
在会话中取代缺省值。例如:
SET ENVIRONMENT DEFAULTESCCHAR '\'