Question: Install Kafka on you Linux VM.
I installed the kafka server on my EC2 Ubuntu 16.04 instance. On the instance are R-Studio Server 1.1, Spark 2.2 and python 2.7 as well as python 3 installed (see previous homeworks for the installation instructions). The following commands install kafka on an ubuntu machine:
wget http://mirror.switch.ch/mirror/apache/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
sudo mkdir /opt/kafka
sudo tar -xvf kafka_2.11-0.11.0.1.tgz
sudo mv kafka_2.11-0.11.0.1/* /opt/kafka
sudo rm -r kafka_2.11-0.11.0.1
Question: Create an environmental variable KAFKA_HOME pointing to that directory. Place the directory KAFKA_HOME/bin in the PATH variable in the .bash_profile file in your home directory. Source .bash_profile.
The following lines add kafka to the path:
# Add Kafka to the home directory
echo "export KAFKA_HOME='/opt/kafka'" >> /home/tim/.bashrc
echo "export PATH=$PATH:$KAFKA_HOME/bin" >> /home/tim/.bashrc
# Source file
source /home/tim/.bashrc
Question: Make sure that Zookeeper server is started. Kafka configuration files reside in the directories: $KAFKA_HOME/config. Create a topic. Demonstrate that provided scripting client kafka-console-producer.sh receives and displays messages produced by kafka-console-consumer.sh client.
After installing Kafka, we use the following to launch Zookeeper:
sudo /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
The zookeper installation kan be tested with the following command:
netstat -ant | grep :2181
tim@ip-172-31-24-35:~/e63$ netstat -ant | grep :2181
tcp6 0 0 :::2181 :::* LISTEN
tcp6 0 0 127.0.0.1:2181 127.0.0.1:55044 ESTABLISHED
tcp6 0 0 127.0.0.1:55044 127.0.0.1:2181 ESTABLISHED
As can be seen above, zookeper is listening on port 2181.
Next we have to start spark on the machine. The option localhost[2] indicates that 2 threads are beeing used.
sudo /home/tim/spark/sbin/start-all.sh --host localhost[2]
With the following command is kafka started:
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
I’m creating the topic tim with 1 partition. We could also speed up data processing by adding more partitions. Kafka would check, that the order accross the partictions is kept. However, 1 partition is easier to visualize with he output. That is why only 1 partition is chosen.
sudo /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic tim
We can list the topic with the following command:
sudo /opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181
tim@ip-172-31-24-35:~/e63$ sudo /opt/kafka/bin/kafka-topics.sh --describe \
--zookeeper localhost:2181
Topic:tim PartitionCount:1 ReplicationFactor:1 Configs:
Topic: tim Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Now we start our producer with:
sudo /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tim
tim@ip-172-31-24-35:~/e63$ sudo /opt/kafka/bin/kafka-console-producer.sh \
--broker-list localhost:9092 --topic tim
>test
>Hello World!
>
And then our consumer with
sudo /opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic tim
tim@ip-172-31-24-35:~/e63$ sudo /opt/kafka/bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 --topic tim
Using the ConsoleConsumer with old consumer is deprecated and will be removed
in a future major release. Consider using the new consumer by passing
[bootstrap-server] instead of [zookeeper].
test
Hello World!
Question: Make supplied python script kafka_consumer.py receive messages produced by supplied python script kafka_producer.py. Modify kafka_producer.py so that you can pass server name and the port of the Kafka broker and the name of Kafka topic on the command line.
Before we can start sending data to the producer we have to get the data onto the image. We’re doing that trough git/github.
git clone https://github.com/greenore/e63.git
cd e63
# Untar
tar xvzf orders.tar-1.gz
Next we have to install the kafka library to python
pip3 install kafka
I had to rewirte the kafka_producer.py file, so that it works with python 3.
from kafka import KafkaProducer
import time
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_producer.py <broker_list> <topic>", file=sys.stderr)
exit(-1)
broker_list, topic = sys.argv[1:]
producer = KafkaProducer(bootstrap_servers=broker_list)
for batch in range(3):
print('Starting batch #' + str(batch))
for i in range(4):
print('sending message #' + str(i))
message = 'test message #' + str(i)
producer.send(topic, value=message.encode())
print('Finished batch #' + str(batch))
print('Sleeping for 5 seconds ...')
time.sleep(5)
print('Done sending messages')
Next we’re starting the producer file on port 9092.
python3 kafka_producer.py localhost:9092 tim
tim@ip-172-31-24-35:~/e63$ python3 kafka_producer.py localhost:9092 tim
Starting batch #0
sending message #0
sending message #1
sending message #2
sending message #3
Finished batch #0
Sleeping for 5 seconds ...
Starting batch #1
sending message #0
sending message #1
sending message #2
sending message #3
Finished batch #1
Sleeping for 5 seconds ...
Starting batch #2
sending message #0
sending message #1
sending message #2
sending message #3
Finished batch #2
Sleeping for 5 seconds ...
Done sending messages
With that we can start the kafka_consumer file.
python3 kafka_consumer.py localhost:9092 tim
tim@ip-172-31-24-35:~/e63$ python3 kafka_consumer.py localhost:9092 tim
Topic is: tim
Group is: my-group1
...
got msg: ConsumerRecord(topic='tim', partition=0, offset=200967,
timestamp=1509816176691, timestamp_type=0, key=None, value=b'test message #0',
checksum=-501606557, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200967
got msg: ConsumerRecord(topic='tim', partition=0, offset=200968,
timestamp=1509816176692, timestamp_type=0, key=None, value=b'test message #1',
checksum=-1543171966, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200968
got msg: ConsumerRecord(topic='tim', partition=0, offset=200969,
timestamp=1509816176693, timestamp_type=0, key=None, value=b'test message #2',
checksum=-1138154465, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200969
got msg: ConsumerRecord(topic='tim', partition=0, offset=200970,
timestamp=1509816176706, timestamp_type=0, key=None, value=b'test message #3',
checksum=1566005260, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200970
got msg: ConsumerRecord(topic='tim', partition=0, offset=200971,
timestamp=1509816181710, timestamp_type=0, key=None, value=b'test message #0',
checksum=-281509412, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200971
got msg: ConsumerRecord(topic='tim', partition=0, offset=200972,
timestamp=1509816181711, timestamp_type=0, key=None, value=b'test message #1',
checksum=421188717, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200972
got msg: ConsumerRecord(topic='tim', partition=0, offset=200973,
timestamp=1509816181711, timestamp_type=0, key=None, value=b'test message #2',
checksum=-2146204201, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200973
got msg: ConsumerRecord(topic='tim', partition=0, offset=200974,
timestamp=1509816181711, timestamp_type=0, key=None, value=b'test message #3',
checksum=-149637823, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200974
got msg: ConsumerRecord(topic='tim', partition=0, offset=200975,
timestamp=1509816186716, timestamp_type=0, key=None, value=b'test message #0',
checksum=-1485067759, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200975
got msg: ConsumerRecord(topic='tim', partition=0, offset=200976,
timestamp=1509816186717, timestamp_type=0, key=None, value=b'test message #1',
checksum=1364852640, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200976
got msg: ConsumerRecord(topic='tim', partition=0, offset=200977,
timestamp=1509816186717, timestamp_type=0, key=None, value=b'test message #2',
checksum=-934236646, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200977
got msg: ConsumerRecord(topic='tim', partition=0, offset=200978,
timestamp=1509816186717, timestamp_type=0, key=None, value=b'test message #3',
checksum=-1084776820, serialized_key_size=-1, serialized_value_size=15)
partition: 0 message offset: 200978
Question: Also, modify that script so that it continuously reads your terminal inputs and sends every line to Kafka consumer. Demonstrate that kafka_consumer.py can read and display messages of modified kafka_producer.py. Provide working code of modified kafka_producer.py. Describe to us the process of installing Python packages, if any, you needed for this problem.
The following producer is able to read continuously the terminal messages.
from kafka import KafkaProducer
import time
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_producer2.py <broker_list> <topic>", file=sys.stderr)
exit(-1)
broker_list, topic = sys.argv[1:]
producer = KafkaProducer(bootstrap_servers=broker_list)
for i in sys.stdin:
message = 'typed message #' + str(i)
producer.send(topic, value=message.encode())
When starting the kafka_producer2.py script we can enter different text.
python3 kafka_producer2.py localhost:9092 tim
tim@ip-172-31-24-35:~/e63$ python3 kafka_producer2.py localhost:9092 tim
Hello World!
Test
The kafka_consumer.py skript is able to read the upper intput.
python3 kafka_consumer.py localhost:9092 tim
tim@ip-172-31-24-35:~/e63$ python3 kafka_consumer.py localhost:9092 tim
Topic is: tim
Group is: my-group1
...
got msg: ConsumerRecord(topic='tim', partition=0, offset=25,
timestamp=1509730033483, timestamp_type=0, key=None,
value=b'typed message #Hello World!\n', checksum=-2029186398,
serialized_key_size=-1, serialized_value_size=28)
partition: 0 message offset: 25
got msg: ConsumerRecord(topic='tim', partition=3, offset=19,
timestamp=1509730034996, timestamp_type=0, key=None,
value=b'typed message #Test\n', checksum=-1409034517,
serialized_key_size=-1, serialized_value_size=20)
partition: 3 message offset: 19
Question: Rather than using splitAndSend.sh bash script to generate traffic towards Spark Streaming engine, write a Kafka Producer which will read orders.txt file and send 1,000 orders to a Kafka topic every second. Write a Kafka consumer that will deliver those batches of orders to Spark Streaming engine. Base your Kafka consumer on provided direct_word_count.py script. Let Spark streaming engine count the number of orders different stocks where bought in each batch. Display for us a section of results in your solution. Describe to us the process of installing and invoking Python packages, if any, you needed for this problem.
First we’re installing the necessary dependencies
sudo pip3 install kafka-python
sudo pip3 install future
Then we’re adapting the producer script that reads the orders.txt, splits them into batches of the size 1’000 and transfers them to kafka.
# Libraries
from __future__ import print_function
import sys
import time
import itertools
import findspark
findspark.init("/home/tim/spark")
from kafka import KafkaProducer
# __name__
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
exit(-1)
broker_list, topic = sys.argv[1:]
producer = KafkaProducer(bootstrap_servers=broker_list)
# Open file
file = open('/home/tim/e63/orders.txt')
rdd_file = file.read()
rdd_split = rdd_file.split('\n')
# Loop trough file (the orders file has 500'000 rows)
for batch in range(500):
print('Start batch #' + str(batch))
for i in range(1000):
place = i + (batch * 1000)
print(rdd_split[place])
producer.send(topic, rdd_split[place].encode())
print('Finish batch #' + str(batch))
print('Sleep 1 second')
time.sleep(1)
# End
print('Finished!')
Running the scripts leads to the following output.
python /home/tim/e63/kafka_producer3.py localhost:9092 tim
tim@ip-172-31-24-35:~/e63$ python3 kafka_producer3.py localhost:9092 tim
Start batch #0
Finish batch #0
Sleep 1 second
Start batch #1
Finish batch #1
...
Sleep 1 second
Start batch #498
Finish batch #498
Sleep 1 second
Start batch #499
Finish batch #499
Sleep 1 second
Finished!
Next we’re adapting the direct_word_count.py script into a new kafka_consumer3.py script. We’re transforming the buy variable into a binary true/false variable in the parse_data function. The goal is count the number of different stocks that where bought in each batch. We’re doing that by aggregating the stock symbol variable by it’s buy variable.
# Libraries
from __future__ import print_function
from operator import add, sub
import sys
import findspark
findspark.init("/home/tim/spark")
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime
# Parse function
def parse_data(line):
s = line.rstrip().split(",")
try:
if s[6] != "B" and s[6] != "S":
raise Exception('Wrong format')
return [
{"time": datetime.strptime(s[0], "%Y-%m-%d %H:%M:%S"),
"orderId": int(s[1]), "clientId": int(s[2]),
"symbol": s[3], "amount": int(s[4]), "price": float(s[5]),
"buy": s[6] == "B"}]
except Exception as err:
print("Wrong line format (%s): " % line)
return []
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_consumer3.py <broker_list> <topic>", file=sys.stderr)
exit(-1)
# Get brokers and topic
broker_list, topic = sys.argv[1:]
# Open spark context
conf = SparkConf().setAppName("PythonStreamingDirectKafkaWordCount")
conf = conf.setMaster("local[2]")
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
# Open kafka stream
kvs = KafkaUtils.createDirectStream(ssc, [topic],
{"metadata.broker.list": broker_list})
filestream = kvs.transform(lambda rdd: rdd.values())
# Parse file
rdd_orders = filestream.flatMap(parse_data)
# Sum buys (1) and sells (0)
result = rdd_orders.map(lambda x: (x['symbol'], x['buy'])).reduceByKey(add)
# Print result
result.pprint()
ssc.start()
ssc.awaitTermination()
We can run the script with the following command
python /home/tim/e63/kafka_consumer3.py localhost:9092 tim
tim@ip-172-31-24-35:~/e63$ python3 kafka_consumer3.py localhost:9092 tim
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
For SparkR, use setLogLevel(newLevel).
17/11/04 19:29:09 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 2017-11-04 19:29:14
-------------------------------------------
('LEU', 13)
('C', 14)
('AMD', 11)
('PHG', 14)
('FCEL', 16)
('PYPL', 10)
('GE', 13)
('CHU', 8)
('WLL', 15)
('TOT', 9)
...
-------------------------------------------
Time: 2017-11-04 19:29:16
-------------------------------------------
('LEU', 10)
('C', 14)
('AMD', 20)
('TSU', 16)
('FCEL', 7)
('PYPL', 13)
('GE', 11)
('CHU', 7)
('WLL', 17)
('TOT', 6)
...
As can be seen above, the script is counting the number of stocks that were bought for each batch.
Question: Install Cassandra server on your VM.
We’re installing cassandra on the ubuntu instance. That means we have to add the debian source to the sources.list.d.
echo "deb http://www.apache.org/dist/cassandra/debian 311x main" | sudo tee -a \
/etc/apt/sources.list.d/cassandra.sources.list
curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
sudo apt-key adv --keyserver pool.sks-keyservers.net --recv-key A278B781FE4B2BDA
sudo apt-get update
sudo apt-get install cassandra
We also need to install cqlsh and the python driver.
pip install cqlsh
pip install cassandra-driver
pip3 install cassandra-driver
conda install -c conda-forge cassandra-driver
Next we can start cassandra with the following commands
systemctl enable cassandra
systemctl start cassandra
systemctl -l status cassandra
The cqlsh version is not compatible with all cassandra versions. However, we can add the cqlversion number when starting cqlsh.
cqlsh --cqlversion=3.4.4
Question: Use Cassandra SQL Client, cqlsh, to create and populate table person. Let every person be described by his or her first and last name, and city where he or she lives. Let every person possess up to three cell phones. Populate your table with three individuals using cqlsh client. Demonstrate that you can select the content of your table person including individuals’ cell phones.
We’re first creating the keyspace hw9.
CREATE KEYSPACE IF NOT EXISTS hw9
WITH replication = {'class':'SimpleStrategy','replication_factor':1};
Next we’re using the keyspace
USE hw9;
Creating a table can be done with the following sql statement.
CREATE TABLE person (id int,
first_name text,
last_name text,
city text,
cell1 text,
cell2 text,
cell3 text,
PRIMARY KEY (id));
Next we’re verifing if the table was created as programmed above.
DESCRIBE person;
cqlsh:hw9> DESCRIBE person;
CREATE TABLE hw9.person (
id int PRIMARY KEY,
cell1 text,
cell2 text,
cell3 text,
city text,
first_name text,
last_name text
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class':
'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64',
'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
Next we can insert data into our table.
INSERT INTO person (id, first_name, last_name, city, cell1, cell2, cell3)
VALUES (1, 'Tim','Hagmann','Basel','111-111-0000', '111-222-0000', '111-333-0000');
INSERT INTO person (id, first_name, last_name, city, cell2, cell3)
VALUES (2, 'Michael','M?ller','Z?rich','222-222-0000', '222-333-0000');
INSERT INTO person (id, first_name, last_name, city, cell1, cell3)
VALUES (3, 'Linda','Meier','Bern','333-111-0000', '333-333-0000');
Select all data from the table person:
SELECT * FROM person;
cqlsh:hw9> SELECT * FROM person;
id | cell1 | cell2 | cell3 | city | first_name | last_name
----+--------------+--------------+--------------+--------+------------+-----------
1 | 111-111-0000 | 111-222-0000 | 111-333-0000 | Basel | Tim | Hagmann
2 | null | 222-222-0000 | 222-333-0000 | Z?rich | Michael | M?ller
3 | 333-111-0000 | null | 333-333-0000 | Bern | Linda | Meier
With the following commands can we select individual elements
SELECT first_name, last_name, city, cell1, cell2 FROM person;
cqlsh:hw9> SELECT first_name, last_name, city, cell1, cell2 FROM person;
first_name | last_name | city | cell1 | cell2
------------+-----------+--------+--------------+--------------
Tim | Hagmann | Basel | 111-111-0000 | 111-222-0000
Michael | M?ller | Z?rich | null | 222-222-0000
Linda | Meier | Bern | 333-111-0000 | null
Question: Write a simple client in a language of your choice that will populate 3 rows in Casandra’s table person, subsequently update one of those rows, for example change the city where a person lives, and finally retrieve that modify row from Cassandra and write its content to the console.
I wrote a client in the cassandra_client.py script. However, I splited the script up into the following code chunks so that it is easier to see what I did.
# Load libraries
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
# Setup connection
cluster = Cluster()
session = cluster.connect('hw9')
# Create a person record
def insert_person(id, first_name, last_name, city, cell1, cell2, cell3):
session.execute("""INSERT INTO person (id, first_name, last_name, city,
cell1, cell2, cell3)
VALUES (%s,%s,%s,%s,%s,%s,%s)""",
(id, first_name, last_name, city, cell1, cell2, cell3))
# All
def select_all():
statement = SimpleStatement("SELECT * FROM person")
result = session.execute(statement)
return result
# By id
def select_one(id):
result = session.execute("""SELECT id, first_name, city FROM person
WHERE id = %s""", [id])[0]
return result
# Update city
def update_city(id, city):
session.execute("UPDATE person SET city = %s where id = %s", (city,id))
# Add three new records
insert_person(4, 'Hans-Peter', 'Hugentobler', 'Grenchen', '444-111-0000',
'444-222-0000', '444-333-0000')
insert_person(5, 'Christof', 'Hausmann', 'Bottmingen', '555-111-0000',
'555-222-0000', '555-333-0000')
insert_person(6, 'Gabriela', 'Tschaggelar', 'Bottmingen',
'666-111-0000','666-222-0000','666-222-0000')
# Select persons
df_persons = select_all()
# Print output
for i in df_persons:
print(i)
Row(id=5, cell1='555-111-0000', cell2='555-222-0000', cell3='555-333-0000',
city='Bottmingen', first_name='Christof', last_name='Hausmann')
Row(id=1, cell1='111-111-0000', cell2='111-222-0000', cell3='111-333-0000',
city='Bettlach', first_name='Tim', last_name='Hagmann')
Row(id=2, cell1=None, cell2='222-222-0000', cell3='222-333-0000', city='Z?rich',
first_name='Michael', last_name='M?ller')
Row(id=4, cell1='444-111-0000', cell2='444-222-0000', cell3='444-333-0000',
city='Grenchen', first_name='Hans-Peter', last_name='Hugentobler')
Row(id=6, cell1='666-111-0000', cell2='666-222-0000', cell3='666-222-0000',
city='Bottmingen', first_name='Gabriela', last_name='Tschaggelar')
Row(id=3, cell1='333-111-0000', cell2=None, cell3='333-333-0000', city='Bern',
first_name='Linda', last_name='Meier')
# Updade id 1
update_city(1, 'Bettlach')
# Select and print
df_person = select_one(1)
print(df_person.first_name + " has a new city with the name: " + df_person.city)
Tim has a new city with the name: Bettlach
As mentioned before, all of the above code can also be found in the cassandra_client.py script. It can be executed with the following command:
python3 cassandra_client.py