Benutzer-Werkzeuge

Webseiten-Werkzeuge


nosql:mapreduce_hadoop_20_job

Unterschiede

Hier werden die Unterschiede zwischen zwei Versionen angezeigt.

Link zu dieser Vergleichsansicht

Beide Seiten der vorigen RevisionVorhergehende Überarbeitung
Nächste Überarbeitung
Vorhergehende Überarbeitung
nosql:mapreduce_hadoop_20_job [2014/09/24 16:36] – [Einen eigenen MapReduce in Java für Hadoop entwickeln] gpipperrnosql:mapreduce_hadoop_20_job [2014/09/28 22:06] (aktuell) – [Quellen] gpipperr
Zeile 1: Zeile 1:
 +=====Einen eigenen ersten MapReduce Job erstellen=====
  
 +
 +==== Einleitung====
 +
 +  * Ein MapReduce Job verdichtet die Daten Schritt für Schritt
 +  * Die Verarbeitung erfolgt Listen orientiert
 +  * Sehr gut für die Parallelisierung geeignet
 +  * Ursprung in der funktionalen Programmierung (Funktion map() und fold() bzw reduce())
 +
 +Zwei Haupt Phasen:
 +
 +  * Map
 +    * Liest Key/Value Paare ein und gibt Key/Value Paare je nach Bedarf wieder aus ( wie z.b. sortiert)
 +    * map (input) -> list(intermediate_value) 
 +
 +
 +  * Reduce
 +    * Verarbeitet die Key/Value Paare und giebt ebenfalls Key/Value Paare wieder aus
 +    * reduce (out_key, list(intermediate_value)) -> list(out_value)
 +
 +
 +
 +
 +
 +=== Ein erstes Beispiel ===
 +
 +Übersicht - eine Liste aller in den Texten vorkommenden Auto Hersteller erstellen:
 +
 +{{ :hadoop:mapreduce_v01.png?400 |Übersicht MapReduce}}
 +
 +
 +
 +==== Einen eigenen MapReduce  in Java für Hadoop entwickeln====
 +
 +=== Übersicht über die generelle Architektur ===
 +
 +Im Beispiel sollen die Buchstaben in den Daten gezählt werden:
 +
 +{{ :hadoop:mapreduce_hadoop_v01.png?500 |MapReduce mit Hadoop}}
 +
 +
 +Die Elemente im einzelnen:
 +
 +
 +**Client**:
 +  * Der Client Driver konfiguriert den Job und sendet Job Anfrage an das Cluster
 +  * Über das Interface "JobConf" wird der Job konfiguriert
 +
 +**InputFormater**:
 +  * Der Mapper Prozesse verwendet einen InputFormater um die Daten zu lesen
 +
 +**Mapper**:
 +  * Der Mapper ließt die Daten in Key/Value Format ein
 +  * Pro Zeile (je nach InputFormater  ) wird die map Methode einmal aufgerufen
 +    * Im Default ist der Key der Offset der Zeile (Zeilennummer)
 +  * Der Mapper erstellt eine Liste mit Output Values im Key/Value Format
 +  * Mapper speicher Daten lokal und werden vom Reducer dann abgeholt
 +    * Konfigurierbar ab wievel % bereits vor Abschluss Daten bereitgestellt werden sollen 
 +  * Der eigene Mapper wird von der Klasse **"Mapper<Object, Text, Text, IntWritable>"** abgeleitet
 +  * Pro split/Block einer Datei wird eine Mapper Instance gestartet
 +
 +**Shuffle-And-Sort**:
 +  * Zuständig für das Sortieren der Ergebnisse der einzelnen Mapper als Input für den Reducer
 +  * Ein Teil wird im Mapper,ein Teil im Reducer durchgeführt
 +
 +**Combiner**:
 +  * Läuft auf dem selben Knoten wie der Mapper
 +  * Dient zur Verdichtung von Ergebnissen des Mappers
 +  * Wird aber nur bei Bedarf verwendet
 +  * Kann gleiche Klasse wie der Reducer sein falls (nur bei distributiven Funktionen wie A=B+C B=A+C)
 +
 +**Partitioner**:
 +  * Legt fest wie die Daten auf die Reducer verteilt werden sollen
 +
 +
 +**Sort**:
 +  * Vor der Verarbeitung durch den Reducer werden die Daten sortiert
 +
 +
 +**Reducer**:
 +  * Verarbeitet die Zwischenergebnisse 
 +  * Erzeugt das Endergebnis
 +  * Die eigene Reducer Klasse wird von der Klasse **"Reducer<Text, IntWritable, Text, IntWritable>"** abgeleitet
 +
 +**OutputFormat**:
 +  * Das OutputFormat legt fest wie der Reducer die Daten im HDFS ablegt
 +  * Default: Tab Format
 +
 +
 +=== Implementierung ===
 +
 +Mit Hadoop 2 hat sich die API geändert, es muss daher das Package **"org.apache.hadoop.mapreduce"** verwendet werden.
 +
 +== Umgebung einrichten ===
 +
 +Ziel ist es den Job mit dem JDeveloper auf einem Windows7 Rechner zu erstellen.
 +\\
 +Größte Herausforderung ist dabei eine laufähige Umgebung unter Win7 einzurichten.
 +\\
 +=> [[nosql:hadoop_connect_from_windows|Mit MS Windows Clients mit Hadoop arbeiten]]
 +
 +
 +
 +Für den lokalen Test kann ist die Implementierung des "hadoop Tool interfaces" notwendig!
 +
 +Beispiel siehe => http://hadoopi.wordpress.com/2013/06/05/hadoop-implementing-the-tool-interface-for-mapreduce-driver/
 +
 +<code bash>
 +hadoop jar <jar file> <startClass> -fs file:/// -jt local <local input Dir> <local output Dir>
 +</code>
 +
 +== Die Aufgabe ===
 +
 +Es soll das in Oracle typische Beispiel, wie viele Mitarbeiter arbeiten in welcher Abteilung umgesetzt werden:
 +
 +<code sql>
 +select count(*), deptno from emp
 +/
 +</code>
 +
 +Zur Zeit allerdings erstmal das Sum Beispiel implementiert um einen ersten einfachen Test zumzuseten.
 +
 +
 +Die Daten werden im dem typischem emp format als CSV Liste im HDFS abgelegt.
 +
 +Die klassischen Testdaten erzeugen:
 +<code sql>
 +set linesize 1000 
 +set trimspool on 
 +set pagesize 0 
 +set feedback off
 +
 +spool /tmp/emp.csv
 +
 +select empno || ':' ||
 +       ename || ':' ||
 +       job   || ':' ||
 +       mgr   || ':' ||
 +       to_char(hiredate,'dd.mm.yyyy')|| ':' ||
 +       sal   || ':' ||
 +       comm  || ':' ||
 +       deptno
 + from  scott.emp
 +order by ename
 +/
 +
 +spool off
 +</code>
 +
 +CSV auf das HDFS kopieren
 +
 +<code bash>
 +hdfs dfs -put /tmp/emp.csv /user/gpipperr
 +hdfs dfs -cat /user/gpipperr/emp.csv
 +
 +7499:ALLEN:SALESMAN:7698:20.02.1981:1600:300:30
 +7782:CLARK:MANAGER:7839:09.06.1981:2450::10
 +7566:JONES:MANAGER:7839:02.04.1981:2975::20
 +7839:KING:PRESIDENT::17.11.1981:5000::10
 +7654:MARTIN:SALESMAN:7698:28.09.1981:1250:1400:30
 +7934:MILLER:CLERK:7782:23.01.1982:1300::10
 +7844:TURNER:SALESMAN:7698:08.09.1981:1500:0:30
 +7521:WARD:SALESMAN:7698:22.02.1981:1250:500:30
 +
 +
 +</code>
 +
 +
 +== Die Mapper Klasse ==
 +
 +
 +<code java>
 +package gpi.hadoop;
 +
 +
 +import java.io.IOException;
 +
 +import java.util.StringTokenizer;
 +
 +import org.apache.hadoop.io.IntWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Mapper;
 +
 +// input Key - input Value - output Key - output Value
 +public class DeptCountMapper extends Mapper<Object, Text, Text, IntWritable> {
 +
 +
 +    static IntWritable oneValue = new IntWritable(1);
 +    
 +    @Override
 +    // input Key - input Value - output Value
 +    public void map(Object key, Text value, Context contex) throws IOException, InterruptedException {
 +        /*
 +        input:
 +        0    1          3    4               7
 +        7876:ADAMS:CLERK:7788:12.01.1983:1100:300:20
 +
 +        Split in Key/Values pairs
 +
 +        20,1 
 +
 +       */
 +
 +        // read on row
 +        String[] emprow = value.toString().split(":");
 +
 +        String deptno = emprow[7];
 +        
 +        contex.write(new Text(deptno), oneValue);      
 +
 +    }
 +}
 +</code>
 +
 +
 +== Die Reducer Klasse ==
 +
 +<code java>
 +package gpi.hadoop;
 +
 +import java.io.IOException;
 +
 +import java.util.Iterator;
 +
 +import org.apache.hadoop.io.IntWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Reducer;
 +
 +
 +public class DeptCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 +
 +    private IntWritable totalWordCount = new IntWritable();
 +
 +       @Override
 +    public void reduce(Text deptno, Iterable<IntWritable> counts, Context context) throws IOException,
 +                                                                                          InterruptedException {
 +        //logic
 +
 +        /*
 +         
 +        30,1,1,1,
 +        50,1,1
 +
 +        Result should look like this
 +        30 3
 +        60 2
 +        
 +        */
 +        int deptcount = 0;
 +        for (IntWritable count : counts) {
 +            deptcount += 1;
 +        }
 +        context.write(deptno, new IntWritable(deptcount));
 +    }
 +}
 +</code>
 +
 +
 +== Driver ==
 +
 +<code java>
 +package gpi.hadoop;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.IntWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.GenericOptionsParser;
 +
 +public class DeptCount {
 +
 +        public static void main(String[] args) throws Exception {
 +
 +        Configuration conf = new Configuration();
 +        
 +        
 +        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 +        if (otherArgs.length != 3) {
 +            System.err.println("Usage: DeptCount [input] [output]");
 +            System.exit(2);
 +        }
 +        else  {
 +            System.out.println("Call DeptCount with Parameter 1::"+args[1]+" Parameter 2::"+args[2]);
 +        }
 +
 +        // create a new Configuration
 +        Job job =  Job.getInstance(conf);
 +        
 +        job.setJobName(args[0]);
 +
 +        // Mapper
 +        job.setInputFormatClass(TextInputFormat.class);
 +        job.setMapperClass(DeptCountMapper.class);
 +
 +
 +        // Reducer
 +        job.setReducerClass(DeptCountReducer.class);
 +        job.setOutputFormatClass(TextOutputFormat.class);
 +
 +        // Input and Output Path to the data
 +        FileInputFormat.setInputPaths(job, new Path(args[1]));
 +        FileOutputFormat.setOutputPath(job, new Path(args[2]));
 +
 +        //main driver Class
 +        job.setJarByClass(DeptCountT.class);
 +
 +        //set Output Class
 +        job.setOutputKeyClass(Text.class);
 +        job.setOutputValueClass(IntWritable.class);
 +
 +        job.submit();
 +
 +    }
 +}
 +
 +</code>
 +
 +
 +Beispiel für die Verwendung des Tool Interfaces für die Startklasse:
 +<code java>
 +package gpi.hadoop;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.IntWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +
 +public class DeptCountT extends Configured implements Tool {
 +
 +    public static void main(String[] args) throws Exception {
 +        int res = ToolRunner.run(new Configuration(), new DeptCountT(), args);
 +        System.exit(res);
 +    }
 +
 +    @Override
 +    public int run(String[] args) throws Exception {
 +        //get Config object
 +        Configuration conf = this.getConf();
 +
 +        // create a new Configuration
 +        Job job = Job.getInstance(conf);
 +
 +        job.setJobName(args[0]);
 +
 +        // Mapper
 +        job.setInputFormatClass(TextInputFormat.class);
 +        job.setMapperClass(DeptCountMapper.class);
 +
 +
 +        // Reducer
 +        job.setReducerClass(DeptCountReducer.class);
 +        job.setOutputFormatClass(TextOutputFormat.class);
 +
 +        // Input and Output Path to the data
 +        FileInputFormat.setInputPaths(job, new Path(args[0]));
 +        FileOutputFormat.setOutputPath(job, new Path(args[1]));
 +
 +        //main driver Class
 +        job.setJarByClass(DeptCount.class);
 +
 +        //set Output Class
 +        job.setOutputKeyClass(Text.class);
 +        job.setOutputValueClass(IntWritable.class);
 +
 +        // Execute job and return status
 +        return job.waitForCompletion(true) ? 0 : 1;
 +    }
 +}
 +
 +</code>
 +
 +
 +Jar file erzeugen und auf den Hadoop Server kopieren
 +
 +=> [[https://blogs.oracle.com/bwb/resource/Jdev_jar_deployments/Creating_Jar_Deployments_with_JDeveloper.html|Jar File im Oracle  JDeveloper]]
 +
 +
 +Jar File aufrufen:
 +
 +<code bash>
 +
 +yarn jar GpiHadoopExamples.jar gpi.hadoop.DeptCount /user/gpipperr/emp.csv /user/gpipperr/empRun1
 +
 +yarn application -list
 +
 +
 +hdfs dfs -cat /user/gpipperr/empRun1/part-r-00000
 +
 +10 3 
 +20 1
 +30 4
 +
 +</code>
 +
 +**Hinweise**
 +
 +
 +Wird mit **yarn jar <class.jar> <class_name> <in> <out>"** der Job aufgerufen, ist der Parameter 1 der Klassenamen, der Parameter 2  das in Verzeichnis und der Parameter 3 das out Verzeichnis.
 +
 +D.h. passt aber nicht zu der Verwendung der Eingabe Parameter in den verbreiteten Dokumentationen und Beispielen.
 +
 +Fehler:
 +<code>
 +istsException: Output directory hdfs://quickstart.cloudera:8020/user/gpipperr/emp.csv already exists
 +Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://quickstart.cloudera:8020/user/gpipperr/emp.csv already exists
 +</code>
 +
 +
 +Nur wenn in der Jar Datei die Main Klasse definiert wird und ein Aufruf nach diesem Muster durchgeführt wird, passt
 +das im Detail: **yarn jar <class.jar> <class_name> <in> <out>"** 
 +
 +Ansonsten muss bei Fehler mit den Aufrufparamentern (Typischerweise "Output directory xxx already exists" ) der Index der Parameter im Code entsprechend angepasst werden.
 +
 +
 +
 +
 +=== Weiteres Beispiel===
 +
 +=> [[nosql:oracle_nosql_hadoop_integration|Die Oracle NoSQL per MapReduce mit Hadoop verwenden]]
 +
 +==== Quellen ====
 +
 +Gute weitere Beispiele:
 +
 +
 +
 +  * http://wiki.apache.org/hadoop/WordCount
 +  * http://stevekrenzel.com/finding-friends-with-mapreduce
 +  * http://www.javacodegeeks.com/2013/07/mapreduce-algorithms-understanding-data-joins-part-1.html
 +
 +MapAndReduce Patterns:
 +  * http://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/
 +  * http://courses.cs.washington.edu/courses/cse490h/08au/lectures/MapReduceDesignPatterns-UW2.pdf
 +  * http://www.ccs.neu.edu/home/mirek/classes/2012-F-CS6240/Slides/4-DesignPatterns.pdf
 +  * http://chandramanitiwary.wordpress.com/2012/08/18/mapreduce-secondary-sort/
 +
 +Dell Zhang
 +  * http://www.dcs.bbk.ac.uk/~dell/teaching/cc/
 +  * http://www.dcs.bbk.ac.uk/~dell/teaching/cc/book/ditp/ditp_ch3.pdf
 +  * https://github.com/lintool/MapReduce-course-2013s
 +
 +
 +Buch:
 +  *  MapReduce cookbook
nosql/mapreduce_hadoop_20_job.txt · Zuletzt geändert: 2014/09/28 22:06 von gpipperr