引言:1.上游如何通过接口如何传数据到nifi并存储数据到数据库?
2.数据库中的表应该如何设计?
目录
1.常见组件的使用
HandleHttpRequest
- 用于上游数据传输到nifi设置接口功能组件,启动一个http的端口。
allowed paths :.*/(qq|hh)
-

2.StandardHttpContextMap
- StandardHttpContextMap 用于设置HTTP的请求和响应
-

3.RouteOnAttribute
- 该处理器使用属性表达式语言,根据流文件的属性去计算然后进行路由。
简单来话是对HandleHttpRequest路由进行分发
oa_zjxbxz ${http.request.uri:equals("/oa_zjxbxz")}
通过不同的路由进入到不同的处理组中对应的处理组件


4.ExecuteGroovyScript
负责写类java脚本groovy对数据进行接收转换并存储到中台。

2.数据库中表的设计
主表oa_zjxbxz表多加fid和dr字段,fid自增,dr用来存储最新数据状态(dr=0代表最新状态,dr=1代表旧数据),一般不涉及到删除数据,要保留痕迹!

子表oa_zjxbxz_h表保存主表的主键并且也多加一个自增字段sid和状态字段dr(同上)

3.groovy脚本接收和存储数据
- 新增数据接口
//资金下拨新增
import com.fasterxml.jackson.databind.ObjectMapper
import groovy.json.JsonSlurper
//数据转化为json格式
import groovy.sql.Sql
import com.yonyou.util.NiFiUtil
import java.util.ArrayList;
import java.util.List;
//判断流数据是否流入
def flowfile = session.get();
if (!flowfile) return;
//jsonObject变量用来获取流数据
def jsonObject = null;
//连接中台数据库
ObjectMapper mapper = new ObjectMapper();
def response = [:];
def conn = CTL.mydb.getConnection();
if (!conn) return;
Sql sql = new Sql(conn);
if (!sql) return;
//连接nc数据库
def conn1 = CTL.ncdb.getConnection();
if (!conn1) return;
Sql sql1 = new Sql(conn1);
if (!sql1) return;
//如果上面都没有问题返回数据库状态
response.put("success", "true");
response.put("code", "1000000000");
int success = 0;//成功存入数据库个数
int fail = 0;//报错个数
NiFiUtil nftUtil = new NiFiUtil();
//把键值对类型的resert包裹在数组里
def restlist = [];
try {
// 将字符串进行 json 反序列化操作 , 得到 map 集合{"age":18,"name":"Tom"} [age:18, name:Tom]
jsonObject = new groovy.json.JsonSlurper().parseText(flowfile.read().getText("UTF-8"));
for (int i = 0; i < 1; i++) {
log.info("=======报销单新增=========");
//resert用来保存错误数据 map
def resert = [:];
//获取流过来的json字段数据
String oaid = jsonObject.oaid?.toString() == null ? "" : jsonObject.oaid?.toString();
//对每一个字段进行判断
if ("".equals(oaid)) {
resert.put("index", i);
resert.put("errormessage","oaid不能为空");
restlist.add(resert);
fail++;
response.put("code","1000000002");
continue;
}
String ymonth = jsonObject.ymonth?.toString() == null ? "" : jsonObject.ymonth?.toString();
//对每一个字段进行判断
if ("".equals(ymonth)) {
resert.put("index", i);
resert.put("errormessage", "ymonth不能为空");
restlist.add(resert);
fail++;
response.put("code","1000000002");
continue;
}
String finance = jsonObject.finance?.toString() == null ? "" : jsonObject.finance?.toString();
String transactiontype = jsonObject.transactiontype?.toString() == null ? "" : jsonObject.transactiontype?.toString();
double homemoney = jsonObject.homemoney == null ? 0.00 : jsonObject.homemoney;
double totalmoney=jsonObject.totalmoney==null? 0.00 :jsonObject.totalmoney;
double actualmoney=jsonObject.actualmoney==null? 0.00:jsonObject.actualmoney;
double paymoney=jsonObject.paymoney==null? 0.00 :jsonObject.paymoney;
double repaymentmoney=jsonObject.repaymentmoney==null? 0.00 :jsonObject.repaymentmoney;
double borrowermoney=jsonObject.borrowermoney==null? 0.00 :jsonObject.borrowermoney;
String payingunit=jsonObject.payingunit?.toString()==null? "" :jsonObject.payingunit?.toString();
String corporatebank = jsonObject.corporatebank?.toString() == null ? "" : jsonObject.corporatebank?.toString();
String paymenttype = jsonObject.paymenttype?.toString() == null ? "" : jsonObject.paymenttype?.toString();
String expenseunit = jsonObject.expenseunit?.toString() == null ? "" : jsonObject.expenseunit?.toString();
String expensedept = jsonObject.expensedept?.toString() == null ? "" : jsonObject.expensedept?.toString();
String balanceitem = jsonObject.balanceitem?.toString() == null ? "" : jsonObject.balanceitem?.toString();
String supplier = jsonObject.supplier?.toString() == null ? "" : jsonObject.supplier?.toString();
String bx_expenseunit = jsonObject.bx_expenseunit?.toString() == null ? "" : jsonObject.bx_expenseunit?.toString();
String bx_expensedept = jsonObject.bx_expensedept?.toString() == null ? "" : jsonObject.bx_expensedept?.toString();
String expense = jsonObject.expense?.toString() == null ? "" : jsonObject.expense?.toString();
String receipttype = jsonObject.receipttype?.toString() == null ? "" : jsonObject.receipttype?.toString();
String payee = jsonObject.payee?.toString() == null ? "" : jsonObject.payee?.toString();
String personagebank = jsonObject.personagebank?.toString() == null ? "" : jsonObject.personagebank?.toString();
String supplierbank = jsonObject.supplierbank?.toString() == null ? "" : jsonObject.supplierbank?.toString();
log.info("数据判断正确,资金下拨新增正在准备存储到中台!!!");
//1.先查询数据库中是否已经有了这条数据
String select = "select * from oa_zjxb where oaid='$oaid' and ymonth='$ymonth' and dr =0";
//返回查找的数据是list格式
List rows = sql.rows(select);
if (rows.size() > 0) {//如果查询到已经有了这条数据,则修改已经存在的数据状态dr=1
int fid = (int) rows.get(0).fid;
String update_a = "update oa_zjxb set dr=1 where fid=$fid";
sql.executeUpdate(update_a);
String update_b = "update oa_zjxb_h set dr=1 where fid=$fid";
sql.executeUpdate(update_b);
}
//插入数据
String insert_a = "insert into oa_bxdxz (oaid,ymonth,finance,transactiontype,homemoney,totalmoney,actualmoney,paymoney,repaymentmoney,borrowermoney,payingunit,corporatebank,paymenttype,expenseunit,expensedept,balanceitem,supplier,bx_expenseunit,bx_expensedept,expense,receipttype,payee,personagebank,supplierbank) values ('$oaid','$ymonth','$finance','$transactiontype',$homemoney,$totalmoney,$actualmoney,$paymoney,$repaymentmoney,$borrowermoney,'$payingunit','$corporatebank','$paymenttype','$expenseunit','$expensedept','$balanceitem','$supplier','$bx_expenseunit','$bx_expensedept','$expense','$receipttype','$payee','$personagebank','$supplierbank')";
log.info(insert_a)
//查看插入了几条数据
int count = sql.executeUpdate(insert_a);
log.info("主表影响的行数:"+count);
if (count > 0) {//主表一条数据
success++;
log.info("主表数据插入成功");
}else {
fail++;
log.info("主表数据插入失败");
continue;
}
//对子表进行插入操作
int m = jsonObject.itmes.size();
log.info("子表有" + m+ "条数据");
for (int k =0; k < m; k++) {
//0.先获取子表中的数据
String receipitem = jsonObject.itmes[k].receipitem?.toString() == null ? "" : jsonObject.itmes[k].receipitem?.toString();
if ("".equals(receipitem)) {
resert.put("index": i);
resert.put("errormessage": "receipitem不能为空");
restlist.add(resert);
fail++;
response.put("code", "1000000002");
continue;
}
double exrate = jsonObject.itmes[k].exrate == null ? 0.00 : jsonObject.itmes[k].exrate;
String budgetaccount = jsonObject.itmes[k].budgetaccount?.toString() == null ? "" : jsonObject.itmes[k].budgetaccount?.toString();
String account = jsonObject.itmes[k].account?.toString() == null ? "" : jsonObject.itmes[k].account?.toString();
double amount = jsonObject.itmes[k].amount == null ? "" : jsonObject.itmes[k].amount;
String summary = jsonObject.itmes[k].summary?.toString() == null ? "" : jsonObject.itmes[k].summary?.toString();
double offsetamount = jsonObject.itmes[k].offsetamount == null ? "" : jsonObject.itmes[k].offsetamount;
String cxreceipitem = jsonObject.itmes[k].cxreceipitem?.toString() == null ? "" : jsonObject.itmes[k].cxreceipitem?.toString();
String borrowaccount = jsonObject.itmes[k].borrowaccount?.toString() == null ? "" : jsonObject.itmes[k].borrowaccount?.toString();
String borrower = jsonObject.itmes[k].borrower?.toString() == null ? "" : jsonObject.itmes[k].borrower?.toString();
String offsetmoney = jsonObject.itmes[k].offsetmoney?.toString() == null ? "" : jsonObject.itmes[k].offsetmoney?.toString();
String borrowdept = jsonObject.itmes[k].borrowdept?.toString() == null ? "" : jsonObject.itmes[k].borrowdept?.toString();
String lastdate = jsonObject.itmes[k].lastdate?.toString() == null ? "" : jsonObject.itmes[k].lastdate?.toString();
String effectivedate = jsonObject.itmes[k].effectivedate?.toString() == null ? "" : jsonObject.itmes[k].effectivedate?.toString();
double zb_repaymentmoney=jsonObject.itmes[k].repaymentmoney==null? 0.00 : jsonObject.itmes[k].repaymentmoney;
//1.查询主表数据是否已经存在
String selectFid = "select * from oa_zjxb where oaid='$oaid' and ymonth='$ymonth' and finance='$finance' and dr=0 ";
List rowsFid = sql.rows(selectFid);
if (rowsFid.size() <= 0) {
resert.put("index", i);
resert.put("errormessage", "资金下拨新增(oa_zjxb)主表数据异常,请联系管理员!");
restlist.add(resert);
fail++;
response.put("code", "1000000002")
continue;
}
int fid=(int)rowsFid.get(0).fid;
String insert_b = "insert into oa_zjxb_h (fid,receipitem,budgetaccount,account,amount,exrate,offsetamount,summary,cxreceipitem,borrowaccount,borrower,offsetmoney,zb_repaymentmoney,borrowdept,lastdate,effectivedate)" +
"values($fid,'$receipitem','$budgetaccount','$account',$amount,$exrate,$offsetamount,'$summary','$cxreceipitem','$borrowaccount','$borrower','$offsetmoney','$zb_repaymentmoney','$borrowdept','$lastdate','$effectivedate')"
int oa_bxdxz_h = sql.executeUpdate(insert_b);
if (oa_bxdxz_h >0) {
success++;
}
}
if (fail > 0) {
response.put("success", "false");
response.put("code", "1000000099");
response.put("message", "数据新增成功:" + success + "条! 失败" + fail + "条!");
response.put("data", restlist);
response.put("errorStack", null);
flowfile.write("UTF-8", mapper.writeValueAsString(response))
flowfile.putAttribute("lognum", fail.toString());
REL_FAILURE << flowfile
return
}
//将没问题的数据写入到flowfile
flowfile.write("UTF-8",mapper.writeValueAsString(jsonObject))
log.info("===========数据执行完毕============")
REL_SUCCESS << flowfile
return
}
} catch (Exception e) {
response.put("success",false);
response.put("code", "1000000003");
response.put("message", "资金下拨-新增异常");
response.put("data", null);
response.put("errorStack", e.getMessage());
flowfile.write("utf-8", mapper.writeValueAsString(response));
REL_FAILURE << flowfile;
return;
} finally {
sql.close();
conn.close();
conn1.close();
sql1.close();
}
使用postman去调用新增接口(http://192.168.1.100:9998/qq)

2.删除数据接口
//资金下拨-删除
import com.fasterxml.jackson.databind.ObjectMapper
import groovy.json.JsonSlurper
//数据转化为json格式
import groovy.sql.Sql
import com.yonyou.util.NiFiUtil
import java.util.ArrayList;
import java.util.List;
//判断流数据是否流入
def flowfile = session.get();
if (!flowfile) return;
//jsonObject变量用来获取流数据
def jsonObject = null;
//连接中台数据库
ObjectMapper mapper = new ObjectMapper();
def response = [:];
def conn = CTL.mydb.getConnection();
if (!conn) return;
Sql sql = new Sql(conn);
if (!sql) return;
//连接nc数据库
def conn1 = CTL.ncdb.getConnection();
if (!conn1) return;
Sql sql1 = new Sql(conn1);
if (!sql1) return;
//如果上面都没有问题返回数据库状态
response.put("success", "true");
response.put("code", "1000000000");
int success = 0;//成功存入数据库个数
int fail = 0;//报错个数
NiFiUtil nftUtil = new NiFiUtil();
//把键值对类型的resert包裹在数组里
def restlist = [];
//用来输出成功报文信息进行判断
int out_json=0;
try {
// 将字符串进行 json 反序列化操作 , 得到 map 集合{"age":18,"name":"Tom"} [age:18, name:Tom]
jsonObject = new groovy.json.JsonSlurper().parseText(flowfile.read().getText("UTF-8"));
for (int i = 0; i < 1; i++) {
//resert用来保存错误数据 map
def resert = [:];
//获取流过来的json字段数据
String oaid = jsonObject.oaid?.toString() == null ? "" : jsonObject.oaid?.toString();
//对每一个字段进行判断
if ("".equals(oaid)) {
resert.put("index", i);
resert.put("errormessage", "oaid不能为空");
restlist.add(resert);
fail++;
response.put("code", "1000000002");
continue;
}
log.info("数据判断正确,正在删除报销单!!!");
//1.先查询数据库中是否已经有了这条数据
String select = "select * from oa_bxdxz where oaid='$oaid' and dr =0";
//返回查找的数据是list格式
List rows = sql.rows(select);
if (rows.size() > 0) {//如果查询到已经有了这条数据,则修改已经存在的数据状态dr=1
log.info("本条数据已经存在oa_bxdxz中,正在修改主子表状态dr=1")
int fid = (int) rows.get(0).fid;
String update_a = "update oa_zjxb set dr=1 where fid=$fid";
sql.executeUpdate(update_a);
String update_b = "update oa_zjxb_h set dr=1 where fid=$fid";
sql.executeUpdate(update_b);
success++;
}
if (fail > 0) {
response.put("success", "false");
response.put("code", "1000000099");
response.put("message", "数据新增成功:" + success + "条! 失败" + fail + "条!");
response.put("data", restlist);
response.put("errorStack", null);
flowfile.write("UTF-8", mapper.writeValueAsString(response))
flowfile.putAttribute("lognum", fail.toString());
REL_FAILURE << flowfile
return
}
//将没问题的数据写入到flowfile
log.info("===========数据执行完毕============")
//正确报文日志
response.put("success", "true");
response.put("data",null)
response.put("code", "1000000099");
response.put("message","报销业务删除成功");
response.put("errorStack",null)
flowfile.write("UTF-8", mapper.writeValueAsString(response));
REL_SUCCESS << flowfile;
return
}
} catch (Exception e) {
response.put("success", false);
response.put("code", "1000000003");
response.put("message", "资金下拨-删除异常");
response.put("data", null);
response.put("errorStack", e.getMessage());
flowfile.write("utf-8", mapper.writeValueAsString(response));
REL_FAILURE << flowfile;
return;
} finally {
sql.close();
conn.close();
conn1.close();
sql1.close();
}
使用postman去调用新增接口(http://192.168.1.100:9998/hh)
3.修改数据接口
//资金下拨修改数据存储
import com.fasterxml.jackson.databind.ObjectMapper
import groovy.json.JsonSlurper
import groovy.sql.Sql
import com.yonyou.util.NiFiUtil
import java.util.ArrayList;
import java.util.List;
//第一步:接收json数据,连接中台数据库和nc数据库
//01.接收json数据
def flowfile = session.get();
if (!flowfile) return;
//02.连接数据库
def jsonObject = null;//存储json数据
//中台数据库
ObjectMapper mapper = new ObjectMapper();
def conn = CTL.mydb.getConnection();
Sql sql = new Sql(conn);
if (!sql) return;
//nc数据库
def conn1 = CTL.ncdb.getConnection();
Sql sql1 = new Sql(conn1);
if (!sql1) return;
//打印连接成功日志
def response = [:];
response.put("success", "true");
response.put("status", "数据库连接成功");
//保存错误日志信息
def resert = [:];
def restlist = [];
//记录错误,成功个数
def success = 0;
def fail = 0;
//自定义jar包用来判断数据合法性
NiFiUtil nftUtil = new NiFiUtil();
//第二步:获取json中主表和子表字段并分别进行新增
try {
//获取json
jsonObject = new groovy.json.JsonSlurper().parseText(flowfile.read().getText("UTF-8"));
for (def i = 0; i < 1; i++) {
//获取主表字段数据
String oaid = jsonObject.oaid?.toString() == null ? "" : jsonObject.oaid?.toString();
//对value进行判断
if ("".equals(oaid)) {
resert.put("index", i);
resert.put("errormessage", "oaid不能为空");
restlist.add(resert);
fail++;
resert.put("code", "1000000002");
continue;
}
String ymonth = jsonObject.ymonth?.toString() == null ? "" : jsonObject.ymonth?.toString();
String finance = jsonObject.finance?.toString() == null ? "" : jsonObject.finance?.toString();
String pk_group = jsonObject.pk_group?.toString() == null ? "" : jsonObject.pk_group?.toString();
String bill_type = jsonObject.bill_type?.toString() == null ? "" : jsonObject.bill_type?.toString();
String trade_type = jsonObject.trade_type?.toString() == null ? "" : jsonObject.trade_type?.toString();
String primal_money = jsonObject.primal_money?.toString() == null ? "" : jsonObject.primal_money?.toString();
String contractcode = jsonObject.contractcode?.toString() == null ? "" : jsonObject.contractcode?.toString();
String itemcode = jsonObject.itemcode?.toString() == null ? "" : jsonObject.itemcode?.toString();
double contractmoney = jsonObject.contractmoney == null ? 0.00 : jsonObject.contractmoney;
//1.先查看表中是否已经存在数据
String select_zb = "select * from oa_zjxb where oaid='$oaid' and dr=0 ";
List rows_zb = sql.rows(select_zb);
//如果数据库已经存在就更改数据的中台dr并把新数据进行添加
if (rows_zb.size() > 0) {
def fid_old = (int) rows_zb.get(0).fid;
String update_zb = "update oa_zjxb set dr=1 where fid=$fid_old";
sql.executeUpdate(update_zb);
String update_zib = "update oa_zjxb_h set dr=1 where fid=$fid_old";
sql.executeUpdate(update_zib);
}
//新增数据
String insert_zb = "insert into oa_zjxb (oaid,ymonth,finance,pk_group,bill_type,trade_type,primal_money,contractcode,itemcode,contractmoney) values ('$oaid','$ymonth','$finance','$pk_group','$bill_type','$trade_type','$primal_money','$contractcode','$itemcode',$contractmoney)"
def count = sql.executeUpdate(insert_zb);
log.info("主表影响的行数:" + count);
if (count > 0) {
success++;
log.info("主表数据插入成功");
} else {
fail++;
log.info("主表数据插入失败");
continue;
}
//对子表数据进行新增数据
//先看子表中有多少条数据
def count_number_zib = jsonObject.itmes.size();
for (def k = 0; k < count_number_zib; k++) {
String pk_org = jsonObject.itmes[k].pk_org?.toString() == null ? "" : jsonObject.itmes[k].pk_org?.toString();
//对子表中的value进行判断
if ("".equals(pk_org)) {
resert.put("index", i);
resert.put("errormessage", "pk_org不能为空");
restlist.add(resert);
fail++;
resert.put("code", "1000000002");
continue;
}
String pk_group_h = jsonObject.itmes[k].pk_group?.toString() == null ? "" : jsonObject.itmes[k].pk_group?.toString();
String bill_type_h = jsonObject.itmes[k].bill_type?.toString() == null ? "" : jsonObject.itmes[k].bill_type?.toString();
String trade_type_h = jsonObject.itmes[k].trade_type?.toString() == null ? "" : jsonObject.itmes[k].trade_type?.toString();
String pay_primal = jsonObject.itmes[k].bill_type?.toString() == null ? "" : jsonObject.itmes[k].pay_primal?.toString();
double direction = jsonObject.itmes[k].direction == null ? 0.00 : jsonObject.itmes[k].direction;
String objecttype = jsonObject.itmes[k].objecttype?.toString() == null ? "" : jsonObject.itmes[k].objecttype?.toString();
String pk_oppaccount = jsonObject.itmes[k].pk_oppaccount?.toString() == null ? "" : jsonObject.itmes[k].pk_oppaccount?.toString();
String pk_supplier = jsonObject.itmes[k].pk_supplier?.toString() == null ? "" : jsonObject.itmes[k].pk_supplier?.toString();
String pk_account = jsonObject.itmes[k].pk_account?.toString() == null ? "" : jsonObject.itmes[k].pk_account?.toString();
//查看主表是否已经添加成功
String selectFid = "select * from oa_zjxb where oaid='$oaid'";
List rowFid = sql.rows(selectFid);
if (rowFid.size() < 0) {
log.info("主表数据新增失败");
resert.put("index", i);
resert.put("errormessage", "主表数据新增异常,请联系管理员!");
restlist.add(resert);
fail++;
response.put("code", "1000000002");
continue;
}
def fid_new = rowFid.get(0).fid;
def insert_zib = "insert into oa_zjxb_h (fid,pk_org,pk_group_h,bill_type_h,trade_type_h,pay_primal,direction,objecttype,pk_oppaccount,pk_supplier,pk_account) values ($fid_new,'$pk_org','$pk_group_h','$bill_type_h','$trade_type_h','$pay_primal','$direction','$objecttype','$pk_oppaccount','$pk_supplier','$pk_account')"
log.info(insert_zib);
int count_zib = sql.executeUpdate(insert_zib);
log.info("子表影响的行数:" + count_zib);
if (count_zib > 0) {
success++;
}
if (fail > 0) {
response.put("success", "false");
response.put("code", "1000000099");
response.put("message", "数据新增成功:" + success + "条! 失败" + fail + "条!");
response.put("data", restlist);
response.put("errorStack", null);
flowfile.write("UTF-8", mapper.writeValueAsString(response))
flowfile.putAttribute("lognum", fail.toString());
REL_FAILURE << flowfile
return
}
//将没问题的数据写入到flowfile
response.put("success", "true");
response.put("data", null)
response.put("code", "1000000099");
response.put("message", "资金下拨修改成功");
response.put("errorStack", null)
flowfile.write("UTF-8", mapper.writeValueAsString(response));
log.info("===========数据执行完毕============")
REL_SUCCESS << flowfile;
return
}
}
} catch (Exception e) {
response.put("success",false);
response.put("code", "1000000000");
response.put("message", "资金下拨-报销单修改异常");
response.put("data", null);
response.put("errorStack", e.getMessage());
flowfile.write("utf-8", mapper.writeValueAsString(response));
REL_FAILURE << flowfile;
return;
}finally{//6.关闭数据流
sql.close();
conn.close();
conn1.close();
sql1.close();
}
4.nifi接口连接和数据存储整体结构


版权声明:本文为qq_42209718原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。