CheckPoint-Java开发
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License");
you may not use this file except in compliance with
* the License.You may obtain a copy of the License at
*
*http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/package org.apache.spark.examples.streaming;
import java.io.File;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import scala.Tuple2;
import com.google.common.io.Files;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.LongAccumulator;
/**
* Use this singleton to get or register a Broadcast variable.
*/
class JavaWordExcludeList {private static volatile Broadcast> instance = null;
public static Broadcast> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordExcludeList.class) {
if (instance == null) {
List> wordExcludeList = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordExcludeList);
}
}
}
return instance;
}
}/**
* Use this singleton to get or register an Accumulator.
*/
class JavaDroppedWordsCounter {private static volatile LongAccumulator instance = null;
public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.sc().longAccumulator("DroppedWordsCounter");
}
}
}
return instance;
}
}/**
* Counts words in text encoded with UTF8 received from the network every second. This example also
* shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
* they can be registered on driver failures.
*
* Usage: JavaRecoverableNetworkWordCount
*and describe the TCP server that Spark Streaming would connect to receive
*data. directory to HDFS-compatible file system which checkpoint data
* file to which the word counts will be appended
*
* and must be absolute paths
*
* To run this on your local machine, you need to first run a Netcat server
*
*`$ nc -lk 9999`
*
* and run the example as
*
*`$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \
*localhost 9999 ~/checkpoint/ ~/out`
*
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
* the checkpoint data.
*
* Refer to the online documentation for more details.
*/
public final class JavaRecoverableNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private static JavaStreamingContext createContext(String ip,
int port,
String checkpointDirectory,
String outputPath) {// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
System.out.println("Creating new context");
File outputFile = new File(outputPath);
if (outputFile.exists()) {
outputFile.delete();
}
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint(checkpointDirectory);
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
JavaReceiverInputDStream> lines = ssc.socketTextStream(ip, port);
JavaDStream> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.foreachRDD((rdd, time) -> {
// Get or register the excludeList Broadcast
Broadcast> excludeList =
JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
LongAccumulator droppedWordsCounter =
JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use excludeList to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(wordCount -> {
if (excludeList.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
System.out.println(output);
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
System.out.println("Appending to " + outputFile.getAbsolutePath());
Files.append(output + "\n", outputFile, Charset.defaultCharset());
});
return ssc;
}public static void main(String[] args) throws Exception {
if (args.length != 4) {
System.err.println("You arguments were " + Arrays.asList(args));
System.err.println(
"Usage: JavaRecoverableNetworkWordCount \n" +
".and 【CheckPoint-Java开发】 describe the TCP server that Spark\n" +
"Streaming would connect to receive data. directory to\n" +
"HDFS-compatible file system which checkpoint data file to which\n" +
"the word counts will be appended\n" +
"\n" +
"In local mode, should be 'local[n]' with n > 1\n" +
"Both and must be absolute paths");
System.exit(1);
}String ip = args[0];
int port = Integer.parseInt(args[1]);
String checkpointDirectory = args[2];
String outputPath = args[3];
// Function to create JavaStreamingContext without any output operations
// (used to detect the new context)
Function0 createContextFunc =
() -> createContext(ip, port, checkpointDirectory, outputPath);
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
ssc.start();
ssc.awaitTermination();
}
}