前言
当用datax同步mysql或sqlserver很多表数据到hive时,经常要手动调整脚本涉及到的每个表的字段、类型及hive对应建表语句,耗费大量的机械比对粘贴复制工作时间、精力,下面介绍的脚本能快速调整好改动点,只需手动复制改一下即可(快速、准确)
需同步的mysql或sqlserver原表结构
//sqlserver表结构
CREATE TABLE [dbo].[TravelOrderTOBody] (
[Id] char(21) COLLATE Chinese_PRC_CI_AS NOT NULL,
[ParkId] int NOT NULL,
[TravelOrderTOHeaderId] char(18) COLLATE Chinese_PRC_CI_AS NOT NULL,
[CustomerId] bigint NULL,
[AgencySaleTicketClassId] int NOT NULL,
[Seq] int NOT NULL,
[Persons] int NOT NULL,
[Qty] int NOT NULL,
[OrderState] int NOT NULL,
[Price] decimal(18,2) NOT NULL,
[SalePrice] decimal(18,2) NOT NULL,
[SettlementPrice] decimal(18,2) NOT NULL,
[ParkSettlementPrice] decimal(18,2) NOT NULL,
[Amount] decimal(18,2) NOT NULL,
[Remark] nvarchar(128) COLLATE Chinese_PRC_CI_AS NULL,
[TicketClassId] int NOT NULL,
[IsMustUseOrderIDNumberActiveVipcard] bit NOT NULL,
[LastModificationTime] datetime NULL,
[LastModifierUserId] bigint NULL,
[CreationTime] datetime NOT NULL,
[CreatorUserId] bigint NULL,
CONSTRAINT [PK_dbo.TravelOrderTOBody] PRIMARY KEY CLUSTERED ([Id])
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)
ON [PRIMARY]
)
//mysql表结构
CREATE TABLE `upgraderecord` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`UserId` varchar(50) NOT NULL,
`CreationTime` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
`BeforeLevelId` int(11) NOT NULL,
`LevelId` int(11) NOT NULL,
`UpgradeSource` varchar(255) NOT NULL,
`UpgradeSourceDetail` varchar(255) NOT NULL,
`Remark` varchar(255) DEFAULT NULL,
PRIMARY KEY (`Id`),
KEY `IX_UserId` (`UserId`) USING BTREE,
KEY `IX_CreationTime` (`CreationTime`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=594477 DEFAULT CHARSET=utf8mb4;
目标结构(脚本自动生成)
1.datax同步脚本所需改动地方
2.hive对应建表语句
3.脚本
若觉得还不是很方便,可以自行进一步改造此脚本,自动生成datax所需的.json文件,然后将文件上传到datax同步指定的表结构位置即可
/**
* @Description 因sqlserver与mysql表结构不一样,故下面列出2者的获取方式demo,参考者注意复制正确对应的表结构
**/
public class Utils {
/***
* 去除String数组中的空值
*/
private static String[] deleteArrayNull(String string[]) {
String strArr[] = string;
// step1: 定义一个list列表,并循环赋值
ArrayList<String> strList = new ArrayList<String>();
for (int i = 0; i < strArr.length; i++) {
strList.add(strArr[i]);
}
// step2: 删除list列表中所有的空值
while (strList.remove(null));
while (strList.remove(""));
// step3: 把list列表转换给一个新定义的中间数组,并赋值给它
String strArrLast[] = strList.toArray(new String[strList.size()]);
return strArrLast;
}
/***
* 替换mysql或sqlserver字段类型所对应的hive的字段类型(可能会有遗漏,默认为string,检查时可自行新增缺失的类型)
*/
public static String findColumnType(String str) {
str = str.toLowerCase();
String type;
if (str.startsWith("int")) {
type = "int";
} else if (str.startsWith("bigint")) {
type = "bigint";
} else if (str.startsWith("decimal(")) {
type = "double";
} else if (str.startsWith("bit")) {
type = "boolean";
} else if (str.startsWith("datetime")) {
type = "timestamp";
} else {
type = "string";
}
return type;
}
public static void main(String[] args) {
// 复制sqlserver原表的建表语句(WITH关键词上面部分即可)
String str9 = "CREATE TABLE [dbo].[TravelOrderTOBody] (\n" +
" [Id] char(21) COLLATE Chinese_PRC_CI_AS NOT NULL,\n" +
" [ParkId] int NOT NULL,\n" +
" [TravelOrderTOHeaderId] char(18) COLLATE Chinese_PRC_CI_AS NOT NULL,\n" +
" [CustomerId] bigint NULL,\n" +
" [AgencySaleTicketClassId] int NOT NULL,\n" +
" [Seq] int NOT NULL,\n" +
" [Persons] int NOT NULL,\n" +
" [Qty] int NOT NULL,\n" +
" [OrderState] int NOT NULL,\n" +
" [Price] decimal(18,2) NOT NULL,\n" +
" [SalePrice] decimal(18,2) NOT NULL,\n" +
" [SettlementPrice] decimal(18,2) NOT NULL,\n" +
" [ParkSettlementPrice] decimal(18,2) NOT NULL,\n" +
" [Amount] decimal(18,2) NOT NULL,\n" +
" [Remark] nvarchar(128) COLLATE Chinese_PRC_CI_AS NULL,\n" +
" [TicketClassId] int NOT NULL,\n" +
" [IsMustUseOrderIDNumberActiveVipcard] bit NOT NULL,\n" +
" [LastModificationTime] datetime NULL,\n" +
" [LastModifierUserId] bigint NULL,\n" +
" [CreationTime] datetime NOT NULL,\n" +
" [CreatorUserId] bigint NULL,\n" +
" CONSTRAINT [PK_dbo.TravelOrderTOBody] PRIMARY KEY CLUSTERED ([Id])";
// 复制mysql原表的建表语句
// String str9 = "CREATE TABLE `upgraderecord` (\n" +
// " `Id` int(11) NOT NULL AUTO_INCREMENT,\n" +
// " `UserId` varchar(50) NOT NULL,\n" +
// " `CreationTime` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,\n" +
// " `BeforeLevelId` int(11) NOT NULL,\n" +
// " `LevelId` int(11) NOT NULL,\n" +
// " `UpgradeSource` varchar(255) NOT NULL,\n" +
// " `UpgradeSourceDetail` varchar(255) NOT NULL,\n" +
// " `Remark` varchar(255) DEFAULT NULL,\n" +
// " PRIMARY KEY (`Id`),\n" +
// " KEY `IX_UserId` (`UserId`) USING BTREE,\n" +
// " KEY `IX_CreationTime` (`CreationTime`) USING BTREE\n" +
// ") ENGINE=InnoDB AUTO_INCREMENT=594477 DEFAULT CHARSET=utf8mb4;";
//mysql
// String tableName = str9.split("` \\(\n")[0].split("`")[1].toLowerCase();
// String[] columnLine = str9.split("` \\(\n")[1].split("PRIMARY KEY \\(")[0].split(",\n");
//sqlServer
String tableName = str9.split("] \\(\n")[0].split("].\\[")[1].toLowerCase();
String[] columnLine = str9.split("] \\(\n")[1].split("CONSTRAINT")[0].split(",\n");
System.out.println(tableName);
StringBuilder hiveSqlStr = new StringBuilder();
StringBuilder dataxColumnStr = new StringBuilder();
StringBuilder dataxSqlStr = new StringBuilder();
int columnNum = 0;
hiveSqlStr.append(("CREATE TABLE `").concat(tableName).concat("`(")).append("\n");
for (String line: columnLine) {
String[] column = deleteArrayNull(line.replace("\n", "").split(" "));
if (column.length >= 2) {
String columnName = column[0].replace("[","").replace("]", "").replace("`", "");
String typeName = findColumnType(column[1]);
dataxColumnStr.append(columnName).append(",").append(" ");
dataxSqlStr.append("{\"name\": \"").append(columnName).append("\",\"type\": \"").append(typeName).append("\"},").append("\n");
hiveSqlStr.append(" " + columnName.toLowerCase() + " " + typeName + ",").append("\n");
columnNum ++;
}
}
dataxColumnStr.delete(dataxColumnStr.length() - 2,dataxColumnStr.length());
dataxSqlStr.delete(dataxSqlStr.length() - 2,dataxSqlStr.length());
hiveSqlStr.delete(hiveSqlStr.length() - 2,hiveSqlStr.length());
hiveSqlStr.append(")\n").append("row format delimited fields terminated by '\\t'").append("\nstored as ORC;");
//判断表结构列数与输出列数是否一致
if (columnLine.length - 1 == columnNum) {
System.out.println(dataxColumnStr.toString() + "\n");
System.out.println(dataxSqlStr.toString() + "\n");
System.out.println(hiveSqlStr.toString() + "\n");
} else {
System.out.println((columnLine.length - 1 == columnNum) + " columnLine length : " + (columnLine.length - 1) + " ==== " + columnNum);
}
System.out.println(tableName + " 表结构列数为 : " + (columnLine.length - 1));
}
}
脚本执行结果

附录
1.datax同步数据完整示例(批量同步mysql或sqlserver脚本完整脚本)
https://blog.csdn.net/qq_25073261/article/details/105220976
版权声明:本文为qq_25073261原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。