一、 携程 Dal
开源框架 Dal
是携程开源的数据库访问框架,为大规模的 DB
管理和使用提供一套优质的解决方案。
首先在 DB
管理方面,Dal
统一集成了主流程的数据访问:支持 Java
和 C#
客户端;支持 MySQL
、SQLServer
数据库;支持 ORM
和非 ORM
方式的数据访问;使用了 Emit
映射技术,提供高性能 ORM
;多数据源访问和主从分离(读写分离);日志、监控集成。
其次在 DB
使用方面, Dal
支持代码生成。通过 Dal
平台,可一键生成 Entity
、Dao
、Unit Test
。这不仅可以让开发者脱离 DB
编程、提升开发效率,还可以统一面向 DB
的代码风格和代码质量。
二、 DataX
与 Dal
的兼容 Dal
的主要特点是统一收口和集中管理,在 DB
连接方面,客户端无需配置 DB
的用户名和密码,只需要配置 Dal
提供的 TitanKey
或 ClusterName
即可。简单来说,TitanKey
或 ClusterName
就是 Dal
生成的,供客户端的访问 DB
的钥匙。也正因此,DataX
在 Dal
架构的系统上就不起作用了。需要解决两个问题:DataX
如何配置 TitanKey
或 ClusterName
;DataX
如何通过 Dal
获取数据源。
三、 Datax
配置 TitanKey
或 ClusterName
这是 mysqlwriter
的配置信息模板:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "name" : "mysqlwriter" , "parameter" : { "username" : "" , "password" : "" , "writeMode" : "" , "column" : [], "session" : [], "preSql" : [], "connection" : [ { "jdbcUrl" : "" , "table" : [] } ] } }
Dal
不关心 username
和 password
,不妨将 TitanKey
或 ClusterName
放在 jdbcurl
处。
四、DataX
获取 Dal
数据源 DataX
通过如下方式获取数据连接:
1 2 3 4 5 6 7 8 9 private static synchronized Connection connect (DataBaseType dataBaseType, String url, Properties prop) { try { Class.forName(dataBaseType.getDriverClassName()); DriverManager.setLoginTimeout(Constant.TIMEOUT_SECONDS); return DriverManager.getConnection(url, prop); } catch (Exception e) { throw RdbmsException.asConnException(dataBaseType, e, prop.getProperty("user" ), null ); } }
Dal
通过如下方式获取数据连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import javax.annotation.Resource;import javax.sql.DataSource;import java.sql.Connection;import com.ctrip.datasource.configure.DalDataSourceFactory;public final class DBUtil { private DBUtil () { } @Resource private DalDataSourceFactory dsFactory; @Bean public DalDataSourceFactory getCtripDalDataSource () { return new DalDataSourceFactory(); } public static Connection getConnectionByTitanKey (final String titanKey) { try { DataSource dataSource = dsFactory.createDataSource(titanKey); return dataSource.getConnection(); } catch (Exception e) { throw DataXException .asDataXException(DBUtilErrorCode.CONN_DB_ERROR, String.format("数据库连接失败. 因为根据您配置的连接信息:%s获取数据库连接失败. 请检查您的配置并作出修改." , titanKey), e); } } public static Connection getConnectionByClusterName (final String clusterName) { try { DataSource ds = dsFactory.getOrCreateDataSource(clusterName); return dataSource.getConnection(); } catch (Exception e) { throw DataXException .asDataXException(DBUtilErrorCode.CONN_DB_ERROR, String.format("数据库连接失败. 因为根据您配置的连接信息:%s获取数据库连接失败. 请检查您的配置并作出修改." , clusterName), e); } } }
综上:借助 DataSource
工厂,就可以用 Dal
数据连接替换掉 DataX
的数据连接。
五、DataX
兼容 Dal
优化 从以上实现可以看出,获取 Dal
数据连接主要有两步:生成数据源和建立数据连接,并且每次数据同步都要重复一遍。既然 Dal
已经提供了 DataSource
工厂,是否可以考虑将数据源缓存下来呢?
所以,有一套更好的兼容方案:在工具启动过程中,加载数据源并缓存下来, dbName
作为查找数据源的 key
。
DataSourceConfiguration.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 import com.alibaba.datax.plugin.rdbms.util.DBUtil;import com.ctrip.datasource.configure.DalDataSourceFactory;import com.google.common.base.Throwables;import com.google.common.collect.Maps;import org.apache.commons.lang.StringUtils;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.util.Assert;import javax.annotation.PostConstruct;import javax.annotation.Resource;import javax.sql.DataSource;import java.util.Map;@Configuration public class DataSourceConfiguration { @Resource private DalDataSourceFactory dsFactory; @Bean public DalDataSourceFactory getCtripDalDataSource () { return new DalDataSourceFactory(); } private static final String CLUSTER_CONN_TYPE_FLAG = "cluster" ; public static final String TITAN_KEY_TEST_DB = "test_titan_db" ; public static final String CLUSTER_NAME_TEST_DB = "test_cluster_db" ; private void fillDataSourceFromTitanKey (String titanKey) { try { Assert.hasText(titanKey, "connect to db failed; titan key cannot be null or empty" ); DataSource dataSource = dsFactory.createDataSource(titanKey); DBUtil.setDataSourceIfAbsent(titanKey, dataSource); } catch (Exception t) { throw Throwables.propagate(t); } } private void fillDataSourceFromClusterName (String clusterName) { try { Assert.hasText(clusterName, "connect to db failed; dal cluster cannot be null or empty" ); Assert.isTrue(clusterName.contains(CLUSTER_CONN_TYPE_FLAG), String.format("%s is not in a cluster format" , clusterName)); DataSource dataSource = dsFactory.getOrCreateDataSource(clusterName); DBUtil.setDataSourceIfAbsent(clusterName, dataSource); } catch (Exception t) { throw Throwables.propagate(t); } } @PostConstruct public void initDataSource () { fillDataSourceFromTitanKey(TITAN_KEY_TEST_DB); fillDataSourceFromClusterName(CLUSTER_NAME_TEST_DB); } }
DBUtil.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 import com.alibaba.datax.common.exception.DataXException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.sql.DataSource;import java.io.File;import java.sql.*;import java.util.*;import java.util.concurrent.*;public final class DBUtil { private DBUtil () { } private static final Logger LOG = LoggerFactory.getLogger(DBUtil.class ) ; private static final Map<String, DataSource> DS_MAP = new ConcurrentHashMap<>(); public static void setDataSourceIfAbsent (String dsName, DataSource ds) { if (DS_MAP.containsKey(dsName)) { return ; } synchronized (DS_MAP) { if (!dsMap.containsKey(dsName)) { DS_MAP.put(dsName, ds); LOG.info("setDataSourceIfAbsent将数据源{}放入Engine" , dsName); } } } private static DataSource getDataSource (String dsName) { if (dsName.contains("?" )) { dsName = dsName.substring(0 , dsName.indexOf("?" )); } return DS_MAP.get(dsName); } public static Connection getConnection (String dsName) { try { DataSource dataSource = getDataSource(dsName); Assert.notNull(dataSource, String.format("获取数据源%s失败" , dsName)); return dataSource.getConnection(); } catch (Exception e) { throw DataXException .asDataXException(DBUtilErrorCode.CONN_DB_ERROR, String.format("数据库连接失败. 因为根据您配置的连接信息:%s获取数据库连接失败. 请检查您的配置并作出修改." , dsName), e); } } }