用户向导:https://github.com/alibaba/DataX/blob/master/userGuid.md
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
教程地址:https://www.shangmayuan.com/a/f52f63deed0a4e2fbbfaa64b.html
**1.1 ** 配置一个从 Clickhouse 数据库同步抽取数据到本地的作业
datax 脚本
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "aaa",
"password": "123456",
"column": [
"account_id",
"account_no",
"app_name",
"id",
"content"
],
"splitPk": "id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:clickhouse://127.0.0.1:8123/default"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
**1.2 ** 配置一个自定义 SQL 的数据库同步任务到本地内容的作业
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "aaa",
"password": "bbb",
"connection": [
{
"querySql": [
"select account_id as accountId, account_no as accountNo, app_name as appName, id as dataId, content as nick, toUnixTimestamp(now()) as orderTimeLong from yx_device_event_info"
],
"jdbcUrl": [
"jdbc:clickhouse://192.168.2.181:8123/ph_eeanalysis_dev"
]
}
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://192.168.167.244:9200",
"index": "yx-search-chat-other-datax",
"type": "_doc",
"cleanup": "false",
"batchSize": 2000,
"encoding": "UTF-8",
// 这里使用了自定义分词
"settings": {
"index": {
"refresh_interval": "30s",
"number_of_shards": "2",
"provided_name": "yx-search-chat-other-datax",
"analysis": {
"analyzer": {
"my_custom_analyzer": {
"type": "custom",
"char_filter": [
"html_strip"
],
"tokenizer": "ik_max_word"
}
}
},
"number_of_replicas": "1",
}
},
"column": [
{
"name": "accountId",
"type": "keyword"
},
{
"name": "accountNo",
"type": "keyword"
},
{
"name": "appName",
"type": "keyword"
},
{
"name": "dataId",
"type": "keyword"
},
{
"name": "nick",
"type": "keyword"
},
{
"name": "orderTimeLong",
"type": "long"
}
]
}
}
}
]
}
}
需要注意的问题:
1、默认情况下,下载官方提供编译好的的datax 没有 clickhousereader
和 elasticsearchwrite
插件,需要我们编译打包源代码,然后将 target/datax
目录下的 copy 到 datax 的 plugns/reader、writer
目录下
2、官方提供的源码是没有 clickhousereader
的,参考:https://blog.csdn.net/yuuuu1214/article/details/118978353 将 rdbmsreader
改造成 clickhousereader
3、elasticsearchwriter
默认开启了认证,如果我们环境的 es
没有设置密码,需要修改源码 com.alibaba.datax.plugin.writer.elasticsearchwriter.ESClient
删除代码 .setPreemptiveAuth(new HttpHost(endpoint))
通过预先填充认证数据缓存,为指定的目标主机设置抢占式认证。必须设置凭据提供者来使用抢占式身份验证。抢先式身份验证如果没有设置凭据提供程序,将引发异常。
public Builder setPreemptiveAuth(HttpHost targetHost) { return preemptiveAuthTargetHosts(Collections.singleton(targetHost)); }
4、还有个重要的问题,如果我们的索引属性设置的复杂些,elasticsearchwriter
就会篡改我们的 索引结构。例如我加了 copy_to
,数据同步之后,索引中的就把 copy_to
去掉了。解决办法:修改源代码 com.alibaba.datax.plugin.writer.elasticsearchwriter.ESWriter
将下面这段代码注释掉,禁止自动创建索引。
// 强制创建,内部自动忽略已存在的情况 if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) { throw new IOException("create index or mapping failed"); }
脚本参考
因为是读取csv,可以直接使用
txtfilereader
{
"job": {
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": [
"/opt/xxx.csv"
],
"column": [
{
"name": "id",
"index": 0,
"type": "string"
},
{
"name": "xxx",
"index": 1,
"type": "string"
},
{
"name": "xx",
"index": 2,
"type": "string"
},
{
"name": "xxxx",
"index": 3,
"type": "string"
}
],
"encoding": "UTF-8",
"csvReaderConfig": {
// 这个就是 列 分隔符,对于非结构化数据,可以时间戳,防止杂乱的数据干扰
"multiDelimiter": "1642123847196",
// 这个就是 行 分隔符,识别这个 为一行
"multiRecordDelimiter": "1642123859380",
"safetySwitch": false,
"trimWhitespace": false,
"useComments": false,
"useTextQualifier": false,
"disableEscape": true
}
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"username": "11",
"password": "111",
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://111:8123/222",
"table": [
"t_xxxx_sss"
]
}
],
"column": [
"id",
"xxx",
"xx",
"xxxx"
]
}
}
}
],
"setting": {
"speed": {
"channel": 5,
"batchSize": 32
}
}
}
}
{
"job": {
"content": [
{
"reader": {
"name": "ossreader",
"parameter": {
"endpoint": "http://ossaddress",
"accessId": "accessId",
"accessKey": "accessKey",
"bucket": "o-bucket-name",
"object": [
"event/target_file.csv"
],
"nullFormat": "",
"column": [
{
"name": "id",
"index": 0,
"type": "string"
},
{
"name": "aaa",
"index": 1,
"type": "string"
},
{
"name": "bbb",
"index": 2,
"type": "string"
}
],
"encoding": "UTF-8",
"csvReaderConfig": {
"multiDelimiter": "\b\b\b\t\t",
"multiRecordDelimiter": "\f\f\f\n\r",
"safetySwitch": false,
"trimWhitespace": false,
"useComments": false,
"useTextQualifier": false,
"disableEscape": true
}
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"writeMode": "insert",
"url": "192.168.2.181:8123",
"schema": "default",
"table": "111",
"username": "111",
"password": "111",
"column": [
"id",
"aaa",
"bbb",
]
}
}
}
],
"setting": {
"speed": {
"channel": 5,
"batchSize": 32
}
}
}
}
4、OSS 同步到 odps
{
"job": {
"content": [
{
"reader": {
"name": "ossreader",
"parameter": {
"fieldDelimiter": "\b\b\b\t\t",
"endpoint": "http://oss-ccom",
"accessId": "accessId",
"accessKey": "accessId",
"bucket": "o-bucket",
"object": [
"olddata\\/aaa.csv__a88c7c5db0124bd2a7c9dbeecc4246b4"
],
"nullFormat": "",
"column": [
"*"
],
"encoding": "UTF-8",
"csvReaderConfig": {
"multiDelimiter": "\b\b\b\t\t",
"multiRecordDelimiter": "\f\f\f\n\r",
"safetySwitch": false,
"trimWhitespace": false,
"useComments": false,
"useTextQualifier": false,
"disableEscape": true
}
}
},
"writer": {
"name": "odpswriter",
"parameter": {
"project": "project_name",
"table": "t_xxxx",
"partition": "ds=19900101",
"column": [
"*"
],
"accessId": "accessId",
"accessKey": "accessId",
"truncate": true,
"odpsServer": "http://odps/api",
"tunnelServer": "http://tunnelServer",
"accountType": "aliyun",
"visible": false,
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5,
"batchSize": 32
}
}
}
}
// TODO