Parquet Files

By |

If you’re familiar with Spark, you know that a dataframe is essentially a data structure that contains “tabular” data in memory. That is, it consists of rows and columns of data that can, for example, store the results of an SQL-style query. Dataframes can be saved into HDFS as Parquet files. Parquet files not only preserve the schema information of the dataframe, but will also compress the data when it gets written into HDFS. This means that the saved file will take up less space in HDFS and it will load faster if you read the data again later. Therefore, it is a useful storage format for data you may want to analyze multiple times.

The Pyspark example below uses Reddit data which is available to all Cavium Hadoop users in HDFS ‘/var/reddit’. This data consists of information about all posts made on the popular website Reddit, including their score, subreddit, text body, author, all of which can make for interesting data analysis.

#First, launch the pyspark shell

pyspark --master yarn --queue <your_queue> --num-executors 35 --executor-cores 4 --executor-memory 5g

#Load the reddit data into a dataframe

>>> reddit = sqlContext.read.json("/var/reddit/RS_2016-0*")

#Set compression type to snappy

>>> sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

#Write data into a parquet file - this example puts it into your HDFS home directory as “reddit.parquet”

>>> reddit.write.parquet("reddit.parquet")

#Create a new dataframe from parquet file 

>>> parquetFile = sqlContext.read.parquet("reddit.parquet")

#Register dataframe as a SQL temporary table

>>> parquetFile.registerTempTable(“reddit_table")

#Query the table

#Can really be any query, but this query will find some of the more highly rated posts

>>> ask = sqlContext.sql(“SELECT title FROM reddit_table WHERE score > 1000 and subreddit = ‘AskReddit’”)

#Since we created the dataframe “ask” with the previous query, we can write it out to HDFS as a parquet file so it can be accessed again later

>>> ask.write.parquet(“ask.parquet”)

#Exit the pyspark console - you’ll view the contents of your parquet file after

>>> exit()

 

To view the contents of your Parquet file, use Parquet tools. Parquet tools is a command line tool that aids in the inspection of Parquet files, such as viewing its contents or its schema.

#view the output 

hadoop jar /sw/dsi/noarch/parquet-tools-1.7.0.jar cat \ 
ask.parquet

#view the schema; in this case, just the “title” of the askreddit thread 

hadoop jar /sw/dsi/noarch/parquet-tools-1.7.0.jar schema \ 
ask.parquet 

#to get a full list of all of the options when using Parquet tools  
hadoop jar /sw/dsi/noarch/parquet-tools-1.7.0.jar -h

Introduction to Spark

By |

Spark and PySpark utilize a container called Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable — once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Using Spark’s native language, Scala, requires more setup than using PySpark. Some example Scala jobs, including the same example job in the PySpark documentation, can be found on this website. That Spark code has some trivial set up required to run a Spark job, and all of the actual logic is in the ‘run’ function.

PySpark

By |

Spark comes with an interactive Python console, which can be opened this way:

# Load the pyspark console 
pyspark --master yarn --queue <your_queue>

This interactive console can be used for prototyping or debugging, or just running simple jobs.

The following example runs a simple line count on a text file, as well as counts the number of instances of the word “words” in that textfile. You can use any text file you have for this example:

>>> textFile = sc.textFile("test.txt")
>>> textFile.count()
>>> textFile.first()
>>> textFile.filter(lambda line: "words" in line).count()

 

You can also submit a job using PySpark without using the interactive console.

Save this file as job.py.

from pyspark import SparkConf, SparkContext
import sys

# This script takes two arguments, an input and output
if len(sys.argv) != 3:
  print('Usage: ' + sys.argv[0] + ' <in> <out>')
  sys.exit(1)

input = sys.argv[1]
output = sys.argv[2]

# Set up the configuration and job context
conf = SparkConf().setAppName('AnnualWordLength')
sc = SparkContext(conf=conf)


# Read in the dataset and immediately transform all the lines in arrays
data = sc.textFile(input).map(lambda line: line.split('\t'))

# Create the 'length' dataset as mentioned above. This is done using the next two variables, and the 'length' dataset ends up in 'yearlyLength'.
yearlyLengthAll = data.map(
    lambda arr: (int(arr[1]), float(len(arr[0])) * float(arr[2]))
)
yearlyLength = yearlyLengthAll.reduceByKey(lambda a, b: a + b)

# Create the 'words' dataset as mentioned above.
yearlyCount = data.map(
    lambda arr: (int(arr[1]), float(arr[2]))
).reduceByKey(
    lambda a, b: a + b
)

# Create the 'average_length' dataset as mentioned above.
yearlyAvg = yearlyLength.join(yearlyCount).map(
    lambda tup: (tup[0], tup[1][0] / tup[1][1])
)

# Save the results in the specified output directory.
yearlyAvg.saveAsTextFile(output)

# Finally, let Spark know that the job is done.
sc.stop()

This above script averages the lengths of words in the NGrams dataset by year. There are two main operations in the above code: ‘map’ and ‘reduceByKey’. ‘map’ applies a function to each RDD element and returns a new RDD containing the results. ‘reduceByKey’ applies a function to the group of values with the same key – for all keys – and returns an RDD with the result.

The job can be submitted by running:

spark-submit \
 --master yarn \
 --num-executors 35 \
 --executor-memory 5g \
 --executor-cores 4 \
 job.py /var/ngrams/data ngrams-out


hdfs dfs -cat ngrams-out/*

 

The only required argument from the above job submission command is ‘–master yarn’. The values passed to the other arguments may be modified in order to get better performance or conform to the limits of your queue.

Spark Shell

By |

Spark has an easy-to-use interactive shell that can be used to learn API and also analyze data interactively. Below is a simple example written in Scala. You can use any text file that you have:

spark-shell --master yarn --queue <your_queue>
scala> val textFile = spark.read.textFile("test.txt")
scala> textFile.count()
scala> textFile.first()
//Count how many lines contain the word "words"
//You can replace "words" with any word you'd like
scala> textFile.filter(line => line.contains("words")).count()

Using Hadoop and HDFS

By |

Hadoop consists of two components; HDFS, a filesystem built for high read speeds, and YARN, a resource manager. HDFS is not a POSIX filesystem, so normal command line tools like “cp” and “mv” will not work. Most of the common tools have been reimplemented for HDFS and can be run using the “hdfs dfs” command. All data must be in HDFS for jobs to be able to read it.

Here are a few basic commands:

# List the contents of your HDFS home directory
hdfs dfs -ls

# Copy local file data.csv to your HDFS home directory
hdfs dfs -put data.csv data.csv

# Copy HDFS file data.csv back to your local home directory
hdfs dfs -get data.csv data2.csv

A complete reference of HDFS commands can be found on the Apache website.

Understanding MapReduce

By |

Writing Hadoop MapReduce code in Java is the lowest level way to program against a Hadoop cluster. Hadoop’s libraries do not contain any abstractions, like Spark RDDs or a Hive or Pig-like higher level language. All code must implement the MapReduce paradigm.

This video provides a great introduction to MapReduce. This documentation provides a written explanation and an example.

Creating an Account

By |

Using the Cavium Hadoop cluster requires an ARC user login. Please fill out this form to request a login if you do not already have one.
When you create an account, you will automatically be added to our default queue. If you are using our Hadoop cluster for a class or another specific purpose, or you need a significant allocation, please open a ticket with us so you can be added to the appropriate queue. In many of the examples in this user guide, there is a “–queue <your_queue>” flag. Please fill in the name of your queue when running this examples, or simply “default” if you do not have one.

Overview

By |

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across a cluster of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures. (From hadoop.apache.org)

The software available is:

Please see our workshop training material: