离线日志分析系统(三)

前一阶段把集群搭建好,编写好FlumeNG采集数据到HDFS上以后,开始使用MapReduce对数据进行初步处理,处理分三个阶段

1.过滤掉无用的数据,像访问的静态资源、访问状态码非200的等。

2.基于第一步的结果进行日志增强,给每条记录添加SessionId,按访问时间排序后加上递增标号

3.初步统计访问起始时间、访问结束时间、进入页面、离开页面、一共访问了多少页面等数据

代码的整体结构

snapshot.png

第一步过滤规整

1.实现第一步的代码WebLogPreProcess.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

package com.zonegood.hive.mr.pre;

import com.zonegood.hive.mrbean.WebLog;
import com.zonegood.hive.util.ParseUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

public class WebLogPreProcess {

static class WebLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

Set<String> filter = new HashSet<String>();
Text k = new Text();
NullWritable v = NullWritable.get();

// 过滤静态资源
@Override
protected void setup(Context context) throws IOException, InterruptedException {
filter.add("/about");
filter.add("/black-ip-list/");
...
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
WebLog bean = ParseUtil.parse(line);
// 过滤掉静态资源
ParseUtil.filter(bean,filter);
if(bean.getInvalid()){
k.set(bean.toString());
context.write(k,v);
}
}
}
}

下面看一下ParseUtil类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.zonegood.hive.util;

import com.zonegood.hive.mrbean.WebLog;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;

public class ParseUtil {

public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);


public static WebLog parse(String line){
WebLog webLogBean = new WebLog();
String[] arr = line.split(" ");
if(arr.length > 11){
webLogBean.setIp(arr[0]);
webLogBean.setU_info(arr[1]);
String time_local = formatDate(arr[3].substring(1));
if(null==time_local) time_local="-invalid_time-";
webLogBean.setTime(time_local);
webLogBean.setRequest_method(arr[5]);
webLogBean.setRequest_url(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setSent_body_bytes(arr[9]);
webLogBean.setRequest_referer(arr[10]);

//如果useragent元素较多,拼接useragent
if (arr.length > 12) {
StringBuilder sb = new StringBuilder();
for(int i=11;i<arr.length;i++){
sb.append(arr[i]);
}
webLogBean.setUser_agent(sb.toString());
} else {
webLogBean.setUser_agent(arr[11]);
}

if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
webLogBean.setInvalid(false);
}

if("-invalid_time-".equals(webLogBean.getTime())){
webLogBean.setInvalid(false);
}
}else{
webLogBean.setInvalid(false);
}

return webLogBean;
}

public static void filter(WebLog bean,Set<String> filter){
if(!filter.contains(bean.getRequest_url())){
bean.setInvalid(false);
}
}

public static String formatDate(String time_local) {
try {
return df2.format(df1.parse(time_local));
} catch (ParseException e) {
return null;
}
}

}

使用下面的shell脚本完成自动化部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/bin/bash

# 作用:批处理weblog日志脚本
# 编写:赵一好

# java 环境
export JAVE_HOME=/data/jdk1.8.0_161
export JRE_HOME=${JAVE_HOME}/jre
export CLASS_PATH=${JAVE_HOME}/lib/dt.jar:${JAVE_HOME}/lib/tools.jar
export PATH=$PATH:${JAVE_HOME}/bin:${JRE_HOME}/bin

# hadoop 环境
export HADDOP_HOME=/data/hadoop-2.7.3
export PATH=$PATH:${HADDOP_HOME}/bin:${HADDOP_HOME}/sbin

# 时间
yesterday=`date -d'-1 day' +%Y-%m-%d`
s_year=`date -d'-1 day' +%Y`
s_month=`date -d'-1 day' +%m`
s_day=`date -d'-1 day' +%d`

# inpath
in_path=/syslog/preprocess/inpath

# outpath
out_path=/syslog/preprocess/outpath

# jar name
jar_name=weblog_pre_process.jar


# 判断制定目录是否存在数据
flies=`hdfs dfs -ls $in_path | grep $yesterday | wc -l`
if [ $flies -gt 0 ];then
echo "开始运行preprocess批处理命令,上传目录为:$out_path/$yesterday"
hadoop jar $jar_name $in_path/$yesterday $out_path/$yesterday
fi

# 如果报错发送邮件
if [ $? -gt 0 ];then
echo "运行失败,发送邮件..."
fi

第二步日志增强

日志增强由ClickStreamPageView.java完成,下面是核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package com.zonegood.hive.mr;

import com.zonegood.hive.mrbean.WebLog;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
*
* 统计点击流PV
*
* 给每条数据添加sessionId,默认session过期时间为30分,当前页面停留了多长时间
* 每个session按访问时间排序,并标上序号
* 保留WebLog中部分字段
*
* @author zyh
* @create 18-11-3 下午3:41
*/
public class ClickStreamPageView {

static class ClickStreamPageViewMapper extends Mapper<LongWritable, Text, Text, WebLog> {

WebLog v = new WebLog();
Text k = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\001");
if(fields.length >= 10 && "true".equals(fields[0])){
// 处理
v.setIp(fields[1]);
v.setU_info(fields[2]);
v.setTime(fields[3]);
v.setRequest_url(fields[5]);
v.setStatus(fields[6]);
v.setSent_body_bytes(fields[7]);
v.setRequest_referer(fields[8]);
v.setUser_agent(fields[9]);
k.set(v.getIp());
context.write(k,v);
}

}

static class ClickStreamPageViewReducer extends Reducer<Text, WebLog, NullWritable, Text> {

public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);

NullWritable k = NullWritable.get();
Text v = new Text();

/**
*
* @param key ip
* @param values <PageView,PageView,PageView,PageView>
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<WebLog> values, Context context) throws IOException, InterruptedException {
ArrayList<WebLog> beans = new ArrayList<WebLog>();
try{
for (WebLog webLog:values) {
WebLog pv = new WebLog();
try {
BeanUtils.copyProperties(pv,webLog);
} catch (Exception e) {
e.printStackTrace();
}

beans.add(pv);
}
Collections.sort(beans, new Comparator<WebLog>() {
@Override
public int compare(WebLog o1, WebLog o2) {
try {
Date d1 = formatDate(o1.getTime());
Date d2 = formatDate(o2.getTime());
if(d1 == null || d2 == null) return 0;
return d1.compareTo(d2);
} catch (Exception e) {
return 0;
}
}
});

/**
* 1. 里面只有一个
* 1.1 设置sessionId、setp、直接输出,重置sessionId
*
* 2. 里面有多个
* 2.1 是第一个,直接跳过不输出
* 2.2 是最后一个,设置sessionId,设置step,输出
* 2.3 比较第n个与n-1个的时间差
* 2.2.1差 大于30s
* 2.2.1.1 设置sessionId,设置step,输出第n-1个,重置sessionId,重置step
* 2.2.2 差小于30s
* 2.2.2.1 设置sessionId,设置step,输出第n-1个,step ++
*
*/

int step = 1;
String sessionId = UUID.randomUUID().toString();
for (int i=0 ; i<beans.size() ; i++) {
if(beans.size() == 1){
v.set(sessionId+"\001"+
beans.get(i).getIp()+"\001"+
step + "\001" +
(60) + "\001" +
beans.get(i).getU_info() + "\001" +
beans.get(i).getTime() + "\001" +
beans.get(i).getRequest_url() + "\001" +
beans.get(i).getRequest_referer() + "\001"+
beans.get(i).getUser_agent() + "\001" +
beans.get(i).getSent_body_bytes() + "\001" +
beans.get(i).getStatus());
// 输出当前pv
context.write(k,v);
break;
}

if(i == 0){
continue;
}

long diffTime = 0;
try {
diffTime = diffTime(beans.get(i).getTime(),beans.get(i-1).getTime());
} catch (Exception e) {

}
if(diffTime > 30 * 60 * 1000){
v.set(sessionId+"\001"+
beans.get(i -1).getIp()+"\001"+
step + "\001" +
(diffTime/1000) + "\001" +
beans.get(i-1).getU_info() + "\001" +
beans.get(i-1).getTime() + "\001" +
beans.get(i-1).getRequest_url() + "\001" +
beans.get(i-1).getRequest_referer() + "\001"+
beans.get(i-1).getUser_agent() + "\001" +
beans.get(i-1).getSent_body_bytes() + "\001" +
beans.get(i-1).getStatus());
// 输出当前pv
context.write(k,v);
sessionId = UUID.randomUUID().toString();
step = 1;
}else{
v.set(sessionId+"\001"+
beans.get(i -1).getIp()+"\001"+
step + "\001" +
(diffTime/1000) + "\001" +
beans.get(i-1).getU_info() + "\001" +
beans.get(i-1).getTime() + "\001" +
beans.get(i-1).getRequest_url() + "\001" +
beans.get(i-1).getRequest_referer() + "\001"+
beans.get(i-1).getUser_agent() + "\001" +
beans.get(i-1).getSent_body_bytes() + "\001" +
beans.get(i-1).getStatus());
// 输出当前pv
context.write(k,v);
step ++;
}

if(i == beans.size()-1){
v.set(sessionId+"\001"+
beans.get(i).getIp()+"\001"+
step + "\001" +
(60) + "\001" +
beans.get(i).getU_info() + "\001" +
beans.get(i).getTime() + "\001" +
beans.get(i).getRequest_url() + "\001" +
beans.get(i).getRequest_referer() + "\001"+
beans.get(i).getUser_agent() + "\001" +
beans.get(i).getSent_body_bytes() + "\001" +
beans.get(i).getStatus());
// 输出当前pv
context.write(k,v);
}
}

}catch (Exception e){
e.printStackTrace();
}

}

private Date formatDate(String timeStr) throws Exception{
return df2.parse(timeStr);
}

private long diffTime(String t1,String t2) throws Exception{
return df2.parse(t1).getTime() - df2.parse(t2).getTime();
}
}
}
public static void main(String[] args) throws Exception {


Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(ClickStreamPageView.class);

job.setMapperClass(ClickStreamPageViewMapper.class);
job.setReducerClass(ClickStreamPageViewMapper.ClickStreamPageViewReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WebLog.class);

job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);

}
}

自动化运行脚本click_stream_page_view.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/bin/bash

# 作用:统计PageView的批处理脚本
# 作者:赵一好

# java 环境
export JAVE_HOME=/data/jdk1.8.0_161
export JRE_HOME=${JAVE_HOME}/jre
export CLASS_PATH=${JAVE_HOME}/lib/dt.jar:${JAVE_HOME}/lib/tools.jar
export PATH=$PATH:${JAVE_HOME}/bin:${JRE_HOME}/bin

# hadoop 环境
export HADDOP_HOME=/data/hadoop-2.7.3
export PATH=$PATH:${HADDOP_HOME}/bin:${HADDOP_HOME}/sbin

# 日期
yesterday=`date -d'-1 day' +%Y-%m-%d`
s_year=`date -d'-1 day' +%Y`
s_month=`date -d'-1 day' +%m`
s_day=`date -d'-1 day' +%d`

# inpath
in_path=/syslog/preprocess/outpath

# outpath
out_path=/syslog/pageview/outpath

# jar name
jar_name=click_stream_page_view.jar

# 检测是否存在待处理数据
files=`hdfs dfs -ls $in_path | grep $yesterday | wc -l`
if [ $files -gt 0 ];then
echo "开始运行PageView批处理命令,上传目录为:$out_path/$yesterday"
hadoop jar $jar_name $in_path/$yesterday $out_path/$yesterday
fi

# 如果报错发送邮件
if [ $? -gt 0 ];then
echo "运行失败,发送邮件..."
fi

第三步生成部分指标

主要由ClickStreamVisit.java完成,下面是核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115

package com.zonegood.hive.mr;

import com.zonegood.hive.mrbean.PageView;
import com.zonegood.hive.mrbean.VisitBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

/**
* 点击流
* 统计起始时间、结束时间、进入页面、离开页面、一共访问了多少页面、sessionId、IP、从哪来的
* @author zyh
* @create 18-11-3 下午5:24
*/
public class ClickStreamVisit {

static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageView> {

Text k = new Text();
PageView v = new PageView();


@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\001");
v.setSessionId(fields[0]);
v.setIp(fields[1]);
v.setStep(fields[2]);
v.setStayTime(fields[3]);
v.setU_info(fields[4]);
v.setTime(fields[5]);
v.setRequest_url(fields[6]);
v.setRequest_referer(fields[7]);
v.setUser_agent(fields[8]);
v.setSent_body_bytes(fields[9]);
v.setStatus(fields[10]);
k.set(v.getSessionId());
context.write(k,v);

}
}

static class ClickStreamVisitReducer extends Reducer<Text, PageView, NullWritable, VisitBean> {

NullWritable k = NullWritable.get();
Text v = new Text();

@Override
protected void reduce(Text key, Iterable<PageView> values, Context context) throws IOException, InterruptedException {

ArrayList<PageView> beans = new ArrayList<PageView>();

// 按照步骤排序
for (PageView pv : values) {
PageView bean = new PageView();
try {
BeanUtils.copyProperties(bean, pv);
} catch (Exception e) {
e.printStackTrace();
}
beans.add(bean);
}

Collections.sort(beans, new Comparator<PageView>() {
@Override
public int compare(PageView o1, PageView o2) {
return Integer.parseInt(o1.getStep()) > Integer.parseInt(o2.getStep()) ? 1 : -1;
}
});

VisitBean visitBean = new VisitBean();
visitBean.setInPage(beans.get(0).getRequest_url());
visitBean.setOutPage(beans.get(beans.size()-1).getRequest_url());
visitBean.setInTime(beans.get(0).getTime());
visitBean.setOutTime(beans.get(beans.size()-1).getTime());
visitBean.setPageVisits(beans.size());
visitBean.setRemote_addr(beans.get(0).getIp());
visitBean.setSession(beans.get(0).getSessionId());
visitBean.setReferal(beans.get(0).getRequest_referer());
context.write(k,visitBean);

}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ClickStreamVisit.class);
job.setMapperClass(ClickStreamVisitMapper.class);
job.setReducerClass(ClickStreamVisitReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PageView.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(VisitBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);

}
}

下面是配合MR自动脚本click_stream_visit.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/bin/bash

# 作用:统计Visit的批处理脚本
# 作者:赵一好

# java 环境
export JAVE_HOME=/data/jdk1.8.0_161
export JRE_HOME=${JAVE_HOME}/jre
export CLASS_PATH=${JAVE_HOME}/lib/dt.jar:${JAVE_HOME}/lib/tools.jar
export PATH=$PATH:${JAVE_HOME}/bin:${JRE_HOME}/bin

# hadoop 环境
export HADDOP_HOME=/data/hadoop-2.7.3
export PATH=$PATH:${HADDOP_HOME}/bin:${HADDOP_HOME}/sbin

# 日期
yesterday=`date -d'-1 day' +%Y-%m-%d`
s_year=`date -d'-1 day' +%Y`
s_month=`date -d'-1 day' +%m`
s_day=`date -d'-1 day' +%d`

# inpath
in_path=/syslog/pageview/outpath

# outpath
out_path=/syslog/visit/outpath

# jar name
jar_name=click_stream_visit.jar

# 检测是否存在待处理数据
files=`hdfs dfs -ls $in_path | grep $yesterday | wc -l`
if [ $files -gt 0 ];then
echo "开始运行Visit批处理命令,上传目录为:$out_path/$yesterday"
hadoop jar $jar_name $in_path/$yesterday $out_path/$yesterday
fi

# 如果报错发送邮件
if [ $? -gt 0 ];then
echo "运行失败,发送邮件..."
fi

自动化

使用部署好的Azkaban调度系统调度三个shell脚本,完成自动ETL步骤。

1.编写auto_run.sh脚本

1
2
3
4
5
6
7
#!/bin/bash

# 作用:自动运行preprocess pageview visit 批处理脚本

sh weblog_pre_process.sh
sh click_stream_page_view.sh
sh click_stream_visit.sh

2.将三个MR程序分别打包成可执行ja,可以使用Maven package指令,这边不过多详述

click_stream_page_view.jar
click_stream_visit.jar
weblog_pre_process.jar

3.编写azkaban的job并打包成zip上传到Azkaban平台

1
2
3
# foo.job
type=command
command=sh auto_run.sh

foo.zip中一定要包含上述的所有文件,如下图所示

snapshot.png

运行foo任务,如果是绿条证明程序没问题

snapshot.png

具体操作有点繁琐,之前blog有详述,这边就略过了