Fuse HDFS

By | | No Comments

Fuse HDFS allows you use standard posix system commands with HDFS. This may be useful, for example, if you have a program that needs to use data that is stored in HDFS. 

To use Fuse HDFS, change directories to /hadoop-fuse/user/<your_uniqname>

Once in this directory, you can use commands on your HDFS files just as you would on any other files. For example, the ls command will list the contents of your HDFS home directory.

You could also run a Python or R program that uses a file in HDFS.

You can save the below file and run it as you would regularly run a python program to access an example data file we have available to all users in HDFS.

#!/usr/bin/python
f = open("/hadoop-fuse/var/examples/romeojuliet.txt", "r")
data = f.read()
f.close()
d = {}
for word in data.split(' '):
        if word in d:
                d[word] += 1
        else:
                d[word] = 1
for word, count in d.items():
        print word + str(count)

Java MapReduce

By | | No Comments

You can compile and run a MapReduce program written in Java on our Hadoop cluster. Below is an example of how to do so. You can get the WordCount.java file from HDFS:

hdfs dfs -get /var/examples/WordCount.java

 

Then run the following:

javac -cp `hadoop classpath` WordCount.java
jar cf wc.jar WordCount*.class
hadoop jar wc.jar WordCount \
/var/examples/romeojuliet.txt /user/<your_uniqname>/wc-output

 

To view the output:

hdfs dfs -cat /user/<your_uniqname>/wc-output/part-r-00000

mrjob

By | | No Comments

Another way to run Hadoop jobs is through mrjob. Mrjob is useful for testing out smaller data on another system (such as your laptop), and later being able to run it on something larger, like a Hadoop cluster. To run an mrjob on your laptop, you can simply remove the “-r hadoop” from the command in the example we use here.

A classic example is a word count, taken from the official mrjob documentation here.

Save this file as mrjob_test.py.

"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
     MRWordFreqCount.run()

Then, run the following command:

python mrjob_test.py -r hadoop /etc/motd

You should receive an output with the word count of the file /etc/motd. You can also try this with any other file you have that contains text.

SparkR

By | | No Comments

SparkR allows users to utilize the ease of data analysis in R while using the speed and capacity of Spark on our Hadoop cluster. Those familiar with R should have no problem utilizing this feature. After opening the SparkR session, simply begin typing out your program in R.

Run this to open a SparkR session:

sparkR --master yarn --queue <your_queue> --num-executors 4 --executor-memory 1g --executor-cores 4

 

The following is an example you can run to get a feel for how SparkR works. This example was taken from the official SparkR documentation, which also provides other examples.

families <- c("gaussian", "poisson")
train <- function(family) {
 model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
 summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

Overview

By | | No Comments

The Flux Hadoop cluster will be retired from service on July 1, 2019. Persistent data that is stored in the Hadoop Distributed File System (HDFS) will be destroyed and unavailable after July 1. If you have persistent data that must be preserved and need assistance making arrangements to find a storage location for this data, or you would like to move it to the Cavium Hadoop cluster please contact hpc-support@umich.edu as soon as possible.

The Cavium Hadoop cluster is replacing the Flux Hadoop cluster. You may request access to the Cavium Hadoop cluster by contacting hpc-support@umich.edu.

Please contact us if you have any questions or issues.

Hadoop is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures. (From hadoop.apache.org)

Flux Hadoop is a technology preview and available at no cost. It may have less technical support than the other Flux services. The Flux Hadoop cluster consists of 12 nodes offering 100TB of HDFS space, and is based on the Kerberos-enabled Hortonworks Data Platform 2.6.3.0. Spark 1 and 2.x are available (and SparkR), Hive2 with Hive on Tez, as well as Anaconda Python 2 and 3.

The software available is:

Introduction to Spark

By | | No Comments

Spark and PySpark utilize a container called Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable — once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Using Spark’s native language, Scala, requires more setup than using PySpark. Some example Scala jobs, including the same example job in the PySpark documentation, can be found on this website. That Spark code has some trivial set up required to run a Spark job, and all of the actual logic is in the ‘run’ function.

On our cluster, Spark 2 is the default. If you would like to use Spark 1, you need to run the following command:

export SPARK_MAJOR_VERSION=1

Beeline

By | | No Comments

Beeline is an alternative to using the Hive CLI. While the Hive CLI connects directly to HDFS and the Hive Metastore, Beeline connects to HiveServer2. More information comparing the two can be found here.

To launch Hive using Beeline, run the command below. All Hive queries cane be run normally once connected to Beeline.

beeline -u 'jdbc:hive2://fladoop-nn02.arc-ts.umich.edu:2181,fladoop-nn01.arc-ts.umich.edu:2181,fladoop-rm01.arc-ts.umich.edu:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2'

#Set your queue in the CLI
set tez.queue.name=<your_queue>;

Streaming (Other Programming Methods)

By | | No Comments

It is also possible to write a job in any programming language, such as Python or C, that operates on tab-separated key-value pairs. The same example done above with Hive and Pig can also be written in Python and submitted as a Hadoop job using Hadoop Streaming. Submitting a job with Hadoop Streaming requires writing a mapper and a reducer. The mapper reads input line by line and generates key-value pairs for the reducer to “reduce” into some sort of sensible data. For our case, the mapper will read in lines and output the year as the key and a ‘1’ as the value if the ngram in the line it reads has only appeared in a single volume. The python code to do this is:

(Save this file as map.py)

#!/usr/bin/env python2.7
import fileinput
for line in fileinput.input():
 arr = line.split("\t")
 try:
    if int(arr[3]) == 1:
       print("\t".join([arr[1], '1']))
 except IndexError:
       pass
 except ValueError:
       pass

 

Now that the mapper has done this, the reduce merely needs to sum the values based on the key:

(Save this file as red.py)

#!/usr/bin/env python2.7

import fileinput

data = dict()

for line in fileinput.input():
  arr = line.split("\t")
  if arr[0] not in data.keys():
     data[arr[0]] = int(arr[1])
  else:
     data[arr[0]] = data[arr[0]] + int(arr[1])

for key in data:
 print("\t".join([key, str(data[key])]))

 

Submitting this streaming job can be done by running the below command:

yarn jar $HADOOP_STREAMING \
 -Dmapreduce.job.queuename=<your_queue> \
 -input /var/ngrams/data \
 -output ngrams-out \
 -mapper map.py \
 -reducer red.py \
 -file map.py \
 -file red.py \
 -numReduceTasks 10


hdfs dfs -cat ngrams-out/* | tail -5

streaming outputhdfs dfs -rm -r -skipTrash /user/<your_uniqname>/ngrams-out

Parquet Files

By | | No Comments

If you’re familiar with Spark, you know that a dataframe is essentially a data structure that contains “tabular” data in memory. That is, it consists of rows and columns of data that can, for example, store the results of an SQL-style query. Dataframes can be saved into HDFS as Parquet files. Parquet files not only preserve the schema information of the dataframe, but will also compress the data when it gets written into HDFS. This means that the saved file will take up less space in HDFS and it will load faster if you read the data again later. Therefore, it is a useful storage format for data you may want to analyze multiple times.

The Pyspark example below uses Reddit data which is available to all Flux Hadoop users in HDFS ‘/var/reddit’. This data consists of information about all posts made on the popular website Reddit, including their score, subreddit, text body, author, all of which can make for interesting data analysis.

#First, launch the pyspark shell

pyspark --master yarn --queue <your_queue> --num-executors 35 --executor-cores 4 --executor-memory 5g

#Load the reddit data into a dataframe

>>> reddit = sqlContext.read.json("/var/reddit/RS_2016-0*")

#Set compression type to snappy

>>> sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

#Write data into a parquet file - this example puts it into your HDFS home directory as “reddit.parquet”

>>> reddit.write.parquet("reddit.parquet")

#Create a new dataframe from parquet file 

>>> parquetFile = sqlContext.read.parquet("reddit.parquet")

#Register dataframe as a SQL temporary table

>>> parquetFile.registerTempTable(“reddit_table")

#Query the table

#Can really be any query, but this query will find some of the more highly rated posts

>>> ask = sqlContext.sql(“SELECT title FROM reddit_table WHERE score > 1000 and subreddit = ‘AskReddit’”)

#Show first 20 lines of results (actually executes query)

>>> ask.show()

#Since we created the dataframe “ask” with the previous query, we can write it out to HDFS as a parquet file so it can be accessed again later

>>> ask.write.parquet(“ask.parquet”)

#Exit the pyspark console - you’ll view the contents of your parquet file after

>>> exit()

 

To view the contents of your Parquet file, use Parquet tools. Parquet tools is a command line tool that aids in the inspection of Parquet files, such as viewing its contents or its schema.

#view the output

hadoop jar /sw/dsi/dsi/noarch/parquet-tools-1.7.0.jar cat \ 
ask.parquet

#view the schema; in this case, just the “title” of the askreddit thread

hadoop jar /sw/dsi/dsi/noarch/parquet-tools-1.7.0.jar schema \ 
ask.parquet

#to get a full list of all of the options when using Parquet tools

hadoop jar /sw/dsi/dsi/noarch/parquet-tools-1.7.0.jar -h


Spark Shell

By | | No Comments

Spark has an easy-to-use interactive shell that can be used to learn API and also analyze data interactively. Below is a simple example written in Scala. You can use any text file that you have:

spark-shell --master yarn --queue <your_queue>
scala> val textFile = spark.read.textFile("test.txt")
scala> textFile.count()
scala> textFile.first()
//Count how many lines contain the word "words"
//You can replace "words" with any word you'd like
scala> textFile.filter(line => line.contains("words")).count()