A lot of company notebook don’t allow running virtual machines (VT-X is disabled). This is also the case in my case. An alternative of using VM is to use the docker image. After installing docker nativily, it is possible to run docker against the WSL engine. This is because Docker can expose a TCP endpoint which the CLI (i.e., WSL) can attach to.
Note: The TCP endpoint on Windows is turned off by default. To activate it, right-click the Docker icon in your taskbar and choose Settings, and tick the box next to “Expose daemon on tcp://localhost:2375 without TLS”.
With that done, all we need to do is instruct the CLI under Bash to connect to the engine running under Windows instead of to the non-existing engine running under Bash, like this:
echo "export DOCKER_HOST='tcp://0.0.0.0:2375'" >> ~/.bashrc
source ~/.bashrc
Download Quick Start for CDH 5.12. Start the image. Examine whether hadoop-hdfs-* , hadoop-mapreduce-* and hadoop-yarn-* daemons are running. If those daemons are not running start all of them. If any of daemons fails to run, try to fix it.
docker pull cloudera/quickstart:latest
docker run --hostname=quickstart.cloudera --privileged=true -it --rm -i -t -p 8888:8888 \
cloudera/quickstart /usr/bin/docker-quickstart
The docker image starts by default into the root user. We’re changing to the installed cloudera user (password=cloudera).
su cloudera
cd ~
for x in `cd /etc/init.d ; ls hadoop-*`; do sudo service $x status ; done
[root@quickstart /]# for x in `cd /etc/init.d ; ls hadoop-*`; do sudo service $x status ; done
Hadoop datanode is running [ OK ]
Hadoop journalnode is running [ OK ]
Hadoop namenode is running [ OK ]
Hadoop secondarynamenode is running [ OK ]
Hadoop httpfs is running [ OK ]
Hadoop historyserver is running [ OK ]
Hadoop nodemanager is running [ OK ]
Hadoop proxyserver is not running [FAILED]
Hadoop resourcemanager is running [ OK ]
The proxyserver appears to have failed at load time. In order to fix this issue, we can edit the yarn-site.xml file
First we’re stoping the services
for x in `cd /etc/init.d ; ls hadoop-*`; do sudo service $x stop ; done
Next we’re configuring the proxyserver file
sudo vi /etc/hadoop/conf.pseudo/yarn-site.xml
We have to add the following property:
<property>
<description>web proxy</description>
<name>yarn.web-proxy.address</name>
<value>localhost:3122</value>
</property>
In order for spark to run, we have to set the environment variables for SPARK_HOME and JAVA_HOME. We’re adding those lines to the .bashrc file.
vi /home/cloudera/.bashrc
# Environment variables
JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera
export JAVA_HOME
SPARK_HOME=/usr/lib/spark
export SPARK_HOME
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$SPARK_HOME/bin
Next we have to source the .bashrc startup file.
source .bashrc
Next we’re starting all the services
for x in `cd /etc/init.d ; ls hadoop-*`; do sudo service $x start ; done
sudo service hadoop-yarn-resourcemanager restart
In order to eliminate the error messages in the spark-shell, we have to create the log4j.properties file from the template file and setting the log level to ERROR and log4j.rootCategory=ERROR.
# Create log4j.properties
sudo mv /etc/spark/conf/log4j.properties.template /etc/spark/conf/log4j.properties
# Edit file
sudo vi /etc/spark/conf/log4j.properties
# Replace INFO, WARN with ERROR.
:g /WARN/s//ERROR/g
:g /INFO/s//ERROR/g
:wq
for x in `cd /etc/init.d ; ls hadoop-*`; do sudo service $x status ; done
[root@quickstart /]# for x in `cd /etc/init.d ; ls hadoop-*`; do sudo service $x status ; done
Hadoop datanode is running [ OK ]
Hadoop journalnode is running [ OK ]
Hadoop namenode is running [ OK ]
Hadoop secondarynamenode is running [ OK ]
Hadoop httpfs is running [ OK ]
Hadoop historyserver is running [ OK ]
Hadoop nodemanager is running [ OK ]
Hadoop proxyserver is not running [ OK ]
Hadoop resourcemanager is running [ OK ]
Examine whether there are HDFS home directories for users: spark, hive, oozie, and cloudera. If the directories are present, find the content of those directories. If the directories are not present, create them. Please do not format the namenode.
hadoop fs -ls /user
drwxr-xr-x - cloudera cloudera 0 2016-04-06 02:25 /user/cloudera
drwxr-xr-x - mapred hadoop 0 2016-04-06 02:26 /user/history
drwxrwxrwx - hive supergroup 0 2016-04-06 02:27 /user/hive
drwxrwxrwx - hue supergroup 0 2016-04-06 02:26 /user/hue
drwxrwxrwx - jenkins supergroup 0 2016-04-06 02:26 /user/jenkins
drwxrwxrwx - oozie supergroup 0 2016-04-06 02:27 /user/oozie
drwxrwxrwx - root supergroup 0 2016-04-06 02:26 /user/root
drwxr-xr-x - hdfs supergroup 0 2016-04-06 02:27 /user/spark
The above output shows, that all users appear to be added correctly.
hdfs fsck /
.............Status: HEALTHY
Total size: 837790910 B (Total open files size: 166 B)
Total dirs: 76
Total files: 913
Total symlinks: 0 (Files currently being written: 3)
Total blocks (validated): 911 (avg. block size 919638 B) (Total open file blocks (not validated): 2)
Minimally replicated blocks: 911 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 1
Average block replication: 1.0
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)
Number of data-nodes: 1
Number of racks: 1
FSCK ended at Sat Oct 07 09:37:21 UTC 2017 in 337 milliseconds
The filesystem under path '/' is HEALTHY
The filesystem appears to be healthy. We can move on to the next exercice.
Create new Linux user smith. Make that user a member of the mapred Linux group. Make that user a sudo user. Create the home directory of user smith in HDFS. Download provided files bible.tar and shakespeare.tar. Unzip both tar files and copy the resulting files into HDFS directory input of user smith. As user smith run Hadoop grep on both bible and shakespeare texts. Every Hadoop run requires separate output directory. Examine content of first 20 lines of files generated by Hadoop grep.
# add new user
sudo useradd smith
# add password for the new user
sudo passwd smith
Changing password for user smith.
New password:
Retype new password:
passwd: all authentication tokens updated successfully.
# Verify presence of new user
grep smith /etc/passwd
smith:x:502:504::/home/smith:/bin/bash
# make new user member of group mapred
sudo usermod -g smith mapred
sudo usermod -g cloudera mapred
# login to the new user account using su command.
su smith
cd ~
#create HDFS directory for the new user
sudo -u hdfs hadoop fs -mkdir -p /user/smith
sudo -u hdfs hadoop fs -mkdir -p /user/cloudera
#change permissions 777 (read/write/exec for owner/group/other)
sudo -u hdfs hadoop fs -chmod -R 777 /user/smith
sudo -u hdfs hadoop fs -chmod -R 777 /user/cloudera
#change ownership to the new user
sudo -u hdfs hadoop fs -chown smith:mapred /user/smith
sudo -u hdfs hadoop fs -chown cloudera:mapred /user/cloudera
#verify HDFS dir created
sudo -u hdfs hadoop fs -ls -R /user/smith
sudo -u hdfs hadoop fs -ls -R /user/cloudera
git clone https://github.com/greenore/e63
tar xf e63/bible.tar
tar xf e63/shakespeare.tar
# put the files into HDFS
hadoop fs -put /home/smith/all-bible
hadoop fs -put /home/smith/all-shakespeare
# List files in hadoop directory
hadoop fs -ls -R /user/smith
-rw-r--r-- 1 smith mapred 5258688 2017-10-07 09:44 /user/smith/all-bible
-rw-r--r-- 1 smith mapred 5284231 2017-10-07 09:44 /user/smith/all-shakes
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar grep \
/user/smith/all-bible /user/smith/bible-freq '\w+'
hadoop fs -cat /user/smith/bible/bible-freq/part-r-00000 | head -20
[smith@quickstart ~]$ hadoop fs -cat /user/smith/bible-freq/part-r-00000 |
head -20
62394 the
38985 and
34654 of
13526 to
12846 And
12603 that
12445 in
9764 shall
9672 he
8940 unto
8854 I
8385 his
8057 a
7270 for
6974 they
6913 be
6884 is
6649 him
6647 LORD
6591 not
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar grep \
/user/smith/shakespeare/all-shakespeare /user/smith/shakespeare/shake-freq '\w+'
hadoop fs -cat /user/smith/shakespeare/shake-freq/ part-r-00000 | head -20
[smith@quickstart ~]$ hadoop fs -cat /user/smith/shake-freq/part-r-00000 |
head -20
25578 the
23027 I
19654 and
17462 to
16444 of
13524 a
12697 you
11296 my
10699 in
8857 is
8851 that
8402 not
8033 me
8020 s
7800 And
7231 with
7165 it
6812 his
6753 be
6246 your
Create your own version of Hadoop grep program using Spark. Compare your results with the results of Hadoop grep when applied to the texts of King James Bible, and all of Shakespears works, contained in files bible.tar and shakespear.tar respectively. Notice small differences between results obtained by your Spark program and Hadoop grep. Try to explain what causes those differences. Save results of your Spark grep operations both in HDFS and on your local file system. You can implement your solution using one of interactive shells or a standalone program.
# Import libraries
import findspark
findspark.init("/usr/local/spark")
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
import re
from operator import add
# Start session
conf = SparkConf().setMaster("local").setAppName("rdd")
sc = SparkContext(conf = conf)
rdd_bible = sc.textFile("/home/smith/all-bible")
rdd_counts = rdd_bible.flatMap(lambda x:x.split(" ")).map(lambda x: (x,1)).reduceByKey(add)
print(rdd_counts.take(5))
[('', 605968), ('King', 60), ('James', 33), ('Bible', 4), ('<!--', 68)]
rdd_counts2 = rdd_counts.map(lambda x: (x[1],x[0]))
print(rdd_counts2.take(5))
[(605968, ''), (60, 'King'), (33, 'James'), (4, 'Bible'), (68, '<!--')]
rdd_sorted = rdd_counts2.sortBy(lambda x: x[0], ascending=False)
print(rdd_sorted.take(20))
[(605968, ''), (62384, 'the'), (38711, 'and'), (34618, 'of'), (13505, 'to'),
(12735, 'And'), (12478, 'that'), (12279, 'in'), (9764, 'shall'), (9513, 'he'),
(8930, 'unto'), (8708, 'I'), (8362, 'his'), (8054, 'a'), (7183, 'for'), (6897,
'they'), (6754, 'be'), (6747, 'is'), (6047, 'with'), (5878, 'not')]
rdd_shake = sc.textFile("/home/smith/all-shakespeare")
rdd_counts = rdd_shake.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).reduceByKey(add)
print(rdd_counts.take(5))
[('', 64531), ("\tALL'S", 25), ('WELL', 52), ('THAT', 27), ('ENDS', 26)]
rdd_counts2 = rdd_counts.map(lambda x: (x[1],x[0]))
print(rdd_counts2.take(5))
[(64531, ''), (25, "\tALL'S"), (52, 'WELL'), (27, 'THAT'), (26, 'ENDS')]
rdd_sorted = rdd_counts2.sortBy(lambda x: x[0], ascending=False)
print(rdd_sorted.take(20))
[(64531, ''), (25069, 'the'), (18793, 'and'), (16436, 'to'), (16069, 'of'),
(15223, 'I'), (12982, 'a'), (11180, 'my'), (10134, 'in'), (9109, 'you'), (8109,
'is'), (7773, 'that'), (7123, 'not'), (7001, 'with'), (6594, 'his'), (6202, 'be'),
(6119, 'your'), (5955, '\tAnd'), (5781, 'for'), (5311, 'have')]
The two grep results, i.e., hadoop grep and my own appear to differ in there results. The reason for this is, that the hadoop grep appears to be removing blanks and is taking the lower() command.
Create your own tables KINGJAMES with columns for words and frequencies and insert into the table the result of your Spark grep program which produces word counts in file bible. Find all words in table KINGJAMES which start with letter w and are 4 or more characters long and appear more than 250 times. Write a query that will tell us the number of such words. Before counting turn all words in lower case. When comparing a word with a string your use LIKE operator, like
word like ‘a%’ or word like ‘%th%’
Symbol % means any number of characters. You measure the length of a string using function length() and you change the case of a word to all lower characters using function lower().
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
import re
from operator import add
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
rdd_bible = sc.textFile("all-bible")
rdd_bible = rdd_bible.flatMap(clean_up)
bible_lines = sc.textFile("all-bible").flatMap(lambda l: l.split())\
.map(lambda x: re.sub("[^a-zA-Z]+", "", x.lower()))\
.filter(lambda x: x != "")
print(bible_lines.take(10))
['king', 'james', 'bible', 'body', 'backgroundfaebd', 'margin',
'textalignjustify', 'p', 'textindent', 'em']
rdd_words = rdd_bible.map(lambda p: Row(words=str(p)))
print(rdd_words.take(10))
[Row(words='king'), Row(words='james'), Row(words='bible'), Row(words='\t\tbody'),
Row(words='backgroundfaebd7'), Row(words='margin10'), Row(words='textalignjustify'),
Row(words='p'), Row(words='textindent'), Row(words='1em')]
df_bible = sqlContext.createDataFrame(rdd_words)
print(df_bible.show(10))
+----------------+
| words|
+----------------+
| king|
| james|
| bible|
| body|
|backgroundfaebd7|
| margin10|
|textalignjustify|
| p|
| textindent|
| 1em|
+----------------+
only showing top 10 rows
None
df_bible.registerTempTable("KINGJAMES")
tbl_bible = sqlContext.sql("SELECT words, COUNT(*) freq FROM KINGJAMES WHERE lower(words) \
like 'w%' AND length(words) > 4 GROUP BY words HAVING COUNT(*) > 250")
tbl_bible = tbl_bible.orderBy(tbl_bible['freq'].desc())
print(tbl_bible.show())
+----------+----+
| words|freq|
+----------+----+
| which|4427|
| words| 548|
| would| 451|
| without| 442|
| where| 407|
| water| 396|
| woman| 357|
| wherefore| 348|
| wicked| 344|
| whose| 314|
|wilderness| 304|
| works| 302|
| waters| 287|
| world| 287|
| written| 283|
+----------+----+
None
Transfer content of your Hive KINGJAMES table to a Spark DataFrame. Perform the analysis from problem 5 using any available API in Spark. Please note that you are working with Spark 1.6.
The necessary python code can be found in the problem_6.py script. However, the code can also be directly run from the RMarkdown script.
hivecontext = HiveContext(sc)
df_bible_freq = hivecontext.sql("SELECT freq, lower(word) word FROM kingjames \
WHERE lower(word) like 'w%' AND length(word) > 4 AND freq > 250 ORDER BY freq \
DESC")
print(df_bible_freq.show(20))
+----+----------+
|freq| word|
+----+----------+
|4297| which|
| 546| words|
| 443| would|
| 436| without|
| 396| water|
| 355| woman|
| 343| wicked|
| 335| where|
| 304|wilderness|
| 301| works|
| 288| world|
| 286| waters|
| 284| whose|
| 283| written|
| 261| wherefore|
+----+----------+
print(df_bible_freq.show.agg({"freq": "sum"}).show())
+---------+
|sum(freq)|
+---------+
| 9158|
+---------+
print("Number of words:")
print(df_bible_freq.count())
# Number of words in Query
15
Use Sqoop to transfer the content of MySQL database retail_db which is present on the Cloudera VM into Hive. Demonstrate that new Hive tables are created and correspond to the original MySQL tables. Find the number of rows in each table. Compare those row counts with row counts in MySQL database.
sudo mysql --user=retail_dba -p
Show tables
show databases;
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| retail_db |
+--------------------+
2 rows in set (0.00 sec)
show databases;
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| retail_db |
+--------------------+
2 rows in set (0.00 sec)
use retail_db;
mysql> use retail_db;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
show tables;
mysql> show tables;
+---------------------+
| Tables_in_retail_db |
+---------------------+
| categories |
| customers |
| departments |
| order_items |
| orders |
| products |
+---------------------+
6 rows in set (0.00 sec)
Next we’re importing the data into hive.
sqoop import-all-tables \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera \
--compression-codec=snappy --as-parquetfile \
--warehouse-dir=/user/hive/warehouse \
--hive-import \
--m 1
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
17/10/07 11:02:13 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.5.0
....................
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
...
Map output records=1345
Input split bytes=87
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=252
CPU time spent (ms)=2980
Physical memory (bytes) snapshot=181051392
Virtual memory (bytes) snapshot=1522274304
Total committed heap usage (bytes)=60620800
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
17/10/07 11:06:00 INFO mapreduce.ImportJobBase: Transferred 46.1318 KB in 41.3821 seconds (1.2983 KB/sec)
17/10/07 11:06:00 INFO mapreduce.ImportJobBase: Retrieved 1345 records.
The above output shows, that the data was successfuly expordet. In order to check the results we can have a look at the tables in hive.
hive
Logging initialized using configuration in file:/etc/hive/conf.dist/hivelog4j.
properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.
SHOW TABLES;
hive> SHOW TABLES;
OK
apachelog
categories
customers
departments
kingjames
merged
order_items
orders
products
Next we’re looking an the order
SELECT order_item_order_id, sum(order_item_quantity) AS total_order_item_quantity
FROM order_items
GROUP BY order_item_order_id
ORDER BY total_order_item_quantity DESC LIMIT 10;
hive> SELECT order_item_order_id, sum(order_item_quantity) AS total_order_item_quantity
FROM order_items GROUP BY order_item_order_id ORDER BY total_order_item_quantity
DESC LIMIT 10;
Query ID = cloudera_20171007181414_a8ca115b-c6a1-549f-bd26-e043b5f80ede
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
....................
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 5.97 sec HDFS Read: 513123
HDFS Write: 1220606 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 4.33 sec HDFS Read:
1225407 HDFS Write: 90 SUCCESS
Total MapReduce CPU Time Spent: 10 seconds 300 msec
OK
49405 24
49429 24
21920 24
24967 24
33283 23
43848 23
29272 22
56414 22
17455 22
56559 22
Time taken: 59.2 seconds, Fetched: 10 row(s)
In order to run the comparisons the necessary sql and hive code can be found in the problem_7.sql and problem_7.q respectivly. However, The code can also be run from the RMarkdown file.
SELECT count(*) FROM categories;
mysql> SELECT count(*) FROM categories;
+----------+
| count(*) |
+----------+
| 58 |
+----------+
1 row in set (0.00 sec)
jdbc:hive2://127.0.0.1:10000/default> select count(*) from
categories;
0: jdbc:hive2://127.0.0.1:10000/default> select count(*) from
categories;
+------+--+
| _c0 |
+------+--+
| 58 |
+------+--+
1 row selected (28.531 seconds)
SELECT count(*) FROM departments;
mysql> SELECT count(*) FROM departments;
+----------+
| count(*) |
+----------+
| 6 |
+----------+
1 row in set (0.00 sec)
jdbc:hive2://127.0.0.1:10000/default> select count(*) from
departments;
0: jdbc:hive2://127.0.0.1:10000/default> select count(*) from
departments;
+------+--+
| _c0 |
+------+--+
| 6 |
+------+--+
1 row selected (21.434 seconds)
SELECT count(*) FROM order_items;
mysql> SELECT count(*) FROM order_items;
+----------+
| count(*) |
+----------+
| 172198 |
+----------+
1 row in set (0.05 sec)
jdbc:hive2://127.0.0.1:10000/default> select count(*) from
order_items;
0: jdbc:hive2://127.0.0.1:10000/default> select count(*) from
order_items;
+---------+--+
| _c0 |
+---------+--+
| 172198 |
+---------+--+
1 row selected (27.422 seconds)
SELECT count(*) FROM orders;
mysql> SELECT count(*) FROM orders;
+----------+
| count(*) |
+----------+
| 68883 |
+----------+
1 row in set (0.04 sec)
jdbc:hive2://127.0.0.1:10000/default> select count(*) from
orders;
0: jdbc:hive2://127.0.0.1:10000/default> select count(*) from
orders;
+--------+--+
| _c0 |
+--------+--+
| 68883 |
+--------+--+
1 row selected (30.201 seconds)
SELECT count(*) FROM products;
mysql> SELECT count(*) FROM products;
+----------+
| count(*) |
+----------+
| 1345 |
+----------+
1 row in set (0.00 sec)
jdbc:hive2://127.0.0.1:10000/default> select count(*) from
products;
0: jdbc:hive2://127.0.0.1:10000/default> select count(*) from
products;
+-------+--+
| _c0 |
+-------+--+
| 1345 |
+-------+--+
1 row selected (27.983 seconds)
As can be seen above, the table between hive and MySQL are the same.