Spark comes with an interactive Python console, which can be opened this way:
# Load the pyspark console pyspark --master yarn --queue <your_queue>
This interactive console can be used for prototyping or debugging, or just running simple jobs.
The following example runs a simple line count on a text file, as well as counts the number of instances of the word “words” in that textfile. You can use any text file you have for this example:
>>> textFile = sc.textFile("test.txt") >>> textFile.count() >>> textFile.first() >>> textFile.filter(lambda line: "words" in line).count()
You can also submit a job using PySpark without using the interactive console.
Save this file as job.py.
from pyspark import SparkConf, SparkContext import sys # This script takes two arguments, an input and output if len(sys.argv) != 3: print('Usage: ' + sys.argv + ' <in> <out>') sys.exit(1) input = sys.argv output = sys.argv # Set up the configuration and job context conf = SparkConf().setAppName('AnnualWordLength') sc = SparkContext(conf=conf) # Read in the dataset and immediately transform all the lines in arrays data = sc.textFile(input).map(lambda line: line.split('\t')) # Create the 'length' dataset as mentioned above. This is done using the next two variables, and the 'length' dataset ends up in 'yearlyLength'. yearlyLengthAll = data.map( lambda arr: (int(arr), float(len(arr)) * float(arr)) ) yearlyLength = yearlyLengthAll.reduceByKey(lambda a, b: a + b) # Create the 'words' dataset as mentioned above. yearlyCount = data.map( lambda arr: (int(arr), float(arr)) ).reduceByKey( lambda a, b: a + b ) # Create the 'average_length' dataset as mentioned above. yearlyAvg = yearlyLength.join(yearlyCount).map( lambda tup: (tup, tup / tup) ) # Save the results in the specified output directory. yearlyAvg.saveAsTextFile(output) # Finally, let Spark know that the job is done. sc.stop()
This above script averages the lengths of words in the NGrams dataset by year. There are two main operations in the above code: ‘map’ and ‘reduceByKey’. ‘map’ applies a function to each RDD element and returns a new RDD containing the results. ‘reduceByKey’ applies a function to the group of values with the same key – for all keys – and returns an RDD with the result.
The job can be submitted by running:
spark-submit \ --master yarn \ --num-executors 35 \ --executor-memory 5g \ --executor-cores 4 \ job.py /var/ngrams/data ngrams-out hdfs dfs -cat ngrams-out/*
The only required argument from the above job submission command is ‘–master yarn-client’. The values passed to the other arguments may be modified in order to get better performance or conform to the limits of your queue.
*Note: Our default Python is Anaconda 2-5.0.1. If you would like to use Anaconda 3-5.0.1 for your PySpark job, run the following command: