Commit 207570a3 by Marco Mariani

hadoop: python demo

1 parent e61c686e
#!/bin/bash
# exit on error
set -e
source environment.sh
echo cp -a $HADOOP_PREFIX/etc ${buildout:directory}/
#!/usr/bin/env python
# http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
#!/usr/bin/env python
# http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
......@@ -3,6 +3,10 @@
parts =
deploy-config
sh-environment
put-files
mapper
reducer
run-demo
eggs-directory = ${buildout:eggs-directory}
develop-eggs-directory = ${buildout:develop-eggs-directory}
......@@ -15,6 +19,7 @@ output = $${buildout:directory}/environment.sh
input = inline:
export JAVA_HOME="${java:location}"
export HADOOP_PREFIX="${hadoop:location}"
export PATH=$PATH:$HADOOP_PREFIX/bin
export HADOOP_HOME="${hadoop:location} "
export HADOOP_COMMON_HOME="${hadoop:location}"
export HADOOP_CONF_DIR="$${buildout:directory}/etc/hadoop"
......@@ -25,11 +30,42 @@ input = inline:
[deploy-config]
recipe = cp.recipe.cmd
update_cmd = /bin/true
install_cmd =
cp -a ${hadoop:location}/etc $${buildout:directory}/
recipe = slapos.recipe.template
url = ${:_profile_base_location_}/deploy-config.sh.in
output = $${buildout:directory}/deploy-config.sh
# md5sum =
mode = 0755
[put-files]
recipe = slapos.recipe.template
url = ${:_profile_base_location_}/put-files.sh.in
output = $${buildout:directory}/put-files.sh
# md5sum =
mode = 0755
# http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
[mapper]
recipe = slapos.recipe.template
url = ${:_profile_base_location_}/gutenberg-mapper.py.in
output = $${buildout:directory}/gutenberg-mapper.py
# md5sum =
mode = 0755
[reducer]
recipe = slapos.recipe.template
url = ${:_profile_base_location_}/gutenberg-reducer.py.in
output = $${buildout:directory}/gutenberg-reducer.py
# md5sum =
mode = 0755
[run-demo]
recipe = slapos.recipe.template
url = ${:_profile_base_location_}/run-demo.sh.in
output = $${buildout:directory}/run-demo.sh
# md5sum =
mode = 0755
#!/bin/bash
# exit on error
# set -e
source environment.sh
hdfs dfs -mkdir gutenberg
RAW_DATA=${buildout:directory}/software_release/gutenberg
for file in `ls $RAW_DATA`; do
hdfs dfs -put $RAW_DATA/$file gutenberg/
done
#!/bin/bash
. environment.sh
hadoop jar software_release/parts/hadoop-streaming/*jar -mapper gutenberg-mapper.py -reducer gutenberg-reducer.py -input gutenberg/* -output gutenberg-output
......@@ -9,6 +9,8 @@ parts =
eggs
java
hadoop
hadoop-streaming
gutenberg-dataset
instance
......@@ -18,6 +20,7 @@ eggs =
slapos.cookbook
collective.recipe.template
cp.recipe.cmd
plone.recipe.command
[hadoop]
......@@ -27,6 +30,15 @@ md5sum = 25f27eb0b5617e47c032319c0bfd9962
strip-top-level-dir = true
[hadoop-streaming]
recipe = hexagonit.recipe.download
url = http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-streaming/0.20.203.0/hadoop-streaming-0.20.203.0.jar
download-only = true
#md5sum =
[instance]
recipe = slapos.recipe.template
url = ${:_profile_base_location_}/instance.cfg.in
......@@ -36,3 +48,60 @@ mode = 0644
[gutenberg-dataset]
recipe = cp.recipe.cmd
update_cmd = /bin/true
install_cmd =
mkdir -p ${buildout:directory}/gutenberg
cd ${buildout:directory}/gutenberg
wget -c http://www.gutenberg.org/cache/epub/103/pg103.txt
wget -c http://www.gutenberg.org/cache/epub/18857/pg18857.txt
wget -c http://www.gutenberg.org/cache/epub/2488/pg2488.txt
wget -c http://www.gutenberg.org/cache/epub/164/pg164.txt
wget -c http://www.gutenberg.org/cache/epub/1268/pg1268.txt
wget -c http://www.gutenberg.org/cache/epub/800/pg800.txt
wget -c http://www.gutenberg.org/cache/epub/4791/pg4791.txt
wget -c http://www.gutenberg.org/cache/epub/3526/pg3526.txt
wget -c http://www.gutenberg.org/cache/epub/2083/pg2083.txt
#[wikipedia-dataset]
#recipe = cp.recipe.cmd
#update_cmd = /bin/true
##update_cmd = ${:install_cmd}
#install_cmd =
# mkdir -p ${buildout:directory}/raw-data
# cd ${buildout:directory}/raw-data
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current1.xml-p000000010p000010000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current2.xml-p000010001p000025000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current3.xml-p000025001p000055000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current4.xml-p000055002p000104998.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current5.xml-p000105001p000184999.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current6.xml-p000185003p000305000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current7.xml-p000305002p000464997.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current8.xml-p000465001p000665000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current9.xml-p000665001p000925000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current10.xml-p000925001p001325000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current11.xml-p001325001p001825000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current12.xml-p001825001p002425000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current13.xml-p002425001p003124998.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current14.xml-p003125001p003924999.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current15.xml-p003925001p004825000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current16.xml-p004825002p006025000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current17.xml-p006025001p007524997.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current18.xml-p007525002p009225000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current19.xml-p009225001p011125000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current20.xml-p011125001p013324998.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current21.xml-p013325001p015725000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current22.xml-p015725003p018225000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current23.xml-p018225001p020925000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current24.xml-p020925002p023725000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current25.xml-p023725001p026624999.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current26.xml-p026625002p029625000.bz2
# wget -c http://dumps.wikimedia.org/enwiki/20140203/enwiki-20140203-pages-meta-current27.xml-p029625001p041836446.bz2
import bz2
import os
import sys
import xml.sax
class WikipediaTitleHandler(xml.sax.ContentHandler):
def startElement(self, name, attrs):
self.chars = []
self.tag = name
def characters(self, content):
if self.tag == 'title':
self.chars.append(content)
def endElement(self, name):
if self.tag == 'title':
title = ''.join(self.chars)
if title.startswith('Talk:'):
return
if title.startswith('User talk:'):
return
if title.startswith('Wikipedia:'):
return
if title.startswith('Wikipedia talk:'):
return
if title.startswith('User:'):
return
print title.encode('utf8')
def process_xml(input):
parser = xml.sax.make_parser()
parser.setContentHandler(WikipediaTitleHandler())
parser.parse(input)
if __name__ == '__main__':
input = bz2.BZ2File('/dev/fd/0')
process_xml(input)
# dirname = '/srv/slapgrid/slappart20/srv/runner/instance/slappart0/software_release/raw-data/'
# filenames = os.listdir(dirname)
# # ['enwiki-20140203-pages-meta-current1.xml-p000000010p000010000.bz2']
# for fname in filenames:
# process_xml(os.path.join(dirname, fname))
# input = bz2.BZ2File(process_xml(os.path.join(dirname, fname)))
import bz2
import os
import sys
import xml.sax
class WikipediaTitleHandler(xml.sax.ContentHandler):
def startElement(self, name, attrs):
self.chars = []
self.tag = name
def characters(self, content):
if self.tag == 'title':
self.chars.append(content)
def endElement(self, name):
if self.tag == 'title':
title = ''.join(self.chars)
if title.startswith('Talk:'):
return
if title.startswith('User talk:'):
return
if title.startswith('Wikipedia:'):
return
if title.startswith('Wikipedia talk:'):
return
if title.startswith('User:'):
return
print title.encode('utf8')
def process_xml(input):
parser = xml.sax.make_parser()
parser.setContentHandler(WikipediaTitleHandler())
parser.parse(input)
if __name__ == '__main__':
input = bz2.BZ2File('/dev/fd/0')
process_xml(input)
# dirname = '/srv/slapgrid/slappart20/srv/runner/instance/slappart0/software_release/raw-data/'
# filenames = os.listdir(dirname)
# # ['enwiki-20140203-pages-meta-current1.xml-p000000010p000010000.bz2']
# for fname in filenames:
# process_xml(os.path.join(dirname, fname))
# input = bz2.BZ2File(process_xml(os.path.join(dirname, fname)))
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!