案例:
使用电商网站的用户行为日志进行统计分析
一:准备
1.指标
PV:网页流浪量
UV:独立访客数
VV:访客的访问数,session次数
IP:独立的IP数
2.上传测试数据
3.查看第一条记录
注意点(字符显示):
二:程序
1.分析
省份ID-》key
value-》1
-》 <proviced,list(1,1,1)>
2.数据类型
key:Text
value:IntWritable
3.map 端的业务
4.reduce端的业务
5.整合运行
6.结果
三:计数器
1.程序
2.结果
结果完全吻合。
四:完整程序
1.PV程序
1 package com.senior.network; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Mapper.Context; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.util.Tool; 19 import org.apache.hadoop.util.ToolRunner; 20 21 public class WebPvCount extends Configured implements Tool{ 22 //Mapper 23 public static class WebPvCountMapper extends Mapper{ 24 private IntWritable mapoutputkey=new IntWritable(); 25 private static final IntWritable mapoutputvalue=new IntWritable(1); 26 @Override 27 protected void cleanup(Context context) throws IOException,InterruptedException { 28 29 } 30 @Override 31 protected void setup(Context context) throws IOException,InterruptedException { 32 33 } 34 35 @Override 36 protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { 37 String lineValue=value.toString(); 38 String[] strs=lineValue.split("\t"); 39 if(30>strs.length){ 40 return; 41 } 42 String priviceIdValue=strs[23]; 43 String urlValue=strs[1]; 44 if(StringUtils.isBlank(priviceIdValue)){ 45 return; 46 } 47 if(StringUtils.isBlank(urlValue)){ 48 return; 49 } 50 Integer priviceId=Integer.MAX_VALUE; 51 try{ 52 priviceId=Integer.valueOf(priviceIdValue); 53 }catch(Exception e){ 54 e.printStackTrace(); 55 } 56 mapoutputkey.set(priviceId); 57 context.write(mapoutputkey, mapoutputvalue); 58 } 59 60 } 61 62 63 64 //Reducer 65 public static class WebPvCountReducer extends Reducer { 66 private IntWritable outputvalue=new IntWritable(); 67 @Override 68 protected void reduce(IntWritable key, Iterable values,Context context)throws IOException, InterruptedException { 69 int sum=0; 70 for(IntWritable value : values){ 71 sum+=value.get(); 72 } 73 outputvalue.set(sum); 74 context.write(key, outputvalue); 75 } 76 77 } 78 79 //Driver 80 public int run(String[] args)throws Exception{ 81 Configuration conf=this.getConf(); 82 Job job=Job.getInstance(conf,this.getClass().getSimpleName()); 83 job.setJarByClass(WebPvCount.class); 84 //input 85 Path inpath=new Path(args[0]); 86 FileInputFormat.addInputPath(job, inpath); 87 88 //output 89 Path outpath=new Path(args[1]); 90 FileOutputFormat.setOutputPath(job, outpath); 91 92 //map 93 job.setMapperClass(WebPvCountMapper.class); 94 job.setMapOutputKeyClass(IntWritable.class); 95 job.setMapOutputValueClass(IntWritable.class); 96 97 //shuffle 98 99 //reduce100 job.setReducerClass(WebPvCountReducer.class);101 job.setOutputKeyClass(IntWritable.class);102 job.setOutputValueClass(IntWritable.class);103 104 //submit105 boolean isSucess=job.waitForCompletion(true);106 return isSucess?0:1;107 }108 109 //main110 public static void main(String[] args)throws Exception{111 Configuration conf=new Configuration();112 //compress113 conf.set("mapreduce.map.output.compress", "true");114 conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");115 args=new String[]{116 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/inputWebData",117 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/outputWebData1"118 };119 int status=ToolRunner.run(new WebPvCount(), args);120 System.exit(status);121 }122 123 }
2.计数器
这个计数器集中在mapper端。
1 package com.senior.network; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.IntWritable; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Mapper.Context; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.util.Tool; 19 import org.apache.hadoop.util.ToolRunner; 20 21 public class WebPvCount extends Configured implements Tool{ 22 //Mapper 23 public static class WebPvCountMapper extends Mapper{ 24 private IntWritable mapoutputkey=new IntWritable(); 25 private static final IntWritable mapoutputvalue=new IntWritable(1); 26 @Override 27 protected void cleanup(Context context) throws IOException,InterruptedException { 28 29 } 30 @Override 31 protected void setup(Context context) throws IOException,InterruptedException { 32 33 } 34 35 @Override 36 protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { 37 String lineValue=value.toString(); 38 String[] strs=lineValue.split("\t"); 39 if(30>strs.length){ 40 context.getCounter("webPvMapper_counter", "length_LT_30").increment(1L); 41 return; 42 } 43 String priviceIdValue=strs[23]; 44 String urlValue=strs[1]; 45 if(StringUtils.isBlank(priviceIdValue)){ 46 context.getCounter("webPvMapper_counter", "priviceIdValue_null").increment(1L); 47 return; 48 49 } 50 if(StringUtils.isBlank(urlValue)){ 51 context.getCounter("webPvMapper_counter", "url_null").increment(1L); 52 return; 53 } 54 Integer priviceId=Integer.MAX_VALUE; 55 try{ 56 priviceId=Integer.valueOf(priviceIdValue); 57 }catch(Exception e){ 58 context.getCounter("webPvMapper_counter", "switch_fail").increment(1L); 59 e.printStackTrace(); 60 } 61 mapoutputkey.set(priviceId); 62 context.write(mapoutputkey, mapoutputvalue); 63 } 64 65 } 66 67 68 69 //Reducer 70 public static class WebPvCountReducer extends Reducer { 71 private IntWritable outputvalue=new IntWritable(); 72 @Override 73 protected void reduce(IntWritable key, Iterable values,Context context)throws IOException, InterruptedException { 74 int sum=0; 75 for(IntWritable value : values){ 76 sum+=value.get(); 77 } 78 outputvalue.set(sum); 79 context.write(key, outputvalue); 80 } 81 82 } 83 84 //Driver 85 public int run(String[] args)throws Exception{ 86 Configuration conf=this.getConf(); 87 Job job=Job.getInstance(conf,this.getClass().getSimpleName()); 88 job.setJarByClass(WebPvCount.class); 89 //input 90 Path inpath=new Path(args[0]); 91 FileInputFormat.addInputPath(job, inpath); 92 93 //output 94 Path outpath=new Path(args[1]); 95 FileOutputFormat.setOutputPath(job, outpath); 96 97 //map 98 job.setMapperClass(WebPvCountMapper.class); 99 job.setMapOutputKeyClass(IntWritable.class);100 job.setMapOutputValueClass(IntWritable.class);101 102 //shuffle103 104 //reduce105 job.setReducerClass(WebPvCountReducer.class);106 job.setOutputKeyClass(IntWritable.class);107 job.setOutputValueClass(IntWritable.class);108 109 //submit110 boolean isSucess=job.waitForCompletion(true);111 return isSucess?0:1;112 }113 114 //main115 public static void main(String[] args)throws Exception{116 Configuration conf=new Configuration();117 //compress118 conf.set("mapreduce.map.output.compress", "true");119 conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");120 args=new String[]{121 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/inputWebData",122 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/outputWebData2"123 };124 int status=ToolRunner.run(new WebPvCount(), args);125 System.exit(status);126 }127 128 }