Error:(52, 65) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.icsoc.report.model.Message[Object]] val inputStream: DataStream[Message[Object]] = env.addSource(flinkKafkaConsumer) 这个问题主要是在程序里需要一个隐式参数,我们可以看大上面的addSource在Flink的源码实现如下
/**
* Create a DataStream using a user defined source function for arbitrary
* source functionality. By default sources have a parallelism of 1.
* To enable parallel execution, the user defined source should implement
* ParallelSourceFunction or extend RichParallelSourceFunction.
* In these cases the resulting source will have the parallelism of the environment.
* To change this afterwards call DataStreamSource.setParallelism(int)
*
*/
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
require(function != null, "Function must not be null.")val cleanFun = scalaClean(function)
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo))
}
在addSource定义中有一个[T: TypeInformation],但是我们的程序中并没有指定任何有关隐式参数的定义,这时候无法创建TypeInformantion,所以出现上线的错误信息. 解决方法一:我们可以直接在代码中加入下列代码: implicit val typeInfo = TypeInformation.of(new TypeHint[Message[Object]] {})
文章图片
接着我们运行程序发现,这个错误又出现了只是这次出现的是FlatMap上,如下图:
Error:(59, 17) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.icsoc.report.proxy.EventMap]
.flatMap((message: Message[Object], collector) => {
经过发现,该问题与上面的问题一样,所以处理可以按照上面的方式一样,在程序中国加入代码如下:
implicit val typeInfo1 = TypeInformation.of(classOf[EventMap])
val flinkKafkaConsumer = new FlinkKafkaConsumer010[Message[Object]](
parameterTool.getRequired("input-topic"),
new MessageSchema,
parameterTool.getProperties
).assignTimestampsAndWatermarks(new CustomWatermarkExtractor)
其中最重要的是MessageSchema,其实现如下:
/*******************************************************************************
* 版权信息:北京中通天鸿武汉分公司
* @author xuchang
* Copyright: Copyright (c) 2007北京中通天鸿武汉分公司,Inc.All Rights Reserved.
* Description:
******************************************************************************/
public class MessageSchema implements DeserializationSchema>, SerializationSchema {
@Override
public Message