返回首页

gbase数据、南大通用产品文档:GBase8agcluster_kafka_consumer_latency_time_statistics

更新日期:2024年09月11日

取值:[0|1]
默认值:0
说明:consumer 数据入库延时时间记录
设置此参数=1 表示让kafka consumer 记录每一个kafka 消息的时间戳,从该消息被
consumer 接收到开始计时,至该消息被正确同步到8A 数据落地计时结束,用户通
过这两个时间戳,能够了解到数据在consumer 环节的延迟时间。由于consumer 采
用批量提交的方式,
所以实际上采用的是本批次数据中第一条数据的接收时间和提
交完成时间。
此外,用户如果要核对同步结果的数据条数,对每次提交涉及的总数据量(Insert、
Delete、Update)做统计,统计方法与gcluster_kafka_result_check 的方法相同。
这些统计信息最终被记录到checkpoint table 里。
配置此参数=1 后,consumer 启动时会改变checkpoint table 表结构,如下所示:
gbase> select * from gclusterdb.kafka_consumer_offset_jx5;
+--------+----------------------+---------------------+---------------------+----------------+
| OFFSET | POS
| recieve_time
| commit_time
|
dml_count
|
+--------+----------------------+---------------------+---------------------+----------------+
|
0 | 00000000010000000600 | NULL
| NULL
| I:68,D:10,U:20 |
|
0 | 00000000010000000600 | 2019-09-17 15:54:00 | 2019-09-17 15:54:02 |
I:68,D:10,U:20 |

GBase 8a MPP Cluster 参数手册
文档版本2022-06-07
南大通用数据技术股份有限公司
14
|
1 | 00000000010000001100 | NULL
| NULL
| I:68,D:10,U:20 |
|
1 | 00000000010000001100 | 2019-09-17 15:54:02 | 2019-09-17 15:54:03 |
I:68,D:10,U:20 |
表结构一旦修改,即使在配置此参数=0,checkpoint table 依然保持当前表结构。
由于本批次数据的提交时间只有在提交完成后才能得到,
而checkpoint table 的数据
是在提交前就确定的,所以在提交完成、得到时间戳后,出于性能考虑,这个信息
需要在下一次提交再追加记录。因此用户stop consumer 的话,最后一次提交的时
间戳信息是无法记录的。
记录checkpoint table 是通过insert 操作来实现的,
随着提交次数的增加,
checkpoint
table 的数据量也越来越多,之前的设计是每当consumer 启动时,读取上次提交的
offset 后,
旧数据就不再有价值,
会自动对旧数据做清理,
只保留最大的一个offset。
当打开gcluster_kafka_consumer_latency_time_statistics,
因为用户需要追溯数据提交
情况,因此不能再用以前的机制,所以consumer 要根据时间信息定期淘汰旧数据,
目前设计是checkpoint table 里的数据保留7 天,
consumer 启动时记录最旧的时间戳,
每批提交完成后检查当前时间与这个时间戳的差值,
达到7 天就会淘汰与最旧时间
戳差值小于1 天的所有记录。
修改方式:
可使用set 语句修改值也可在配置文件中修改值。
适用于session、
global
范围均可。



sword GCILobRead2(
GCISvcCtx *svchp,
GCIError *errhp,
GCILobLocator *locp,
gbsub8 *byte_amtp,
gbsub8 *char_amtp,
gbsub8 offset,
void *bufp,
gbsub8 bufl,
ub1 piece,
void *ctxp,
GCICallbackLobRead2 (cbfp)
(void *ctxp,
const void *bufp,
gbsub8 lenp,
ub1 piecep
void **changed_bufpp,
gbsub8 *changed_lenp
),
ub2 csid,
ub1 csfrm

);

连续向前读取某个大对象中指定长度的内容。暂不支持大于4G的LOB对象读取。

参数
输入
/输出


svchp
输入/输出
上下文句柄指针
errhp
输入/输出
错误信息句柄,该接口调用失败时,错误信息会存在该句柄上
locp
输入
存储大字段描述符指针
byte_amtp
输入/输出
调用函数时,该参数表示想读取的字节数,函数执行完以后,
该参数为实际读到的字节数,对于 CLOB,仅在 char_amtp 为零
时使用
char_amtp
输入/输出
对于CLOB,该参数表示想读取的字节数,函数执行完以后,该
参数为实际读到的字节数
offset
输入
输入时,这是从LOB 值开始的绝对偏移。它是字节数。第一位
是1。
如果使用流式传输(通过轮询或回调),请在第一个调用中指
定偏移量;在随后的轮询调用中,将忽略offset 参数。使用回调
时,没有offset 参数
bufp
输入/输出
存放读到数据的缓冲区指针
bufl
输入
缓冲区的长度,以字节为单位指定
piece
输入
GCI_ONE_PIECE-呼叫永远不会进行轮询。如果指示的数量大
于缓冲区长度,则缓冲区将尽可能填充。要进行轮询,请
GCI_FIRST_PIECE 在第一次和GCI_NEXT_PIECE 以后的通话
中传递。GCI_FIRST_PIECE 应该在使用回调时传递

GBase 8s GCI 接口使用指南

南大通用数据技术股份有限公司 75 参数
输入
/输出


ctxp
输入
回调函数的上下文指针,可以为NULL
cbfp
输入
可以注册为每个片段调用的回调。如果是NULL,
GCI_NEED_DATA 则为每块返回。
回调函数必须返回GCI_CONTINUE 才能继续读取。如果返回
任何其他错误代码,则将终止LOB 读取。
回调采用以下参数:

ctxp(输入)
回调函数的上下文。可以NULL。

bufp (输入/输出)
片段的缓冲区指针。

lenp(输入)
bufp 指向的当前片段的长度(以字节为单位)

piecep (输入)
可能的值:GCI_FIRST_PIECE,GCI_NEXT_PIECE 或
GCI_LAST_PIECE。

changed_bufpp(输出)
保留参数,目前不使用(仅用于和Oracle 保持兼容)

changed_lenp(输出)
保留参数,目前不使用(仅用于和Oracle 保持兼容)
csid
输入
保留参数,目前不使用(仅用于和Oracle 保持兼容)
csfrm
输入
保留参数,目前不使用(仅用于和Oracle 保持兼容)

如果执行成功,返回GCI_SUCCESS,否则返回GCI_ERROR。

编写如下servlet,然后把项目部署到GlassFish 上。
package com.gbase.glassfish;


GBase UP 产品手册 6 应用开发指南
文档版本04(2021-04-21) 南大通用数据技术股份有限公司 1065
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.sql.DataSource;

public class TestServlet extends HttpServlet {

/**
*
*/
private static final long serialVersionUID = 1L;

/**
* The doGet method of the servlet.

*
* This method is called when a form has its tag value method equals to get.
*
* @param request the request send by the client to the server
* @param response the response send by the server to the client
* @throws ServletException if an error occurred
* @throws IOException if an error occurred
*/
public void doGet(HttpServletRequest request, HttpServletResponse response)


throws ServletException, IOException {

Connection conn = null;
ResultSet rs = null;
Statement st = null;
try {
Context ctx = new InitialContext();
DataSource ds = (DataSource) ctx.lookup("gbase2");
conn = ds.getConnection();
st = conn.createStatement();
rs = st.executeQuery("select id,name,age,department from emp");
while(rs.next()){

GBase UP 产品手册 6 应用开发指南
文档版本04(2021-04-21) 南大通用数据技术股份有限公司 1066
System.out.println("员工id:"+rs.getInt(1)+" 员工姓名:"+rs.getString(2)+"
员工年龄:"+rs.getInt(3)+" 员工部门:"+rs.getString(4));
}
} catch (Exception e) {
e.printStackTrace();
}finally{
if(rs != null){
try {
rs.close();
rs = null;
} catch (SQLException e) {
e.printStackTrace();
}
if(st != null){
try {
st.close();
st = null;
} catch (SQLException e) {
e.printStackTrace();
}
if(conn != null){
try {







conn.close();







conn = null;
} catch (SQLException e) {







e.printStackTrace();
}
}
}
}
}
}

/**
* The doPost method of the servlet.

*
* This method is called when a form has its tag value method equals to post.
*
* @param request the request send by the client to the server
* @param response the response send by the server to the client

GBase UP 产品手册 6 应用开发指南
文档版本04(2021-04-21) 南大通用数据技术股份有限公司 1067
* @throws ServletException if an error occurred
* @throws IOException if an error occurred
*/
public void doPost(HttpServletRequest request, HttpServletResponse response)


throws ServletException, IOException {

this.doGet(request, response);
}
}
web.xml 文件如下:


xmlns="http://java.sun.com/xml/ns/javaee"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://java.sun.com/xml/ns/javaee

http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">

This is the description of my J2EE component
This is the display name of my J2EE component
TestServlet
com.gbase.glassfish.TestServlet



TestServlet
/TestServlet


index.jsp


部署成功后,启动glassfish 服务器,访问如下url:
http://localhost:8088/TestGBase/TestServlet
该测试用例会打印出数据库为test 表为emp 中的内容。

GBase UP 产品手册 6 应用开发指南
文档版本04(2021-04-21) 南大通用数据技术股份有限公司 1068