flink java 代码引入 scala tuple2包导致失败

代码混用

在java代码编写代码时候不知不觉就引入了,引入了scala包,

  <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
           <version>${flink.version}</version>
       </dependency>
       
package cn.putact.datastream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
**import scala.Tuple2;**

public class NcWordCount {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitters())
                .keyBy(value -> value._1)
                .timeWindow(Time.seconds(5))
                .sum(1);
        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitters implements FlatMapFunction<String,Tuple2<String,Integer>>{
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {

            for (String word:s.split(" ")){
                collector.collect(new Tuple2<String ,Integer>(word,1));
            }
        }
    }
}


Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by position on GenericType<scala.Tuple2>Referencing a field by position is supported on tuples, case classes, and arrays. Additionally, you can select the 0th field of a primitive/basic type (e.g. int).
	at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:87)
	at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:43)
	at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:1352)
	at cn.putact.datastream.NcWordCount.main(NcWordCount.java:21)

调试了半天最后发现java代码混合了scala报错,idea编译是通过的,最好写纯java代码,mavn依赖去掉scala 包,代码中用java tuple元祖就好

//import scala.Tuple2;
import org.apache.flink.api.java.tuple.Tuple2;

版权声明:本文为pop_xiaohao原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。