nosql:mapreduce_hadoop_20_job
Unterschiede
Hier werden die Unterschiede zwischen zwei Versionen angezeigt.
Beide Seiten der vorigen RevisionVorhergehende ÜberarbeitungNächste Überarbeitung | Vorhergehende Überarbeitung | ||
nosql:mapreduce_hadoop_20_job [2014/09/24 15:46] – [Einen eigenen MapReduce in Java für Hadoop entwickeln] gpipperr | nosql:mapreduce_hadoop_20_job [2014/09/28 22:06] (aktuell) – [Quellen] gpipperr | ||
---|---|---|---|
Zeile 1: | Zeile 1: | ||
+ | =====Einen eigenen ersten MapReduce Job erstellen===== | ||
+ | |||
+ | ==== Einleitung==== | ||
+ | |||
+ | * Ein MapReduce Job verdichtet die Daten Schritt für Schritt | ||
+ | * Die Verarbeitung erfolgt Listen orientiert | ||
+ | * Sehr gut für die Parallelisierung geeignet | ||
+ | * Ursprung in der funktionalen Programmierung (Funktion map() und fold() bzw reduce()) | ||
+ | |||
+ | Zwei Haupt Phasen: | ||
+ | |||
+ | * Map | ||
+ | * Liest Key/Value Paare ein und gibt Key/Value Paare je nach Bedarf wieder aus ( wie z.b. sortiert) | ||
+ | * map (input) -> list(intermediate_value) | ||
+ | |||
+ | |||
+ | * Reduce | ||
+ | * Verarbeitet die Key/Value Paare und giebt ebenfalls Key/Value Paare wieder aus | ||
+ | * reduce (out_key, list(intermediate_value)) -> list(out_value) | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | === Ein erstes Beispiel === | ||
+ | |||
+ | Übersicht - eine Liste aller in den Texten vorkommenden Auto Hersteller erstellen: | ||
+ | |||
+ | {{ : | ||
+ | |||
+ | |||
+ | |||
+ | ==== Einen eigenen MapReduce | ||
+ | |||
+ | === Übersicht über die generelle Architektur === | ||
+ | |||
+ | Im Beispiel sollen die Buchstaben in den Daten gezählt werden: | ||
+ | |||
+ | {{ : | ||
+ | |||
+ | |||
+ | Die Elemente im einzelnen: | ||
+ | |||
+ | |||
+ | **Client**: | ||
+ | * Der Client Driver konfiguriert den Job und sendet Job Anfrage an das Cluster | ||
+ | * Über das Interface " | ||
+ | |||
+ | **InputFormater**: | ||
+ | * Der Mapper Prozesse verwendet einen InputFormater um die Daten zu lesen | ||
+ | |||
+ | **Mapper**: | ||
+ | * Der Mapper ließt die Daten in Key/Value Format ein | ||
+ | * Pro Zeile (je nach InputFormater | ||
+ | * Im Default ist der Key der Offset der Zeile (Zeilennummer) | ||
+ | * Der Mapper erstellt eine Liste mit Output Values im Key/Value Format | ||
+ | * Mapper speicher Daten lokal und werden vom Reducer dann abgeholt | ||
+ | * Konfigurierbar ab wievel % bereits vor Abschluss Daten bereitgestellt werden sollen | ||
+ | * Der eigene Mapper wird von der Klasse **" | ||
+ | * Pro split/Block einer Datei wird eine Mapper Instance gestartet | ||
+ | |||
+ | **Shuffle-And-Sort**: | ||
+ | * Zuständig für das Sortieren der Ergebnisse der einzelnen Mapper als Input für den Reducer | ||
+ | * Ein Teil wird im Mapper,ein Teil im Reducer durchgeführt | ||
+ | |||
+ | **Combiner**: | ||
+ | * Läuft auf dem selben Knoten wie der Mapper | ||
+ | * Dient zur Verdichtung von Ergebnissen des Mappers | ||
+ | * Wird aber nur bei Bedarf verwendet | ||
+ | * Kann gleiche Klasse wie der Reducer sein falls (nur bei distributiven Funktionen wie A=B+C B=A+C) | ||
+ | |||
+ | **Partitioner**: | ||
+ | * Legt fest wie die Daten auf die Reducer verteilt werden sollen | ||
+ | |||
+ | |||
+ | **Sort**: | ||
+ | * Vor der Verarbeitung durch den Reducer werden die Daten sortiert | ||
+ | |||
+ | |||
+ | **Reducer**: | ||
+ | * Verarbeitet die Zwischenergebnisse | ||
+ | * Erzeugt das Endergebnis | ||
+ | * Die eigene Reducer Klasse wird von der Klasse **" | ||
+ | |||
+ | **OutputFormat**: | ||
+ | * Das OutputFormat legt fest wie der Reducer die Daten im HDFS ablegt | ||
+ | * Default: Tab Format | ||
+ | |||
+ | |||
+ | === Implementierung === | ||
+ | |||
+ | Mit Hadoop 2 hat sich die API geändert, es muss daher das Package **" | ||
+ | |||
+ | == Umgebung einrichten === | ||
+ | |||
+ | Ziel ist es den Job mit dem JDeveloper auf einem Windows7 Rechner zu erstellen. | ||
+ | \\ | ||
+ | Größte Herausforderung ist dabei eine laufähige Umgebung unter Win7 einzurichten. | ||
+ | \\ | ||
+ | => [[nosql: | ||
+ | |||
+ | |||
+ | |||
+ | Für den lokalen Test kann ist die Implementierung des " | ||
+ | |||
+ | Beispiel siehe => http:// | ||
+ | |||
+ | <code bash> | ||
+ | hadoop jar <jar file> < | ||
+ | </ | ||
+ | |||
+ | == Die Aufgabe === | ||
+ | |||
+ | Es soll das in Oracle typische Beispiel, wie viele Mitarbeiter arbeiten in welcher Abteilung umgesetzt werden: | ||
+ | |||
+ | <code sql> | ||
+ | select count(*), deptno from emp | ||
+ | / | ||
+ | </ | ||
+ | |||
+ | Zur Zeit allerdings erstmal das Sum Beispiel implementiert um einen ersten einfachen Test zumzuseten. | ||
+ | |||
+ | |||
+ | Die Daten werden im dem typischem emp format als CSV Liste im HDFS abgelegt. | ||
+ | |||
+ | Die klassischen Testdaten erzeugen: | ||
+ | <code sql> | ||
+ | set linesize 1000 | ||
+ | set trimspool on | ||
+ | set pagesize 0 | ||
+ | set feedback off | ||
+ | |||
+ | spool / | ||
+ | |||
+ | select empno || ':' | ||
+ | ename || ':' | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | order by ename | ||
+ | / | ||
+ | |||
+ | spool off | ||
+ | </ | ||
+ | |||
+ | CSV auf das HDFS kopieren | ||
+ | |||
+ | <code bash> | ||
+ | hdfs dfs -put / | ||
+ | hdfs dfs -cat / | ||
+ | |||
+ | 7499: | ||
+ | 7782: | ||
+ | 7566: | ||
+ | 7839: | ||
+ | 7654: | ||
+ | 7934: | ||
+ | 7844: | ||
+ | 7521: | ||
+ | |||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | == Die Mapper Klasse == | ||
+ | |||
+ | |||
+ | <code java> | ||
+ | package gpi.hadoop; | ||
+ | |||
+ | |||
+ | import java.io.IOException; | ||
+ | |||
+ | import java.util.StringTokenizer; | ||
+ | |||
+ | import org.apache.hadoop.io.IntWritable; | ||
+ | import org.apache.hadoop.io.Text; | ||
+ | import org.apache.hadoop.mapreduce.Mapper; | ||
+ | |||
+ | // input Key - input Value - output Key - output Value | ||
+ | public class DeptCountMapper extends Mapper< | ||
+ | |||
+ | |||
+ | static IntWritable oneValue = new IntWritable(1); | ||
+ | | ||
+ | @Override | ||
+ | // input Key - input Value - output Value | ||
+ | public void map(Object key, Text value, Context contex) throws IOException, | ||
+ | /* | ||
+ | input: | ||
+ | 0 1 | ||
+ | 7876: | ||
+ | |||
+ | Split in Key/Values pairs | ||
+ | |||
+ | 20,1 | ||
+ | |||
+ | */ | ||
+ | |||
+ | // read on row | ||
+ | String[] emprow = value.toString().split(":" | ||
+ | |||
+ | String deptno = emprow[7]; | ||
+ | | ||
+ | contex.write(new Text(deptno), | ||
+ | |||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | |||
+ | == Die Reducer Klasse == | ||
+ | |||
+ | <code java> | ||
+ | package gpi.hadoop; | ||
+ | |||
+ | import java.io.IOException; | ||
+ | |||
+ | import java.util.Iterator; | ||
+ | |||
+ | import org.apache.hadoop.io.IntWritable; | ||
+ | import org.apache.hadoop.io.Text; | ||
+ | import org.apache.hadoop.mapreduce.Reducer; | ||
+ | |||
+ | |||
+ | public class DeptCountReducer extends Reducer< | ||
+ | |||
+ | private IntWritable totalWordCount = new IntWritable(); | ||
+ | |||
+ | | ||
+ | public void reduce(Text deptno, Iterable< | ||
+ | InterruptedException { | ||
+ | //logic | ||
+ | |||
+ | /* | ||
+ | |||
+ | 30,1,1,1, | ||
+ | 50,1,1 | ||
+ | |||
+ | Result should look like this | ||
+ | 30 3 | ||
+ | 60 2 | ||
+ | | ||
+ | */ | ||
+ | int deptcount = 0; | ||
+ | for (IntWritable count : counts) { | ||
+ | deptcount += 1; | ||
+ | } | ||
+ | context.write(deptno, | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | |||
+ | == Driver == | ||
+ | |||
+ | <code java> | ||
+ | package gpi.hadoop; | ||
+ | |||
+ | import org.apache.hadoop.conf.Configuration; | ||
+ | import org.apache.hadoop.fs.Path; | ||
+ | import org.apache.hadoop.io.IntWritable; | ||
+ | import org.apache.hadoop.io.Text; | ||
+ | import org.apache.hadoop.mapreduce.Job; | ||
+ | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | ||
+ | import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | ||
+ | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | ||
+ | import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | ||
+ | import org.apache.hadoop.util.GenericOptionsParser; | ||
+ | |||
+ | public class DeptCount { | ||
+ | |||
+ | public static void main(String[] args) throws Exception { | ||
+ | |||
+ | Configuration conf = new Configuration(); | ||
+ | | ||
+ | | ||
+ | String[] otherArgs = new GenericOptionsParser(conf, | ||
+ | if (otherArgs.length != 3) { | ||
+ | System.err.println(" | ||
+ | System.exit(2); | ||
+ | } | ||
+ | else { | ||
+ | System.out.println(" | ||
+ | } | ||
+ | |||
+ | // create a new Configuration | ||
+ | Job job = Job.getInstance(conf); | ||
+ | | ||
+ | job.setJobName(args[0]); | ||
+ | |||
+ | // Mapper | ||
+ | job.setInputFormatClass(TextInputFormat.class); | ||
+ | job.setMapperClass(DeptCountMapper.class); | ||
+ | |||
+ | |||
+ | // Reducer | ||
+ | job.setReducerClass(DeptCountReducer.class); | ||
+ | job.setOutputFormatClass(TextOutputFormat.class); | ||
+ | |||
+ | // Input and Output Path to the data | ||
+ | FileInputFormat.setInputPaths(job, | ||
+ | FileOutputFormat.setOutputPath(job, | ||
+ | |||
+ | //main driver Class | ||
+ | job.setJarByClass(DeptCountT.class); | ||
+ | |||
+ | //set Output Class | ||
+ | job.setOutputKeyClass(Text.class); | ||
+ | job.setOutputValueClass(IntWritable.class); | ||
+ | |||
+ | job.submit(); | ||
+ | |||
+ | } | ||
+ | } | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | Beispiel für die Verwendung des Tool Interfaces für die Startklasse: | ||
+ | <code java> | ||
+ | package gpi.hadoop; | ||
+ | |||
+ | import org.apache.hadoop.conf.Configuration; | ||
+ | import org.apache.hadoop.conf.Configured; | ||
+ | import org.apache.hadoop.fs.Path; | ||
+ | import org.apache.hadoop.io.IntWritable; | ||
+ | import org.apache.hadoop.io.Text; | ||
+ | import org.apache.hadoop.mapreduce.Job; | ||
+ | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | ||
+ | import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | ||
+ | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | ||
+ | import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | ||
+ | import org.apache.hadoop.util.Tool; | ||
+ | import org.apache.hadoop.util.ToolRunner; | ||
+ | |||
+ | public class DeptCountT extends Configured implements Tool { | ||
+ | |||
+ | public static void main(String[] args) throws Exception { | ||
+ | int res = ToolRunner.run(new Configuration(), | ||
+ | System.exit(res); | ||
+ | } | ||
+ | |||
+ | @Override | ||
+ | public int run(String[] args) throws Exception { | ||
+ | //get Config object | ||
+ | Configuration conf = this.getConf(); | ||
+ | |||
+ | // create a new Configuration | ||
+ | Job job = Job.getInstance(conf); | ||
+ | |||
+ | job.setJobName(args[0]); | ||
+ | |||
+ | // Mapper | ||
+ | job.setInputFormatClass(TextInputFormat.class); | ||
+ | job.setMapperClass(DeptCountMapper.class); | ||
+ | |||
+ | |||
+ | // Reducer | ||
+ | job.setReducerClass(DeptCountReducer.class); | ||
+ | job.setOutputFormatClass(TextOutputFormat.class); | ||
+ | |||
+ | // Input and Output Path to the data | ||
+ | FileInputFormat.setInputPaths(job, | ||
+ | FileOutputFormat.setOutputPath(job, | ||
+ | |||
+ | //main driver Class | ||
+ | job.setJarByClass(DeptCount.class); | ||
+ | |||
+ | //set Output Class | ||
+ | job.setOutputKeyClass(Text.class); | ||
+ | job.setOutputValueClass(IntWritable.class); | ||
+ | |||
+ | // Execute job and return status | ||
+ | return job.waitForCompletion(true) ? 0 : 1; | ||
+ | } | ||
+ | } | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | Jar file erzeugen und auf den Hadoop Server kopieren | ||
+ | |||
+ | => [[https:// | ||
+ | |||
+ | |||
+ | Jar File aufrufen: | ||
+ | |||
+ | <code bash> | ||
+ | |||
+ | yarn jar GpiHadoopExamples.jar gpi.hadoop.DeptCount / | ||
+ | |||
+ | yarn application -list | ||
+ | |||
+ | |||
+ | hdfs dfs -cat / | ||
+ | |||
+ | 10 3 | ||
+ | 20 1 | ||
+ | 30 4 | ||
+ | |||
+ | </ | ||
+ | |||
+ | **Hinweise** | ||
+ | |||
+ | |||
+ | Wird mit **yarn jar < | ||
+ | |||
+ | D.h. passt aber nicht zu der Verwendung der Eingabe Parameter in den verbreiteten Dokumentationen und Beispielen. | ||
+ | |||
+ | Fehler: | ||
+ | < | ||
+ | istsException: | ||
+ | Exception in thread " | ||
+ | </ | ||
+ | |||
+ | |||
+ | Nur wenn in der Jar Datei die Main Klasse definiert wird und ein Aufruf nach diesem Muster durchgeführt wird, passt | ||
+ | das im Detail: **yarn jar < | ||
+ | |||
+ | Ansonsten muss bei Fehler mit den Aufrufparamentern (Typischerweise " | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | === Weiteres Beispiel=== | ||
+ | |||
+ | => [[nosql: | ||
+ | |||
+ | ==== Quellen ==== | ||
+ | |||
+ | Gute weitere Beispiele: | ||
+ | |||
+ | |||
+ | |||
+ | * http:// | ||
+ | * http:// | ||
+ | * http:// | ||
+ | |||
+ | MapAndReduce Patterns: | ||
+ | * http:// | ||
+ | * http:// | ||
+ | * http:// | ||
+ | * http:// | ||
+ | |||
+ | Dell Zhang | ||
+ | * http:// | ||
+ | * http:// | ||
+ | * https:// | ||
+ | |||
+ | |||
+ | Buch: | ||
+ | * MapReduce cookbook |
nosql/mapreduce_hadoop_20_job.txt · Zuletzt geändert: 2014/09/28 22:06 von gpipperr