Nifi从入门到精通(一)之 数据存储

引言:1.上游如何通过接口如何传数据到nifi并存储数据到数据库?

           2.数据库中的表应该如何设计?

目录

1.常见组件的使用

HandleHttpRequest

2.数据库中表的设计

    ​编辑

3.groovy脚本接收和存储数据

4.nifi接口连接和数据存储整体结构


1.常见组件的使用

  1. HandleHttpRequest

  •    用于上游数据传输到nifi设置接口功能组件,启动一个http的端口。
  • allowed paths :.*/(qq|hh)

  •      

2.StandardHttpContextMap

  • StandardHttpContextMap 用于设置HTTP的请求和响应
  •  

3.RouteOnAttribute

  •  该处理器使用属性表达式语言,根据流文件的属性去计算然后进行路由。
  • 简单来话是对HandleHttpRequest路由进行分发

  • oa_zjxbxz ${http.request.uri:equals("/oa_zjxbxz")}

  • 通过不同的路由进入到不同的处理组中对应的处理组件

  •  

    4.ExecuteGroovyScript  

  • 负责写类java脚本groovy对数据进行接收转换并存储到中台。

  •  

    2.数据库中表的设计

  • 主表oa_zjxbxz表多加fid和dr字段,fid自增,dr用来存储最新数据状态(dr=0代表最新状态,dr=1代表旧数据),一般不涉及到删除数据,要保留痕迹!

  •     

  • 子表oa_zjxbxz_h表保存主表的主键并且也多加一个自增字段sid和状态字段dr(同上)

  • 3.groovy脚本接收和存储数据

  1. 新增数据接口
//资金下拨新增
import com.fasterxml.jackson.databind.ObjectMapper
import groovy.json.JsonSlurper

//数据转化为json格式
import groovy.sql.Sql
import com.yonyou.util.NiFiUtil

import java.util.ArrayList;
import java.util.List;


//判断流数据是否流入
def flowfile = session.get();
if (!flowfile) return;


//jsonObject变量用来获取流数据
def jsonObject = null;
//连接中台数据库
ObjectMapper mapper = new ObjectMapper();
def response = [:];
def conn = CTL.mydb.getConnection();
if (!conn) return;
Sql sql = new Sql(conn);
if (!sql) return;



//连接nc数据库
def conn1 = CTL.ncdb.getConnection();
if (!conn1) return;
Sql sql1 = new Sql(conn1);
if (!sql1) return;



//如果上面都没有问题返回数据库状态
response.put("success", "true");
response.put("code", "1000000000");



int success = 0;//成功存入数据库个数

int fail = 0;//报错个数

NiFiUtil nftUtil = new NiFiUtil();

//把键值对类型的resert包裹在数组里
def restlist = [];



try {
    // 将字符串进行 json 反序列化操作 , 得到 map 集合{"age":18,"name":"Tom"}   [age:18, name:Tom]
    jsonObject = new groovy.json.JsonSlurper().parseText(flowfile.read().getText("UTF-8"));

    for (int i = 0; i < 1; i++) {
        log.info("=======报销单新增=========");
        //resert用来保存错误数据      map
        def resert = [:];
        //获取流过来的json字段数据
        String oaid = jsonObject.oaid?.toString() == null ? "" : jsonObject.oaid?.toString();
        //对每一个字段进行判断
        if ("".equals(oaid)) {
            resert.put("index", i);
            resert.put("errormessage","oaid不能为空");
            restlist.add(resert);
            fail++;

            response.put("code","1000000002");
            continue;

        }

        String ymonth = jsonObject.ymonth?.toString() == null ? "" : jsonObject.ymonth?.toString();
        //对每一个字段进行判断
        if ("".equals(ymonth)) {
            resert.put("index", i);
            resert.put("errormessage", "ymonth不能为空");
            restlist.add(resert);
            fail++;

            response.put("code","1000000002");
            continue;

        }
        String finance = jsonObject.finance?.toString() == null ? "" : jsonObject.finance?.toString();
        String transactiontype = jsonObject.transactiontype?.toString() == null ? "" : jsonObject.transactiontype?.toString();
        double homemoney = jsonObject.homemoney == null ? 0.00 : jsonObject.homemoney;
        double totalmoney=jsonObject.totalmoney==null? 0.00 :jsonObject.totalmoney;
        double actualmoney=jsonObject.actualmoney==null? 0.00:jsonObject.actualmoney;
        double paymoney=jsonObject.paymoney==null? 0.00 :jsonObject.paymoney;
        double repaymentmoney=jsonObject.repaymentmoney==null? 0.00 :jsonObject.repaymentmoney;
        double borrowermoney=jsonObject.borrowermoney==null? 0.00 :jsonObject.borrowermoney;
        String payingunit=jsonObject.payingunit?.toString()==null? "" :jsonObject.payingunit?.toString();
        String corporatebank = jsonObject.corporatebank?.toString() == null ? "" : jsonObject.corporatebank?.toString();
        String paymenttype = jsonObject.paymenttype?.toString() == null ? "" : jsonObject.paymenttype?.toString();
        String expenseunit = jsonObject.expenseunit?.toString() == null ? "" : jsonObject.expenseunit?.toString();
        String expensedept = jsonObject.expensedept?.toString() == null ? "" : jsonObject.expensedept?.toString();
        String balanceitem = jsonObject.balanceitem?.toString() == null ? "" : jsonObject.balanceitem?.toString();
        String supplier = jsonObject.supplier?.toString() == null ? "" : jsonObject.supplier?.toString();
        String bx_expenseunit = jsonObject.bx_expenseunit?.toString() == null ? "" : jsonObject.bx_expenseunit?.toString();
        String bx_expensedept = jsonObject.bx_expensedept?.toString() == null ? "" : jsonObject.bx_expensedept?.toString();
        String expense = jsonObject.expense?.toString() == null ? "" : jsonObject.expense?.toString();
        String receipttype = jsonObject.receipttype?.toString() == null ? "" : jsonObject.receipttype?.toString();
        String payee = jsonObject.payee?.toString() == null ? "" : jsonObject.payee?.toString();
        String personagebank = jsonObject.personagebank?.toString() == null ? "" : jsonObject.personagebank?.toString();
        String supplierbank = jsonObject.supplierbank?.toString() == null ? "" : jsonObject.supplierbank?.toString();
        log.info("数据判断正确,资金下拨新增正在准备存储到中台!!!");
        //1.先查询数据库中是否已经有了这条数据
        String select = "select * from oa_zjxb where oaid='$oaid' and ymonth='$ymonth' and dr =0";
        //返回查找的数据是list格式
        List rows = sql.rows(select);
        if (rows.size() > 0) {//如果查询到已经有了这条数据,则修改已经存在的数据状态dr=1
            int fid = (int) rows.get(0).fid;
            String update_a = "update oa_zjxb set dr=1 where fid=$fid";
            sql.executeUpdate(update_a);
            String update_b = "update oa_zjxb_h set dr=1 where fid=$fid";
            sql.executeUpdate(update_b);


        }
        //插入数据
        String insert_a = "insert into oa_bxdxz (oaid,ymonth,finance,transactiontype,homemoney,totalmoney,actualmoney,paymoney,repaymentmoney,borrowermoney,payingunit,corporatebank,paymenttype,expenseunit,expensedept,balanceitem,supplier,bx_expenseunit,bx_expensedept,expense,receipttype,payee,personagebank,supplierbank) values ('$oaid','$ymonth','$finance','$transactiontype',$homemoney,$totalmoney,$actualmoney,$paymoney,$repaymentmoney,$borrowermoney,'$payingunit','$corporatebank','$paymenttype','$expenseunit','$expensedept','$balanceitem','$supplier','$bx_expenseunit','$bx_expensedept','$expense','$receipttype','$payee','$personagebank','$supplierbank')";
        log.info(insert_a)
        //查看插入了几条数据
        int count = sql.executeUpdate(insert_a);
        log.info("主表影响的行数:"+count);

        if (count > 0) {//主表一条数据
            success++;
            log.info("主表数据插入成功");
        }else {
            fail++;
            log.info("主表数据插入失败");
            continue;
        }
        //对子表进行插入操作
        int m = jsonObject.itmes.size();
        log.info("子表有" + m+ "条数据");

        for (int k =0; k < m; k++) {
            //0.先获取子表中的数据
            String receipitem = jsonObject.itmes[k].receipitem?.toString() == null ? "" : jsonObject.itmes[k].receipitem?.toString();
            if ("".equals(receipitem)) {
                resert.put("index": i);
                resert.put("errormessage": "receipitem不能为空");
                restlist.add(resert);
                fail++;

                response.put("code", "1000000002");
                continue;

            }

            double exrate = jsonObject.itmes[k].exrate == null ? 0.00 : jsonObject.itmes[k].exrate;

            String budgetaccount = jsonObject.itmes[k].budgetaccount?.toString() == null ? "" : jsonObject.itmes[k].budgetaccount?.toString();

            String account = jsonObject.itmes[k].account?.toString() == null ? "" : jsonObject.itmes[k].account?.toString();

            double amount = jsonObject.itmes[k].amount == null ? "" : jsonObject.itmes[k].amount;

            String summary = jsonObject.itmes[k].summary?.toString() == null ? "" : jsonObject.itmes[k].summary?.toString();

            double offsetamount = jsonObject.itmes[k].offsetamount == null ? "" : jsonObject.itmes[k].offsetamount;

            String cxreceipitem = jsonObject.itmes[k].cxreceipitem?.toString() == null ? "" : jsonObject.itmes[k].cxreceipitem?.toString();

            String borrowaccount = jsonObject.itmes[k].borrowaccount?.toString() == null ? "" : jsonObject.itmes[k].borrowaccount?.toString();

            String borrower = jsonObject.itmes[k].borrower?.toString() == null ? "" : jsonObject.itmes[k].borrower?.toString();

            String offsetmoney = jsonObject.itmes[k].offsetmoney?.toString() == null ? "" : jsonObject.itmes[k].offsetmoney?.toString();

            String borrowdept = jsonObject.itmes[k].borrowdept?.toString() == null ? "" : jsonObject.itmes[k].borrowdept?.toString();

            String lastdate = jsonObject.itmes[k].lastdate?.toString() == null ? "" : jsonObject.itmes[k].lastdate?.toString();

            String effectivedate = jsonObject.itmes[k].effectivedate?.toString() == null ? "" : jsonObject.itmes[k].effectivedate?.toString();

            double zb_repaymentmoney=jsonObject.itmes[k].repaymentmoney==null? 0.00 : jsonObject.itmes[k].repaymentmoney;

            //1.查询主表数据是否已经存在
            String selectFid = "select * from oa_zjxb where oaid='$oaid' and ymonth='$ymonth' and finance='$finance' and dr=0  ";
            List rowsFid = sql.rows(selectFid);
            if (rowsFid.size() <= 0) {
                resert.put("index", i);
                resert.put("errormessage", "资金下拨新增(oa_zjxb)主表数据异常,请联系管理员!");
                restlist.add(resert);
                fail++;

                response.put("code", "1000000002")

                continue;
            }
            int fid=(int)rowsFid.get(0).fid;
            String insert_b = "insert into oa_zjxb_h (fid,receipitem,budgetaccount,account,amount,exrate,offsetamount,summary,cxreceipitem,borrowaccount,borrower,offsetmoney,zb_repaymentmoney,borrowdept,lastdate,effectivedate)" +
                    "values($fid,'$receipitem','$budgetaccount','$account',$amount,$exrate,$offsetamount,'$summary','$cxreceipitem','$borrowaccount','$borrower','$offsetmoney','$zb_repaymentmoney','$borrowdept','$lastdate','$effectivedate')"
            int oa_bxdxz_h = sql.executeUpdate(insert_b);
            if (oa_bxdxz_h >0) {
                success++;
            }
        }
        if (fail > 0) {
            response.put("success", "false");
            response.put("code", "1000000099");
            response.put("message", "数据新增成功:" + success + "条! 失败" + fail + "条!");
            response.put("data", restlist);
            response.put("errorStack", null);
            flowfile.write("UTF-8", mapper.writeValueAsString(response))
            flowfile.putAttribute("lognum", fail.toString());
            REL_FAILURE << flowfile
            return
        }
        //将没问题的数据写入到flowfile
        flowfile.write("UTF-8",mapper.writeValueAsString(jsonObject))
        log.info("===========数据执行完毕============")
        REL_SUCCESS << flowfile
        return


    }


} catch (Exception e) {
    response.put("success",false);
    response.put("code", "1000000003");
    response.put("message", "资金下拨-新增异常");
    response.put("data", null);
    response.put("errorStack", e.getMessage());
    flowfile.write("utf-8", mapper.writeValueAsString(response));
    REL_FAILURE << flowfile;
    return;
} finally {
    sql.close();
    conn.close();
    conn1.close();
    sql1.close();
}




 2.删除数据接口

 

//资金下拨-删除
import com.fasterxml.jackson.databind.ObjectMapper
import groovy.json.JsonSlurper

//数据转化为json格式
import groovy.sql.Sql
import com.yonyou.util.NiFiUtil

import java.util.ArrayList;
import java.util.List;


//判断流数据是否流入
def flowfile = session.get();
if (!flowfile) return;

//jsonObject变量用来获取流数据
def jsonObject = null;
//连接中台数据库
ObjectMapper mapper = new ObjectMapper();
def response = [:];
def conn = CTL.mydb.getConnection();
if (!conn) return;
Sql sql = new Sql(conn);
if (!sql) return;


//连接nc数据库
def conn1 = CTL.ncdb.getConnection();
if (!conn1) return;
Sql sql1 = new Sql(conn1);
if (!sql1) return;

//如果上面都没有问题返回数据库状态
response.put("success", "true");
response.put("code", "1000000000");



int success = 0;//成功存入数据库个数

int fail = 0;//报错个数

NiFiUtil nftUtil = new NiFiUtil();

//把键值对类型的resert包裹在数组里
def restlist = [];
//用来输出成功报文信息进行判断
int out_json=0;

try {
    // 将字符串进行 json 反序列化操作 , 得到 map 集合{"age":18,"name":"Tom"}   [age:18, name:Tom]
    jsonObject = new groovy.json.JsonSlurper().parseText(flowfile.read().getText("UTF-8"));

    for (int i = 0; i < 1; i++) {
        //resert用来保存错误数据      map
        def resert = [:];
        //获取流过来的json字段数据
        String oaid = jsonObject.oaid?.toString() == null ? "" : jsonObject.oaid?.toString();
        //对每一个字段进行判断
        if ("".equals(oaid)) {
            resert.put("index", i);
            resert.put("errormessage", "oaid不能为空");
            restlist.add(resert);
            fail++;

            response.put("code", "1000000002");
            continue;

        }

        log.info("数据判断正确,正在删除报销单!!!");
        //1.先查询数据库中是否已经有了这条数据
        String select = "select * from oa_bxdxz where oaid='$oaid' and dr =0";
        //返回查找的数据是list格式
        List rows = sql.rows(select);
        if (rows.size() > 0) {//如果查询到已经有了这条数据,则修改已经存在的数据状态dr=1
            log.info("本条数据已经存在oa_bxdxz中,正在修改主子表状态dr=1")
            int fid = (int) rows.get(0).fid;
            String update_a = "update oa_zjxb set dr=1 where fid=$fid";
            sql.executeUpdate(update_a);
            String update_b = "update oa_zjxb_h set dr=1 where fid=$fid";
            sql.executeUpdate(update_b);
            success++;
        }

        if (fail > 0) {
            response.put("success", "false");
            response.put("code", "1000000099");
            response.put("message", "数据新增成功:" + success + "条! 失败" + fail + "条!");
            response.put("data", restlist);
            response.put("errorStack", null);
            flowfile.write("UTF-8", mapper.writeValueAsString(response))
            flowfile.putAttribute("lognum", fail.toString());
            REL_FAILURE << flowfile
            return
        }
        //将没问题的数据写入到flowfile
        log.info("===========数据执行完毕============")
        //正确报文日志
        response.put("success", "true");
        response.put("data",null)
        response.put("code", "1000000099");
        response.put("message","报销业务删除成功");
        response.put("errorStack",null)
        flowfile.write("UTF-8", mapper.writeValueAsString(response));
        REL_SUCCESS << flowfile;
        return

    }


} catch (Exception e) {
    response.put("success", false);
    response.put("code", "1000000003");
    response.put("message", "资金下拨-删除异常");
    response.put("data", null);
    response.put("errorStack", e.getMessage());
    flowfile.write("utf-8", mapper.writeValueAsString(response));
    REL_FAILURE << flowfile;
    return;
} finally {
    sql.close();
    conn.close();
    conn1.close();
    sql1.close();
}




 3.修改数据接口

 

//资金下拨修改数据存储
import com.fasterxml.jackson.databind.ObjectMapper
import groovy.json.JsonSlurper

import groovy.sql.Sql
import com.yonyou.util.NiFiUtil


import java.util.ArrayList;
import java.util.List;

//第一步:接收json数据,连接中台数据库和nc数据库
//01.接收json数据
def flowfile = session.get();
if (!flowfile) return;

//02.连接数据库

def jsonObject = null;//存储json数据
//中台数据库
ObjectMapper mapper = new ObjectMapper();
def conn = CTL.mydb.getConnection();
Sql sql = new Sql(conn);
if (!sql) return;

//nc数据库
def conn1 = CTL.ncdb.getConnection();
Sql sql1 = new Sql(conn1);
if (!sql1) return;

//打印连接成功日志
def response = [:];
response.put("success", "true");
response.put("status", "数据库连接成功");


//保存错误日志信息
def resert = [:];
def restlist = [];

//记录错误,成功个数
def success = 0;
def fail = 0;

//自定义jar包用来判断数据合法性
NiFiUtil nftUtil = new NiFiUtil();

//第二步:获取json中主表和子表字段并分别进行新增
try {
    //获取json
    jsonObject = new groovy.json.JsonSlurper().parseText(flowfile.read().getText("UTF-8"));

    for (def i = 0; i < 1; i++) {
        //获取主表字段数据
        String oaid = jsonObject.oaid?.toString() == null ? "" : jsonObject.oaid?.toString();
        //对value进行判断
        if ("".equals(oaid)) {
            resert.put("index", i);
            resert.put("errormessage", "oaid不能为空");
            restlist.add(resert);
            fail++;

            resert.put("code", "1000000002");
            continue;
        }
        String ymonth = jsonObject.ymonth?.toString() == null ? "" : jsonObject.ymonth?.toString();
        String finance = jsonObject.finance?.toString() == null ? "" : jsonObject.finance?.toString();
        String pk_group = jsonObject.pk_group?.toString() == null ? "" : jsonObject.pk_group?.toString();
        String bill_type = jsonObject.bill_type?.toString() == null ? "" : jsonObject.bill_type?.toString();
        String trade_type = jsonObject.trade_type?.toString() == null ? "" : jsonObject.trade_type?.toString();
        String primal_money = jsonObject.primal_money?.toString() == null ? "" : jsonObject.primal_money?.toString();
        String contractcode = jsonObject.contractcode?.toString() == null ? "" : jsonObject.contractcode?.toString();
        String itemcode = jsonObject.itemcode?.toString() == null ? "" : jsonObject.itemcode?.toString();
        double contractmoney = jsonObject.contractmoney == null ? 0.00 : jsonObject.contractmoney;
        //1.先查看表中是否已经存在数据
        String select_zb = "select * from oa_zjxb where oaid='$oaid' and dr=0 ";
        List rows_zb = sql.rows(select_zb);
        //如果数据库已经存在就更改数据的中台dr并把新数据进行添加
        if (rows_zb.size() > 0) {
            def fid_old = (int) rows_zb.get(0).fid;
            String update_zb = "update oa_zjxb set dr=1 where fid=$fid_old";
            sql.executeUpdate(update_zb);
            String update_zib = "update oa_zjxb_h set dr=1 where fid=$fid_old";
            sql.executeUpdate(update_zib);
        }
        //新增数据
        String insert_zb = "insert into oa_zjxb (oaid,ymonth,finance,pk_group,bill_type,trade_type,primal_money,contractcode,itemcode,contractmoney) values ('$oaid','$ymonth','$finance','$pk_group','$bill_type','$trade_type','$primal_money','$contractcode','$itemcode',$contractmoney)"
        def count = sql.executeUpdate(insert_zb);
        log.info("主表影响的行数:" + count);
        if (count > 0) {
            success++;
            log.info("主表数据插入成功");
        } else {
            fail++;
            log.info("主表数据插入失败");
            continue;
        }


        //对子表数据进行新增数据

        //先看子表中有多少条数据
        def count_number_zib = jsonObject.itmes.size();
        for (def k = 0; k < count_number_zib; k++) {
            String pk_org = jsonObject.itmes[k].pk_org?.toString() == null ? "" : jsonObject.itmes[k].pk_org?.toString();
            //对子表中的value进行判断
            if ("".equals(pk_org)) {
                resert.put("index", i);
                resert.put("errormessage", "pk_org不能为空");
                restlist.add(resert);
                fail++;
                resert.put("code", "1000000002");
                continue;
            }
            String pk_group_h = jsonObject.itmes[k].pk_group?.toString() == null ? "" : jsonObject.itmes[k].pk_group?.toString();
            String bill_type_h = jsonObject.itmes[k].bill_type?.toString() == null ? "" : jsonObject.itmes[k].bill_type?.toString();
            String trade_type_h = jsonObject.itmes[k].trade_type?.toString() == null ? "" : jsonObject.itmes[k].trade_type?.toString();
            String pay_primal = jsonObject.itmes[k].bill_type?.toString() == null ? "" : jsonObject.itmes[k].pay_primal?.toString();
            double direction = jsonObject.itmes[k].direction == null ? 0.00 : jsonObject.itmes[k].direction;
            String objecttype = jsonObject.itmes[k].objecttype?.toString() == null ? "" : jsonObject.itmes[k].objecttype?.toString();
            String pk_oppaccount = jsonObject.itmes[k].pk_oppaccount?.toString() == null ? "" : jsonObject.itmes[k].pk_oppaccount?.toString();
            String pk_supplier = jsonObject.itmes[k].pk_supplier?.toString() == null ? "" : jsonObject.itmes[k].pk_supplier?.toString();
            String pk_account = jsonObject.itmes[k].pk_account?.toString() == null ? "" : jsonObject.itmes[k].pk_account?.toString();
            //查看主表是否已经添加成功
            String selectFid = "select * from oa_zjxb where oaid='$oaid'";
            List rowFid = sql.rows(selectFid);
            if (rowFid.size() < 0) {
                log.info("主表数据新增失败");
                resert.put("index", i);
                resert.put("errormessage", "主表数据新增异常,请联系管理员!");
                restlist.add(resert);
                fail++;
                response.put("code", "1000000002");
                continue;
            }
            def fid_new = rowFid.get(0).fid;
            def insert_zib = "insert into oa_zjxb_h (fid,pk_org,pk_group_h,bill_type_h,trade_type_h,pay_primal,direction,objecttype,pk_oppaccount,pk_supplier,pk_account) values ($fid_new,'$pk_org','$pk_group_h','$bill_type_h','$trade_type_h','$pay_primal','$direction','$objecttype','$pk_oppaccount','$pk_supplier','$pk_account')"
            log.info(insert_zib);
            int count_zib = sql.executeUpdate(insert_zib);
            log.info("子表影响的行数:" + count_zib);
            if (count_zib > 0) {
                success++;
            }

            if (fail > 0) {
                response.put("success", "false");
                response.put("code", "1000000099");
                response.put("message", "数据新增成功:" + success + "条! 失败" + fail + "条!");
                response.put("data", restlist);
                response.put("errorStack", null);
                flowfile.write("UTF-8", mapper.writeValueAsString(response))
                flowfile.putAttribute("lognum", fail.toString());
                REL_FAILURE << flowfile
                return
            }

            //将没问题的数据写入到flowfile
            response.put("success", "true");
            response.put("data", null)
            response.put("code", "1000000099");
            response.put("message", "资金下拨修改成功");
            response.put("errorStack", null)
            flowfile.write("UTF-8", mapper.writeValueAsString(response));
            log.info("===========数据执行完毕============")
            REL_SUCCESS << flowfile;
            return


        }


    }

} catch (Exception e) {
    response.put("success",false);
    response.put("code", "1000000000");
    response.put("message", "资金下拨-报销单修改异常");
    response.put("data", null);
    response.put("errorStack", e.getMessage());
    flowfile.write("utf-8", mapper.writeValueAsString(response));
    REL_FAILURE << flowfile;
    return;
}finally{//6.关闭数据流
    sql.close();
    conn.close();
    conn1.close();
    sql1.close();
}


4.nifi接口连接和数据存储整体结构

 

 

 

 


版权声明:本文为qq_42209718原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。