Current location - Plastic Surgery and Aesthetics Network - Plastic surgery and medical aesthetics - How to use Hadoop’s JobControl
How to use Hadoop’s JobControl

package com.qin.test.hadoop.jobctrol;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop. io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparator;

import org. apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce. lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

* Hadoop version is 1.2

* JDK environment 1.6

* Use ControlledJob+JobControl new version API< /p>

* Complete combined tasks

* The first task is to count word frequency

* The second task is to sort in descending order

* < /p>

* If you use MapReduce job to complete, you need to run 2 MR tasks

* But if we use JobControl+ControlledJob, we can do it in

* One The DAG-dependent tasks of the type are completed in the class

*

*

* @author qindongliang

*

< p> *

*

* ***/

public class MyHadoopControl {

/***

< p> *

*Mapper of MapReduce job 1

*

*LongWritable 1 represents the input key value, and the default is the position offset of the text

*Text 2 The specific content of each line

*Text 3 The output Key type

*Text 4 The output Value type

* < /p>

* */

private static class SumMapper extends Mapper{

private Text t=new Text();

private IntWritable one=new IntWritable(1);

/**

*

* Output word frequency in map stage

< p> *

*

* **/

@Override

protected void map(LongWritable key, Text value,Context context )

throws IOException, InterruptedException {

String data=value.toString();

String words[]=data.split(";");

if(words[0].trim()!=null){

t.set(" "+words[0]);//Assign value K

one.set(Integer.parseInt(words[1]));

context.write(t, one);

}

}

}

/**

* Reducer of MapReduce job 1

* Responsible for word frequency accumulation and output

< p> *

* **/

private static class SumReduce extends Reducer{

//Storage word frequency object< /p>

private IntWritable iw=new IntWritable();

@Override

protected void reduce(Text key, Iterable value,Context context)

throws IOException, InterruptedException {

int sum=0;

for(IntWritable count:value){

sum+=count.get() ;//Accumulate word frequency

}

iw.set(sum);//Set word frequency

context.write(iw, key);//Output Data

}

}

/**

* Mapper sorted by MapReduce job 2

* < /p>

* **/

private static class SortMapper extends Mapper{

IntWritable iw=new IntWritable();// Store word frequency

private Text t=new Text();//Storage text

@Override

protected void map(LongWritable key, Text value,Context context )throws IOException, InterruptedException {

String words[]=value.toString().split(" ");

System.out.println("The length of the array: "+ words.length);

System.out.println("Text read by Map: "+value.toString());

System.out.println("== ===> "+words[0]+" =====>"+words[1]);

if(words[0]!=null){

iw.set(Integer.parseInt(words[0].trim()));

t.set(words[1].trim());

context. write(iw, t);//map stage output, sorted by key by default

}

}

}

/* *

* MapReduce job 2 sorted Reducer

*

* **/

private static class SortReduce extends Reducer{

/**

*

* Output sorted content

*

* **/

@Override

protected void reduce(IntWritable key, Iterable value,Context context)

throws IOException, InterruptedException {< /p>

for(Text t:value){

context.write(t, key);//Output the sorted K, V

}< /p>

}

}

/***

* Sorting component needs to be used in sorting operations

* Sort by key in descending order

*

* **/

public static class DescSort extends WritableComparator{

public DescSort( ) {

super(IntWritable.class,true);//Register sorting component

}

@Override

public int compare (byte[] arg0, int arg1, int arg2, byte[] arg3,

int arg4, int arg5) {

return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//Note the use of negative signs to complete descending order

}

@Override

public int compare(Object a, Object b) {

return -super.compare(a, b);//Note the use of negative signs to complete descending order

}

}

/**

* Driver class

*

* **/

public static void main(String[] args) throws Exception {

JobConf conf=new JobConf(MyHadoopControl.class);

conf.set("mapred.job.tracker","192.168.75.130:9001");< /p>

conf.setJar("tt.jar");

System.out.println("Mode: "+conf.get("mapred.job.tracker"));;

/**

*

*Configuration of job 1

*Statistical word frequency

*

p>

* **/

Job job1=new Job(conf,"Join1");

job1.setJarByClass(MyHadoopControl.class);

< p> job1.setMapperClass(SumMapper.class);

job1.setReducerClass(SumReduce.class);

job1.setMapOutputKeyClass(Text.class);//The output of the map stage key

job1.setMapOutputValueClass(IntWritable.class);//The value of the output of the map stage

job1.setOutputKeyClass(IntWritable.class);//The key of the output of the reduce stage< /p>

job1.setOutputValueClass(Text.class);//The output value of the reduce stage

//Add to the control container

ControlledJob ctrljob1=new ControlledJob(conf) ;

ctrljob1.setJob(job1);

FileInputFormat.addInputPath(job1, new Path("hdfs://192.168.75.130:9000/root/input"));< /p>

FileSystem fs=FileSystem.get(conf);

Path op=new Path("hdfs://192.168.75.130:9000/root/op");

if(fs.exists(op)){

fs.delete(op, true);

System.out.println("This output path exists and has been deleted ! ! ! ");

}

FileOutputFormat.setOutputPath(job1, op);

/**============== ================================================== =========*/

/**

*

*Configuration of job 2

*Sort

*

* **/

Job job2=new Job(conf,"Join2");

job2.setJarByClass( MyHadoopControl.class);

//job2.setInputFormatClass(TextInputFormat.class);

job2.setMapperClass(SortMapper.class);

job2.setReducerClass( SortReduce.class);

job2.setSortComparatorClass(DescSort.class);//Sort by key in descending order

job2.setMapOutputKeyClass(IntWritable.class);//The output of the map stage key

job2.setMapOutputValueClass(Text.class);//The value of the output of the map stage

job2.setOutputKeyClass(Text.class);//The key of the output of the reduce stage< /p>

job2.setOutputValueClass(IntWritable.class);//The output value of the reduce stage

//Job 2 is added to the control container

ControlledJob ctrljob2=new ControlledJob( conf);

ctrljob2.setJob(job2);

/***

*

* Set multiple jobs directly Dependency

* Written as follows:

* It means that the startup of job2 depends on the completion of job1

*

* **/

ctrljob2.addDependingJob(ctrljob1);

//The input path is the output path of the previous job

FileInputFormat.addInputPath(job2, new Path ("hdfs://192.168.75.130:9000/root/op/part*"));

FileSystem fs2=FileSystem.get(conf);

Path op2=new Path("hdfs://192.168.75.130:9000/root/op2");

if(fs2.exists(op2)){

fs2.delete(op2, true );

System.out.println("This output path exists and has been deleted! ! ! ");

}

FileOutputFormat.setOutputPath(job2, op2);

// System.exit(job2.waitForCompletion(true) ? 0 : 1 );