# Interactively Analyse 100GB of JSON data with Spark

Do you know what is the heaviest book ever printed? Let’s find out by exploring the Open Library data set using Spark in Python.

The purpose of this tutorial is educational. You will get a taster of some of the operations available in Spark and how you can use them to interactively explore a dataset that would be inconvenient (because of size and structure) to work with in simpler tools like Excel. You will also see how MapReduce operations can easily be expressed in Spark. Note that we use Spark to run an ad-hoc analysis in a convenient manner. There are alternatives to analyse this dataset such as using Impala.

In this tutorial you will learn:

• how to get started with Spark,
• how to use the map, flatMap, filter, and reduce patterns, and
• how to use the groupByKey and reduceByKey functions.

The Open Library is an initiative intended to create “one web page for every book ever published.” You can download their dataset which is about 20GB of compressed data using the following command in your terminal. The --continue flag lets you download the data in several go.

wget --continue http://openlibrary.org/data/ol_cdump_latest.txt.gz

You can then extract the data – you need about 100GB of free space – using the following command:

gunzip -k ol_cdump_latest.txt.gz | cut -f 5 > ol_cdump.json

To use the dataset on EC2, upload it to Amazon S3. Use the following command, with your S3 bucket name, the upload the data to an S3. Like the three previous steps, this one takes time to complete.

aws s3 cp ol_cdump.json s3://my_bucket

Alternatively, if you want to work with a smaller data set to save time, you can download a sample of the data from https://s3-eu-west-1.amazonaws.com/csparkdata/ol_cdump.json The techniques below will work, but the results will differ.

If you want to analyse the data locally you can install PySpark on your own machine, ignore the Amazon setup and jump straight to the data analysis.

# Starting Amazon EMR

If you would like to get started with Spark on a cluster, a simple option is Amazon Elastic MapReduce (EMR). It gives you a cluster of several machines with Spark pre-configured. This is particularly useful if you quickly need to process a large file which is stored over S3.

Here is a youtube video to show you how you can get started:

# Starting a Zeppelin notebook

Amazon EMR Spark instances come with Zeppelin notebooks: an alternative to jupyter notebooks which run directly on top of Spark. Open a Zeppelin notebook by clicking on the Zeppelin link on Amazon EMR.

Zeppelin allows for multiple languages to be used within the same notebook. Use %pyspark at the top of a cell to run Python commands.

%pyspark
print("Hello")

Alternatively, you can execute shell instructions:

%sh
echo "World !"

# Importing the dataset

The first step is to load the dataset in a Spark RDD: a data structure that abstracts how the data is processed – in distributed mode the data is split among machines – and lets you apply different data processing patterns such as filter, map and reduce. To learn more about RDDs as well as the rest of the topics of this tutorial, check out our big data bootcamp.

You can read the file and turn each line into an element of the RDD using the operation textFile.

path = "s3://my_bucket/ol_cdump.json"
raw_data = sc.textFile(path)

Note that if you are working with a local copy of the file, you can just pass a standard file path (e.g., ol_cdump.json) to the function.

Each element in the RDD is a single string representing a json value. Thus, the second step turns each of these elements in a Python dictionary so they can be analysed more easily. The json.loads function parses a JSON value into a Python dictionary. And the method .map(f) returns a new RDD where f has been applied to each element in the original RDD. Combine the two to parse all the lines of the RDD.

import json
dataset.persist()

Note that the code also calls the method .persist() to cache the RDD in memory so it can be reused directly later.

Each element in the RDD dataset is now a dictionary mapping keys to values.

# Exploratory data analysis

You can start by finding out the number of entries:

dataset.count()

This returns 126,107,177. Not bad, quite a lot of books!

You can take a sneak peak at the data using the first operation to return the very first element. Alternatively, take(k) returns a list of the first k elements

dataset.take(10)

The output is quite long. But you will see that the entries returned contain book attributes such as number_of_pages, title, weight, isbn_10, etc.

In order to understand the shape of the data you have, you can extract all the distinct keys available in the dictionaries. You may be tempted to use the map operation again, but you have to use flatMap instead. Indeed, for each dictionary, you extract a list of keys, so map would produce an RDD of lists of keys. Using flatMap, all the keys are collapsed into a single, flat RDD. One way to think about flatMap is that it lets you apply a one-to-many transformation for each element instead of one-to-one like map does.

On this RDD of keys, you can use distinct to remove duplicate keys. Finally, use the collect operation to extract this RDD of unique keys into a Python list.

keys = dataset.flatMap(lambda d: d.keys()).distinct().collect()
len(keys)

There are 504 unique keys! That is quite a lot of different attributes for a book.

In practice not all JSON objects have the same attributes: there are often missing attributes. E.g., the database may mention the number of pages in a book but not necessarily its printed dimensions. That is what you get with real-world data. In the Open Library dataset you find a lot of diversity! To explore that diversity, you could group the number of attributes for each element using groupByKey:

groups = dataset.map(lambda e: (len(e.keys()), e)).groupByKey()

But hold on! This actually not good practice at all! You could have a most common number of keys with a lot of data associated with that key, say 20GB. In that case, you would create a 20GB Python list which would crash your machine or cause swapping.

A better way of computing the same result is by using reduceByKey:

count_per_key = (
dataset
.map(lambda e: (len(e.keys()), 1))
.reduceByKey(lambda x, y: x + y)
.collect()
)

The reduceByKey operation adds up the 1s generated for each key, which ends up returning the count per number of attributes. Zeppelin is quite useful here as it lets you visualise the result directly through its interface:

print("%table")
for e in count_per_key:
print("%d\t%d" % (e[0], e[1]))

# Tell me about the weight

You managed to get some insights about the data. How about something more fun which you can bring up during fancy dinner parties? You are in luck. One of the JSON attribute is the weight of each book. Are you curious to find out what is the heaviest book? Let’s hope it is an interesting book at least. When you explored the dataset you will have noticed that there are different units for the weight attribute: kg, g, ounces, pounds, etc. It is all very messy! You will need a function that can normalise the weights so you can compare each book:

def sanitizedWeight(weight_str):
w = convertToKilograms(weight_str)
if w > 1e6:  #books above 1e6 kg are errors
return 0.0
else:
return w

def convertToKilograms(weight_str):
result = weight_str.split()
if(len(result)) != 2:
return 0
try:
number = float(result[0])
except ValueError:
return 0
if(result[1] == 'pounds' or result[1] == 'lb' or result[1] == 'lbs'):
return number * 453.592 * 1e-3
elif(result[1] == 'ounces' or result[1] == 'oz' or result[1] == 'oz.'):
return number * 28.35 * 1e-3
elif(result[1] == 'grams' or result[1] == 'gms' or result[1] == 'g'):
return number * 1e-3
elif(result[1] == 'kilograms' or result[1] == 'kilo' or result[1] == 'kg'):
return number
else:
return 0

Note that there are still a few books with weight attributes that are ignored by this function. Some weights appear with no space between the number and the unit, some with unusual capitalisation (GM, KGms), some with typos (“ounds”), some with other oddities. Let's keep a focus on analysing the dataset – but feel free to improve this parser if you want.

To find the heaviest book, you simply need to iterate through the data and reduce it, selecting the heaviest book each time.

heaviest_book = (
dataset
.filter(lambda e: "weight" in e and "title" in e)
.map(lambda e: (e, sanitizedWeight(e["weight"])))
.reduce(lambda x, y: x if x[1]>y[1] else y)
)

So what is the answer? Unfortunately it is a bit of a let down, the output is a book of 200,000 pounds (just over 90 metric tons) with a strangle title!

({… u'weight': u'200000 pounds', …,  u'title': u'u fool,stupid', …}, 90718.40000000001)

Clearly someone inserted a dummy entry in the database! This is a bit of let down.

How about getting a visualisation of when books were published? You can use the operations you’ve learnt to produce the following query:

booksWithDate = (
dataset
.filter(lambda e: "publish_date" in e)
.map(lambda e: (e["publish_date"], 1))
.reduceByKey(lambda x, y: x + y)
.collect()
)

def is_int(s):
try:
t = int(s)
return True
except ValueError:
return False

booksWithDate = (
dataset
.filter(lambda e: "publish_date" in e)
.filter(lambda e: len(e["publish_date"]) >=4)
.filter(lambda e: is_int(e["publish_date"][-4:]))
.map(lambda e: (int(e["publish_date"][-4:]), 1))
.reduceByKey(lambda x, y: x+y)
.collect()
)

Again, you need to process and clean the data before you can work with real years. You can now use the visualisation feature in Zeppelin to get a nice distribution:

print("%table")
for r in d:
print("%d\t%d" % (r[0], r[1]))

If you are interested more about Spark and big data systems, check out our intensive bootcamp: https://cambridgespark.com/courses/bigdata. And in the meantime you can test your Spark skills trying to find answers to these questions:

• Which author has written or co-written the most books in the dataset?
• Which category of books is the most popular in the dataset?