Flink CDC 同步Oracle 到Doris操作过程

By | 2025-11-13

Oracle 11G 配置

1.启用归档日志(ARCHIVELOG)模式

检查当前模式

检查当前模式
-- 以SYSDBA身份登录
SQL> SELECT log_mode FROM v$database;

-- 如果显示NOARCHIVELOG,需要启用归档模式

启用归档模式

-- 1. 关闭数据库
SQL> SHUTDOWN IMMEDIATE;

-- 2. 启动到mount状态
SQL> STARTUP MOUNT;

-- 3. 启用归档模式
SQL> ALTER DATABASE ARCHIVELOG;

-- 4. 打开数据库
SQL> ALTER DATABASE OPEN;

-- 5. 验证
SQL> SELECT log_mode FROM v$database;
-- 应该显示: ARCHIVELOG

2. 配置补充日志(Supplemental Logging)

启用最小补充日志

-- 启用数据库级最小补充日志
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

-- 为CDC表启用主键/唯一键补充日志
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;

为同步的表启用标识键日志

-- 为每个需要同步的表执行(以您的SIMPLE_USER表为例)
SQL> ALTER TABLE B2B.USERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

-- 或者只启用主键补充日志(推荐)
SQL> ALTER TABLE B2B.USERS ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;

#建议使用
也可以考虑添加 ALL COLUMN 日志(捕获所有列变更):
ALTER TABLE B2B.USERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

3. 创建专用CDC用户

创建用户并授权

-- 创建CDC专用用户
SQL> CREATE USER flink_cdc IDENTIFIED BY "1234qwer";

-- 授予基本权限
SQL> GRANT CREATE SESSION TO flink_cdc;
SQL> GRANT SELECT ANY TABLE TO flink_cdc;
SQL> GRANT SELECT ANY TRANSACTION TO flink_cdc;

-- 授予CDC相关权限
SQL> GRANT EXECUTE_CATALOG_ROLE TO flink_cdc;
SQL> GRANT SELECT ON V_$DATABASE TO flink_cdc;
SQL> GRANT SELECT ON V_$LOG TO flink_cdc;
SQL> GRANT SELECT ON V_$LOGFILE TO flink_cdc;
SQL> GRANT SELECT ON V_$ARCHIVED_LOG TO flink_cdc;
SQL> GRANT SELECT ON V_$LOGMNR_CONTENTS TO flink_cdc;
SQL> GRANT SELECT ON V_$LOGMNR_LOGS TO flink_cdc;
SQL> GRANT SELECT ON V_$LOG_HISTORY TO flink_cdc;
SQL> GRANT SELECT ON V_$STANDBY_LOG TO flink_cdc;
SQL> GRANT SELECT ON V_$TRANSPORTABLE_PLATFORM TO flink_cdc;
SQL> GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flink_cdc;
SQL> GRANT SELECT ON V_$LOGMMR_SESSION TO flink_cdc; --失败
SQL> GRANT SELECT ON ALL_INDEXES TO flink_cdc;
SQL> GRANT SELECT ON ALL_CATALOG TO flink_cdc;
SQL> GRANT SELECT ON ALL_CONSTRAINTS TO flink_cdc;
SQL> GRANT SELECT ON ALL_OBJECTS TO flink_cdc;
SQL> GRANT SELECT ON ALL_TAB_COLS TO flink_cdc;
SQL> GRANT SELECT ON ALL_USERS TO flink_cdc;
SQL> GRANT SELECT ON ALL_ENCRYPTED_COLUMNS TO flink_cdc;

补充的权限
GRANT SELECT_CATALOG_ROLE TO flink_cdc;
GRANT EXECUTE ON SYS.DBMS_METADATA TO flink_cdc;
-- 授予对同步表的SELECT权限
SQL> GRANT SELECT ON B2B.USERS TO flink_cdc;
GRANT SELECT ANY TABLE TO flink_cdc;
GRANT SELECT_CATALOG_ROLE TO flink_cdc;

GRANT SELECT ON ALL_TAB_COLUMNS TO flink_cdc;
GRANT SELECT ON ALL_TABLES TO flink_cdc;
GRANT SELECT ON ALL_CONSTRAINTS TO flink_cdc;

GRANT EXECUTE_CATALOG_ROLE TO flink_cdc;
GRANT SELECT ANY TRANSACTION TO flink_cdc;
GRANT LOGMINING TO flink_cdc; --失败
GRANT SELECT ON V_$LOG TO flink_cdc;
GRANT SELECT ON V_$LOG_HISTORY TO flink_cdc;
GRANT SELECT ON V_$LOGFILE TO flink_cdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flink_cdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flink_cdc;

GRANT FLASHBACK ON B2B.USERS TO flink_cdc;
GRANT FLASHBACK ANY TABLE TO flink_cdc;

-- 支持 LogMiner 变更捕获
GRANT EXECUTE ON DBMS_LOGMNR TO flink_cdc;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flink_cdc;

-- 支持内部表创建(如 flush 表)
GRANT CREATE TABLE TO flink_cdc;
GRANT ALTER SESSION TO flink_cdc;

#ORA-01950: no privileges on tablespace 'USERS'表示 flink_cdc用户在 USERS表空间上没有足够的权限。
LogMiner需要创建一个临时表来存储日志挖掘的中间结果,但 flink_cdc用户没有在 USERS表空间上创建表的权限。
-- 授予USERS表空间的配额权限
ALTER USER flink_cdc QUOTA UNLIMITED ON USERS;
-- 或者授予默认表空间权限
ALTER USER flink_cdc DEFAULT TABLESPACE USERS;
-- 授予创建表的权限
GRANT CREATE TABLE TO flink_cdc;
-- 授予表空间使用权限
GRANT UNLIMITED TABLESPACE TO flink_cdc;

GRANT SELECT ON DBA_LOG_GROUPS TO flink_cdc;
GRANT SELECT ON DBA_LOG_GROUP_COLUMNS TO flink_cdc;

GRANT SELECT ANY DICTIONARY TO flink_cdc;

-- 3. 检查配置是否生效
SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all 
FROM v$DATABASE;
-- 启用全列级别的补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

#添加锁表权限
-- DBA 执行一次就行
GRANT SELECT ANY TABLE TO flink_cdc;
GRANT LOCK ANY TABLE TO flink_cdc;
GRANT SELECT ON DBA_TABLES TO flink_cdc;
GRANT SELECT ON DBA_TAB_PARTITIONS TO flink_cdc;
GRANT EXECUTE ON DBMS_LOGMNR TO flink_cdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flink_cdc;
GRANT SELECT ON V_$LOG TO flink_cdc;
GRANT SELECT ON V_$LOGFILE TO flink_cdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flink_cdc;

4. Oracle参数调整

检查关键参数

-- 检查日志相关参数
SQL> SHOW PARAMETER LOG_ARCHIVE_DEST;
SQL> SHOW PARAMETER LOG_ARCHIVE_FORMAT;
SQL> SHOW PARAMETER DB_RECOVERY_FILE_DEST_SIZE;

-- 检查LogMiner相关参数
SQL> SHOW PARAMETER UTL_FILE_DIR;

5. 验证配置

验证归档模式

SQL> SELECT log_mode, supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_ui 
     FROM v$database;

验证表级补充日志

SQL> SELECT owner, table_name, log_group_type 
     FROM dba_log_groups 
     WHERE owner = 'B2B' AND table_name = 'USER';

验证用户权限

SQL> SELECT * FROM dba_sys_privs WHERE grantee = 'FLINK_CDC';
SQL> SELECT * FROM dba_tab_privs WHERE grantee = 'FLINK_CDC';

doirs 中建表sql

CREATE TABLE IF NOT EXISTS b2b.USERS (
    USERID BIGINT NOT NULL,
    USERNAME STRING,
    USERSTATUS STRING,
    EMAIL STRING,
    MOBILE1 STRING,
    MOBILE2 STRING,
    OFFICETEL1 STRING,
    OFFICETEL2 STRING,
    EXPIRATIONTIME DATETIME,
    CATEGORYID STRING,
    CREATETIME DATE,
    QQ STRING,
    CHANNEL_TYPE STRING,
    REFEREE STRING,
    QQ2 STRING,
    SOURCE STRING,
    ORIGINAL_QQ STRING,
    ISAPP STRING
)
UNIQUE KEY(USERID)
DISTRIBUTED BY HASH(USERID) BUCKETS 10
PROPERTIES (
    "replication_num" = "3",
    "enable_unique_key_merge_on_write" = "true"
);

Flink sql client配置

同步oracle到doris配置实例

CREATE TABLE doris_simple_user_170 (
    id DECIMAL(20, 0),        -- 与 Oracle 保持一致
    name VARCHAR(100),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'doris',
    'fenodes' = '172.20.200.170:8030,172.20.200.170:8031,172.20.200.170:8032',
    'table.identifier' = 'app.simple_user',
    'username' = 'root',
    'password' = '1234qwer',

    -- 核心参数
    'sink.buffer-flush.max-rows' = '10000',
    'sink.buffer-flush.interval' = '10s',
    'sink.max-retries' = '3',

    -- 数据格式
    'sink.properties.format' = 'json',
    'sink.properties.read_json_by_line' = 'true'
);
CREATE TABLE oracle_simple_user_75 (
    ID DECIMAL(20, 0),
    NAME VARCHAR(100),
    PRIMARY KEY (ID) NOT ENFORCED
) WITH (
    'connector' = 'oracle-cdc',
    'hostname' = '172.20.200.75',
    'port' = '1521',
    'username' = 'APP',
    'password' = '1234qwer',
    'database-name' = 'RTTEST',
    'schema-name' = 'APP',
    'table-name' = 'SIMPLE_USER',
    'scan.startup.mode' = 'initial',
    'debezium.log.mining.strategy' = 'online_catalog',
    'debezium.log.mining.continuous.mine' = 'true',
    'debezium.database.connection.autocommit' = 'false',
    'debezium.database.connection.isolation.level' = 'read_uncommitted'
);

INSERT INTO doris_simple_user_170 SELECT ID, NAME FROM oracle_simple_user_75;

真实表同步实例

doris 中建表语句

CREATE TABLE b2b.USERS (
    USERID          BIGINT          NOT NULL,
    USERNAME        VARCHAR(100),
    USERSTATUS      CHAR(1),
    EMAIL           VARCHAR(100),
    MOBILE1         VARCHAR(50),
    MOBILE2         VARCHAR(50),
    OFFICETEL1      VARCHAR(50),
    OFFICETEL2      VARCHAR(50),
    EXPIRATIONTIME  DATETIME,
    CATEGORYID      CHAR(1),
    CREATETIME      VARCHAR(30)     COMMENT '创建时间字符串版',
    QQ              VARCHAR(100),
    CHANNEL_TYPE    VARCHAR(2),
    REFEREE         VARCHAR(100),
    QQ2             VARCHAR(100),
    SOURCE          VARCHAR(2),
    ORIGINAL_QQ     VARCHAR(100),
    ISAPP           CHAR(1)                  -- ← 最后一列,后面没逗号!
)
UNIQUE KEY(USERID)
DISTRIBUTED BY HASH(USERID) BUCKETS 10
PROPERTIES (
    "replication_num" = "3",
    "enable_unique_key_merge_on_write" = "true"
);
CREATE TABLE oracle_b2b_users (
    USERID DECIMAL(20, 0),
    USERNAME VARCHAR(100),
    USERSTATUS CHAR(1),
    EMAIL VARCHAR(100),
    MOBILE1 VARCHAR(50),
    MOBILE2 VARCHAR(50),
    OFFICETEL1 VARCHAR(50),
    OFFICETEL2 VARCHAR(50),
    EXPIRATIONTIME TIMESTAMP(6),
    CATEGORYID CHAR(1),
    CREATETIME STRING,  -- 改为STRING
    QQ VARCHAR(100),
    CHANNEL_TYPE VARCHAR(2),
    REFEREE VARCHAR(100),
    QQ2 VARCHAR(100),
    SOURCE VARCHAR(2),
    ORIGINAL_QQ VARCHAR(100),
    ISAPP CHAR(1),
    PRIMARY KEY (USERID) NOT ENFORCED
) WITH (
    'connector' = 'oracle-cdc',
    'hostname' = '192.168.114.133',
    'port' = '1521',
    'username' = 'flink_cdc',
    'password' = '1234qwer',
    'database-name' = 'BTOB',
    'schema-name' = 'B2B',
    'table-name' = 'USERS',
    'scan.startup.mode' = 'initial',
    'debezium.log.mining.strategy' = 'online_catalog',
    'debezium.log.mining.continuous.mine' = 'true',
    -- 添加日期异常处理
    -- 'debezium.log.mining.start.scn' = '281470000000000',
    -- 'debezium.snapshot.select.overrides' = '"B2B"."USERS"',
    'debezium.column.propagate.source.type' = '"B2B"."USERS"."CREATETIME"'
);
CREATE TABLE doris_b2b_users (
    USERID          DECIMAL(20, 0),
    USERNAME        VARCHAR(100),
    USERSTATUS      CHAR(1),
    EMAIL           VARCHAR(100),
    MOBILE1         VARCHAR(50),
    MOBILE2         VARCHAR(50),
    OFFICETEL1      VARCHAR(50),
    OFFICETEL2      VARCHAR(50),
    EXPIRATIONTIME  TIMESTAMP(6),
    CATEGORYID      CHAR(1),
    CREATETIME      STRING,
    QQ              VARCHAR(100),
    CHANNEL_TYPE    VARCHAR(2),
    REFEREE         VARCHAR(100),
    QQ2             VARCHAR(100),
    SOURCE          VARCHAR(2),
    ORIGINAL_QQ     VARCHAR(100),
    ISAPP           CHAR(1),

    PRIMARY KEY (USERID) NOT ENFORCED
) WITH (
    'connector'             = 'doris',
    'fenodes'                = '172.20.200.170:8030,172.20.200.170:8031,172.20.200.170:8032',
    'table.identifier'       = 'b2b.USERS', 
    'username'               = 'root',
    'password'               = '1234qwer',
    'sink.buffer-flush.max-rows' = '10000',
    'sink.buffer-flush.interval' = '10s',
    'sink.max-retries'           = '3',
    'sink.properties.format'          = 'json',
    'sink.properties.read_json_by_line' = 'true',
    'sink.enable-2pc'                 = 'true',
    'sink.enable-delete'              = 'true',
    'sink.batch.size'                 = '10000'
);

启动任务

INSERT INTO doris_b2b_users SELECT * FROM oracle_b2b_users;

测试库实例

1. doris建表

CREATE TABLE yq.T_PROD_COMP_SHELF (
    ID              BIGINT          NOT NULL COMMENT '主键',
    PROD_ID         BIGINT          NOT NULL COMMENT '商品ID',
    COMPANY_ID      BIGINT          NOT NULL COMMENT '公司ID',
    UP_SHELF_DATE   DATETIME(6)              COMMENT '上架时间',
    DOWN_SHELF_DATE DATETIME(6)              COMMENT '下架时间',
    STATUS          VARCHAR(32)              COMMENT '状态',
    CREATED_DATE    DATETIME(6)              COMMENT '创建时间',
    CREATOR_ID      BIGINT                   COMMENT '创建人ID',
    CREATOR_NAME    VARCHAR(32)              COMMENT '创建人姓名',
    MODI_DATE       DATETIME(6)              COMMENT '修改时间',
    MODIFIER_ID     BIGINT                   COMMENT '修改人ID',
    MODIFIER_NAME   VARCHAR(32)              COMMENT '修改人姓名',
    GRAIL_PRICE     DECIMAL(20,4)            COMMENT '价格',
    IS_TIGHT        VARCHAR(2)               COMMENT '是否紧俏',
    IS_FC           VARCHAR(2)               COMMENT '是否FC'
)
UNIQUE KEY(ID)                                   -- 主键就是 ID
DISTRIBUTED BY HASH(ID) BUCKETS 32               -- 52万行建议 32~64 个 bucket
PROPERTIES (
    "replication_num" = "3",
    "enable_unique_key_merge_on_write" = "true",
    "compression" = "zstd",
    "light_schema_change" = "true"
);
-- 删除旧表(防止残留)
DROP TABLE IF EXISTS oracle_prod_comp_shelf;

-- Oracle CDC 源表(有主键!)
CREATE TABLE oracle_prod_comp_shelf (
    ID              DECIMAL(12,0),
    PROD_ID         DECIMAL(12,0),
    COMPANY_ID      DECIMAL(12,0),
    UP_SHELF_DATE   TIMESTAMP(6),
    DOWN_SHELF_DATE TIMESTAMP(6),
    STATUS          STRING,
    CREATED_DATE    TIMESTAMP(6),
    CREATOR_ID      DECIMAL(12,0),
    CREATOR_NAME    STRING,
    MODI_DATE       TIMESTAMP(6),
    MODIFIER_ID     DECIMAL(12,0),
    MODIFIER_NAME   STRING,
    GRAIL_PRICE     DECIMAL(20,4),
    IS_TIGHT        STRING,
    IS_FC           STRING,

    PRIMARY KEY (ID) NOT ENFORCED                 -- 关键:声明主键
) WITH (
    'connector'                     = 'oracle-cdc',
    'hostname'                      = '172.20.200.39',           -- 改成你的 Oracle IP
    'port'                          = '1521',
    'username'                      = 'flink_cdc',
    'password'                      = '1234qwer',
    'database-name'                 = 'DZDZ38',                      -- 你的实例名
    'schema-name'                   = 'YQ',
    'table-name'                    = 'T_PROD_COMP_SHELF',

    -- 首次全量 + 增量(52万行预计 2~5 分钟)
    'scan.startup.mode'             = 'initial',
    'debezium.log.mining.strategy'  = 'online_catalog',
    'debezium.log.mining.continuous.mine' = 'true',
    'debezium.column.propagate.source.type' = '"DZDZ38"."YQ"."T_PROD_COMP_SHELF"'
);
#建表后查询建表状态,在flink sql client中查看
Flink SQL> select count(*) from oracle_prod_comp_shelf;
#图形化查看任务状态
http://172.20.200.170:8081/
-- 删除旧表
DROP TABLE IF EXISTS doris_prod_comp_shelf;

-- Doris Sink 表
CREATE TABLE doris_prod_comp_shelf (
    ID              DECIMAL(12,0),
    PROD_ID         DECIMAL(12,0),
    COMPANY_ID      DECIMAL(12,0),
    UP_SHELF_DATE   TIMESTAMP(6),
    DOWN_SHELF_DATE TIMESTAMP(6),
    STATUS          STRING,
    CREATED_DATE    TIMESTAMP(6),
    CREATOR_ID      DECIMAL(12,0),
    CREATOR_NAME    STRING,
    MODI_DATE       TIMESTAMP(6),
    MODIFIER_ID     DECIMAL(12,0),
    MODIFIER_NAME   STRING,
    GRAIL_PRICE     DECIMAL(20,4),
    IS_TIGHT        STRING,
    IS_FC           STRING,

    PRIMARY KEY (ID) NOT ENFORCED
) WITH (
    'connector'                     = 'doris',
    'fenodes'                       = '172.20.200.170:8030,172.20.200.170:8031,172.20.200.170:8032',
    'table.identifier'              = 'yq.T_PROD_COMP_SHELF',
    'username'                      = 'root',
    'password'                      = '1234qwer',

    'sink.buffer-flush.max-rows'    = '10000',
    'sink.buffer-flush.interval'    = '10s',
    'sink.max-retries'              = '3',

    'sink.properties.format'                = 'json',
    'sink.properties.read_json_by_line'     = 'true',

    'sink.enable-2pc'                       = 'true',     -- Exactly-Once
    'sink.enable-delete'                    = 'true',     -- 支持 DELETE
    'sink.label-prefix'                     = 'flink_prod_shelf_20251111'
);

4.启动任务

-- 调试时避免 5MB 内存限制(生产作业可省略)
SET 'state.checkpoint-storage' = 'filesystem';
SET 'state.checkpoints.dir' = 'file:///tmp/flink-checkpoints';

-- 一行启动(字段完全一致,可用 SELECT *)
INSERT INTO doris_prod_comp_shelf 
SELECT * FROM oracle_prod_comp_shelf;

5. 字段太多情况的处理(只处理部分字段)

# 第 1 步:保存为文件(必须分文件!不能直接粘贴多条)
cat > /opt/flink-sql-scripts/product_top35_safe.sql << 'EOF'
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.timeout' = '10min';
SET 'state.checkpoints.dir' = 'file:///opt/flink-checkpoints';
SET 'parallelism.default' = '8';
SET 'table.exec.source.cdc.snapshot.fetch.size' = '100000';
SET 'debezium.snapshot.fetch.size' = '100000';

DROP TABLE IF EXISTS oracle_product_top35_src;

CREATE TABLE oracle_product_top35_src (
  ID                  DECIMAL(12,0),
  PROD_CODE           STRING,
  PROD_MODL_ID        DECIMAL(12,0),
  SCALE               STRING,
  SEO_TITLE           STRING,
  NAME                STRING,
  TYPE_CODE           STRING,
  PROD_SERIES         STRING,
  COLOR_CODE          STRING,
  VOL_WIDTH           DECIMAL(20,8),
  VOL_HIGH            DECIMAL(20,8),
  VOL_LENGTH          DECIMAL(20,8),
  VOL                 DECIMAL(20,8),
  VOL_UNIT_CODE       STRING,
  ONE_BOX_NUM         INT,
  NET_WEIGHT          DECIMAL(20,8),
  GROSS_WEIGHT        DECIMAL(20,8),
  WEIGHT_UNIT_CODE    STRING,
  CURRENCY            STRING,
  PUR_UNIT_CODE       STRING,
  PUR_UNIT_NAME       STRING,
  INV_UNIT_CODE       STRING,
  INV_UNIT_NAME       STRING,
  MATERIAL_CODE       STRING,
  SMALL_IMG           STRING,
  BIG_IMG             STRING,
  STATUS              STRING,
  STATUS_REASON       STRING,
  IS_TAX_PROTECT      STRING,
  COST_PRICE          DECIMAL(20,8),
  PRICE_UNIT_CODE     STRING,
  PRICE               DECIMAL(20,8),
  MARKET_PRICE        DECIMAL(20,8),
  MIN_PRICE           DECIMAL(20,8),
  MAX_PRICE           DECIMAL(20,8)
) WITH (
  'connector' = 'oracle-cdc',
  'hostname' = '172.20.200.39',
  'port' = '1521',
  'username' = 'flink_cdc',
  'password' = '1234qwer',
  'database-name' = 'DZDZ38',
  'schema-name' = 'YQ',
  'table-name' = 'T_PRODUCT',
  'scan.startup.mode' = 'initial',
  'debezium.snapshot.locking.mode' = 'none',
  'debezium.log.mining.strategy' = 'online_catalog',
  'debezium.log.mining.continuous.mine' = 'true'
);

DROP TABLE IF EXISTS doris_top35;
CREATE TABLE doris_top35 (
  ID                  DECIMAL(12,0),
  PROD_CODE           STRING,
  PROD_MODL_ID        DECIMAL(12,0),
  SCALE               STRING,
  SEO_TITLE           STRING,
  NAME                STRING,
  TYPE_CODE           STRING,
  PROD_SERIES         STRING,
  COLOR_CODE          STRING,
  VOL_WIDTH           DECIMAL(20,8),
  VOL_HIGH            DECIMAL(20,8),
  VOL_LENGTH          DECIMAL(20,8),
  VOL                 DECIMAL(20,8),
  VOL_UNIT_CODE       STRING,
  ONE_BOX_NUM         INT,
  NET_WEIGHT          DECIMAL(20,8),
  GROSS_WEIGHT        DECIMAL(20,8),
  WEIGHT_UNIT_CODE    STRING,
  CURRENCY            STRING,
  PUR_UNIT_CODE       STRING,
  PUR_UNIT_NAME       STRING,
  INV_UNIT_CODE       STRING,
  INV_UNIT_NAME       STRING,
  MATERIAL_CODE       STRING,
  SMALL_IMG           STRING,
  BIG_IMG             STRING,
  STATUS              STRING,
  STATUS_REASON       STRING,
  IS_TAX_PROTECT      STRING,
  COST_PRICE          DECIMAL(20,8),
  PRICE_UNIT_CODE     STRING,
  PRICE               DECIMAL(20,8),
  MARKET_PRICE        DECIMAL(20,8),
  MIN_PRICE           DECIMAL(20,8),
  MAX_PRICE           DECIMAL(20,8)
) WITH (
  'connector' = 'doris',
  'fenodes' = '172.20.200.170:8030',
  'table.identifier' = 'yq.T_PRODUCT_TOP40',
  'username' = 'root',
  'password' = '1234qwer',
  'sink.label-prefix' = 'top35_safe_20251113',
  'sink.enable-2pc' = 'false',
  'sink.buffer-count' = '4',
  'sink.buffer-size' = '1mb'
);

INSERT INTO doris_top35 SELECT * FROM oracle_product_top35_src;
EOF
分类: IT 标签: