kettle日志解析_kettle 日志 解析功能

解析 kettle 日志文件

将 文本文件转成字符串

private String readInput(File file) {

StringBuffer buffer = new StringBuffer();

try {

FileInputStream fis = new FileInputStream(file);

InputStreamReader isr = new InputStreamReader(fis, "GBK");

Reader in = new BufferedReader(isr);

int i;

while ((i= in.read()) > -1) {

buffer.append((char) i);

}

in.close();

return buffer.toString();

} catch (IOException e) {

e.printStackTrace();

return null;

}

}

将字符串写入文件中

private void writeOutput(String str, File file) {

try {

FileOutputStream fos = new FileOutputStream(file);

Writer out = new OutputStreamWriter(fos, "GBK");

out.write(str);

out.close();

} catch (IOException e) {

e.printStackTrace();

}

}

存储解析的 kettle 信息的 日志类:

public class KettleLogParse {

// 表示第几张表

private int tableNumber;

// 表示表名

private String tableName;

// 表示抽取是否成功

private boolean isSuccess;

// 表示警告的数量

private int warnNumber;

// 表示抽取的数量

private int dataNumber;

// 日志添加在第几行

private int lineNumber;

public int getTableNumber() {

return tableNumber;

}

public void setTableNumber(int tableNumber) {

this.tableNumber = tableNumber;

}

public boolean isSuccess() {

return isSuccess;

}

public void setSuccess(boolean isSuccess) {

this.isSuccess = isSuccess;

}

public int getWarnNumber() {

return warnNumber;

}

public void setWarnNumber(int warnNumber) {

this.warnNumber = warnNumber;

}

public int getDataNumber() {

return dataNumber;

}

public void setDataNumber(int dataNumber) {

this.dataNumber = dataNumber;

}

public String getTableName() {

return tableName;

}

public void setTableName(String tableName) {

this.tableName = tableName;

}

public int getLineNumber() {

return lineNumber;

}

public void setLineNumber(int lineNumber) {

this.lineNumber = lineNumber;

}

@Override

public String toString() {

String flag = isSuccess == true ? ",抽取成功,共" : ",抽取失败,共";

return "表" + tableNumber + ":" + tableName + flag + warnNumber + "个警报,抽取量为" + dataNumber + "条";

}

}

解析 kettle 的日志文件,并返回解析好的信息:

// 传入日志文件解析得到的字符串

private List parseKettleLog(String kettle) {

String[] strs=kettle.split("\r\n");

// 默认第一张表为表1

int tableNumber = 1;

int lineNumber = 0;

List recordList = new ArrayList<>();

Stack stack = new Stack<>();

// 错误数

int errorNum = 0;

// 警告数

int warningNum = 0;

// 遍历解析日志文件

for(int i=0; i

//获取每行的字符串

String str = strs[i];

// 遇到开始项,则向stack里面压入一个 KettleLogParse (kettlelog解析实例),开始对解析实例做处理

if (str.contains("开始项")) {

KettleLogParse addLogEntity = new KettleLogParse();

addLogEntity.setTableNumber(tableNumber);

addLogEntity.setLineNumber(lineNumber);

errorNum = 0;

warningNum = 0;

stack.push(addLogEntity);

}

// 获取该解析项的一些初始信息,表名,抽取文件路径等等

if (str.contains("Loading transformation from XML file")) {

KettleLogParse addLogEntity = stack.peek();

int tableNameStart = str.lastIndexOf("[");

int tableNameEnd = str.lastIndexOf("]");

String tableName = str.substring(tableNameStart+1, tableNameEnd);

int lastIndexOf;

if (tableName.contains("file")) {

lastIndexOf = tableName.lastIndexOf("/");

} else {

lastIndexOf = tableName.lastIndexOf("\\");

}

String tableName1 = tableName.substring(lastIndexOf+1);

addLogEntity.setTableName(tableName1);

tableNumber++;

}

// 对该处理项的结果进行解析

if (str.contains("完成处理")) {

KettleLogParse addLogEntity = stack.peek();

int beginIndex = str.lastIndexOf("(");

int endIndex = str.lastIndexOf(")");

String record = str.substring(beginIndex+1, endIndex);

List asList = Arrays.asList(record.split(","));

Map map = getKettleMap(asList);

addLogEntity.setWarnNumber(warningNum);

addLogEntity.setDataNumber(map.get(" W"));

if (errorNum == 0) {

addLogEntity.setSuccess(true);

}

}

完成时出栈,并设置最终的解析结果

if (str.contains("完成作业项")) {

KettleLogParse addLogEntity = stack.pop();

if(addLogEntity.getTableName()!=null) {

recordList.add(addLogEntity);

}

}

// 记录错误数

if (str.contains("- ERROR")) {

errorNum++;

}

// 记录警告数

if (str.contains("- Warning:")) {

warningNum++;

}

lineNumber++;

}

return recordList;

}

根据解析信息,获取新的日志文本字符串

private String pageKettle (String kettle, List recordList) {

String[] strs=kettle.split("\r\n");

StringBuilder result = new StringBuilder("");

for (int i = 0; i < strs.length; i++) {

String string = strs[i];

KettleLogParse insertLine = isInsertLine(i, recordList);

if(insertLine!=null) {

String warning = insertLine.getWarnNumber() > 0 ? "

" : "

";

if (insertLine.isSuccess() == false) {

warning = "

";

}

result.append(warning+insertLine.toString()+"

"+string+"
\r\n");

}else {

result.append("

"+string+"
"+"\r\n");

}

}

return result.toString();

}

// 判断是否在该行插入新的解析数据,是则返回插入数据,否返回空

private KettleLogParse isInsertLine(int index, List list) {

for(KettleLogParse logEntity : list) {

if(index==logEntity.getLineNumber()) {

return logEntity;

}

}

return null;

}

对文本文件进行修改,添加标签。

/**

* 在文本文件中插入的字符串,不同 HTML 标签

* @param kettle

* @param recordList

* @return

*/

private String pageLogFile (String kettle, List recordList) {

String[] strs=kettle.split("\r\n");

StringBuilder result = new StringBuilder("");

for (int i = 0; i < strs.length; i++) {

String string = strs[i];

KettleLogParse insertLine = isInsertLine(i, recordList);

if(insertLine!=null) {

result.append(insertLine.toString()+"\r\n"+string+"\r\n");

}else {

result.append(string+"\r\n");

}

}

return result.toString();

}

日志解析过程中,获取其中的特殊数据:

// 获取日志中I=0, O=0, R=77175, W=77175, U=0, E=0,并将其解析成map结构

private Map getKettleMap(List list){

Map map = new HashMap();

for (String str : list) {

String[] split = str.split("=");

map.put(split[0], Integer.parseInt(split[1]));

}

return map;

}

获取日志文件

private File requiredLogFile(Map kettleLog, boolean isTrusted) {

File logFile = null;

try {

Object logTime = kettleLog.get("logDate");

String rootPath = ResourceUtil.getConfigByName("data.kettle.path");

// 获取文件路径

String filePath = rootPath + "/" + (isTrusted ? "Trusted_Log_" : "Print_Log_")

+ logTime.toString().replace("-", "") + ".log";

// 获取文件对象

logFile = new File(filePath);

// 如果文件存在,则返回,不存在,则写入。

if (logFile.exists() && logFile.isFile() && logFile.length() != 0) {

String readInput = readInput(logFile);

if(readInput.contains("个警报,抽取量为")) {

return logFile;

}

List parseKettleLog = parseKettleLog(readInput);

String pageLogFile = pageLogFile(readInput, parseKettleLog);

writeOutput(pageLogFile, logFile);

return logFile;

}

// 如果文件存在,但文件内容为空

if (logFile.exists() && logFile.length() == 0) {

logFile.delete();

}

if (!logFile.getParentFile().exists()) {

logFile.getParentFile().mkdirs();

}

logFile.createNewFile();

Object fileContent = kettleLog.get("dataLog");

FileWriter writer = new FileWriter(logFile);

writer.write(fileContent.toString());

writer.close();

String readInput = readInput(logFile);

List parseKettleLog = parseKettleLog(readInput);

String pageLogFile = pageLogFile(readInput, parseKettleLog);

writeOutput(pageLogFile, logFile);

} catch (Exception e) {

e.printStackTrace();

}

return logFile;

}

对日志文件进行压缩处理

/**

* 压缩一个文件集合中的所有文件,并返回压缩文件

* @param fileList

* @return

*/

private File logToZip(List fileList) {

File zipFile = null;

ZipOutputStream zos = null;

try {

zipFile = File.createTempFile("ketteLogZip", ".zip");

zos = new ZipOutputStream(new FileOutputStream(zipFile));

int i = 0;

for (File srcFile : fileList) {

byte[] buf = new byte[2048];

// 防止文件重名导致压缩失败

String fileName = srcFile.getName();

String prefix = fileName.substring(fileName.lastIndexOf("."));

String newFileName = fileName.substring(0, fileName.length()-prefix.length()) + "_" +i+ prefix;

zos.putNextEntry(new ZipEntry(newFileName));

int len;

FileInputStream in = new FileInputStream(srcFile);

while ((len = in.read(buf)) != -1){

zos.write(buf, 0, len);

}

zos.closeEntry();

in.close();

zos.flush();

i++;

}

} catch (Exception e) {

e.printStackTrace();

} finally {

if(zos != null) {

try {

zos.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

return zipFile;

}

查询接口

@Override

public Map queryKettleLog(String beginTime,String endTime,String logFlag,Integer pageId, Integer pageNum){

// 查询日志的SQL

String sql = "SELECT ID_JOB as id, left(LOGDATE,10) as logDate, `ERRORS` as logFlag, REPLAYDATE AS STARTDATE, LOGDATE AS ENDDATE, LOG_FIELD as dataLog FROM t_sdrs_data_extraction_log";

// 日志总数

String sql1 = "SELECT COUNT(*) FROM t_sdrs_data_extraction_log";

// 筛选条件

String condition = " WHERE 1=1";

if(!StringUtils.isBlank(beginTime)){

condition = condition +" AND LOGDATE >='"+beginTime+"'";

}

if(!StringUtils.isBlank(endTime)){

condition = condition +" AND LOGDATE <='"+endTime+"'";

}

if(!StringUtils.isBlank(logFlag)){

if(logFlag.equals("1")) {

condition = condition +" AND `ERRORS` !='0'";

}else {

condition = condition +" AND `ERRORS` ='0'";

}

}

condition = condition + " ORDER BY LOGDATE DESC";

sql = sql + condition;

sql1 = sql1 + condition;

// dao层查询方法

Map map = Maps.newHashMap();

Long total = systemService.getCountForJdbc(sql1);

List> kettleLogList = systemService.findForJdbc(sql,pageId,pageNum);

// 开始解析,目标是统计没条日志中有无错误,或警告

Iterator> iterator = kettleLogList.iterator();

while (iterator.hasNext()){

Map queryMap = iterator.next();

Object object = queryMap.get("dataLog");

List parseKettleLog = parseKettleLog(object.toString());

int sumRecord = 0; //每条日志中的记录总数

int resultFlag = 0; // 记录的标志:0正常,1错误,2警告

for (KettleLogParse parse : parseKettleLog) {

int dataNumber = parse.getDataNumber();

sumRecord = sumRecord + dataNumber;

// error 检查

if (resultFlag!=1) {

if(parse.isSuccess()==false) {

resultFlag = 1;

}

}

if(resultFlag == 0) {

if(parse.getWarnNumber()>0) {

resultFlag = 2;

}

}

}

//当日志的解析结果和查询的条件不一致时,将其从所有数据中移除。

if(!StringUtils.isBlank(logFlag)&&!logFlag.equals(String.valueOf(resultFlag))) {

iterator.remove();

continue;

}

queryMap.put("sumRecord", sumRecord);

queryMap.put("resultFlag", resultFlag);

}

map.put("ketteLog", kettleLogList);

map.put("total", total);

return map;

}


版权声明:本文为weixin_35815036原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。