Flink|Flink 框架下scala与java混合编程问题

最近在应用Flink做相关业务设计,使用scala与java的混合编程,遇到一些问题,在这里做个记录.
问题1:
【Flink|Flink 框架下scala与java混合编程问题】Flink|Flink 框架下scala与java混合编程问题
文章图片

Error:(85, 23) value foreach is not a member of java.util.ArrayList[com.icsoc.report.model.Message[_]] for (msg <- messages) {

这是由于在scala中没有ArrayList这个类,所以在scala中需要引用java的ArrayList对象的的话,需要在scala代码中引入一个隐式转换.
import scala.collection.JavaConversions._
这样就可以在scala中直接是用java.lang.ArrayList了.
问题2:
Flink|Flink 框架下scala与java混合编程问题
文章图片

Error:(52, 65) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.icsoc.report.model.Message[Object]]
val inputStream: DataStream[Message[Object]] = env.addSource(flinkKafkaConsumer)
这个问题主要是在程序里需要一个隐式参数,我们可以看大上面的addSource在Flink的源码实现如下
/** * Create a DataStream using a user defined source function for arbitrary * source functionality. By default sources have a parallelism of 1. * To enable parallel execution, the user defined source should implement * ParallelSourceFunction or extend RichParallelSourceFunction. * In these cases the resulting source will have the parallelism of the environment. * To change this afterwards call DataStreamSource.setParallelism(int) * */ def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = { require(function != null, "Function must not be null.")val cleanFun = scalaClean(function) val typeInfo = implicitly[TypeInformation[T]] asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo)) }

在addSource定义中有一个[T: TypeInformation],但是我们的程序中并没有指定任何有关隐式参数的定义,这时候无法创建TypeInformantion,所以出现上线的错误信息.
解决方法一:我们可以直接在代码中加入下列代码:
implicit val typeInfo = TypeInformation.of(new TypeHint[Message[Object]] {})
Flink|Flink 框架下scala与java混合编程问题
文章图片

接着我们运行程序发现,这个错误又出现了只是这次出现的是FlatMap上,如下图:
Error:(59, 17) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.icsoc.report.proxy.EventMap] .flatMap((message: Message[Object], collector) => {

经过发现,该问题与上面的问题一样,所以处理可以按照上面的方式一样,在程序中国加入代码如下:
implicit val typeInfo1 = TypeInformation.of(classOf[EventMap])

这个方法能够决问题,但是太繁琐了,如果有像许隐式类型需要转换,就需要在代码中加入很多这种变量.
方法二:
我们只需要在代码中引入
import org.apache.flink.streaming.api.scala._

如果数据是有限的(静态数据集),我们可以引入以下包:
import org.apache.flink.api.scala._

只需要引入这个一个包,在不需要加任何代码,无论有多少该类型的隐式转换,都能够处理.
问题三:在Flink中从kafka中消费数据使用反序列化,将数据转成我们实际时常用的数据类型,代码如下:
val flinkKafkaConsumer = new FlinkKafkaConsumer010[Message[Object]]( parameterTool.getRequired("input-topic"), new MessageSchema, parameterTool.getProperties ).assignTimestampsAndWatermarks(new CustomWatermarkExtractor)

其中最重要的是MessageSchema,其实现如下:
/******************************************************************************* * 版权信息:北京中通天鸿武汉分公司 * @author xuchang * Copyright: Copyright (c) 2007北京中通天鸿武汉分公司,Inc.All Rights Reserved. * Description: ******************************************************************************/ public class MessageSchema implements DeserializationSchema>, SerializationSchema { @Override public Message deserialize(byte[] bytes) { return Message.fromString(new String(bytes)); }@Override public boolean isEndOfStream(Message message) { return false; }@Override public byte[] serialize(Message message) { return message.toString().getBytes(); }@Override public TypeInformation> getProducedType() { return TypeInformation.of(new TypeHint>() { }); } }
这个MessageSchema在Java编写的Flink程序中国是没有问题的,但是在scala编写的Flink程序中一直报错,如下:
Error:(41, 30) overloaded method constructor FlinkKafkaConsumer010 with alternatives: (x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] (x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] (x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] (x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] (x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[com.icsoc.report.model.Message[_]],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[com.icsoc.report.model.Message[_]] cannot be applied to (String, com.icsoc.report.flink.MessageSchema, java.util.Properties) val flinkKafkaConsumer = new FlinkKafkaConsumer010[Message[_]](

查看Flink的TypeInformation官方文档,在某些API中,手动创建一个TypeInformation类可能是必须的,因为Java泛型的类型擦除特性会使得Flink无法推断数据类型.所以需要自己手动创建一个TypeInformation,修改后的代码如下:
/******************************************************************************* * 版权信息:北京中通天鸿武汉分公司 * @author xuchang * Copyright: Copyright (c) 2007北京中通天鸿武汉分公司,Inc.All Rights Reserved. * Description: ******************************************************************************/ public class MessageSchema implements DeserializationSchema>, SerializationSchema { @Override public Message deserialize(byte[] bytes) { return Message.fromString(new String(bytes)); }@Override public boolean isEndOfStream(Message message) { return false; }@Override public byte[] serialize(Message message) { return message.toString().getBytes(); }@Override public TypeInformation> getProducedType() { return TypeInformation.of(new TypeHint>() { }); } }
如果关于Flink有其他问题,可以在Flink官方文档中找到答案:
http://flink.iteblog.com/dev/types_serialization.html

    推荐阅读