目录
Datax 使用教程
/    

Datax 使用教程

Datax 使用教程

用户向导:https://github.com/alibaba/DataX/blob/master/userGuid.md

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

常用使用示例

1、Clickhouse 同步到 Elasticsearch

教程地址https://www.shangmayuan.com/a/f52f63deed0a4e2fbbfaa64b.html

**1.1 ** 配置一个从 Clickhouse 数据库同步抽取数据到本地的作业

datax 脚本

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "clickhousereader",
                    "parameter": {
                        "username": "aaa",
                        "password": "123456",
                        "column": [
                            "account_id",
                            "account_no",
                            "app_name",
                            "id",
                            "content"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
                                    "jdbc:clickhouse://127.0.0.1:8123/default"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true
                    }
                }
            }
        ]
    }
}

**1.2 ** 配置一个自定义 SQL 的数据库同步任务到本地内容的作业

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "clickhousereader",
                    "parameter": {
                        "username": "aaa",
                        "password": "bbb",
                        "connection": [
                            {
                                "querySql": [
                                    "select account_id as accountId, account_no as accountNo, app_name as appName, id as dataId, content as nick, toUnixTimestamp(now()) as orderTimeLong from yx_device_event_info"
                                ],
                                "jdbcUrl": [
                                    "jdbc:clickhouse://192.168.2.181:8123/ph_eeanalysis_dev"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "http://192.168.167.244:9200",
                        "index": "yx-search-chat-other-datax",
                        "type": "_doc",
                        "cleanup": "false",
                        "batchSize": 2000,
                        "encoding": "UTF-8",
                        // 这里使用了自定义分词
                        "settings": {
                            "index": {
                                "refresh_interval": "30s",
                                "number_of_shards": "2",
                                "provided_name": "yx-search-chat-other-datax",
                                "analysis": {
                                    "analyzer": {
                                        "my_custom_analyzer": {
                                            "type": "custom",
                                            "char_filter": [
                                                "html_strip"
                                            ],
                                            "tokenizer": "ik_max_word"
                                        }
                                    }
                                },
                                "number_of_replicas": "1",
                            }
                        },
                        "column": [
                            {
                                "name": "accountId",
                                "type": "keyword"
                            },
                            {
                                "name": "accountNo",
                                "type": "keyword"
                            },
                            {
                                "name": "appName",
                                "type": "keyword"
                            },
                            {
                                "name": "dataId",
                                "type": "keyword"
                            },
                            {
                                "name": "nick",
                                "type": "keyword"
                            },
                            {
                                "name": "orderTimeLong",
                                "type": "long"
                            }
                        ]
                    }
                }
            }
        ]
    }
}

需要注意的问题:

1、默认情况下,下载官方提供编译好的的datax 没有 clickhousereaderelasticsearchwrite 插件,需要我们编译打包源代码,然后将 target/datax 目录下的 copy 到 datax 的 plugns/reader、writer 目录下

2、官方提供的源码是没有 clickhousereader 的,参考:https://blog.csdn.net/yuuuu1214/article/details/118978353rdbmsreader 改造成 clickhousereader

3、elasticsearchwriter 默认开启了认证,如果我们环境的 es 没有设置密码,需要修改源码 com.alibaba.datax.plugin.writer.elasticsearchwriter.ESClient 删除代码 .setPreemptiveAuth(new HttpHost(endpoint))

通过预先填充认证数据缓存,为指定的目标主机设置抢占式认证。必须设置凭据提供者来使用抢占式身份验证。抢先式身份验证如果没有设置凭据提供程序,将引发异常。

public Builder setPreemptiveAuth(HttpHost targetHost) {
    return preemptiveAuthTargetHosts(Collections.singleton(targetHost));
}

4、还有个重要的问题,如果我们的索引属性设置的复杂些,elasticsearchwriter 就会篡改我们的 索引结构。例如我加了 copy_to,数据同步之后,索引中的就把 copy_to 去掉了。解决办法:修改源代码 com.alibaba.datax.plugin.writer.elasticsearchwriter.ESWriter 将下面这段代码注释掉,禁止自动创建索引。

// 强制创建,内部自动忽略已存在的情况
if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
	throw new IOException("create index or mapping failed");
}

2、csv 同步到 Clickhouse

脚本参考

因为是读取csv,可以直接使用 txtfilereader

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": [
                            "/opt/xxx.csv"
                        ],
                        "column": [
                            {
                                "name": "id",
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "name": "xxx",
                                "index": 1,
                                "type": "string"
                            },
                            {
                                "name": "xx",
                                "index": 2,
                                "type": "string"
                            },
                            {
                                "name": "xxxx",
                                "index": 3,
                                "type": "string"
                            }
                        ],
                        "encoding": "UTF-8",
                        "csvReaderConfig": {
                            // 这个就是 列 分隔符,对于非结构化数据,可以时间戳,防止杂乱的数据干扰
                            "multiDelimiter": "1642123847196",
                            // 这个就是 行 分隔符,识别这个 为一行
                            "multiRecordDelimiter": "1642123859380",
                            "safetySwitch": false,
                            "trimWhitespace": false,
                            "useComments": false,
                            "useTextQualifier": false,
                            "disableEscape": true
                        }
                    }
                },
                "writer": {
                    "name": "clickhousewriter",
                    "parameter": {
                        "username": "11",
                        "password": "111",
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:clickhouse://111:8123/222",
                                "table": [
                                    "t_xxxx_sss"
                                ]
                            }
                        ],
                        "column": [
                            "id",
                            "xxx",
                            "xx",
                            "xxxx"
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 5,
                "batchSize": 32
            }
        }
    }
}

3、OSS 同步到 Ads

{
	"job": {
		"content": [
			{
				"reader": {
					"name": "ossreader",
					"parameter": {
						"endpoint": "http://ossaddress",
						"accessId": "accessId",
						"accessKey": "accessKey",
						"bucket": "o-bucket-name",
						"object": [
							"event/target_file.csv"
						],
						"nullFormat": "",
						"column": [
							{
								"name": "id",
								"index": 0,
								"type": "string"
							},
							{
								"name": "aaa",
								"index": 1,
								"type": "string"
							},
							{
								"name": "bbb",
								"index": 2,
								"type": "string"
							}
						],
						"encoding": "UTF-8",
						"csvReaderConfig": {
							"multiDelimiter": "\b\b\b\t\t",
							"multiRecordDelimiter": "\f\f\f\n\r",
							"safetySwitch": false,
							"trimWhitespace": false,
							"useComments": false,
							"useTextQualifier": false,
							"disableEscape": true
						}
					}
				},
				"writer": {
					"name": "clickhousewriter",
					"parameter": {
						"writeMode": "insert",
						"url": "192.168.2.181:8123",
						"schema": "default",
						"table": "111",
						"username": "111",
						"password": "111",
						"column": [
							"id",
							"aaa",
							"bbb",
						]
					}
				}
			}
		],
		"setting": {
			"speed": {
				"channel": 5,
				"batchSize": 32
			}
		}
	}
}

4、OSS 同步到 odps

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "ossreader",
                    "parameter": {
                        "fieldDelimiter": "\b\b\b\t\t",
                        "endpoint": "http://oss-ccom",
                        "accessId": "accessId",
                        "accessKey": "accessId",
                        "bucket": "o-bucket",
                        "object": [
                            "olddata\\/aaa.csv__a88c7c5db0124bd2a7c9dbeecc4246b4"
                        ],
                        "nullFormat": "",
                        "column": [
                            "*"
                        ],
                        "encoding": "UTF-8",
                        "csvReaderConfig": {
                            "multiDelimiter": "\b\b\b\t\t",
                            "multiRecordDelimiter": "\f\f\f\n\r",
                            "safetySwitch": false,
                            "trimWhitespace": false,
                            "useComments": false,
                            "useTextQualifier": false,
                            "disableEscape": true
                        }
                    }
                },
                "writer": {
                    "name": "odpswriter",
                    "parameter": {
                        "project": "project_name",
                        "table": "t_xxxx",
                        "partition": "ds=19900101",
                        "column": [
                            "*"
                        ],
                        "accessId": "accessId",
                        "accessKey": "accessId",
                        "truncate": true,
                        "odpsServer": "http://odps/api",
                        "tunnelServer": "http://tunnelServer",
                        "accountType": "aliyun",
                        "visible": false,
                        "encoding": "UTF-8",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 5,
                "batchSize": 32
            }
        }
    }
}

插件开发

// TODO


标题:Datax 使用教程
作者:gitsilence
地址:https://blog.lacknb.cn/articles/2022/02/09/1644401254488.html