Consider two attached text files: bible.txt and 4300.txt. The first contains ASCII text of King James Bible and the other the text of James Joyce's novel Ulysses.
NOTE: For this assignment a Docker container with the Jupyter Notebook as an IDE was used. The advantage of this approach is it's reproducabiltiy.
Download and parse a list of stop words from the web page: http://www.lextek.com/manuals/onix/stopwords1.html.
# Load libraries
import requests
import csv
from bs4 import BeautifulSoup
# Download page
page = requests.get("http://www.lextek.com/manuals/onix/stopwords1.html")
# Parse page
html = BeautifulSoup(page.content, 'html.parser').pre
text = html.get_text().split()
# Remove introduction
stopwords = text[21:len(text)]
## Export data to a datafile
result_file = open("stopwords.csv", 'w')
for i in stopwords:
result_file.write(i + "\n")
result_file.close
Use Spark transformation and action functions present in RDD API to transform those texts into RDD-s that contain words and numbers of occurrence of those words in respective text. From King James Bible eliminate all verse numbers of the form: 03:019:024. Eliminate from both RDDs so called stop words. List for us 30 most frequent words in each RDD (text).
# Cleanup function
def clean_up(rdd_words):
import re # Import regex library
rdd_words_clean1 = re.sub(r'(03:019:024)', '', rdd_words) # certain verse
rdd_words_clean2 = re.sub(r'([^A-Za-z0-9\s+])', '',
rdd_words_clean1) # Nonwords
rdd_words_split = rdd_words_clean2.split(' ') # Split data
return [word.lower() for word in rdd_words_split if word != ''] # Lower case
# Import libraries
import findspark
findspark.init("/usr/local/spark")
from pyspark import SparkContext, SparkConf
# Start session
conf = SparkConf().setMaster("local").setAppName("rdd")
sc = SparkContext(conf = conf)
# Read data
rdd_ulysses = sc.textFile("4300-2.txt")
rdd_bible = sc.textFile("bible.txt")
rdd_stopwords = sc.textFile("stopwords.csv")
# Clean data and remove stopwords and verse number
rdd_ulysess = rdd_ulysses.flatMap(clean_up)
rdd_ulysess_cleaned = rdd_ulysess.subtract(rdd_stopwords)
rdd_bible = rdd_bible.flatMap(clean_up)
rdd_bible_cleaned = rdd_bible.subtract(rdd_stopwords)
# Number of occurence (Mapreduce)
rdd_ulysess_all = rdd_ulysess_cleaned.map(lambda x: (x, 1))\
.reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False)
rdd_bible_all = rdd_bible_cleaned.map(lambda x: (x, 1))\
.reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False)
print("Bible - Top 30 word pairs:")
print(rdd_bible_all.take(30))
print("Ulysess - Top 30 word pairs:")
print(rdd_ulysess_all.take(30))
Create RDD-s that contain only words unique for each of text.
# Get distinct values
rdd_ulysess_dist = rdd_ulysess_cleaned.distinct()
rdd_bible_dist = rdd_bible_cleaned.distinct()
# Number of unique words
rdd_ulysess_dist.count()
rdd_bible_dist.count()
Finally create an RDD that contains only the words common to both texts. In latest RDD preserve numbers of occurrences in two texts. In other words a row in your RDD will look like (love 45 32). Print or store the words and the numbers of occurrences.
rdd_combined = rdd_ulysess_all.join(rdd_bible_all)
print("Common Words:")
print(rdd_combined.sortByKey(False).take(10))
Create for us the list of 20 most frequently used words common to both texts. In your report, print (store) the words, followed by the number of occurrences in Ulysses and then the Bible. Order your report in descending order starting by the number of occurrences in Ulysses. Present the same data this time ordered by the number of occurrences in the Bible.
print("Top 20 word pairs (Ulysess):")
rdd_combined = rdd_ulysess_all.join(rdd_bible_all)
print(rdd_combined.sortBy(lambda a:a[1], False).take(5))
print("Top 20 word pairs (Bible):")
rdd_combined = rdd_bible_all.join(rdd_ulysess_all)
print(rdd_combined.sortBy(lambda a:a[1], False).take(5))
List for us a random samples containing 5% of words in the final RDD.
rdd_5perc = format(rdd_combined.takeSample(False,\
int(rdd_combined.count() *
5/100), seed=123))
print("5 percent sample of common words in both books")
print(rdd_5perc)
Implement problem 1 using DataFrame API.
Use Spark transformation and action functions present in DF API to transform those texts into DF-s that contain words and numbers of occurrence of those words in respective text. From King James Bible eliminate all verse numbers of the form: 03:019:024. Eliminate from both RDDs so called stop words. List for us 30 most frequent words in each DF (text).
# Function
from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
return trim(lower(regexp_replace(column,'([^A-Za-z0-9\s+])', ''))).alias('words')
# Cleanup function
def clean_up(rdd_words):
import re # Import regex library
rdd_words_clean1 = re.sub(r'(03:019:024)', '', rdd_words) # certain verse
rdd_words_clean2 = re.sub(r'([^A-Za-z0-9\s+])', '', rdd_words_clean1) # Nonwords
rdd_words_split = rdd_words_clean2.split(' ') # Split data
return [word.lower() for word in rdd_words_split if word != ''] # Lower case
# Import libraries
import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession
from pyspark.sql.functions import split # Function to split data
from pyspark.sql.functions import explode # Equivalent to flatMap
# Create Session
spark = SparkSession.builder.master("local") \
.appName("df").getOrCreate()
# Read data
df_ulysses = spark.read.text("4300-2.txt")
df_bible = spark.read.text("bible.txt")
df_stopwords = spark.read.text("stopwords.csv")
# Select words
df_ulysses_all = df_ulysses.select(split(df_ulysses.value, " ").alias("words"))
df_ulysses_all = df_ulysses_all.select(explode(df_ulysses_all.words).alias("words"))
df_ulysses_all = df_ulysses_all.select(removePunctuation(col('words')))
df_ulysses_all = df_ulysses_all.filter('words != Null or words != ""')
df_bible_all = df_bible.select(split(df_bible.value, " ").alias("words"))
df_bible_all = df_bible_all.select(explode(df_bible_all.words).alias("words"))
df_bible_all = df_bible_all.select(removePunctuation(col('words')))
df_bible_all = df_bible_all.filter('words != Null or words != ""')
# Remove stopwords
df_ulysses_cleaned = df_ulysses_all.join(df_stopwords, df_ulysses_all.words
== df_stopwords.value, \
'left_anti').select(df_ulysses_all.words)
df_bible_cleaned = df_bible_all.join(df_stopwords, df_bible_all.words
== df_stopwords.value, \
'left_anti').select(df_bible_all.words)
# Get frequent word pair
df_ulysses_unique = df_ulysses_cleaned.groupBy("words").count()
df_ulysses_unique = df_ulysses_unique.orderBy(["count"], ascending=False)
print("30 Most frequent words: ")
print(df_ulysses_unique.show(30))
# Get frequent word pair
df_bible_unique = df_bible_cleaned.groupBy("words").count()
df_bible_unique = df_bible_unique.orderBy(["count"], ascending=False)
print("30 Most frequent words: ")
print(df_bible_unique.show(30))
Create DF-s that contain only words unique for each of text.
df_ulysess_dist = df_ulysses_all.distinct()
df_ulysess_dist.count()
df_bible_dist = df_bible_all.distinct()
df_bible_dist.count()
Finally create an DF that contains only the words common to both texts. In latest DF preserve numbers of occurrences in two texts. In other words a row in your DF will look like (love 45 32). Print or store the words and the numbers of occurrences.
df_combined = df_ulysses_unique.join(df_bible_unique, \
df_ulysses_unique.words\
== df_bible_unique.words, 'inner')
df_combined = df_combined.toDF("words_ulysses", "count_ulysses", \
"words_bible", "count_bible")
df_combined.show(5)
Create for us the list of 20 most frequently used words common to both texts. In your report, print (store) the words, followed by the number of occurrences in Ulysses and then the Bible. Order your report in descending order starting by the number of occurrences in Ulysses. Present the same data this time ordered by the number of occurrences in the Bible.
df_combined = df_combined.select(['words_ulysses', 'count_ulysses',
'count_bible'])
print(df_combined.count())
df_combined.orderBy(col('count_ulysses').desc()).show(20)
df_combined.orderBy(col('count_bible').desc()).show(20)
List for us a random samples containing 5% of words in the final DF.
# List for us a random samples containing 5% of words in the final RDD.
final_df_sample = df_combined.sample(False, 0.5, 123)
print(final_df_sample.show())
Consider attached files transactions.txt and products.txt.
Each line in transactions.txt file contains a transaction date, time, customer id, product id, quantity bought and price paid, delimited with hash (#) sign. Each line in file products.txt contains product id, product name, unit price and quantity available in the store. Bring those data in Spark and organize it as DataFrames with named columns.
# Read data
df_transactions = spark.read.csv("transactions.txt", sep="#")
df_products = spark.read.csv("products.txt", sep="#")
df_transactions = df_transactions.withColumnRenamed('_c0', "transaction_date")
df_transactions = df_transactions.withColumnRenamed('_c1', "time")
df_transactions = df_transactions.withColumnRenamed('_c2', "customer_id")
df_transactions = df_transactions.withColumnRenamed('_c3', "product_id")
df_transactions = df_transactions.withColumnRenamed('_c4', "quantity_bought")
df_transactions = df_transactions.withColumnRenamed('_c5', "price_paid")
df_products = df_products.withColumnRenamed('_c0', "product_id")
df_products = df_products.withColumnRenamed('_c1', "product_name")
df_products = df_products.withColumnRenamed('_c2', "unit_price")
df_products = df_products.withColumnRenamed('_c3', "quantity")
Using either DataFrame methods or plain SQL statements find 5 customers with the largest spent on the day. Find the names of the products each of those 5 customers bought.
#df_transactions.groupBy("customer id").sum().show()
df_cust_spend = df_transactions.groupBy('customer_id', \
'transaction_date')
.agg({'price_paid': 'sum'})
df_cust_spend = df_cust_spend.orderBy('sum(price_paid)', \
ascending=False)
# Create tables
df_cust_spend.createOrReplaceTempView("tbl_cust_spend")
df_transactions.createOrReplaceTempView("tbl_transactions")
df_products.createOrReplaceTempView("tbl_products")
df_top5 = spark.sql("SELECT * FROM tbl_cust_spend LIMIT 5")
df_top5.show()
df_transactions.show(5)
df_top5_products = df_transactions.join(df_top5, \
df_transactions.customer_id \
== df_top5.customer_id, "left")\
.select(df_transactions.customer_id, \
df_transactions.product_id)
df_top5_list = df_top5_products.join(df_products, df_top5_products.product_id \
== df_products.product_id, "left")\
.select(df_top5_products.customer_id,
df_products.product_name)
df_top5_list.orderBy("customer_id").show()
Find the names and total number sold of 10 most popular products. Order products once per the number sold and then by the total value (quanity*price) sold.
# List the sum of sold products
df_sum_products=df_transactions.groupBy('product_id')
.agg({'quantity_bought': 'sum'})
df_sum_products = df_sum_products
.orderBy('sum(quantity_bought)', ascending=False)
# Get top ten results
df_sum_products.createOrReplaceTempView("tbl_sum_products")
df_top10_products = spark.sql("SELECT * FROM tbl_sum_products LIMIT 10")
# Calculate the total value
df_products_distinct = df_products.select(df_products.product_id,
df_products.product_name,
df_products.unit_price).distinct()
df_top10_products = df_top10_products.join(df_products_distinct,
df_top10_products.product_id
== df_products_distinct.product_id, "left")
df_top10_products = df_top10_products.select(df_top10_products['product_name'],
df_top10_products['sum(quantity_bought)']
.alias("quantity"),
df_top10_products['unit_price'],
(df_top10_products['sum(quantity_bought)'] *
df_top10_products['unit_price'])
.alias("Total value"))
df_top10_products.show()
Implement problem 3 using RDD APIs.
Each line in transactions.txt file contains a transaction date, time, customer id, product id, quantity bought and price paid, delimited with hash (#) sign. Each line in file products.txt contains product id, product name, unit price and quantity available in the store. Bring those data in Spark and organize it as DataFrames with named columns.
from pyspark.sql import SQLContext, Row
rdd_transactions = sc.textFile("transactions.txt")
rdd_transactions = rdd_transactions.map(lambda x: x.split("#"))
rdd_transactions = rdd_transactions.map(lambda x: Row(transaction_date = x[0],
time = x[1],
customer_id = int(x[2]),
product_id = int(x[3]),
quantity_bought = int(x[4]),
price_paid = float(x[5])))
rdd_products = sc.textFile("products.txt")
rdd_products = rdd_products.map(lambda x: x.split("#"))
rdd_products = rdd_products.map(lambda x: Row(product_id = int(x[0]),
product_name = x[1],
unit_price = float(x[2]),
quantity = int(x[3])))
rdd_transactions.take(5)
rdd_products.take(5)
Using either RDD methods or plain SQL statements find 5 customers with the largest spent on the day. Find the names of the products each of those 5 customers bought.
# Import data types
from pyspark.sql.types import *
# The schema is encoded in a string.
schemaString1 = "transaction_date time customer_id product_id quantity_bought price_paid"
schemaString2 = "product_id time product_name unit_price quantity"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString1.split()]
schema1 = StructType(fields)
fields = [StructField(field_name, StringType(), True) for field_name in schemaString2.split()]
schema2 = StructType(fields)
# Create schema
sch_transactions = spark.createDataFrame(rdd_transactions, schema)
sch_products = spark.createDataFrame(rdd_products, schema)
# Creates a temporary view using the DataFrame
sch_transactions.createOrReplaceTempView("tbl_transactions")
sch_products.createOrReplaceTempView("tbl_products")
Note: The SQL group by method somehow didn't worrk. That is why the DF method was used. In order to solve the problem the following SQL stament should work:
SELECT customer_id, SUM(to_float(quantity_bought) * to_float(price_paid)) AS revenue
FROM tbl_transactions
GROUP BY customer_id
ORDER BY revenue DESC
sch_transactions = df_transactions.groupBy('customer_id', 'transaction_date').agg({'price_paid': 'sum'})
sch_transactions = sch_transactions.orderBy('sum(price_paid)', ascending=False)
sch_transactions.createOrReplaceTempView("tbl_transactions2")
tbl_cust_spend = spark.sql("SELECT * FROM tbl_transactions2 ORDER BY 'sum(price_paid)' DESC")
tbl_cust_spend.rdd.take(5)
Somehow the code below doesn't run. However, it would be the necessary SQL command.
df_top5_products = spark.sql("SELECT * FROM tbl_transactions t \
LEFT JOIN tbl_products p ON \
t.product_id == p.product_id")
df_top5_products.createOrReplaceTempView("tbl_top5_products")
df_top5_list = spark.sql("SELECT t.customer_id FROM tbl_products p \
LEFT JOIN tbl_top5_products t ON \
t.product_id == p.product_id")
df_top5_list.createOrReplaceTempView("df_top5_list")
df_top5_list = spark.sql("SELECT * FROM tbl_cust_spend ORDER BY \
'customer_id' DESC")
df_top5_list.take(5)
Find the names and total number sold of 10 most popular products. Order products once per the number sold and then by the total value (quanity*price) sold.
Note: It's the same problem as above.
# List the sum of sold products
df_sum_products=df_transactions.groupBy('product_id').agg({'quantity_bought': 'sum'})
df_sum_products = df_sum_products.orderBy('sum(quantity_bought)',
ascending=False)
# Get top ten results
df_sum_products.createOrReplaceTempView("tbl_sum_products")
df_top10_products = spark.sql("SELECT * FROM tbl_sum_products LIMIT 10")
# Calculate the total value
df_products_distinct = df_products.select(df_products.product_id,
df_products.product_name,
df_products.unit_price).distinct()
df_top10_products = df_top10_products.join(df_products_distinct,
df_top10_products.product_id
== df_products_distinct.product_id, "left")
df_top10_products = df_top10_products.select(df_top10_products['product_name'],
df_top10_products['sum(quantity_bought)']
.alias("quantity"),
df_top10_products['unit_price'],
(df_top10_products['sum(quantity_bought)'] *
df_top10_products['unit_price'])
.alias("Total value"))
df_top10_products.show()