Oracle 11G 配置
1.启用归档日志(ARCHIVELOG)模式
检查当前模式
检查当前模式
-- 以SYSDBA身份登录
SQL> SELECT log_mode FROM v$database;
-- 如果显示NOARCHIVELOG,需要启用归档模式
启用归档模式
-- 1. 关闭数据库
SQL> SHUTDOWN IMMEDIATE;
-- 2. 启动到mount状态
SQL> STARTUP MOUNT;
-- 3. 启用归档模式
SQL> ALTER DATABASE ARCHIVELOG;
-- 4. 打开数据库
SQL> ALTER DATABASE OPEN;
-- 5. 验证
SQL> SELECT log_mode FROM v$database;
-- 应该显示: ARCHIVELOG
2. 配置补充日志(Supplemental Logging)
启用最小补充日志
-- 启用数据库级最小补充日志
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 为CDC表启用主键/唯一键补充日志
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;
为同步的表启用标识键日志
-- 为每个需要同步的表执行(以您的SIMPLE_USER表为例)
SQL> ALTER TABLE B2B.USERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 或者只启用主键补充日志(推荐)
SQL> ALTER TABLE B2B.USERS ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
#建议使用
也可以考虑添加 ALL COLUMN 日志(捕获所有列变更):
ALTER TABLE B2B.USERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
3. 创建专用CDC用户
创建用户并授权
-- 创建CDC专用用户
SQL> CREATE USER flink_cdc IDENTIFIED BY "1234qwer";
-- 授予基本权限
SQL> GRANT CREATE SESSION TO flink_cdc;
SQL> GRANT SELECT ANY TABLE TO flink_cdc;
SQL> GRANT SELECT ANY TRANSACTION TO flink_cdc;
-- 授予CDC相关权限
SQL> GRANT EXECUTE_CATALOG_ROLE TO flink_cdc;
SQL> GRANT SELECT ON V_$DATABASE TO flink_cdc;
SQL> GRANT SELECT ON V_$LOG TO flink_cdc;
SQL> GRANT SELECT ON V_$LOGFILE TO flink_cdc;
SQL> GRANT SELECT ON V_$ARCHIVED_LOG TO flink_cdc;
SQL> GRANT SELECT ON V_$LOGMNR_CONTENTS TO flink_cdc;
SQL> GRANT SELECT ON V_$LOGMNR_LOGS TO flink_cdc;
SQL> GRANT SELECT ON V_$LOG_HISTORY TO flink_cdc;
SQL> GRANT SELECT ON V_$STANDBY_LOG TO flink_cdc;
SQL> GRANT SELECT ON V_$TRANSPORTABLE_PLATFORM TO flink_cdc;
SQL> GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flink_cdc;
SQL> GRANT SELECT ON V_$LOGMMR_SESSION TO flink_cdc; --失败
SQL> GRANT SELECT ON ALL_INDEXES TO flink_cdc;
SQL> GRANT SELECT ON ALL_CATALOG TO flink_cdc;
SQL> GRANT SELECT ON ALL_CONSTRAINTS TO flink_cdc;
SQL> GRANT SELECT ON ALL_OBJECTS TO flink_cdc;
SQL> GRANT SELECT ON ALL_TAB_COLS TO flink_cdc;
SQL> GRANT SELECT ON ALL_USERS TO flink_cdc;
SQL> GRANT SELECT ON ALL_ENCRYPTED_COLUMNS TO flink_cdc;
补充的权限
GRANT SELECT_CATALOG_ROLE TO flink_cdc;
GRANT EXECUTE ON SYS.DBMS_METADATA TO flink_cdc;
-- 授予对同步表的SELECT权限
SQL> GRANT SELECT ON B2B.USERS TO flink_cdc;
GRANT SELECT ANY TABLE TO flink_cdc;
GRANT SELECT_CATALOG_ROLE TO flink_cdc;
GRANT SELECT ON ALL_TAB_COLUMNS TO flink_cdc;
GRANT SELECT ON ALL_TABLES TO flink_cdc;
GRANT SELECT ON ALL_CONSTRAINTS TO flink_cdc;
GRANT EXECUTE_CATALOG_ROLE TO flink_cdc;
GRANT SELECT ANY TRANSACTION TO flink_cdc;
GRANT LOGMINING TO flink_cdc; --失败
GRANT SELECT ON V_$LOG TO flink_cdc;
GRANT SELECT ON V_$LOG_HISTORY TO flink_cdc;
GRANT SELECT ON V_$LOGFILE TO flink_cdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flink_cdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flink_cdc;
GRANT FLASHBACK ON B2B.USERS TO flink_cdc;
GRANT FLASHBACK ANY TABLE TO flink_cdc;
-- 支持 LogMiner 变更捕获
GRANT EXECUTE ON DBMS_LOGMNR TO flink_cdc;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flink_cdc;
-- 支持内部表创建(如 flush 表)
GRANT CREATE TABLE TO flink_cdc;
GRANT ALTER SESSION TO flink_cdc;
#ORA-01950: no privileges on tablespace 'USERS'表示 flink_cdc用户在 USERS表空间上没有足够的权限。
LogMiner需要创建一个临时表来存储日志挖掘的中间结果,但 flink_cdc用户没有在 USERS表空间上创建表的权限。
-- 授予USERS表空间的配额权限
ALTER USER flink_cdc QUOTA UNLIMITED ON USERS;
-- 或者授予默认表空间权限
ALTER USER flink_cdc DEFAULT TABLESPACE USERS;
-- 授予创建表的权限
GRANT CREATE TABLE TO flink_cdc;
-- 授予表空间使用权限
GRANT UNLIMITED TABLESPACE TO flink_cdc;
GRANT SELECT ON DBA_LOG_GROUPS TO flink_cdc;
GRANT SELECT ON DBA_LOG_GROUP_COLUMNS TO flink_cdc;
GRANT SELECT ANY DICTIONARY TO flink_cdc;
-- 3. 检查配置是否生效
SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all
FROM v$DATABASE;
-- 启用全列级别的补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
#添加锁表权限
-- DBA 执行一次就行
GRANT SELECT ANY TABLE TO flink_cdc;
GRANT LOCK ANY TABLE TO flink_cdc;
GRANT SELECT ON DBA_TABLES TO flink_cdc;
GRANT SELECT ON DBA_TAB_PARTITIONS TO flink_cdc;
GRANT EXECUTE ON DBMS_LOGMNR TO flink_cdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flink_cdc;
GRANT SELECT ON V_$LOG TO flink_cdc;
GRANT SELECT ON V_$LOGFILE TO flink_cdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flink_cdc;
4. Oracle参数调整
检查关键参数
-- 检查日志相关参数
SQL> SHOW PARAMETER LOG_ARCHIVE_DEST;
SQL> SHOW PARAMETER LOG_ARCHIVE_FORMAT;
SQL> SHOW PARAMETER DB_RECOVERY_FILE_DEST_SIZE;
-- 检查LogMiner相关参数
SQL> SHOW PARAMETER UTL_FILE_DIR;
5. 验证配置
验证归档模式
SQL> SELECT log_mode, supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_ui
FROM v$database;
验证表级补充日志
SQL> SELECT owner, table_name, log_group_type
FROM dba_log_groups
WHERE owner = 'B2B' AND table_name = 'USER';
验证用户权限
SQL> SELECT * FROM dba_sys_privs WHERE grantee = 'FLINK_CDC';
SQL> SELECT * FROM dba_tab_privs WHERE grantee = 'FLINK_CDC';
doirs 中建表sql
CREATE TABLE IF NOT EXISTS b2b.USERS (
USERID BIGINT NOT NULL,
USERNAME STRING,
USERSTATUS STRING,
EMAIL STRING,
MOBILE1 STRING,
MOBILE2 STRING,
OFFICETEL1 STRING,
OFFICETEL2 STRING,
EXPIRATIONTIME DATETIME,
CATEGORYID STRING,
CREATETIME DATE,
QQ STRING,
CHANNEL_TYPE STRING,
REFEREE STRING,
QQ2 STRING,
SOURCE STRING,
ORIGINAL_QQ STRING,
ISAPP STRING
)
UNIQUE KEY(USERID)
DISTRIBUTED BY HASH(USERID) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"enable_unique_key_merge_on_write" = "true"
);
Flink sql client配置
同步oracle到doris配置实例
1.创建Flink 目标表
CREATE TABLE doris_simple_user_170 (
id DECIMAL(20, 0), -- 与 Oracle 保持一致
name VARCHAR(100),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = '172.20.200.170:8030,172.20.200.170:8031,172.20.200.170:8032',
'table.identifier' = 'app.simple_user',
'username' = 'root',
'password' = '1234qwer',
-- 核心参数
'sink.buffer-flush.max-rows' = '10000',
'sink.buffer-flush.interval' = '10s',
'sink.max-retries' = '3',
-- 数据格式
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
);
2.创建Flink 源表
CREATE TABLE oracle_simple_user_75 (
ID DECIMAL(20, 0),
NAME VARCHAR(100),
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '172.20.200.75',
'port' = '1521',
'username' = 'APP',
'password' = '1234qwer',
'database-name' = 'RTTEST',
'schema-name' = 'APP',
'table-name' = 'SIMPLE_USER',
'scan.startup.mode' = 'initial',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true',
'debezium.database.connection.autocommit' = 'false',
'debezium.database.connection.isolation.level' = 'read_uncommitted'
);
3.创建Flink 任务
INSERT INTO doris_simple_user_170 SELECT ID, NAME FROM oracle_simple_user_75;
真实表同步实例
doris 中建表语句
CREATE TABLE b2b.USERS (
USERID BIGINT NOT NULL,
USERNAME VARCHAR(100),
USERSTATUS CHAR(1),
EMAIL VARCHAR(100),
MOBILE1 VARCHAR(50),
MOBILE2 VARCHAR(50),
OFFICETEL1 VARCHAR(50),
OFFICETEL2 VARCHAR(50),
EXPIRATIONTIME DATETIME,
CATEGORYID CHAR(1),
CREATETIME VARCHAR(30) COMMENT '创建时间字符串版',
QQ VARCHAR(100),
CHANNEL_TYPE VARCHAR(2),
REFEREE VARCHAR(100),
QQ2 VARCHAR(100),
SOURCE VARCHAR(2),
ORIGINAL_QQ VARCHAR(100),
ISAPP CHAR(1) -- ← 最后一列,后面没逗号!
)
UNIQUE KEY(USERID)
DISTRIBUTED BY HASH(USERID) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"enable_unique_key_merge_on_write" = "true"
);
flink中 oralce USERS 表建表语句
CREATE TABLE oracle_b2b_users (
USERID DECIMAL(20, 0),
USERNAME VARCHAR(100),
USERSTATUS CHAR(1),
EMAIL VARCHAR(100),
MOBILE1 VARCHAR(50),
MOBILE2 VARCHAR(50),
OFFICETEL1 VARCHAR(50),
OFFICETEL2 VARCHAR(50),
EXPIRATIONTIME TIMESTAMP(6),
CATEGORYID CHAR(1),
CREATETIME STRING, -- 改为STRING
QQ VARCHAR(100),
CHANNEL_TYPE VARCHAR(2),
REFEREE VARCHAR(100),
QQ2 VARCHAR(100),
SOURCE VARCHAR(2),
ORIGINAL_QQ VARCHAR(100),
ISAPP CHAR(1),
PRIMARY KEY (USERID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '192.168.114.133',
'port' = '1521',
'username' = 'flink_cdc',
'password' = '1234qwer',
'database-name' = 'BTOB',
'schema-name' = 'B2B',
'table-name' = 'USERS',
'scan.startup.mode' = 'initial',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true',
-- 添加日期异常处理
-- 'debezium.log.mining.start.scn' = '281470000000000',
-- 'debezium.snapshot.select.overrides' = '"B2B"."USERS"',
'debezium.column.propagate.source.type' = '"B2B"."USERS"."CREATETIME"'
);
flink中 doris USERS 表建表语句
CREATE TABLE doris_b2b_users (
USERID DECIMAL(20, 0),
USERNAME VARCHAR(100),
USERSTATUS CHAR(1),
EMAIL VARCHAR(100),
MOBILE1 VARCHAR(50),
MOBILE2 VARCHAR(50),
OFFICETEL1 VARCHAR(50),
OFFICETEL2 VARCHAR(50),
EXPIRATIONTIME TIMESTAMP(6),
CATEGORYID CHAR(1),
CREATETIME STRING,
QQ VARCHAR(100),
CHANNEL_TYPE VARCHAR(2),
REFEREE VARCHAR(100),
QQ2 VARCHAR(100),
SOURCE VARCHAR(2),
ORIGINAL_QQ VARCHAR(100),
ISAPP CHAR(1),
PRIMARY KEY (USERID) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = '172.20.200.170:8030,172.20.200.170:8031,172.20.200.170:8032',
'table.identifier' = 'b2b.USERS',
'username' = 'root',
'password' = '1234qwer',
'sink.buffer-flush.max-rows' = '10000',
'sink.buffer-flush.interval' = '10s',
'sink.max-retries' = '3',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-2pc' = 'true',
'sink.enable-delete' = 'true',
'sink.batch.size' = '10000'
);
启动任务
INSERT INTO doris_b2b_users SELECT * FROM oracle_b2b_users;
测试库实例
1. doris建表
CREATE TABLE yq.T_PROD_COMP_SHELF (
ID BIGINT NOT NULL COMMENT '主键',
PROD_ID BIGINT NOT NULL COMMENT '商品ID',
COMPANY_ID BIGINT NOT NULL COMMENT '公司ID',
UP_SHELF_DATE DATETIME(6) COMMENT '上架时间',
DOWN_SHELF_DATE DATETIME(6) COMMENT '下架时间',
STATUS VARCHAR(32) COMMENT '状态',
CREATED_DATE DATETIME(6) COMMENT '创建时间',
CREATOR_ID BIGINT COMMENT '创建人ID',
CREATOR_NAME VARCHAR(32) COMMENT '创建人姓名',
MODI_DATE DATETIME(6) COMMENT '修改时间',
MODIFIER_ID BIGINT COMMENT '修改人ID',
MODIFIER_NAME VARCHAR(32) COMMENT '修改人姓名',
GRAIL_PRICE DECIMAL(20,4) COMMENT '价格',
IS_TIGHT VARCHAR(2) COMMENT '是否紧俏',
IS_FC VARCHAR(2) COMMENT '是否FC'
)
UNIQUE KEY(ID) -- 主键就是 ID
DISTRIBUTED BY HASH(ID) BUCKETS 32 -- 52万行建议 32~64 个 bucket
PROPERTIES (
"replication_num" = "3",
"enable_unique_key_merge_on_write" = "true",
"compression" = "zstd",
"light_schema_change" = "true"
);
2.flink sql client 创建oracle 源表
-- 删除旧表(防止残留)
DROP TABLE IF EXISTS oracle_prod_comp_shelf;
-- Oracle CDC 源表(有主键!)
CREATE TABLE oracle_prod_comp_shelf (
ID DECIMAL(12,0),
PROD_ID DECIMAL(12,0),
COMPANY_ID DECIMAL(12,0),
UP_SHELF_DATE TIMESTAMP(6),
DOWN_SHELF_DATE TIMESTAMP(6),
STATUS STRING,
CREATED_DATE TIMESTAMP(6),
CREATOR_ID DECIMAL(12,0),
CREATOR_NAME STRING,
MODI_DATE TIMESTAMP(6),
MODIFIER_ID DECIMAL(12,0),
MODIFIER_NAME STRING,
GRAIL_PRICE DECIMAL(20,4),
IS_TIGHT STRING,
IS_FC STRING,
PRIMARY KEY (ID) NOT ENFORCED -- 关键:声明主键
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '172.20.200.39', -- 改成你的 Oracle IP
'port' = '1521',
'username' = 'flink_cdc',
'password' = '1234qwer',
'database-name' = 'DZDZ38', -- 你的实例名
'schema-name' = 'YQ',
'table-name' = 'T_PROD_COMP_SHELF',
-- 首次全量 + 增量(52万行预计 2~5 分钟)
'scan.startup.mode' = 'initial',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true',
'debezium.column.propagate.source.type' = '"DZDZ38"."YQ"."T_PROD_COMP_SHELF"'
);
#建表后查询建表状态,在flink sql client中查看
Flink SQL> select count(*) from oracle_prod_comp_shelf;
#图形化查看任务状态
http://172.20.200.170:8081/
3. Flink SQL Client 创建 Sink 表(Doris)
-- 删除旧表
DROP TABLE IF EXISTS doris_prod_comp_shelf;
-- Doris Sink 表
CREATE TABLE doris_prod_comp_shelf (
ID DECIMAL(12,0),
PROD_ID DECIMAL(12,0),
COMPANY_ID DECIMAL(12,0),
UP_SHELF_DATE TIMESTAMP(6),
DOWN_SHELF_DATE TIMESTAMP(6),
STATUS STRING,
CREATED_DATE TIMESTAMP(6),
CREATOR_ID DECIMAL(12,0),
CREATOR_NAME STRING,
MODI_DATE TIMESTAMP(6),
MODIFIER_ID DECIMAL(12,0),
MODIFIER_NAME STRING,
GRAIL_PRICE DECIMAL(20,4),
IS_TIGHT STRING,
IS_FC STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = '172.20.200.170:8030,172.20.200.170:8031,172.20.200.170:8032',
'table.identifier' = 'yq.T_PROD_COMP_SHELF',
'username' = 'root',
'password' = '1234qwer',
'sink.buffer-flush.max-rows' = '10000',
'sink.buffer-flush.interval' = '10s',
'sink.max-retries' = '3',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-2pc' = 'true', -- Exactly-Once
'sink.enable-delete' = 'true', -- 支持 DELETE
'sink.label-prefix' = 'flink_prod_shelf_20251111'
);
4.启动任务
-- 调试时避免 5MB 内存限制(生产作业可省略)
SET 'state.checkpoint-storage' = 'filesystem';
SET 'state.checkpoints.dir' = 'file:///tmp/flink-checkpoints';
-- 一行启动(字段完全一致,可用 SELECT *)
INSERT INTO doris_prod_comp_shelf
SELECT * FROM oracle_prod_comp_shelf;
5. 字段太多情况的处理(只处理部分字段)
# 第 1 步:保存为文件(必须分文件!不能直接粘贴多条)
cat > /opt/flink-sql-scripts/product_top35_safe.sql << 'EOF'
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.timeout' = '10min';
SET 'state.checkpoints.dir' = 'file:///opt/flink-checkpoints';
SET 'parallelism.default' = '8';
SET 'table.exec.source.cdc.snapshot.fetch.size' = '100000';
SET 'debezium.snapshot.fetch.size' = '100000';
DROP TABLE IF EXISTS oracle_product_top35_src;
CREATE TABLE oracle_product_top35_src (
ID DECIMAL(12,0),
PROD_CODE STRING,
PROD_MODL_ID DECIMAL(12,0),
SCALE STRING,
SEO_TITLE STRING,
NAME STRING,
TYPE_CODE STRING,
PROD_SERIES STRING,
COLOR_CODE STRING,
VOL_WIDTH DECIMAL(20,8),
VOL_HIGH DECIMAL(20,8),
VOL_LENGTH DECIMAL(20,8),
VOL DECIMAL(20,8),
VOL_UNIT_CODE STRING,
ONE_BOX_NUM INT,
NET_WEIGHT DECIMAL(20,8),
GROSS_WEIGHT DECIMAL(20,8),
WEIGHT_UNIT_CODE STRING,
CURRENCY STRING,
PUR_UNIT_CODE STRING,
PUR_UNIT_NAME STRING,
INV_UNIT_CODE STRING,
INV_UNIT_NAME STRING,
MATERIAL_CODE STRING,
SMALL_IMG STRING,
BIG_IMG STRING,
STATUS STRING,
STATUS_REASON STRING,
IS_TAX_PROTECT STRING,
COST_PRICE DECIMAL(20,8),
PRICE_UNIT_CODE STRING,
PRICE DECIMAL(20,8),
MARKET_PRICE DECIMAL(20,8),
MIN_PRICE DECIMAL(20,8),
MAX_PRICE DECIMAL(20,8)
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '172.20.200.39',
'port' = '1521',
'username' = 'flink_cdc',
'password' = '1234qwer',
'database-name' = 'DZDZ38',
'schema-name' = 'YQ',
'table-name' = 'T_PRODUCT',
'scan.startup.mode' = 'initial',
'debezium.snapshot.locking.mode' = 'none',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'
);
DROP TABLE IF EXISTS doris_top35;
CREATE TABLE doris_top35 (
ID DECIMAL(12,0),
PROD_CODE STRING,
PROD_MODL_ID DECIMAL(12,0),
SCALE STRING,
SEO_TITLE STRING,
NAME STRING,
TYPE_CODE STRING,
PROD_SERIES STRING,
COLOR_CODE STRING,
VOL_WIDTH DECIMAL(20,8),
VOL_HIGH DECIMAL(20,8),
VOL_LENGTH DECIMAL(20,8),
VOL DECIMAL(20,8),
VOL_UNIT_CODE STRING,
ONE_BOX_NUM INT,
NET_WEIGHT DECIMAL(20,8),
GROSS_WEIGHT DECIMAL(20,8),
WEIGHT_UNIT_CODE STRING,
CURRENCY STRING,
PUR_UNIT_CODE STRING,
PUR_UNIT_NAME STRING,
INV_UNIT_CODE STRING,
INV_UNIT_NAME STRING,
MATERIAL_CODE STRING,
SMALL_IMG STRING,
BIG_IMG STRING,
STATUS STRING,
STATUS_REASON STRING,
IS_TAX_PROTECT STRING,
COST_PRICE DECIMAL(20,8),
PRICE_UNIT_CODE STRING,
PRICE DECIMAL(20,8),
MARKET_PRICE DECIMAL(20,8),
MIN_PRICE DECIMAL(20,8),
MAX_PRICE DECIMAL(20,8)
) WITH (
'connector' = 'doris',
'fenodes' = '172.20.200.170:8030',
'table.identifier' = 'yq.T_PRODUCT_TOP40',
'username' = 'root',
'password' = '1234qwer',
'sink.label-prefix' = 'top35_safe_20251113',
'sink.enable-2pc' = 'false',
'sink.buffer-count' = '4',
'sink.buffer-size' = '1mb'
);
INSERT INTO doris_top35 SELECT * FROM oracle_product_top35_src;
EOF