博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用eclipse来调试hadoop作业是非常简洁方便的,
阅读量:4185 次
发布时间:2019-05-26

本文共 31666 字,大约阅读时间需要 105 分钟。

使用eclipse来调试hadoop作业是非常简洁方便的,散仙以前也有用eclipse开发过hadoop程序,但是一直没有深入了解eclipse调试的一些模式,有些时候也会出一些莫名奇妙的异常,最常见的就是下面这个

Java代码  
  1. java.lang.RuntimeException: java.lang.ClassNotFoundException: com.qin.sort.TestSort$SMapper  
  2.     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:857)  
  3.     at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)  
  4.     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:718)  
  5.     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)  
  6.     at org.apache.hadoop.mapred.Child$4.run(Child.java:255)  
  7.     at java.security.AccessController.doPrivileged(Native Method)  
  8.     at javax.security.auth.Subject.doAs(Subject.java:415)  
  9.     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)  
java.lang.RuntimeException: java.lang.ClassNotFoundException: com.qin.sort.TestSort$SMapper	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:857)	at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:718)	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)	at java.security.AccessController.doPrivileged(Native Method)	at javax.security.auth.Subject.doAs(Subject.java:415)	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)

这个异常是最莫名其妙的一个,明明自己的MR类里面有这个Mapper的内部类,但是一运行程序,就报这个异常,说找不到这个类,然后就百般查找问题,找来找去,也没找出个所以然。
其实这并不是程序的问题,而是对eclipse的调试模式不够了解的问题,eclipse上运行hadoop总的来说有2种模式,第一种就是Local模式,也叫本地模式,第二种就是我们正式的线上集群模式,当运行本地模式的时候,程序并不会被提交到Hadoop集群上,而是基于单机的模式跑的,但是单机的模式,运行的结果仍在是存储在HDFS上的,只不过没有利用hadoop集群的资源,单机的模式不要提交jar包到hadoop集群上,因此一般我们使用local来测试我们的MR程序是否能够正常运行,
下面我们来看下,基于Local模式跑的一个排序作业:

Java代码  
  1. 排序数据:  
  2. 784  
  3. 12  
  4. c -11  
  5. dd 99999  
排序数据:a 784b 12c -11dd 99999

程序源码:

Java代码  
  1. package com.qin.sort;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.fs.FileSystem;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.io.WritableComparator;  
  12. import org.apache.hadoop.mapred.JobConf;  
  13. import org.apache.hadoop.mapreduce.Job;  
  14. import org.apache.hadoop.mapreduce.Mapper;  
  15. import org.apache.hadoop.mapreduce.Reducer;  
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  18.   
  19.   
  20.    
  21. /** 
  22.  * 测试排序的 
  23.  * MR作业类 
  24.  *  
  25.  * QQ技术交流群:324714439 
  26.  * @author qindongliang 
  27.  *  
  28.  *  
  29.  * **/  
  30. public class TestSort {  
  31.       
  32.       
  33.     /** 
  34.      * Map类 
  35.      *  
  36.      * **/  
  37.     private static class SMapper extends Mapper<LongWritable, Text, IntWritable, Text>{  
  38.           
  39.         private Text text=new Text();//输出  
  40.          private static final IntWritable one=new IntWritable();  
  41.           
  42.         @Override  
  43.         protected void map(LongWritable key, Text value,Context context)  
  44.                 throws IOException, InterruptedException {  
  45.             String s=value.toString();  
  46.             //System.out.println("abc: "+s);  
  47.         //  if((s.trim().indexOf(" ")!=-1)){
      
  48.              String ss[]=s.split(" ");  
  49.              one.set(Integer.parseInt(ss[1].trim()));//  
  50.              text.set(ss[0].trim());    
  51.              context.write(one, text);  
  52.         }  
  53.     }  
  54.       
  55.     /** 
  56.      * Reduce类 
  57.      * 
  58.      * */  
  59.      private static class SReduce extends Reducer<IntWritable, Text, Text, IntWritable>{  
  60.          private Text text=new Text();  
  61.          @Override  
  62.         protected void reduce(IntWritable arg0, Iterable<Text> arg1,Context context)  
  63.                 throws IOException, InterruptedException {  
  64.                
  65.                
  66.              for(Text t:arg1){  
  67.                  text.set(t.toString());  
  68.                   
  69.                  context.write(text, arg0);  
  70.              }   
  71.         }  
  72.      }  
  73.        
  74.      /** 
  75.       * 排序的类 
  76.       *  
  77.       * **/  
  78.      private static class SSort extends WritableComparator{  
  79.            
  80.          public SSort() {  
  81.              super(IntWritable.class,true);//注册排序组件  
  82.         }  
  83.          @Override  
  84.         public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,  
  85.                 int arg4, int arg5) {  
  86.             /** 
  87.              * 控制升降序的关键控制-号是降序 
  88.              * */  
  89.             return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序  
  90.         }  
  91.            
  92.          @Override  
  93.         public int compare(Object a, Object b) {  
  94.        
  95.             return    -super.compare(a, b);//注意使用负号来完成降序  
  96.         }  
  97.            
  98.            
  99.      }  
  100.       
  101.      /** 
  102.       * main方法 
  103.       * */  
  104.      public static void main(String[] args) throws Exception{  
  105.          String inputPath="hdfs://192.168.75.130:9000/root/output";       
  106.           String outputPath="hdfs://192.168.75.130:9000/root/outputsort";  
  107.           JobConf conf=new JobConf();  
  108.         //Configuration conf=new Configuration();  
  109.            //在你的文件地址前自动添加:hdfs://master:9000/  
  110.          // conf.set("fs.default.name", "hdfs://192.168.75.130:9000");  
  111.           //指定jobtracker的ip和端口号,master在/etc/hosts中可以配置  
  112.         //  conf.set("mapred.job.tracker","192.168.75.130:9001");  
  113.          // conf.get("mapred.job.tracker");  
  114.          System.out.println("模式:  "+conf.get("mapred.job.tracker"));  
  115.         //  conf.setJar("tt.jar");  
  116.           FileSystem  fs=FileSystem.get(conf);  
  117.           Path pout=new Path(outputPath);  
  118.           if(fs.exists(pout)){  
  119.               fs.delete(pout, true);  
  120.               System.out.println("存在此路径, 已经删除......");  
  121.           }             
  122.           Job job=new Job(conf, "sort123");   
  123.           job.setJarByClass(TestSort.class);  
  124.           job.setOutputKeyClass(IntWritable.class);//告诉map,reduce输出K,V的类型  
  125.           FileInputFormat.setInputPaths(job, new Path(inputPath));  //输入路径  
  126.           FileOutputFormat.setOutputPath(job, new Path(outputPath));//输出路径    
  127.           job.setMapperClass(SMapper.class);//map类  
  128.           job.setReducerClass(SReduce.class);//reduce类  
  129.           job.setSortComparatorClass(SSort.class);//排序类  
  130. //          job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);  
  131. //        job.setOutputFormatClass(TextOutputFormat.class);  
  132.           System.exit(job.waitForCompletion(true) ? 0 : 1);    
  133.            
  134.            
  135.     }  
  136.       
  137.   
  138. }  
package com.qin.sort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 测试排序的 * MR作业类 *  * QQ技术交流群:324714439 * @author qindongliang *  *  * **/public class TestSort {			/**	 * Map类	 * 	 * **/	private static class SMapper extends Mapper
{ private Text text=new Text();//输出 private static final IntWritable one=new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String s=value.toString(); //System.out.println("abc: "+s); // if((s.trim().indexOf(" ")!=-1)){ String ss[]=s.split(" "); one.set(Integer.parseInt(ss[1].trim()));// text.set(ss[0].trim()); context.write(one, text); } } /** * Reduce类 * * */ private static class SReduce extends Reducer
{ private Text text=new Text(); @Override protected void reduce(IntWritable arg0, Iterable
arg1,Context context) throws IOException, InterruptedException { for(Text t:arg1){ text.set(t.toString()); context.write(text, arg0); } } } /** * 排序的类 * * **/ private static class SSort extends WritableComparator{ public SSort() { super(IntWritable.class,true);//注册排序组件 } @Override public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { /** * 控制升降序的关键控制-号是降序 * */ return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序 } @Override public int compare(Object a, Object b) { return -super.compare(a, b);//注意使用负号来完成降序 } } /** * main方法 * */ public static void main(String[] args) throws Exception{ String inputPath="hdfs://192.168.75.130:9000/root/output"; String outputPath="hdfs://192.168.75.130:9000/root/outputsort"; JobConf conf=new JobConf(); //Configuration conf=new Configuration(); //在你的文件地址前自动添加:hdfs://master:9000/ // conf.set("fs.default.name", "hdfs://192.168.75.130:9000"); //指定jobtracker的ip和端口号,master在/etc/hosts中可以配置 // conf.set("mapred.job.tracker","192.168.75.130:9001"); // conf.get("mapred.job.tracker"); System.out.println("模式: "+conf.get("mapred.job.tracker")); // conf.setJar("tt.jar"); FileSystem fs=FileSystem.get(conf); Path pout=new Path(outputPath); if(fs.exists(pout)){ fs.delete(pout, true); System.out.println("存在此路径, 已经删除......"); } Job job=new Job(conf, "sort123"); job.setJarByClass(TestSort.class); job.setOutputKeyClass(IntWritable.class);//告诉map,reduce输出K,V的类型 FileInputFormat.setInputPaths(job, new Path(inputPath)); //输入路径 FileOutputFormat.setOutputPath(job, new Path(outputPath));//输出路径 job.setMapperClass(SMapper.class);//map类 job.setReducerClass(SReduce.class);//reduce类 job.setSortComparatorClass(SSort.class);//排序类// job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);// job.setOutputFormatClass(TextOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

打印结果如下:

Java代码  
  1. 模式:  local  
  2. 存在此路径, 已经删除......  
  3. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
  4. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
  5. WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).  
  6. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1  
  7. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
  8. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1242054158_0001  
  9. INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks  
  10. INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1242054158_0001_m_000000_0  
  11. INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null  
  12. INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/output/sort.txt:0+28  
  13. INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100  
  14. INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720  
  15. INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680  
  16. INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output  
  17. INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0  
  18. INFO - Task.done(858) | Task:attempt_local1242054158_0001_m_000000_0 is done. And is in the process of commiting  
  19. INFO - LocalJobRunner$Job.statusUpdate(466) |   
  20. INFO - Task.sendDone(970) | Task 'attempt_local1242054158_0001_m_000000_0' done.  
  21. INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1242054158_0001_m_000000_0  
  22. INFO - LocalJobRunner$Job.run(348) | Map task executor complete.  
  23. INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null  
  24. INFO - LocalJobRunner$Job.statusUpdate(466) |   
  25. INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments  
  26. INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 35 bytes  
  27. INFO - LocalJobRunner$Job.statusUpdate(466) |   
  28. INFO - Task.done(858) | Task:attempt_local1242054158_0001_r_000000_0 is done. And is in the process of commiting  
  29. INFO - LocalJobRunner$Job.statusUpdate(466) |   
  30. INFO - Task.commit(1011) | Task attempt_local1242054158_0001_r_000000_0 is allowed to commit now  
  31. INFO - FileOutputCommitter.commitTask(173) | Saved output of task 'attempt_local1242054158_0001_r_000000_0' to hdfs://192.168.75.130:9000/root/outputsort  
  32. INFO - LocalJobRunner$Job.statusUpdate(466) | reduce > reduce  
  33. INFO - Task.sendDone(970) | Task 'attempt_local1242054158_0001_r_000000_0' done.  
  34. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
  35. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1242054158_0001  
  36. INFO - Counters.log(585) | Counters: 19  
  37. INFO - Counters.log(587) |   File Output Format Counters   
  38. INFO - Counters.log(589) |     Bytes Written=26  
  39. INFO - Counters.log(587) |   File Input Format Counters   
  40. INFO - Counters.log(589) |     Bytes Read=28  
  41. INFO - Counters.log(587) |   FileSystemCounters  
  42. INFO - Counters.log(589) |     FILE_BYTES_READ=393  
  43. INFO - Counters.log(589) |     HDFS_BYTES_READ=56  
  44. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=135742  
  45. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=26  
  46. INFO - Counters.log(587) |   Map-Reduce Framework  
  47. INFO - Counters.log(589) |     Map output materialized bytes=39  
  48. INFO - Counters.log(589) |     Map input records=4  
  49. INFO - Counters.log(589) |     Reduce shuffle bytes=0  
  50. INFO - Counters.log(589) |     Spilled Records=8  
  51. INFO - Counters.log(589) |     Map output bytes=25  
  52. INFO - Counters.log(589) |     Total committed heap usage (bytes)=455475200  
  53. INFO - Counters.log(589) |     Combine input records=0  
  54. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112  
  55. INFO - Counters.log(589) |     Reduce input records=4  
  56. INFO - Counters.log(589) |     Reduce input groups=4  
  57. INFO - Counters.log(589) |     Combine output records=0  
  58. INFO - Counters.log(589) |     Reduce output records=4  
  59. INFO - Counters.log(589) |     Map output records=4  
模式:  local存在此路径, 已经删除......WARN - NativeCodeLoader.
(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicableWARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1WARN - LoadSnappy.
(46) | Snappy native library not loadedINFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1242054158_0001INFO - LocalJobRunner$Job.run(340) | Waiting for map tasksINFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1242054158_0001_m_000000_0INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : nullINFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/output/sort.txt:0+28INFO - MapTask$MapOutputBuffer.
(949) | io.sort.mb = 100INFO - MapTask$MapOutputBuffer.
(961) | data buffer = 79691776/99614720INFO - MapTask$MapOutputBuffer.
(962) | record buffer = 262144/327680INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map outputINFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0INFO - Task.done(858) | Task:attempt_local1242054158_0001_m_000000_0 is done. And is in the process of commitingINFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.sendDone(970) | Task 'attempt_local1242054158_0001_m_000000_0' done.INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1242054158_0001_m_000000_0INFO - LocalJobRunner$Job.run(348) | Map task executor complete.INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : nullINFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segmentsINFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 35 bytesINFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.done(858) | Task:attempt_local1242054158_0001_r_000000_0 is done. And is in the process of commitingINFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.commit(1011) | Task attempt_local1242054158_0001_r_000000_0 is allowed to commit nowINFO - FileOutputCommitter.commitTask(173) | Saved output of task 'attempt_local1242054158_0001_r_000000_0' to hdfs://192.168.75.130:9000/root/outputsortINFO - LocalJobRunner$Job.statusUpdate(466) | reduce > reduceINFO - Task.sendDone(970) | Task 'attempt_local1242054158_0001_r_000000_0' done.INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100%INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1242054158_0001INFO - Counters.log(585) | Counters: 19INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=26INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=28INFO - Counters.log(587) | FileSystemCountersINFO - Counters.log(589) | FILE_BYTES_READ=393INFO - Counters.log(589) | HDFS_BYTES_READ=56INFO - Counters.log(589) | FILE_BYTES_WRITTEN=135742INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=26INFO - Counters.log(587) | Map-Reduce FrameworkINFO - Counters.log(589) | Map output materialized bytes=39INFO - Counters.log(589) | Map input records=4INFO - Counters.log(589) | Reduce shuffle bytes=0INFO - Counters.log(589) | Spilled Records=8INFO - Counters.log(589) | Map output bytes=25INFO - Counters.log(589) | Total committed heap usage (bytes)=455475200INFO - Counters.log(589) | Combine input records=0INFO - Counters.log(589) | SPLIT_RAW_BYTES=112INFO - Counters.log(589) | Reduce input records=4INFO - Counters.log(589) | Reduce input groups=4INFO - Counters.log(589) | Combine output records=0INFO - Counters.log(589) | Reduce output records=4INFO - Counters.log(589) | Map output records=4

排序结果如下:

Java代码  
  1. dd  99999  
  2. a   784  
  3. b   12  
  4. c   -11  
dd	99999a	784b	12c	-11

单机模式调试通过之后,我们就可以考虑采用hadoop集群的模式来跑,这时候有2种方式,可以来完成这件事,第一是,为了方便将整个项目打成一个jar包,上传到Linux上,然后执行shell命令:
bin/hadoop jar tt.jar com.qin.sort.TestSort
来进行测试,注意,散仙是为了方便,路径是写死在程序里面所以后面不用输入,输入和输出路径,正式的开发,为了灵活性,一般会通过外部传产来指定输入和输出路径。
第二种方式,也比较方便,直接在eclipse中提交到hadoop集群作业中,不过即使是使用eclipse来提交作业,还是需要将整个项目打成一个jar包,只不过这时是eclipse帮我们提交作业的,这样我们就可以Win平台上直接提交运行hadoop作业了,但是主流的还是使用上传jar包的方式。关于把整个项目打成一个jar包,散仙在后面会上传一个ant脚本,直接执行它就可以了,这样就可以把有依赖关系的类打在一起,把一整个项目做为一个整体,在hadoop上,只需要指定jar,指定类的全名称,和输入,输出路径即可。ant的脚本内容如下:

Xml代码  
  1. <project name="${component.name}" basedir="." default="jar">  
  2.     <property environment="env"/>  
  3.     <!-- 
  4.     <property name="hadoop.home" value="${env.HADOOP_HOME}"/> 
  5.     -->  
  6.     <property name="hadoop.home" value="D:/hadoop-1.2.0"/>  
  7.     <!-- 指定jar包的名字 -->  
  8.     <property name="jar.name" value="tt.jar"/>  
  9.     <path id="project.classpath">  
  10.         <fileset dir="lib">  
  11.             <include name="*.jar" />  
  12.         </fileset>  
  13.         <fileset dir="${hadoop.home}">  
  14.             <include name="**/*.jar" />  
  15.         </fileset>  
  16.     </path>  
  17.     <target name="clean" >  
  18.         <delete dir="bin" failonerror="false" />  
  19.         <mkdir dir="bin"/>  
  20.     </target>   
  21.     <target name="build" depends="clean">  
  22.         <echo message="${ant.project.name}: ${ant.file}"/>  
  23.         <javac destdir="bin" encoding="utf-8" debug="true" includeantruntime="false" debuglevel="lines,vars,source">  
  24.             <src path="src"/>  
  25.             <exclude name="**/.svn" />  
  26.             <classpath refid="project.classpath"/>  
  27.         </javac>  
  28.         <copy todir="bin">  
  29.             <fileset dir="src">  
  30.                 <include name="*config*"/>  
  31.             </fileset>  
  32.         </copy>  
  33.     </target>  
  34.       
  35.     <target name="jar" depends="build">  
  36.         <copy todir="bin/lib">  
  37.             <fileset dir="lib">  
  38.                 <include name="**/*.*"/>  
  39.             </fileset>  
  40.         </copy>  
  41.           
  42.         <path id="lib-classpath">  
  43.             <fileset dir="lib" includes="**/*.jar" />  
  44.         </path>  
  45.           
  46.         <pathconvert property="my.classpath" pathsep=" " >  
  47.             <mapper>  
  48.                 <chainedmapper>  
  49.                     <!-- 移除绝对路径 -->  
  50.                     <flattenmapper />  
  51.                     <!-- 加上lib前缀 -->  
  52.                     <globmapper from="*" to="lib/*" />  
  53.                </chainedmapper>  
  54.              </mapper>  
  55.              <path refid="lib-classpath" />  
  56.         </pathconvert>  
  57.           
  58.         <jar basedir="bin" destfile="${jar.name}" >  
  59.             <include name="**/*"/>  
  60.             <!-- define MANIFEST.MF -->  
  61.             <manifest>  
  62.                 <attribute name="Class-Path" value="${my.classpath}" />  
  63.             </manifest>  
  64.         </jar>  
  65.     </target>  
  66. </project>  

运行上面的这个ant脚本之后,我们的项目就会被打成一个jar包,截图如下:
jar包有了之后,我们先测试在eclipse上如何把作业提交到hadoop集群上,只要把main方面的代码,稍加改动即可:

Java代码  
  1.  /** 
  2.       * main方法 
  3.       * */  
  4.      public static void main(String[] args) throws Exception{  
  5.          String inputPath="hdfs://192.168.75.130:9000/root/output";       
  6.           String outputPath="hdfs://192.168.75.130:9000/root/outputsort";  
  7.           JobConf conf=new JobConf();  
  8.          //Configuration conf=new Configuration();//可以使用这个conf来测试Local模式  
  9.          //如果在src目录下有,mapred-site.xml文件,就不要此行代码  
  10.          //注意此行代码也是在非Local模式下才使用  
  11.          conf.set("mapred.job.tracker","192.168.75.130:9001");  
  12.          // conf.get("mapred.job.tracker");  
  13.          System.out.println("模式:  "+conf.get("mapred.job.tracker"));  
  14.          // conf.setJar("tt.jar"); 非Local模式下使用  
  15.           FileSystem  fs=FileSystem.get(conf);  
  16.           Path pout=new Path(outputPath);  
  17.           if(fs.exists(pout)){  
  18.               fs.delete(pout, true);  
  19.               System.out.println("存在此路径, 已经删除......");  
  20.           }             
  21.           Job job=new Job(conf, "sort123");   
  22.           job.setJarByClass(TestSort.class);  
  23.           job.setOutputKeyClass(IntWritable.class);//告诉map,reduce输出K,V的类型  
  24.           FileInputFormat.setInputPaths(job, new Path(inputPath));  //输入路径  
  25.           FileOutputFormat.setOutputPath(job, new Path(outputPath));//输出路径    
  26.           job.setMapperClass(SMapper.class);//map类  
  27.           job.setReducerClass(SReduce.class);//reduce类  
  28.           job.setSortComparatorClass(SSort.class);//排序类  
  29. //          job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);  
  30. //        job.setOutputFormatClass(TextOutputFormat.class);  
  31.           System.exit(job.waitForCompletion(true) ? 0 : 1);    
  32.            
  33.            
  34.     }  
/**	  * main方法	  * */	 public static void main(String[] args) throws Exception{		 String inputPath="hdfs://192.168.75.130:9000/root/output";	    		  String outputPath="hdfs://192.168.75.130:9000/root/outputsort";		  JobConf conf=new JobConf();		 //Configuration conf=new Configuration();//可以使用这个conf来测试Local模式		 //如果在src目录下有,mapred-site.xml文件,就不要此行代码		 //注意此行代码也是在非Local模式下才使用		 conf.set("mapred.job.tracker","192.168.75.130:9001");		 // conf.get("mapred.job.tracker");		 System.out.println("模式:  "+conf.get("mapred.job.tracker"));		 // conf.setJar("tt.jar"); 非Local模式下使用		  FileSystem  fs=FileSystem.get(conf);		  Path pout=new Path(outputPath);		  if(fs.exists(pout)){			  fs.delete(pout, true);			  System.out.println("存在此路径, 已经删除......");		  } 		  		  Job job=new Job(conf, "sort123");           job.setJarByClass(TestSort.class);          job.setOutputKeyClass(IntWritable.class);//告诉map,reduce输出K,V的类型          FileInputFormat.setInputPaths(job, new Path(inputPath));  //输入路径          FileOutputFormat.setOutputPath(job, new Path(outputPath));//输出路径            job.setMapperClass(SMapper.class);//map类          job.setReducerClass(SReduce.class);//reduce类          job.setSortComparatorClass(SSort.class);//排序类//          job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);//		  job.setOutputFormatClass(TextOutputFormat.class);          System.exit(job.waitForCompletion(true) ? 0 : 1);  		 		 	}

运行程序,输出,如下:

Java代码  
  1. 模式:  192.168.75.130:9001  
  2. 存在此路径, 已经删除......  
  3. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
  4. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1  
  5. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
  6. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
  7. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201403252058_0035  
  8. INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
  9. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
  10. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%  
  11. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
  12. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201403252058_0035  
  13. INFO - Counters.log(585) | Counters: 29  
  14. INFO - Counters.log(587) |   Job Counters   
  15. INFO - Counters.log(589) |     Launched reduce tasks=1  
  16. INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=8498  
  17. INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
  18. INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
  19. INFO - Counters.log(589) |     Launched map tasks=1  
  20. INFO - Counters.log(589) |     Data-local map tasks=1  
  21. INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9667  
  22. INFO - Counters.log(587) |   File Output Format Counters   
  23. INFO - Counters.log(589) |     Bytes Written=26  
  24. INFO - Counters.log(587) |   FileSystemCounters  
  25. INFO - Counters.log(589) |     FILE_BYTES_READ=39  
  26. INFO - Counters.log(589) |     HDFS_BYTES_READ=140  
  27. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=117654  
  28. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=26  
  29. INFO - Counters.log(587) |   File Input Format Counters   
  30. INFO - Counters.log(589) |     Bytes Read=28  
  31. INFO - Counters.log(587) |   Map-Reduce Framework  
  32. INFO - Counters.log(589) |     Map output materialized bytes=39  
  33. INFO - Counters.log(589) |     Map input records=4  
  34. INFO - Counters.log(589) |     Reduce shuffle bytes=39  
  35. INFO - Counters.log(589) |     Spilled Records=8  
  36. INFO - Counters.log(589) |     Map output bytes=25  
  37. INFO - Counters.log(589) |     Total committed heap usage (bytes)=176033792  
  38. INFO - Counters.log(589) |     CPU time spent (ms)=1140  
  39. INFO - Counters.log(589) |     Combine input records=0  
  40. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112  
  41. INFO - Counters.log(589) |     Reduce input records=4  
  42. INFO - Counters.log(589) |     Reduce input groups=4  
  43. INFO - Counters.log(589) |     Combine output records=0  
  44. INFO - Counters.log(589) |     Physical memory (bytes) snapshot=259264512  
  45. INFO - Counters.log(589) |     Reduce output records=4  
  46. INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=1460555776  
  47. INFO - Counters.log(589) |     Map output records=4  
模式:  192.168.75.130:9001存在此路径, 已经删除......WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1WARN - NativeCodeLoader.
(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicableWARN - LoadSnappy.
(46) | Snappy native library not loadedINFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201403252058_0035INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0%INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33%INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100%INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201403252058_0035INFO - Counters.log(585) | Counters: 29INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=8498INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0INFO - Counters.log(589) | Launched map tasks=1INFO - Counters.log(589) | Data-local map tasks=1INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9667INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=26INFO - Counters.log(587) | FileSystemCountersINFO - Counters.log(589) | FILE_BYTES_READ=39INFO - Counters.log(589) | HDFS_BYTES_READ=140INFO - Counters.log(589) | FILE_BYTES_WRITTEN=117654INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=26INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=28INFO - Counters.log(587) | Map-Reduce FrameworkINFO - Counters.log(589) | Map output materialized bytes=39INFO - Counters.log(589) | Map input records=4INFO - Counters.log(589) | Reduce shuffle bytes=39INFO - Counters.log(589) | Spilled Records=8INFO - Counters.log(589) | Map output bytes=25INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792INFO - Counters.log(589) | CPU time spent (ms)=1140INFO - Counters.log(589) | Combine input records=0INFO - Counters.log(589) | SPLIT_RAW_BYTES=112INFO - Counters.log(589) | Reduce input records=4INFO - Counters.log(589) | Reduce input groups=4INFO - Counters.log(589) | Combine output records=0INFO - Counters.log(589) | Physical memory (bytes) snapshot=259264512INFO - Counters.log(589) | Reduce output records=4INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1460555776INFO - Counters.log(589) | Map output records=4

我们可以看出,运行正常,排序的内容如下:

Java代码  
  1. dd  99999  
  2. a   784  
  3. b   12  
  4. c   -11  
dd	99999a	784b	12c	-11

结果和local模式下的一样,还有一个与local模式不同的地方是,我们可以在http://192.168.75.130:50030/jobtracker.jsp的任务页面上找到刚才执行的任务状况,这一点在Local模式下运行程序,是没有的。/size]
[size=large]最后,散仙再来看下,如何将jar包,上传到Linux提交作业到hadoop集群上。刚才,我们已经把jar给打好了,现在只需上传到linux上即可:
然后开始执行shell命令运行程序:
到此,我们已经完美的执行成功,最后一点需要注意的是,在执行排序任务时,如果出现异常:

Java代码  
  1. java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.IntWritable  
  2.     at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)  
  3. Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.IntWritable  
  4.     at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1019)  
  5.     at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)  
  6.     at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)  
  7.     at com.qin.sort.TestSort$SMapper.map(TestSort.java:51)  
  8.     at com.qin.sort.TestSort$SMapper.map(TestSort.java:1)  
  9.     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)  
  10.     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)  
  11.     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)  
  12.     at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)  
  13.     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)  
  14.     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)  
  15.     at java.util.concurrent.FutureTask.run(FutureTask.java:166)  
  16.     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)  
  17.     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)  
  18.     at java.lang.Thread.run(Thread.java:722)  
java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.IntWritable	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.IntWritable	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1019)	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)	at com.qin.sort.TestSort$SMapper.map(TestSort.java:51)	at com.qin.sort.TestSort$SMapper.map(TestSort.java:1)	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)	at java.util.concurrent.FutureTask.run(FutureTask.java:166)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)	at java.lang.Thread.run(Thread.java:722)

这个异常的出现,多半是因为,我们没有指定输出的Key,或者Value,或者指定的类型不一致,导致,我们只需要正确的设置输出的Key或者Value的类型即可.

Java代码  
  1. job.setOutputKeyClass(IntWritable.class);  
  2.            job.setOutputValueClass(Text.class);  
job.setOutputKeyClass(IntWritable.class);		   job.setOutputValueClass(Text.class);

设置完后,就可以正常测试运行了。

转载地址:http://ufjoi.baihongyu.com/

你可能感兴趣的文章
Optional类包含的方法
查看>>
如何使用MR来读取数据库的数据,并写入HDFS上
查看>>
mapred-site.xml里面配置运行日志的输出目录
查看>>
DistributedCache是Hadoop的一个分布式文件缓存类
查看>>
FileSplit:文件的子集--文件分割体
查看>>
使用Hadoop的MapReduce来完成大表join
查看>>
常用的算法
查看>>
Mina框架
查看>>
Spring MVC 和 Servlet 一样,都不是线程安全的
查看>>
Java线程:线程的同步与锁
查看>>
Mac、Windows可以互相远程
查看>>
oracle提示 ORA-12154: TNS: 无法解析指定的连接标识符
查看>>
oracle 插入数据时提示没有足够的值
查看>>
Oracle Net Manager的使用及配置
查看>>
镜像文件
查看>>
苹果笔记本桌面下面的工具栏没了怎么调出来
查看>>
CSS原理与CSS经验分享
查看>>
oracle中int与number的区别
查看>>
php不用jsonp也能跨域
查看>>
solr作为一种开源的搜索服务器
查看>>