Spark

A fast and general engine for large-scale data processing

Speed

Easy

Generality

Language Selection

  • Scala
  • Java
  • Python
  • R

PySpark

PySpark Basics

  • Based on Anaconda Python distribution
  • Over 720 packages for data preparation, data analysis, data visualization, machine learning and interactive data science

Running pyspark interactively

  • Using Jupyter notebook
  • Running from the command line using ipython:
    PYSPARK_DRIVER_PYTHON=ipython pyspark

Example


          [jlopez@login7 ~]$ PYSPARK_DRIVER_PYTHON=ipython pyspark
          >>>  from pyspark.sql import Row
          >>>  Person = Row('name', 'surname')
          >>>  data = []
          >>>  data.append(Person('Joe', 'MacMillan'))
          >>>  data.append(Person('Gordon', 'Clark'))
          >>>  data.append(Person('Cameron', 'Howe'))
          >>>  df = sqlContext.createDataFrame(data)
          >>>  df.show()
            +-------+---------+
            |   name|  surname|
            +-------+---------+
            |    Joe|MacMillan|
            | Gordon|    Clark|
            |Cameron|     Howe|
            +-------+---------+
        

SparkR

SparkR Basics

  • Anaconda R distribution
  • r-essentials bundle: contains the IRKernel and more than 80 of the most popular R packages for data science, including dplyr, shiny, ggplot2, tidyr, caret and nnet.

Running SparkR interactively

  • Using Jupyter notebook
  • Running from the command line
    [jlopez@login6 ~]$ sparkR

Example


          [jlopez@login7 ~]$ sparkR
          > df <- createDataFrame(sqlContext, faithful)
          > head(df)
            eruptions waiting
            1     3.600      79
            2     1.800      54
            3     3.333      74
            4     2.283      62
            5     4.533      85
            6     2.883      55
        

Jupyter Setup 1/2

Start an SSH session with X11 support:
ssh -X login.hdp.cesga.es

Jupyter Setup 2/2

Inside the notebook initialize the Spark context:

          Sys.setenv(SPARK_HOME='/usr/hdp/2.4.2.0-258/spark')
          .libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))

          library(SparkR)

          sc <- sparkR.init(master="yarn-client")
          sqlContext <- sparkRSQL.init(sc)
          

spark-submit

Submit job to queue

Spark Components

spark-submit Python


        # client mode
        spark-submit --master yarn \
          --name testWC test.py input output
        # cluster mode
        spark-submit --master yarn --deploy-mode cluster \
          --name testWC test.py input output
        

spark-submit Scala/Java


        # client mode
        spark-submit --master yarn --name testWC \
          --class es.cesga.hadoop.Test test.jar \
          input output
        # cluster mode
        spark-submit --master yarn --deploy-mode cluster \
          --name testWC \
          --class es.cesga.hadoop.Test test.jar \
          input output
        

spark-submit options


--num-executors NUM    Number of executors to launch (Default: 2)
--executor-cores NUM   Number of cores per executor. (Default: 1)
--driver-cores NUM     Number of cores for driver (cluster mode)
--executor-memory MEM  Memory per executor (Default: 1G)
--queue QUEUE_NAME     The YARN queue to submit to (Default: "default")
--proxy-user NAME      User to impersonate