From 207570a3e71a33abc1a3d4690d8b2772567aca0d Mon Sep 17 00:00:00 2001
From: Marco Mariani <marco.mariani@nexedi.com>
Date: Tue, 18 Feb 2014 14:16:12 +0000
Subject: [PATCH] hadoop: python demo

---
 software/hadoop/deploy-config.sh.in     |  8 +++
 software/hadoop/gutenberg-mapper.py.in  | 19 +++++++
 software/hadoop/gutenberg-reducer.py.in | 40 ++++++++++++++
 software/hadoop/instance.cfg.in         | 44 ++++++++++++++--
 software/hadoop/put-files.sh.in         | 17 ++++++
 software/hadoop/run-demo.sh.in          |  5 ++
 software/hadoop/software.cfg            | 69 +++++++++++++++++++++++++
 software/hadoop/wikipedia-mapper.py.in  | 61 ++++++++++++++++++++++
 software/hadoop/wikipedia-reducer.py.in | 61 ++++++++++++++++++++++
 9 files changed, 320 insertions(+), 4 deletions(-)
 create mode 100644 software/hadoop/deploy-config.sh.in
 create mode 100644 software/hadoop/gutenberg-mapper.py.in
 create mode 100644 software/hadoop/gutenberg-reducer.py.in
 create mode 100644 software/hadoop/put-files.sh.in
 create mode 100644 software/hadoop/run-demo.sh.in
 create mode 100644 software/hadoop/wikipedia-mapper.py.in
 create mode 100644 software/hadoop/wikipedia-reducer.py.in

diff --git a/software/hadoop/deploy-config.sh.in b/software/hadoop/deploy-config.sh.in
new file mode 100644
index 000000000..abb6f680c
--- /dev/null
+++ b/software/hadoop/deploy-config.sh.in
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+# exit on error
+set -e
+
+source environment.sh
+echo cp -a $HADOOP_PREFIX/etc ${buildout:directory}/
+
diff --git a/software/hadoop/gutenberg-mapper.py.in b/software/hadoop/gutenberg-mapper.py.in
new file mode 100644
index 000000000..094fd3c5b
--- /dev/null
+++ b/software/hadoop/gutenberg-mapper.py.in
@@ -0,0 +1,19 @@
+#!/usr/bin/env python
+# http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
+
+import sys
+
+# input comes from STDIN (standard input)
+for line in sys.stdin:
+    # remove leading and trailing whitespace
+    line = line.strip()
+    # split the line into words
+    words = line.split()
+    # increase counters
+    for word in words:
+        # write the results to STDOUT (standard output);
+        # what we output here will be the input for the
+        # Reduce step, i.e. the input for reducer.py
+        #
+        # tab-delimited; the trivial word count is 1
+        print '%s\t%s' % (word, 1)
diff --git a/software/hadoop/gutenberg-reducer.py.in b/software/hadoop/gutenberg-reducer.py.in
new file mode 100644
index 000000000..cccd32105
--- /dev/null
+++ b/software/hadoop/gutenberg-reducer.py.in
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+# http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
+
+from operator import itemgetter
+import sys
+
+current_word = None
+current_count = 0
+word = None
+
+# input comes from STDIN
+for line in sys.stdin:
+    # remove leading and trailing whitespace
+    line = line.strip()
+
+    # parse the input we got from mapper.py
+    word, count = line.split('\t', 1)
+
+    # convert count (currently a string) to int
+    try:
+        count = int(count)
+    except ValueError:
+        # count was not a number, so silently
+        # ignore/discard this line
+        continue
+
+    # this IF-switch only works because Hadoop sorts map output
+    # by key (here: word) before it is passed to the reducer
+    if current_word == word:
+        current_count += count
+    else:
+        if current_word:
+            # write result to STDOUT
+            print '%s\t%s' % (current_word, current_count)
+        current_count = count
+        current_word = word
+
+# do not forget to output the last word if needed!
+if current_word == word:
+    print '%s\t%s' % (current_word, current_count)
diff --git a/software/hadoop/instance.cfg.in b/software/hadoop/instance.cfg.in
index 18c87a0e9..98e832c42 100644
--- a/software/hadoop/instance.cfg.in
+++ b/software/hadoop/instance.cfg.in
@@ -3,6 +3,10 @@
 parts =
   deploy-config
   sh-environment
+  put-files
+  mapper
+  reducer
+  run-demo
 
 eggs-directory = ${buildout:eggs-directory}
 develop-eggs-directory = ${buildout:develop-eggs-directory}
@@ -15,6 +19,7 @@ output = $${buildout:directory}/environment.sh
 input = inline:
   export JAVA_HOME="${java:location}"
   export HADOOP_PREFIX="${hadoop:location}"
+  export PATH=$PATH:$HADOOP_PREFIX/bin
   export HADOOP_HOME="${hadoop:location} "
   export HADOOP_COMMON_HOME="${hadoop:location}"
   export HADOOP_CONF_DIR="$${buildout:directory}/etc/hadoop"
@@ -25,11 +30,42 @@ input = inline:
 
 
 [deploy-config]
-recipe = cp.recipe.cmd
-update_cmd = /bin/true
-install_cmd =
-  cp -a ${hadoop:location}/etc $${buildout:directory}/
+recipe = slapos.recipe.template
+url = ${:_profile_base_location_}/deploy-config.sh.in
+output = $${buildout:directory}/deploy-config.sh
+# md5sum = 
+mode = 0755
 
 
+[put-files]
+recipe = slapos.recipe.template
+url = ${:_profile_base_location_}/put-files.sh.in
+output = $${buildout:directory}/put-files.sh
+# md5sum = 
+mode = 0755
 
 
+# http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
+
+[mapper]
+recipe = slapos.recipe.template
+url = ${:_profile_base_location_}/gutenberg-mapper.py.in
+output = $${buildout:directory}/gutenberg-mapper.py
+# md5sum =
+mode = 0755
+
+
+[reducer]
+recipe = slapos.recipe.template
+url = ${:_profile_base_location_}/gutenberg-reducer.py.in
+output = $${buildout:directory}/gutenberg-reducer.py
+# md5sum =
+mode = 0755
+
+[run-demo]
+recipe = slapos.recipe.template
+url = ${:_profile_base_location_}/run-demo.sh.in
+output = $${buildout:directory}/run-demo.sh
+# md5sum =
+mode = 0755
+
diff --git a/software/hadoop/put-files.sh.in b/software/hadoop/put-files.sh.in
new file mode 100644
index 000000000..204e4feb4
--- /dev/null
+++ b/software/hadoop/put-files.sh.in
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+# exit on error
+# set -e
+
+source environment.sh
+
+hdfs dfs -mkdir gutenberg
+
+RAW_DATA=${buildout:directory}/software_release/gutenberg
+
+for file in `ls $RAW_DATA`; do
+  hdfs dfs -put $RAW_DATA/$file gutenberg/
+done
+
+
+
diff --git a/software/hadoop/run-demo.sh.in b/software/hadoop/run-demo.sh.in
new file mode 100644
index 000000000..8b02023e0
--- /dev/null
+++ b/software/hadoop/run-demo.sh.in
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+. environment.sh
+hadoop jar software_release/parts/hadoop-streaming/*jar -mapper gutenberg-mapper.py -reducer gutenberg-reducer.py -input gutenberg/* -output gutenberg-output
+
diff --git a/software/hadoop/software.cfg b/software/hadoop/software.cfg
index a140291c7..310625be4 100644
--- a/software/hadoop/software.cfg
+++ b/software/hadoop/software.cfg
@@ -9,6 +9,8 @@ parts =
   eggs
   java
   hadoop
+  hadoop-streaming
+  gutenberg-dataset
   instance
 
 
@@ -18,6 +20,7 @@ eggs =
   slapos.cookbook
   collective.recipe.template
   cp.recipe.cmd
+  plone.recipe.command
 
 
 [hadoop]
@@ -27,6 +30,15 @@ md5sum = 25f27eb0b5617e47c032319c0bfd9962
 strip-top-level-dir = true
 
 
+[hadoop-streaming]
+recipe = hexagonit.recipe.download
+url = http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-streaming/0.20.203.0/hadoop-streaming-0.20.203.0.jar
+download-only = true
+#md5sum = 
+
+
+
+
 [instance]
 recipe = slapos.recipe.template
 url = ${:_profile_base_location_}/instance.cfg.in
@@ -36,3 +48,60 @@ mode = 0644
 
 
 
+[gutenberg-dataset]
+recipe = cp.recipe.cmd
+update_cmd = /bin/true
+install_cmd =
+  mkdir -p ${buildout:directory}/gutenberg
+  cd ${buildout:directory}/gutenberg
+  wget -c http://www.gutenberg.org/cache/epub/103/pg103.txt
+  wget -c http://www.gutenberg.org/cache/epub/18857/pg18857.txt
+  wget -c http://www.gutenberg.org/cache/epub/2488/pg2488.txt
+  wget -c http://www.gutenberg.org/cache/epub/164/pg164.txt
+  wget -c http://www.gutenberg.org/cache/epub/1268/pg1268.txt
+  wget -c http://www.gutenberg.org/cache/epub/800/pg800.txt
+  wget -c http://www.gutenberg.org/cache/epub/4791/pg4791.txt
+  wget -c http://www.gutenberg.org/cache/epub/3526/pg3526.txt
+  wget -c http://www.gutenberg.org/cache/epub/2083/pg2083.txt
+
+
+
+
+#[wikipedia-dataset]
+#recipe = cp.recipe.cmd
+#update_cmd = /bin/true
+##update_cmd = ${:install_cmd}
+#install_cmd =
+#  mkdir -p ${buildout:directory}/raw-data
+#  cd ${buildout:directory}/raw-data
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current1.xml-p000000010p000010000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current2.xml-p000010001p000025000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current3.xml-p000025001p000055000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current4.xml-p000055002p000104998.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current5.xml-p000105001p000184999.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current6.xml-p000185003p000305000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current7.xml-p000305002p000464997.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current8.xml-p000465001p000665000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current9.xml-p000665001p000925000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current10.xml-p000925001p001325000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current11.xml-p001325001p001825000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current12.xml-p001825001p002425000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current13.xml-p002425001p003124998.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current14.xml-p003125001p003924999.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current15.xml-p003925001p004825000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current16.xml-p004825002p006025000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current17.xml-p006025001p007524997.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current18.xml-p007525002p009225000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current19.xml-p009225001p011125000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current20.xml-p011125001p013324998.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current21.xml-p013325001p015725000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current22.xml-p015725003p018225000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current23.xml-p018225001p020925000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current24.xml-p020925002p023725000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current25.xml-p023725001p026624999.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current26.xml-p026625002p029625000.bz2
+#  wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current27.xml-p029625001p041836446.bz2
+
+
+
+
diff --git a/software/hadoop/wikipedia-mapper.py.in b/software/hadoop/wikipedia-mapper.py.in
new file mode 100644
index 000000000..d800bb51b
--- /dev/null
+++ b/software/hadoop/wikipedia-mapper.py.in
@@ -0,0 +1,61 @@
+
+import bz2
+import os
+import sys
+import xml.sax
+
+class WikipediaTitleHandler(xml.sax.ContentHandler):
+    def startElement(self, name, attrs):
+        self.chars = []
+        self.tag = name
+
+    def characters(self, content):
+        if self.tag == 'title':
+            self.chars.append(content)
+
+    def endElement(self, name):
+        if self.tag == 'title':
+            title = ''.join(self.chars)
+            if title.startswith('Talk:'):
+                return
+            if title.startswith('User talk:'):
+                return
+            if title.startswith('Wikipedia:'):
+                return
+            if title.startswith('Wikipedia talk:'):
+                return
+            if title.startswith('User:'):
+                return
+            print title.encode('utf8')
+
+
+
+
+
+def process_xml(input):
+    
+    parser = xml.sax.make_parser()
+    parser.setContentHandler(WikipediaTitleHandler())
+    parser.parse(input)
+
+
+
+
+if __name__ == '__main__':
+  input = bz2.BZ2File('/dev/fd/0')
+  process_xml(input)
+
+
+
+#  dirname = '/srv/slapgrid/slappart20/srv/runner/instance/slappart0/software_release/raw-data/'
+#  filenames = os.listdir(dirname)
+#  # ['enwiki-20140203-pages-meta-current1.xml-p000000010p000010000.bz2']
+#  for fname in filenames:
+#      process_xml(os.path.join(dirname, fname))
+#      input = bz2.BZ2File(process_xml(os.path.join(dirname, fname)))
+
+
+
+
+
+
diff --git a/software/hadoop/wikipedia-reducer.py.in b/software/hadoop/wikipedia-reducer.py.in
new file mode 100644
index 000000000..d800bb51b
--- /dev/null
+++ b/software/hadoop/wikipedia-reducer.py.in
@@ -0,0 +1,61 @@
+
+import bz2
+import os
+import sys
+import xml.sax
+
+class WikipediaTitleHandler(xml.sax.ContentHandler):
+    def startElement(self, name, attrs):
+        self.chars = []
+        self.tag = name
+
+    def characters(self, content):
+        if self.tag == 'title':
+            self.chars.append(content)
+
+    def endElement(self, name):
+        if self.tag == 'title':
+            title = ''.join(self.chars)
+            if title.startswith('Talk:'):
+                return
+            if title.startswith('User talk:'):
+                return
+            if title.startswith('Wikipedia:'):
+                return
+            if title.startswith('Wikipedia talk:'):
+                return
+            if title.startswith('User:'):
+                return
+            print title.encode('utf8')
+
+
+
+
+
+def process_xml(input):
+    
+    parser = xml.sax.make_parser()
+    parser.setContentHandler(WikipediaTitleHandler())
+    parser.parse(input)
+
+
+
+
+if __name__ == '__main__':
+  input = bz2.BZ2File('/dev/fd/0')
+  process_xml(input)
+
+
+
+#  dirname = '/srv/slapgrid/slappart20/srv/runner/instance/slappart0/software_release/raw-data/'
+#  filenames = os.listdir(dirname)
+#  # ['enwiki-20140203-pages-meta-current1.xml-p000000010p000010000.bz2']
+#  for fname in filenames:
+#      process_xml(os.path.join(dirname, fname))
+#      input = bz2.BZ2File(process_xml(os.path.join(dirname, fname)))
+
+
+
+
+
+
-- 
2.30.9