需求:将log文件中的ip地址转换为主机名
日志文件的格式如下:
10.100.122.132 - [17/Jun/2013:22:53:58] "GET /bgs/greenbg.gif HTTP 1.1" 200 50 10.100.122.133 - [17/Jun/2013:22:53:58] "GET /bgs/redbg.gif HTTP 1.1" 200 50 |
PC-20161220MYVT- [17/Jun/2013:22:53:58] "GET /bgs/greenbg.gif HTTP 1.1" 200 50 sog- [17/Jun/2013:22:53:58] "GET /bgs/redbg.gif HTTP 1.1" 200 50 |
1.解决方案1:顺序处理
public class MainThread {
public static void main(String[] args) {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(new FileInputStream("a.txt"), "UTF-8"));
BufferedWriter bw = new BufferedWriter(new FileWriter("a_01.txt",true))) {
for (String entry = in.readLine();
entry != null;
entry = in.readLine()) {
int index = entry.indexOf(' ');
String address = entry.substring(0, index);
String theRest = entry.substring(index);
String hostname = InetAddress.getByName(address).getHostName();
bw.append(hostname + " " + theRest);
bw.newLine();
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
存在的问题:程序耗费了大量时间等待DNS返回请求,在此期间什么也不做
2.解决方案2:使用线程池 由一个主线程读取日志文件,使用线程池将各个日志项(每行)传递给其他线程进行处理。
通过这种方式,由于DNS转换耗时,那么可以在堵塞的时候进行其他线程的执行(如果DNS转换不耗时,那么就没有什么必要使用多线程),注意主线程中仍是顺序执行的,future是按照读取的顺序逐次返回。
2.1DNSResolverTask
public class DNSResolverTask implements Callable {private String line;
public DNSResolverTask(String line) {
this.line = line;
}
@Override
public String call() {
try {
// separate out the IP address
int index = line.indexOf(' ');
String address = line.substring(0, index);
String theRest = line.substring(index);
//很多访问者访问网站时会请求多个页面。
//DNS查找成本很高,如果每个网站每次出现在日志文件中时都要查找,这样做并不合适。
//InetAddress类会缓存请求过的地址。如果再次请求相同的地址,它可以从缓存中获取,这比从DNS获取要快得多。
String hostname = InetAddress.getByName(address).getHostName();
return hostname + " " + theRest;
} catch (Exception ex) {
return line;
}
}
}
2.2MainThread
public class MainThread {private final static int NUM_THREADS = 4;
public static void main(String[] args) throws IOException {
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
Queue results = new LinkedList();
try (BufferedReader in = new BufferedReader(
new InputStreamReader(new FileInputStream("a.txt"), "UTF-8"))) {
//主线程读取文件项的速度要比各个线程解析域名并结束的速度快得多//会读取文件,并为每一行创建一个LookupTask.
//通过for循环保证顺序
for (String entry = in.readLine();
entry != null;
entry = in.readLine()) {
DNSResolverTask task = new DNSResolverTask(entry);
//如果通过DNS转换不堵塞,那么使用多线程就没有什么必要
//由于DNS转换耗时,那么可以在堵塞的时候进行其他线程的执行,快是快在这个地方
Future future = executor.submit(task);
LogEntry result = new LogEntry(entry, future);
//想法1:我直接向文件写 怎么样?
//直接向文件写,可选 速度应该也不会太慢results.add(result);
}
}BufferedWriter bw = new BufferedWriter(new FileWriter("a_02.txt", true));
for (LogEntry result : results) {
try {
bw.append(result.future.get());
} catch (InterruptedException e) {
bw.append(result.original);
} catch (ExecutionException e) {
bw.append(result.original);
}
bw.newLine();
//不要忘记flush
bw.flush();
}
executor.shutdown();
}private static class LogEntry {
//最初的一行log记录
String original;
Future future;
LogEntry(String original, Future future) {
this.original = original;
this.future = future;
}
}
}
存在的问题:日志文件可能很庞大,所以使用LinkedList会导致这个程序占用大量内存
3.解决方案3:使用生产者消费者队列 为避免这一点,可以把输出放在一个单独的线程中,它与输入线程共享同一个队列。由于解析输入的同时可以处理之前的日志文件项,所以队列不会膨胀得过大。但是这又会带来另一个问题。你需要一个单独的信号指示输出已经完成,因为空队列已经不足以证明任务已经完成。最容易的方法是统计输入行数,确保它与输出行数一致。
3.1DNSResolveTask
public class DNSResolveTask implements Callable {Logger logger = LoggerFactory.getLogger(DNSResolveTask.class);
private String line;
public DNSResolveTask(String line) {
this.line = line;
}
@Override
public String call() {
try {
// separate out the IP address
int index = line.indexOf(' ');
String address = line.substring(0, index);
String theRest = line.substring(index);
//很多访问者访问网站时会请求多个页面。
//DNS查找成本很高,如果每个网站每次出现在日志文件中时都要查找,这样做并不合适。
//InetAddress类会缓存请求过的地址。如果再次请求相同的地址,它可以从缓存中获取,这比从DNS获取要快得多。
String hostname = InetAddress.getByName(address).getHostName();
//logger.info("return a line to queue");
return hostname + " " + theRest;
} catch (Exception ex) {
return line;
}
}
}
3.2WriteTask
public class WriterTask implements Runnable {Logger logger = LoggerFactory.getLogger(WriterTask.class);
private int lineCount;
private LinkedBlockingQueue queue;
public WriterTask(LinkedBlockingQueue queue, int lineCount) {
this.queue = queue;
this.lineCount = lineCount;
}@Override
public void run() {
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter("a_03.txt", true));
while (!Thread.interrupted() && lineCount != 0) {
if(!queue.isEmpty()) {
MainThread.LogEntry remove = queue.remove();
try {
logger.info("write a line");
bw.append(remove.future.get());
} catch (InterruptedException e) {
bw.append(remove.original);
} catch (ExecutionException e) {
bw.append(remove.original);
}
bw.newLine();
bw.flush();
lineCount--;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.3MainThread
public class MainThread {static Logger logger = LoggerFactory.getLogger(MainThread.class);
private final static int NUM_THREADS = 4;
public static void main(String[] args) throws IOException {final String fileName = "a.txt";
//计算txt文件行数
int lineCount = getLineCount(fileName);
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
LinkedBlockingQueue results = new LinkedBlockingQueue<>();
executor.execute(new WriterTask(results, lineCount));
BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(fileName), "UTF-8"));
for (String entry = in.readLine();
entry != null;
entry = in.readLine()) {
DNSResolveTask task = new DNSResolveTask(entry);
//如果通过DNS转换不堵塞,那么使用多线程就没有什么必要
//由于DNS转换耗时,那么可以在堵塞的时候进行其他线程的执行,快是快在这个地方
Future future = executor.submit(task);
LogEntry result = new LogEntry(entry, future);
//想法1:我直接向文件写 怎么样?
//直接向文件写,可选 速度应该也不会太慢//想法2:放到list中 作为一个生产者队列
logger.info("add a line to queue");
results.add(result);
}executor.shutdown();
}private static int getLineCount(String fileName) throws IOException {
BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(fileName), "UTF-8"));
String line;
int lineCount = 0;
while((line = in.readLine())!=null){
lineCount++;
}
return lineCount;
}static class LogEntry {
//最初的一行log记录
String original;
Future future;
LogEntry(String original, Future future) {
this.original = original;
this.future = future;
}
}
}
3.4执行结果
文章图片
推荐阅读
- 计算机网络|计算机网络——DHCP协议详解
- Linux|Linux--网络基础
- 网络|一文彻底搞懂前端监控
- 网络夺命连环问系列|网络夺命连环问5--HTTP怎么传输大文件()
- 网络|网络编程释疑(TCP连接拔掉网线后会发生什么)
- 网络|简单聊聊压缩网络
- Java|图解四大IO模型与原理
- 卷积|吃透空洞卷积(Dilated Convolutions)
- 计算机网络|网桥与交换机
- Karpenter : 新一代 Kubernetes auto scaling 工具