package com.qin.test.hadoop.jobctrol;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop. io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org. apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce. lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Hadoop version is 1.2
* JDK environment 1.6
* Use ControlledJob+JobControl new version API< /p>
* Complete combined tasks
* The first task is to count word frequency
* The second task is to sort in descending order
* < /p>
* If you use MapReduce job to complete, you need to run 2 MR tasks
* But if we use JobControl+ControlledJob, we can do it in
* One The DAG-dependent tasks of the type are completed in the class
*
*
* @author qindongliang
*
< p> **
* ***/
public class MyHadoopControl {
/***
< p> **Mapper of MapReduce job 1
*
*LongWritable 1 represents the input key value, and the default is the position offset of the text p>
*Text 2 The specific content of each line
*Text 3 The output Key type
*Text 4 The output Value type
* < /p>
* */
private static class SumMapper extends Mapper
private Text t=new Text(); p>
private IntWritable one=new IntWritable(1);
/**
*
* Output word frequency in map stage
< p> **
* **/
@Override
protected void map(LongWritable key, Text value,Context context )
throws IOException, InterruptedException {
String data=value.toString();
String words[]=data.split(";");
if(words[0].trim()!=null){
t.set(" "+words[0]);//Assign value K
one.set(Integer.parseInt(words[1]));
context.write(t, one);
}
}
}
/**
* Reducer of MapReduce job 1
* Responsible for word frequency accumulation and output
< p> ** **/
private static class SumReduce extends Reducer
//Storage word frequency object< /p>
private IntWritable iw=new IntWritable();
@Override
protected void reduce(Text key, Iterable
throws IOException, InterruptedException {
int sum=0;
for(IntWritable count:value){
sum+=count.get() ;//Accumulate word frequency
}
iw.set(sum);//Set word frequency
context.write(iw, key);//Output Data
}
}
/**
* Mapper sorted by MapReduce job 2
* < /p>
* **/
private static class SortMapper extends Mapper
IntWritable iw=new IntWritable();// Store word frequency
private Text t=new Text();//Storage text
@Override
protected void map(LongWritable key, Text value,Context context )throws IOException, InterruptedException {
String words[]=value.toString().split(" ");
System.out.println("The length of the array: "+ words.length);
System.out.println("Text read by Map: "+value.toString());
System.out.println("== ===> "+words[0]+" =====>"+words[1]);
if(words[0]!=null){
iw.set(Integer.parseInt(words[0].trim()));
t.set(words[1].trim());
context. write(iw, t);//map stage output, sorted by key by default
}
}
}
/* *
* MapReduce job 2 sorted Reducer
*
* **/
private static class SortReduce extends Reducer
/**
*
* Output sorted content
*
* **/
@Override
protected void reduce(IntWritable key, Iterable
throws IOException, InterruptedException {< /p>
for(Text t:value){
context.write(t, key);//Output the sorted K, V
}< /p>
}
}
/***
* Sorting component needs to be used in sorting operations
* Sort by key in descending order
*
* **/
public static class DescSort extends WritableComparator{
public DescSort( ) {
super(IntWritable.class,true);//Register sorting component
}
@Override
public int compare (byte[] arg0, int arg1, int arg2, byte[] arg3,
int arg4, int arg5) {
return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//Note the use of negative signs to complete descending order
}
@Override
public int compare(Object a, Object b) {
return -super.compare(a, b);//Note the use of negative signs to complete descending order
}
}
/**
* Driver class
*
* **/
public static void main(String[] args) throws Exception {
JobConf conf=new JobConf(MyHadoopControl.class);
conf.set("mapred.job.tracker","192.168.75.130:9001");< /p>
conf.setJar("tt.jar");
System.out.println("Mode: "+conf.get("mapred.job.tracker"));;
/**
*
*Configuration of job 1
*Statistical word frequency
*
p>
* **/
Job job1=new Job(conf,"Join1");
job1.setJarByClass(MyHadoopControl.class);
< p> job1.setMapperClass(SumMapper.class);job1.setReducerClass(SumReduce.class);
job1.setMapOutputKeyClass(Text.class);//The output of the map stage key
job1.setMapOutputValueClass(IntWritable.class);//The value of the output of the map stage
job1.setOutputKeyClass(IntWritable.class);//The key of the output of the reduce stage< /p>
job1.setOutputValueClass(Text.class);//The output value of the reduce stage
//Add to the control container
ControlledJob ctrljob1=new ControlledJob(conf) ;
ctrljob1.setJob(job1);
FileInputFormat.addInputPath(job1, new Path("hdfs://192.168.75.130:9000/root/input"));< /p>
FileSystem fs=FileSystem.get(conf);
Path op=new Path("hdfs://192.168.75.130:9000/root/op");
if(fs.exists(op)){
fs.delete(op, true);
System.out.println("This output path exists and has been deleted ! ! ! ");
}
FileOutputFormat.setOutputPath(job1, op);
/**============== ================================================== =========*/
/**
*
*Configuration of job 2
*Sort
*
* **/
Job job2=new Job(conf,"Join2");
job2.setJarByClass( MyHadoopControl.class);
//job2.setInputFormatClass(TextInputFormat.class);
job2.setMapperClass(SortMapper.class);
job2.setReducerClass( SortReduce.class);
job2.setSortComparatorClass(DescSort.class);//Sort by key in descending order
job2.setMapOutputKeyClass(IntWritable.class);//The output of the map stage key
job2.setMapOutputValueClass(Text.class);//The value of the output of the map stage
job2.setOutputKeyClass(Text.class);//The key of the output of the reduce stage< /p>
job2.setOutputValueClass(IntWritable.class);//The output value of the reduce stage
//Job 2 is added to the control container
ControlledJob ctrljob2=new ControlledJob( conf);
ctrljob2.setJob(job2);
/***
*
* Set multiple jobs directly Dependency
* Written as follows:
* It means that the startup of job2 depends on the completion of job1
*
* **/
ctrljob2.addDependingJob(ctrljob1);
//The input path is the output path of the previous job
FileInputFormat.addInputPath(job2, new Path ("hdfs://192.168.75.130:9000/root/op/part*"));
FileSystem fs2=FileSystem.get(conf);
Path op2=new Path("hdfs://192.168.75.130:9000/root/op2");
if(fs2.exists(op2)){
fs2.delete(op2, true );
System.out.println("This output path exists and has been deleted! ! ! ");
}
FileOutputFormat.setOutputPath(job2, op2);
// System.exit(job2.waitForCompletion(true) ? 0 : 1 );