BD|CESGA

Providing quick access to ready-to-use Big Data solutions

Because Big Data doesn't have to be complicated



Javier Cacheiro / Cloudera Certified Developer for Spark & Hadoop / @javicacheiro

Big Data

Introduction

3Vs of Big Data

Variety

The importance of data

We don’t have better algorithms.
We just have more data.

Peter Norvig, Google's Research Director

Big Data

Important concepts

Vertical Scaling

Horizontal Scaling

Data Centric

Compute centric: bring the data to the computation

Data centric: bring the computation to the data

MPI Shortcomings

Jonathan Dursi: HPC is dying, and MPI is killing it

  • Wrong level of abstraction for application writers
  • No fault-tolerance

BD|CESGA

Hardware

Hardware Infrastructure

  • 38 nodes: 4 masters + 34 slaves
  • Storage capacity 816TB
  • Aggregated I/O throughtput 30GB/s
  • 64GB RAM per node
  • 10GbE connectivity between all nodes

Hardware Master Nodes

  • Model: Lenovo System x3550 M5
  • CPU: 2x Intel Xeon E5-2620 v3 @ 2.40GHz
  • Cores: 12 (2x6)
  • HyperThreading: On (24 threads)
  • Total memory: 64GB
  • Network: 1x10Gbps + 2x1Gbps
  • Disks: 8x 480GB SSD SATA 2.5" MLC G3HS
  • Controller: ServeRAID M5210 1GB Cache FastPath

Hardware Slave Nodes

  • Modelo: Lenovo System x3650 M5
  • CPU: 2x Intel Xeon E5-2620 v3 @ 2.40GHz
  • Cores: 12 (2x6)
  • HyperThreading: On (24 threads)
  • Total memory: 64GB
  • Network: 1x10Gbps + 2x1Gbps
  • Disks: 12x 2TB NL SATA 6Gbps 3.5" G2HS
  • Controller: N2215 SAS/SATA HBA

BD|CESGA

Software

Platforms available

  • Hadoop Platform
  • PaaS Platform (beta)

Hadoop Platform

  • Ready to use Hadoop ecosystem
  • Covers most of the uses cases
  • Production ready
  • Fully optimized for Big Data applications

PaaS Platform

  • When you need something outside the Hadoop ecosystem
  • Enables you to deploy custom Big Data clusters
  • Advanced resource planning based on Mesos
  • No virtualization overheads: based on Docker
  • Includes a catalog of products ready to use: eg. Cassandra, MongoDB, PostgreSQL

BD|CESGA Portal

bigdata.cesga.es

Front Page

General info

Tools available

WebUI

Tutorials

Accessing Hadoop 3 Service

VPN

VPN

The VPN software allows you not only to connect to CESGA in a secure way but also to access internal resources that you can not reach otherwise.

Installing the Forticlient VPN software

Forticlient Configuration

  • Enter the following configuration options
  • Gateway: gateway.cesga.es
    Port: 443
    Username: your email address registered at CESGA
    Password: your password in the supercomputers

VPN Installation in old Linux versions

  • If your Linux distribution is not supported in the official download page use our CESGA Local Repository
  • Follow the next steps:
  • unrar e vpn-fortissl.rar
    tar xvzf forticlientsslvpn_linux_4.4.2323.tar.gz
    cd forticlientsslvpn
    ./fortisslvpn.sh
    Accept the license agreement presented
    ../forticlientsslvpn_cli \
       --server gateway.cesga.es:443 \
       --vpnuser usuario@dominio.com
              

Alternative Open Source Linux Client OpenFortiVPN

  • For Linux there is also an alternative open-source client that you can use instead of the official fortivpn client
  • Some Linux distibutions like Ubuntu, Debian, OpenSuse or Arch Linux provide openfortivpn packages
  • Check the project github page for details openfortivpn
  • It has also a GUI that you can use: openfortigui

For more info check the VPN section of the User Guide

How to connect

How to connect: Setup

  1. Configure the Fortigate VPN
  2. Start the VPN

How to connect: SSH

Using a powerful CLI through SSH:

ssh username@hadoop3.cesga.es

How to connect: WebUI

Using a simple Web User Interface

How to connect: Remote Desktop

No VPN needed if connecting from a Remote Desktop

How to transfer data

How to transfer data efficiently

Use our dedicated Data Transfer Node: dtn.srv.cesga.es

SCP/SFTP is fine for small transfers

But for large transfers use Globus

Our endpoint is cesga#dtn

For more info check the DTN User Guide

Expected upload times

How to transfer data: non-efficient way

Direct SCP/SFTP to hadoop3.cesga.es

Useful only for internal transfers: eg. FT to BD

Not recommended for external transfers because it will be slowed down by the VPN server and the firewall

Filesystem Quotas

  • HOMEBD and HDFS have quotas
  • To check current usage you can use the command:
  • myquota
  • Verify that you have enough space before transfering files or submitting jobs

Default Filesystem Quotas

Defaults:

  • HDFS: 18TB
  • HOMEBD: 800GB

If you need additional space you can request Additional Storage

Backup policies

HDFS and HOMEBD do not have automatic backups configured

Only HOME FT has automatic daily backups

If you need to backup data in HDFS or HOMEBD contact us

Migrating Data from Hadoop 2

We recommend that you use the discp tool

hadoop distcp -i -pat -update hdfs://10.121.13.19:8020/user/uscfajlc/wcresult hdfs://nameservice1/user/uscfajlc/wcresult

Run it from hadoop3.cesga.es so it takes into account HA

Hadoop

Core Concepts

Core Components

  • HDFS: Parallel filesystem
  • YARN: Resource manager

Ecosystem

HDFS

The Hadoop Distributed Filesystem

HDFS Architecture

HDFS Replicas

Upload a file to HDFS

To upload a file from local disk to HDFS:

hdfs dfs -put file.txt file.txt

It will copy the file to /user/username/file.txt in HDFS.

List files

To list files in HDFS:

hdfs dfs -ls

Lists the files in our HOME directory of HDFS /user/username/

To list files in the root directory:

hdfs dfs -ls /

Working with directories

Create a directory:

hdfs dfs -mkdir /tmp/test

Delete a directory:

hdfs dfs -rm -r -f /tmp/test

Working with files

Read a file:

hdfs dfs -cat file.txt

Download a file from HDFS to local disk:

hdfs dfs -get fichero.txt

Web File Explorer

You can easily access the HUE File Explorer from the WebUI:

YARN

Yet Another Resource Negotiator

YARN Architecture

Launching an application

yarn jar application.jar DriverClass input output

List running jobs

yarn application -list
yarn top

See application logs

yarn logs -applicationId applicationId

Kill an application

yarn application -kill applicationId

Fair Scheduler

  • Resources will be shared with the rest of users using the YARN fair share scheduler
  • Dominant Resource Fairness: both CPU and memory considered
  • Jobs should be composed of lots of short running tasks so they share resources nicely with other jobs
  • Long running tasks that monopolize resources during large times can be preempted to allow other applications to run

Fair Scheduler Queues

  • root.users.[username]: default queue, one per user
  • interactive: Jupyter Notebooks and interactive jobs
  • urgent: limited resources that can be used for urgent jobs

Web Job Browser

You can access the HUE Job Browser from the WebUI:

Provided Tools

Spark

A fast and general engine for large-scale data processing

Speed

Easy

Generality

Language Selection

  • Scala
  • Java
  • Python
  • R

Updated to Spark 2.4

Now the main entry point is spark instead of sc and sqlContext

Spark Python

PySpark

PySpark Basics

  • Can be used together with Anaconda Python distribution
  • Over 720 packages for data preparation, data analysis, data visualization, machine learning and interactive data science

Running pyspark interactively

  • Running from the command line:
    pyspark
  • Running from the command line using ipython:
    
                  module load anaconda2
                  PYSPARK_DRIVER_PYTHON=ipython pyspark
                
  • Running inside a Jupyter notebook

Example


          from pyspark.sql import Row
          Person = Row('name', 'surname')
          data = []
          data.append(Person('Joe', 'MacMillan'))
          data.append(Person('Gordon', 'Clark'))
          data.append(Person('Cameron', 'Howe'))
          df = spark.createDataFrame(data)
          df.show()
           +-------+---------+
           |   name|  surname|
           +-------+---------+
           |    Joe|MacMillan|
           | Gordon|    Clark|
           |Cameron|     Howe|
           +-------+---------+
        

spark-submit

Submit job to queue

Spark Components

spark-submit Python


        # client mode
        spark-submit --master yarn \
          --name testWC test.py input output
        # cluster mode
        spark-submit --master yarn --deploy-mode cluster \
          --name testWC test.py input output
        

spark-submit Scala/Java


        # client mode
        spark-submit --master yarn --name testWC \
          --class es.cesga.hadoop.Test test.jar \
          input output
        # cluster mode
        spark-submit --master yarn --deploy-mode cluster \
          --name testWC \
          --class es.cesga.hadoop.Test test.jar \
          input output
        

spark-submit options


--num-executors NUM    Number of executors to launch (Default: 2)
--executor-cores NUM   Number of cores per executor. (Default: 1)
--driver-cores NUM     Number of cores for driver (cluster mode)
--executor-memory MEM  Memory per executor (Default: 1G)
--queue QUEUE_NAME     The YARN queue to submit to (Default: "default")
--proxy-user NAME      User to impersonate
        

Jupyter

Interactive Computing

Jupyter

The Jupyter Notebook is a web application that allows you to create and share documents that contain live code, equations, visualizations and explanatory text.

Launching Jupyter

  • Connect to hadoop3.cesga.es
  • Go to your working directory
  • Launch the Jupyter server
    start_jupyter
  • Point your browser to the provided URL
  • VPN must be running if not using a remote desktop

Jupyter

Using Python 3

You can also use Jupyter with Python 3:


          module load anaconda3/2018.12
          start_jupyter
        

Just load the desired python version first

Keep in mind that CDH 6.1 does not officially support Python 3 yet

JupyterLab

You can also try the new Jupyter Lab:


          module load anaconda2/2018.12
          start_jupyter-lab
        

JupyterLab

JupyterLab

Hive

SQL-like interface

Hive

Hive offers the possibility to use Hadoop through a SQL-like interface

Using Hive: HUE

You can use Hive from the WebUI through HUE:

Using Hive: Beeline

beeline
beeline> !connect 
        jdbc:hive2://c14-19.bd.cluster.cesga.es:10000/default;
        ssl=true;sslTrustStore=/opt/cesga/cdh61/hiveserver2.jks;
        trustStorePassword=notsecret

Using Hive: deprecated

The Hive CLI is not deprecated and not recommended:

hive

Field delimitter

  • Default field delimitter Ctr+A (0x01)
  • It can be changed when creating a table
    
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY ':'
              

Important Considerations

  • Do not create tables in the default database
  • Create a database with your username and then create your tables in this database
  • 
              create database if not exists uscfajlc;
              use uscfajlc;
              

Important Considerations

  • To restrict access to your database set permissions of the directory:
  • hdfs dfs -chmod go-rwx /user/hive/warehouse/uscfajlc.db
  • Use always external tables that are stored in your HDFS HOME
  • Customize the permissions of the external directory
  • Use HDFS ACLs to fine tune the permissions to further fit your needs

Impala

Low-latency SQL queries

Impala

impala-shell --ssl --impalad=c14-2

Point it to any worker node in the cluster

Impala

Hive and Impala use the same SQL syntax HiveQL

Sqoop

Transferring data between Hadoop and relational databases

Sqoop

List tables


        sqoop list-tables \
          --username ${USER} -P \
          --connect jdbc:postgresql://${SERVER}/${DB}
        

Import one table


        sqoop import \
          --username ${USER} --password ${PASSWORD} \
          --connect jdbc:postgresql://${SERVER}/${DB} \
          --table mytable \
          --target-dir /user/username/mytable \
          --num-mappers 1
        

Import into Hive


        sqoop import \
          --username ${USER} --password ${PASSWORD} \
          --connect jdbc:postgresql://${SERVER}/${DB} \
          --table mytable \
          --target-dir /user/username/mytable \
          --num-mappers 1 \
          --hive-import
        

Create only the table structure into Hive


        sqoop create-hive-table \
          --username ${USER} --password ${PASSWORD} \
          --connect jdbc:postgresql://${SERVER}/${DB} \
          --table mytable
        

Sqoop Export

Export

First create table into PostgreSQL


        sqoop export \
          --username ${USER} --password ${PASSWORD} \
          --connect jdbc:postgresql://${SERVER}/${DB} \
          --table mytable \
          --export-dir /user/username/mytable \
          --input-fields-terminated-by '\001' \
          --num-mappers 1
        

Direct mode

For MySQL and PosgreSQL for faster performance you can use direct mode (--direct option)

Modules

Additional Software Versions

Choosing software versions

module available

Anaconda

We recommend that you use the Anaconda version of Python instead of the OS one

module load anaconda2/2018.12

Even you can try Python 3 (not officially supported)

module load anaconda3/2018.12

Build tools

You can load the following build tools

  • maven
  • sbt

Technology Selection

Where to get additional information

Tutorials

User Guide

DTN User Guide

Official Documentation

Cloudera Official Documentation:

Reference Documentation for each component:

Upcoming courses

  • Spark Course: covering pyspark and sparklyr
  • Additional courses (Hive, HBase, Kafka, Flume, Sqoop): depending on your interests

Documentation Pack

We have prepared a documentation pack including today's slides as well as all related material:

Q&A

Big Data is a multi-disciplinary domain.

Collaboration is a key factor in Big Data projects.

Tell us your project and we will help you:

Extra Material

MapReduce

MapReduce

MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster.

MapReduce

Launching a job

To launch a job:

yarn jar job.jar DriverClass input output

List MR jobs

To list running MR jobs:

mapred job -list

Cancelling a job

To cancel a job:

mapred job -kill [jobid]

Monitoring

You can easily monitor your jobs using the YARN UI from the WebUI:

Job History

You can see finished jobs using the MR2 UI from the WebUI:

MapReduce for Java developers

Development Environment Setup

  • For a sample Maven-based project use:
    git clone https://github.com/bigdatacesga/mr-wordcount
  • Import the project in Eclipse using m2e or in Intellij
  • If using an IDE like Eclipse or Intellij it can be useful to:
    
                  # Download sources and javadoc
                  mvn dependency:sources
                  mvn dependency:resolve -Dclassifier=javadoc
                  # Update the existing Eclipse project
                  mvn eclipse:eclipse
                  # Or if you using Intellij IDEA
                  mvn idea:idea
                

Maven Basic Usage

Compile:

mvn compile

Run the tests

mvn test

Package your app

mvn package

Manual Process

If you prefer to compile and package manually:


            javac -classpath $(hadoop classpath) *.java
            jar cvf wordcount.jar *.class
          

MapReduce Program

Basic components of a program:

  • Driver: management code for the job or sequence of jobs
  • map function of the Mapper
  • reduce function of the Reducer

Driver Code


public class Driver {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(Driver.class);
    job.setJobName("Word Count");
    job.setMapperClass(WordMapper.class);
    job.setCombinerClass(SumReducer.class);
    job.setReducerClass(SumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));    
    boolean success = job.waitForCompletion(true);
    System.exit(success ? 0 : 1);
  }
}
        

Map Code


public class WordMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();
	@Override
	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		for (String field : line.split("\\W+")) {
			if (field.length() > 0) {
				word.set(field);
				context.write(word, one);
			}
		}
	}
}
        

Reduce Code


public class SumReducer
  extends Reducer<Text, IntWritable, Text, IntWritable> {
	@Override
	public void reduce(
            Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		int wordCount = 0;

		for (IntWritable value : values) {
			wordCount += value.get();
		}
		context.write(key, new IntWritable(wordCount));
	}
}
        

MapReduce Streaming API

Quick how-to

  • We write two separated scripts: Mapper y Reducer
  • The Mapper script will receive as stdin the file line by line
  • The stdout of the Mapper and Reducer must be key-value pairs separated by a tab

Example


yarn jar \
    /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input input -output output \
    -mapper mapper.pl -reducer reducer.pl \
    -file mapper.pl -file reducer.pl
        

Optimizations

Improving the performance

BLAS

Low-level routines for performing common linear algebra operations

NumPy

Adds support to Python for fast operations with multi-dimensional arrays and matrices

Already configured to use Intel MKL

Storage Formats

Success Stories

Gaia (UDC)

FilmYou (CITIC)