nifi探索之ExecuteScript处理器

Abstract

本文的主要内容是介绍ExecuteScript这个功能强大的处理器,用户可以根据自己的需求提供脚本代码,然后ExecuteScript就可以根据脚本来处理flowfile。ExectuteScript支持多种语言Groovy/JPython等等,本文的代码采用Groovy。

常用变量

  • session变量:类型:ProcessSession,能够完成对flowfile的一些操作,例如create,putAttribute,transfer,read,write
  • context变量:类型:ProcessContext,能够获取处理器的一些properties,relationship,controllerService和StateManager等内容
  • log变量:用于写日志
  • REL_SUCCESS: Success Relationship的引用
  • REL_FAILURE: Failure Relationship的引用

常用的方法

  • session.get():获取输入的flowfile,从队列中获取优先级最高的flowfile或者返回null
flowfile=session.get()
if(!flowfile)return 
//对flowfile进行处理
  • session.get(maxNum):获取包含0到maxNum条flowfile组成的list,如果输入的flowfile有多重来源,那么获取的flowfile list中的flowfile可能来源于同一个flowfile queue也可能是多个 flowfile queue
flowfileList=session.get(100)
if(!flowfileList.isEmpty()){
    flowfileList.each{flowfile->//对每个flowfile的处理方法
}
  • session.create():获取一个新的flowfile对象
flowfile=session.create()
  • session.create(parentFlowfile):根据现有的parentFlowfile创建一个新的flowfile,新创建的flowfile将会继承parentFlowfile的属性以及其他内容(除了UUID)
flowfile=session.get()
if(!flowfile)return
newFlowfile=session.create(flowfile)
  • session.putAttribute():给flowfile添加或者更新属性,其中UUID这个属性是不可以更改的,如果试图更改,则忽略
    注意:flowfile是不可更改的,因此每次putAttribute时,将会创建一个新的flowfile,将旧的flowfile舍弃
flowfile=session.get()
if(!flowfile)return
flowfile=session.putAttribute(flowfile,'myAttr','myValue')
  • session.putAllAttribute(attrMap):一次性添加或者更改多个属性
attrMap=['myAttr1':'1','myAttr2':Integer.toString(2)]
flowfile=session.get()
if(!flowfile)return 
flowfile=session.putAllAttribute(flowfile,attrMap)
  • session.getAttribute(‘attrName’):返回string value或者null(如果该attrname的属性不存在)
flowfile=session.get()
if(!flowfile)return
myAttr=flowfile.getAttribute('filename')
  • session.getAllAttributes():获取flowfile所有的属性,返回的是一个map
flowfile=session.get()
if(!flowfile)return
flowfile.getAllAttributes().each{
    key,value->//处理操作
}

-session.transfer(flowfile,relationship):将处理之后的flowfile transfer到指定的relationship

flowfile=session.get()
if(!flowfile)return
if(errorOccured)
    session.transfer(flowfile,REL_FAILURE)
else
    session.transfer(flowfile,REL_SUCCESS)
  • log.info(…):日志操作,分为多个等级 warn/trace/debug/info/error
//无参数
log.info('字符串')
//有参数
log.info('字符串:{}{}{}',[参数1,参数2,参数3]as Object[])

原文来源自: https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html