NodeRED BlockChain

Build your own block chain in 15 minutes on Node-RED using Node.js, JavaScript, Cloudant/CouchDB on a free IBM Cloud account… Note: To do the tutorial you need a free Bluemix (IBM PaaS Cloud) account. You can obtain one here and the raw file (JSON) for this NodeRED flow is here. Tutorial Objective In this exercise […]

via Node-RED Blockchain — romeokienzler

R on the Raspberry Pi

R on the Raspberry Pi

I’m interested in running R on the Raspberry Pi, and on Raspbian in particular. There are loads of Debian packages for R, and I’m hoping that many of these find there way into Raspbian eventually. Right now it is possible to install and run R from Raspbian, but relatively few packages are available. However, the package r-base can be installed, and that is enough to get up and running with a basic R installation. So,

% sudo apt-get install r-base
% R

should be enough to get started. Indeed, here’s a little Raspbian session to illustrate R running on the Pi:

pi@raspberrypi ~/src/r $ uname -a
Linux raspberrypi 3.1.9+ #168 PREEMPT Sat Jul 14 18:56:31 BST 2012 armv6l GNU/Linux
pi@raspberrypi ~/src/r $ R
R version 2.15.1 (2012-06-22) -- "Roasted Marshmallows"
Copyright (C) 2012 The R Foundation for Statistical Computing
ISBN 3-900051-07-0Platform: arm-unknown-linux-gnueabihf (32-bit)
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.  Natural language support but running in an English locale
R is a collaborative project with many contributors.
Type 'contributors()' for more information and'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
> rnorm(5)
[1] -1.8385888 -1.1114294  0.7943391  1.0070076  0.7702747>

Nice! Base graphics such as scatter plots and histograms all work fine, and can be piped to a remote X server if needed. So even without all the add-on packages it is a perfectly reasonable platform for basic data analysis. To benchmark it, I used my standard Gibbs sampling script, gibbs.R

gibbs=function(N,thin)
{
        x=0
        y=0
        cat(paste("Iter","x","y","n"))
        for (i in 1:N) {
                for (j in 1:thin) {
                        x=rgamma(1,3,y*y+4)
                        y=rnorm(1,1/(x+1),1/sqrt(2*x+2))
                }
                cat(paste(i,x,y,"n"))
        }
}
gibbs(50000,1000)

which I can run and time from the linux command line with

% time Rscript gibbs.R > /dev/null

Unfortunately, this takes over 400 minutes, which is around 3 times slower than the equivalent python benchmarking script that I have run on Raspbian. On Intel, R is around half the speed of python, so there’s a bit of a gap there, but actually python runs slower than it should on the Pi anyway. Comparing against R on Intel, on my fast i7 laptop, this R script takes around 7 minutes, and on my Atom based netbook, it takes around 57 minutes. This is consistent with my other findings – namely that the speed difference between C and higher level languages is greater on the Pi than on Intel. Nevertheless, for many basic data analysis tasks, speed isn’t that much of an issue, and it’s certainly going to be very convenient to have R on the Pi.

A Hadoop data lab project on Raspberry Pi

[ A Hadoop data lab project on Raspberry Pi ]

Carsten Mönning and Waldemar Schiller
Hadoop has developed into a key enabling technology for all kinds of Big Data analytics scenarios. Although Big Data applications have started to move beyond the classic batch-oriented Hadoop architecture towards near real-time architectures such as Spark, Storm, etc., [1] a thorough understanding of the Hadoop & MapReduce & HDFS principles and services such as Hive, HBase, etc. operating on top of the Hadoop core still remains one of the best starting points for getting into the world of Big Data. Renting a Hadoop cloud service or even getting hold of an on-premise Big Data appliance will get you Big Data processing power but no real understanding of what is going on behind the scene.
To inspire your own little Hadoop data lab project, this four part blog will provide a step-by-step guide for the installation of open source Apache Hadoop from scratch on Raspberry Pi 2 Model B over the course of the next three to four weeks. Hadoop is designed for operation on commodity hardware so it will do just fine for tutorial purposes on a Raspberry Pi. We will start with a single node Hadoop setup, will move on to the installation of Hive on top of Hadoop, followed by using the Apache Hive connector of the free SAP Lumira desktop trial edition to visually explore a Hive database. We will finish the series with the extension of the single node setup to a Hadoop cluster on multiple, networked Raspberry Pis. If things go smoothly and varying with your level of Linux expertise, you can expect your Hadoop Raspberry Pi data lab project to be up and running within approximately 4 to 5 hours.
We will use a simple, widely known processing example (word count) throughout this blog series. No prior technical knowledge of Hadoop, Hive, etc. is required. Some basic Linux/Unix command line skills will prove helpful throughout. We are assuming that you are familiar with basic Big Data notions and the Hadoop processing principle. If not so, you will find useful pointers in [3] and at: http://hadoop.apache.org/. Further useful references will be provided in due course.
Part 1 – Single node Hadoop on Raspberry Pi 2 Model B (~120 mins)

Part 2 – Hive on Hadoop (~40 mins), http://bit.ly/1Biq7Ta

Part 3 – Hive access with SAP Lumira (~30mins), http://bit.ly/1cbPz68

Part 4 – A Hadoop cluster on Raspberry Pi 2 Model B(s) (~45mins), http://bit.ly/1eO766g

 

HiveServices5.jpg

 

Part 1 – Single node Hadoop on Raspberry Pi 2 Model B (~120 mins)

 

Preliminaries

To get going with your single node Hadoop setup, you will need the following Raspberry Pi 2 Model B bits and pieces:

  • One Raspberry Pi 2 Model B, i.e. the latest Raspberry Pi model featuring a quad core CPU with 1 GB RAM.
  • 8GB microSD card with NOOBS (“New Out-Of-the-Box Software”) installer/boot loader pre-installed (https://www.raspberrypi.org/tag/noobs/).
  • Wireless LAN USB card.
  • Mini USB power supply, heat sinks and HDMI display cable.
  • Optional, but recommended: A case to hold the Raspberry circuit board.

To make life a little easier for yourself, we recommend to go for a Raspberry Pi accessory bundle which typically comes with all of these components pre-packaged and will set you back approx. € 60-70.

RaspberryPiBundle.png

We intend to install the latest stable Apache Hadoop and Hive releases available from any of the Apache Software Foundation download mirror sites, http://www.apache.org/dyn/closer.cgi/hadoop/common/, alongside the free SAP Lumira desktop trial edition, http://saplumira.com/download/, i.e.

  • Hadoop 2.6.0
  • Hive 1.1.0
  • SAP Lumira 1.23 desktop edition

The initial Raspberry setup procedure is described by, amongst others, Jonas Widriksson athttp://www.widriksson.com/raspberry-pi-hadoop-cluster/. His blog also provides some pointers in case you are not starting off with a Raspberry Pi accessory bundle but prefer obtaining the hard- and software bits and pieces individually. We will follow his approach for the basic Raspbian setup in this part, but updated to reflect Raspberry Pi 2 Model B-specific aspects and providing some more detail on various Raspberry Pi operating system configuration steps. To keep things nice and easy, we are assuming that you will be operating the environment within a dedicated local wireless network thereby avoiding any firewall and port setting (and the Hadoop node & rack network topology) discussion. The basic Hadoop installation and configuration descriptions in this part make use of [3].
The subsequent blog parts will be based on this basic setup.

 

Raspberry Pi setup

Powering on your Raspberry Pi will automatically launch the pre-installed NOOBS installer on the SD card. Select “Raspbian”, a Debian 7 Wheezy-based Linux distribution for ARM CPUs, from the installation options and wait for its subsequent Installation procedure to complete. Once the Raspbian operating system has been installed successfully, your Raspberry Pi will reboot automatically and you will be asked to provide some basic configuration settings using raspi-config. Note that since we are assuming that you are using NOOBS, you will not need to expand your SD card storage (menu Option Expand Filesystem). NOOBS will already have done so for you. By the way, if you want or need to run NOOBS again at some point, press & hold the shift key on boot and you will be presented with the NOOBS screen.

 

Basic configuration

What you might want to do though is to set a new password for the default user “pi” via configuration optionChange User Password. Similarly, set your internationalisation options, as required, via optionInternationalisation Options.

BasicConfiguration Menu.png

More interestingly in our context, go for menu item Overclock and set a CPU speed to your liking taking into account any potential implications for your power supply/consumption (“voltmodding”) and the life-time of your Raspberry hardware. If you are somewhat optimistic about these things, go for the “Pi2” setting featuring 1GHz CPU and 500 MHz RAM speeds to make the single node Raspberry Pi Hadoop experience a little more enjoyable.

AdvancedOptions_Overclocking.png

Under Advanced Options, followed by submenu item Hostname, set the hostname of your device to “node1”.  Selecting Advanced Options again, followed by Memory Split, set the GPU memory to 32 MB.

AdvancedOptions_Hostname.png

Finally, under Advanced Options, followed by SSH, enable the SSH server and reboot your Raspberry Pi by selecting<Finish> in the configuration menu. You will need the SSH server to allow for Hadoop cluster-wide operations.
Once rebooted and with your “pi” user logged in again, the basic configuration setup of your Raspberry device has been successfully completed and you are ready for the next set of preparation steps.

 

Network configuration

To make life a little easier, launch the Raspbian GUI environment by entering startx in the Raspbian command line.(Alternatively, you can use, for example, the vi editor, of course.) Use the GUI text editor, “Leafpad”, to edit the /etc/network/interfaces text file as shown to change the local ethernet settings for eth0 from DHCP to the static IP address 192.168.0.110. Also add the netmask and gateway entries shown. This is the preparation for our multi-node Hadoop cluster which is the subject of Part 4 of this blog series.

Ethernet2.png

Check whether the nameserver entry in file /etc/resolv.conf is given and looks ok. Restart your device afterwards.

 

 

Java environment

Hadoop is Java coded so requires Java 6 or later to operate. Check whether the pre-installed Java environment is in place by executing:

 

java –version
You should be prompted with a Java 1.8, i.e. Java 8, response.

 

 

Hadoop user & group accounts

Set up dedicated user and group accounts for the Hadoop environment to separate the Hadoop installation from other services. The account IDs can be chosen freely, of course. We are sticking here with the ID examples in Widriksson’s blog posting, i.e. group account ID “hadoop” and user account ID “hduser” within this and the sudo user groups.

     sudo addgroup hadoop

     sudo adduser –-ingroup hadoop hduser

     sudo adduser to group sudo
UserGroup2.png

SSH server configuration

Generate a RSA key pair to allow the “hduser” to access slave machines seamlessly with empty passphrase. The public key will be stored in a file with the default Name “id_rsa.pub” and then appended to the list of SSH authorised keys in the file “authorized_keys”. Note that this public key file will need to be shared by all Raspberry Pis in an Hadoop cluster (Part 4).

 

su hduser

mkdir ~/.ssh

ssh-keygen –t rsa –P “”

     cat ~/.ssh/id_rsa.pub > ~/.ssh/authorized_keys

SSHKeys2.png
Verify your SSH server access via: ssh localhost
This completes the Raspberry Pi preparations and you are all set for downloading and installing the Hadoop environment.

 

Hadoop installation & configuration

Similar to the Rasbian installation & configuration description above, we will talkyou through the basic Hadoop installation first, followed by the various
environment variable and configuration settings.

 

Basic setup

You need to get your hands on the latest stable Hadoop version (here: version 2.6.0) so initiate the download from any of the various Apache mirror sites (here: spacedump.net).

 

     cd ~/
     wget http://apache.mirrors.spacedump.net/hadoop/core/stable/hadoop-2.6.0.tar.gz
Once the download has been completed, unpack the archive to a sensible location, e.g., /opt represents a typical choice.
     sudo mkdir /opt

     sudo tar –xvzf hadoop-2.6.0.tar.gz -C /opt/
Following extraction, rename the newly created hadoop-2.6.0 folder into something a little more convenient such as “hadoop”.
     cd /opt

     sudo mv Hadoop-2.6.0 hadoop
Running, for example, ls –al, you will notice that your “pi” user is the owner of the “Hadoop” directory, as expected. To allow for the dedicated Hadoop user “hduser” to operate within the Hadoop environment, change the ownership of the Hadoop directory to “hduser”.
     sudo chown -R hduser:hadoop hadoop
This completes the basic Hadoop installation and we can proceed with its configuration.

 

Environment settings

Switch to the “hduser” and add the export statements listed below to the end of the shell startup file ~/.bashrc. Instead of using the standard vi editor, you could, of course, make use of the Leafpad text editor within the GUI environment again.
     su hduser

     vi ~/.bashrc
Export statements to be added to ~/.bashrc:
     export JAVA_HOME=$(readlink -f /usr/bin/java | sed “s:bin/java::”)

     export HADOOP_INSTALL=/opt/hadoop

     export PATH=$PATH:$HADOOP_INSTALL/bin:$HADOOP_INSTALL/sbin
This way both the Java and the Hadoop installation as well as the Hadoop binary paths become known to your user environment. Note that you may add the JAVA_HOME setting to the hadoop-env.sh script instead, as shown below.
Apart from these environment variables, modify the /opt/hadoop/etc/hadoop/hadoop-env.sh script as follows. If you are using an older version of Hadoop, this file can be found in: /opt/hadoop/conf/. Note that in case you decide to relocate this configuration directory, you will have to pass on the
directory location when starting any of the Hadoop daemons (see daemon table below) using the –config option.
     vi /opt/hadoop/etc/hadoop/hadoop-env.sh
Hadoop assigns 1 GB of memory to each daemon so this default value needs to be reduced via parameterHADOOP_HEAPSIZE to
allow for Raspberry Pi conditions. The JAVA_HOME setting for the location of the Java implementation may be omitted if already set in your shell environment, as shown above. Finally, set the datanode’s Java virtual machine to client mode. (Note that with the Raspberry Pi 2 Model B’s ARMv7 processor, this
ARMv6-specific setting is not strictly necessary anymore.)
     # The java implementation to use. Required, if not set in the home shell

     export JAVA_HOME=$(readlink -f /usr/bin/java | sed “s:bin/java::”)
     # The maximum amount of heap to use, in MB. Default is 1000.

     export HADOOP_HEAPSIZE=250

     # Command specific options appended to HADOOP_OPTS when specified

     export HADOOP_DATANODE_OPTS=”-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTSi -client”

 

Hadoop daemon properties

With the environment settings completed, you are ready for the more advanced Hadoop daemon configurations. Note that the configuration files are not held globally, i.e. each node in an Hadoop cluster holds its own set of configuration files which need to be kept in sync by the administrator using, for example, rsync.
Modify the following files, as shown below, to configure the Hadoop system for operation in pseudodistributed mode. You can find these files in directory /opt/hadoop/etc/hadoop. In the case of older Hadoop versions, look for the files in:/opt/hadoop/conf

core-site.xml Common configuration settings for Hadoop Core.

hdfs-site.xml
Configuration settings for HDFS daemons:
The namenode, the secondary namenode and the datanodes.
mapred-site.xml General configuration settings for MapReduce
daemons
. Since we are running MapReduce usingYARN, the MapReduce jobtracker and tasktrackers are replaced with a single resource manager running on the namenode.

 

File: core-site.XML

<configuration>

     <property>

          <name>hadoop.tmp.dir</name>

          <value>/hdfs/tmp</value>

     </property>

     <property>

          <name>fs.default.name</name>

          <value>hdfs://localhost:54310</value>

     </property>

  </configuration>
File: hdfs-site.xml

    

  <configuration>

     <property>

          <name>dfs.replication</name>

          <value>1</value>

     </property>

  </configuration>
File: mapred-site.xml.template ( “mapred-site.xml”, if dealing with older Hadoop versions)
  <configuration>

     <property>

          <name>mapred.job.tracker</name>

          <value>localhost:54311</value>

     </property>

  </configuration>

Hadoop Data File System (HDFS) creation

HDFS has been automatically installed as part of the Hadoop installation. Create a tmp folder within HDFS to store temporary test data and change the directory ownership to your Hadoop user of choice. A new HDFS installation needs to be formatted prior to use. This is achieved via -format.
     sudo mkdir -p /hdfs/tmp

     sudo chown hduser:hadoop /hdfs/tmp

     sudo chmod 750 /hdfs/tmp

     hadoop namenode -Format

Launch HDFS and YARN daemons

Hadoop comes with a set of scripts for starting and stopping the various daemons. They can be found in the /bindirectory. Since you are dealing with a single node setup, you do not need to tell Hadoop about the various machines in the cluster to execute any script on and you can simply execute the following scripts straightaway to launch the Hadoop file system (namenode, datanode and secondary namenode) and YARN resource manager daemons. If you need to stop these daemons, use the stop-dfs.sh and stop-yarn.sh script, respectively.
     /opt/hadoop/sbin/start-dfs.sh

     /opt/hadoop/sbin/start-yarn.sh
Check the resource manager web UI at http://localhost:8088 for a node overview. Similarly,http://localhost:50070 will provide you with details on your HDFS. If you find yourself in need for issue diagnostics at any point, consult the log4j.log file in the Hadoop installation directory/logs first. If preferred, you can separate the log files from the Hadoop installation directory by setting a new log directory in HADOOP_LOG_DIR and adding it to script hadoop-env.sh.

WebUI_NodeOverview2.png

With all the implementation work completed, it is time for a little Hadoop processing example.

 

An example

We will run some word count statistics on the standard Apache Hadoop license file to give your Hadoop core setup a simple test run. The word count executable represents a standard element of your Hadoop jar file. To get going, you need to upload the Apache Hadoop license file into your HDFS home directory.

 

     hadoop fs -copyFromLocal /opt/hadoop/LICENSE.txt /license.txt
Run word count against the license file and write the result into license-out.txt.

 

     hadoop jar /opt/hadoop-examples-2.6.0.jar wordcount /license.txt /license-out.txt
You can get hold of the HDFS output file via:

 

     hadoop fs -copyToLocal /license-out.txt ~/
Have a look at ~/license-out.txt/part-r-00000 with your preferred text editor to see the word count results. It should look like shown in the extract below.

WordCount_ResultExtract2.png
We will build on these results in the subsequent parts of this blog series on Hive QL and its SAP Lumira integration.

 

Links

Apache Software Foundation Hadoop Distribution – http://www.apache.org/dyn/closer.cgi/hadoop/common/

Jonas Widriksson blog – http://www.widriksson.com/raspberry-pi-hadoop-cluster/

NOOBS – https://www.raspberrypi.org/tag/noobs/

SAP Lumira desktop trial edition – http://saplumira.com/download/

A BOBI document dashboard with Raspberry Pi – http://bit.ly/1Mv2Rv5

 

References

[1] V. S. Agneeswaran, “Big Data Beyond Hadoop”, Pearson, USA, 2014

[2] K. Shvachko, H. Kuang, S. Radia and R. Chansler, “The Hadoop Distributed File System”, Proc. of MSST 2010, 05/2010

[3] T. White, “Hadoop: The Definitive Guide”, 3rd edition, O’Reilly, USA, 2012

Posted by Carsten Mönning in BI Platform

 

Part 2 – Hive on Hadoop (~40 mins)

Part 2 – Hive on Hadoop (~40 mins)

Part 1 – Single node Hadoop on Raspberry Pi 2 Model B (~120 mins), http://bit.ly/1dqm8yO

Part 2 – Hive on Hadoop (~40 mins)

Part 3 – Hive access with SAP Lumira (~30mins), http://bit.ly/1cbPz68

Part 4 – A Hadoop cluster on Raspberry Pi 2 Model B(s) (~45mins), http://bit.ly/1eO766g

Following on from the Hadoop core installation on a Raspberry Pi 2 Model B in Part 1 of this blog series, in this Part 2, we will proceed with installing Apache Hive on top of HDFS and show its basic principles with the help of last part’s word count Hadoop processing example.
Hive represents a distributed relational data warehouse featuring a SQL-like query language, HiveQL, inspired by the MySQL SQL dialect. A high-level comparison of the HiveQl and SQL is provided in [1]. For a HiveQL command reference, see: https://cwiki.apache.org/confluence/display/Hive/LanguageManual.
The Hive data sits in HDFS with HiveQL queries getting translated into MapReduce jobs by the Hadoop run-time environment. Whilst traditional relational data warehouses enforce a pre-defined meta data schema when writing data to the warehouse, Hive performs schema on read, i.e., the data is checked when a query is launched against it. Hive alongside the NoSQL data warehouse HBase represent frequently used components of the Hadoop data processing layer for external applications to push query workloads towards data in Hadoop. This is exactly what we are going to do in Part 3 of this series when connecting to the Hive environment via the SAP Lumira Apache Hive standard connector and pushing queries through this connection against the word count output file.

 

HiveServices5.jpg
First, let us get Hive up and running on top of HDFS.

 

Hive installation
The latest stable Hive release will operate alongside the latest stable Hadoop release and can be obtained from Apache Software Foundation mirror download sites. Initiate the download, for example, from spacedump.net and unpack the latest stable Hive release as follows. You may also want to rename the binary directory to something a little more convenient.
cd ~/
wget http://apache.mirrors.spacedump.net/hive/stable/apache-hive-1.1.0-bin.tar.gz
tar -xzvf apache-hive-1.1.0-bin.tar.gz
mv apache-hive-1.1.0-bin hive-1.1.0
Add the paths to the Hive installation and the binary directory, respectively, to your user environment.
cd hive-1.1.0
export HIVE_HOME={{pwd}}
export PATH=$HIVE_HOME/bin:$PATH
export HADOOP_USER_CLASSPATH_FIRST=true
Make sure your Hadoop user chosen in Part 1 (here: hduser) has ownership rights to your Hive directory.
chown -R hduser:hadoop hive
To be able to generate tables within Hive, run the Hadoop start scripts start-dfs.sh and start-yarn.sh (see also Part 1). You may also want to create the following directories and access settings.
hadoop fs -mkdir /tmp
hadoop fs -chmod g+w /tmp
hadoop fs -mkdir /user/hive/warehouse
hadoop fs -chmod g+w /user/hive/warehouse
Strictly speaking, these directory and access settings assume that you are intending to have more than one Hive user sharing the Hadoop cluster and are not required for our current single Hive user scenario.
By typing in hive, you should now be able to launch the Hive command line interface. By default, Hive issues information to standard error in both interactive and noninteractive mode. We will see this effect in action in Part 3 when connecting to Hive via SAP Lumira. The -S parameter of the hive statement will suppress any feedback statements.
Typing in hive –service help will provide you with a list of all available services [1]:

cli Command-line interface to Hive. The default service.
hiveserver Hive operating as a server for programmatic client access via, for example, JDBC and ODBC. Http, port 10000. Port configuration parameter HIVE_PORT.
hwi Hive web interface for exploring the Hive schemas. Http, port: 9999. Port configuration parameter hive.hwi.listen.port.
jar Hive equivalent to hadoop jar. Will run Java applications in both the Hadoop and Hive classpath.
metastore Central repository of Hive meta data.

If you are curious about the Hive web interface, launch hive –service hwi, enter http://localhost:9999/hwi in your browser and you will be shown something along the lines of the screenshot below.

HWI.png
If you run into any issues, check out the Hive error log at /tmp/$USER/hive.log. Similarly, the Hadoop error logs presented in Part 1 can prove useful for Hive debugging purposes.

An example (continued)

Following on from our word count example in Part 1 of this blog series, let us upload the word count output file into Hive’s local managed data store. You need to generate the Hive target table first. Launch the Hive command line interface and proceed as follows.
create table wcount_t(word string, count int) row format delimited fields terminated by ‘\t’ stored as textfile;
In other words, we just created a two-column table consisting of a string and an integer field delimited by tabs and featuring newlines for each new row. Note that HiveQL expects a command line to be finished with a semicolon.

 

The word count output file can now be loaded into this target table.
load data local inpath ‘~/license-out.txt/part-r-00000’ overwrite into table wcount_t;
Effectively, the local file part-r-00000 is stored in the Hive warehouse directory which is set touser/hive/warehouse by default. More specifically, part-r-00000 can be found in Hive Directoryuser/hive/warehouse/wcount_t and you may query the table contents.
show tables;

select * from wcount_t;
If everything went according to plan, your screen should show a result similar to the screenshot extract below.

 

ShowTables2.png
If so, it means you managed to both install Hive on top of Hadoop on Raspberry Pi 2 Model B and load the word count output file generated in Part 1 into the Hive data warehouse environment. In the process, you should have developed a basic understanding of the Hive processing environment, its SQL-like query language and its interoperability with the underlying Hadoop environment.

 

In the next part of this series, we will bring the implementation and configuration effort of Parts 1 & 2 to fruition by running SAP Lumira as a client against the Hive server and will submit queries against the word count result file in Hive using standard SQL with the Raspberry Pi doing all the MapReduce work. Lumira’s Hive connector will translate these standard SQL queries into HiveQL so that things appear pretty standard from the outside. Having worked your way through the first two parts of this blog series, however, you will be very much aware of what is actually going on behind the scene.

 

Links

Apache Software Foundation Hive Distribution – Index of /hive

Apache Hive wiki – https://cwiki.apache.org/confluence/display/Hive/GettingStarted

Apache Hive command reference – https://cwiki.apache.org/confluence/display/Hive/LanguageManual

A Hadoop data lab project Part 1 – http://bit.ly/1dqm8yO

A BOBI document dashboard with Raspberry Pi – http://bit.ly/1Mv2Rv5

Configuring Hive ports – http://docs.hortonworks.com/HDP2Alpha/index.htm#Appendix/Ports_Appendix/Hive_Ports.htm

References

[1] T. White, “Hadoop: The Definitive Guide”, 3rd edition, O’Reilly, USA, 2012

 

Raspberry Pi 2 cluster with NAT routing

[ Raspberry Pi 2 cluster with NAT routing ]

Introduction

In this post I will describe how to build a 12-node cluster of Raspberry Pi 2, how to set it up and configure it with a NAT-routing, DHCP-serving head node, and some simple ways to manage the slaves. In order to minimise typing/copy-and-pasting, and to make everything as simple and automated as possible, all of the commands, scripts and config files associated with this post are available in an associated github repo, which the reader is encouraged to browse in conjunction with reading this post. All of the scripts are very short and simple (just a few lines), and could easily be entered directly at the command line. There is no hidden magic here.

Hardware

To give an idea of what I’m talking about, a picture of my finished cluster is given below. Click on it to view additional detail.

A 12-node Pi 2 cluster
A 12-node Pi 2 cluster

In the foreground are 12 Pis in off-the-shelf cases, stacked in 3 columns of 4. They are connected to a 16-port switch in the background via the regular Pi ethernet ports. They are powered by three 4-port USB chargers (one per column), which are resting on top of the switch. The head node has a USB ethernet dongle, which is in turn connected to an upstream switch, providing internet connectivity. In the picture, the head node is also connected to a display via an HDMI cable (and to a wireless keyboard), though once up and running, the display and keyboard are optional.

For this cluster you need: 12 x Raspberry Pi 2, 12 x (stackable) Pi 2 case, 12 x uSD card (64GB, Class 10), 12 x short patch cable, 12 x short USB to uUSB cable, 3 x 4-port, high power USB charger, 1 x 16 port switch (12 port would do), 1 x USB ethernet dongle, 1 x HDMI display and cable, 1 x USB keyboard, 1 x long patch cable for internet uplink, 1 x 6-way mains power adaptor/extension.

Software

I am using vanilla Raspbian as the base OS. Using an Ubuntu laptop, I flashed the 12 uSD cards one at a time with

sudo dd bs=32M if=2015-05-05-raspbian-wheezy.img of=/dev/mmcblk0
sync

It takes between 3 and 4 minutes per card. Before assembling the cluster, I booted up each card in turn in a Pi 2 to do some very basic initial config. On first boot “raspi-config” will run. I made 4 changes to the default settings. From the main settings I selected “Expand filesystem” and set “Overclock” to “Pi2”. From the advanced settings I changed the RAM split to “16” and enabled SSH. I then finished and rebooted to re-size the card, before logging back in (pi/raspberry) and doing a “shutdown -h now” and then powering down. Again, this took between 3 and 4 minutes per card.

After assembling the cluster, put the cards in all of the Pis, but don’t power up any of the Pis. Pick a head node and attach the ethernet dongle, the internet uplink, a keyboard and display, and then boot up just this head node. Assuming that your internet uplink will respond to DHCP requests made by the dongle, the Pi should auto-detect the fact that there is internet via the dongle (eth1) and not via the internal NIC (eth0), and configure itself appropriately. Check you have connectivity by logging in and doing a ping http://www.google.com or some such. Also change the password on this node to something secure (using passwd). Make sure you have internet connectivity on the head node before proceeding (ifconfig provides useful info).

Grab my code from github and install a few required packages with

wget https://github.com/darrenjw/blog/archive/master.zip
unzip master.zip
cd blog-master/pi-cluster
sudo sh install-packages

This could take around half an hour, and the node will reboot when it is finished. Log back in and return to the script directory to continue. Configure the network with

sudo sh setup-network

This will first configure the two network interfaces correctly, then set up a DHCP server to manage the network settings of the nodes on the internal network, and finally will set up iptables correctly for NAT routing of internal traffic from the worker nodes to the internet and back, ensuring that the LAN-side nodes can all connect to the internet properly. Again, this script will reboot when it is finished (it only takes a few seconds to run).

Once the head node comes back up, log back in, check you still have internet connectivity, and then return to the script directory.

Now boot up all of the other nodes. When they boot, they will send out DHCP requests which the head node should respond to, correctly configuring all of the nodes for internet connectivity. Wait at least 2-3 minutes for all of the nodes to have a chance to boot up properly.

Now on the head node run

sh setup-cluster

Note that this does not need to run as root. This will first generate SSH keys on the head node (for passwordless SSH). Just hit return (3 times) to prompts. Once it has generated the keys, it will scan the internal network to find the other nodes. It will store a list of workers in the file "workers.txt", which will be stored in both the script directory and the home directory. It will then copy the keys to each worker node in turn. For this, you will have to accept the connection and type in the password (raspberry) for each node in turn. This should be the last time you need to enter a password for the worker nodes. Finally, it will upgrade Raspbian on the workers and the workers will all reboot when they have finished upgrading. This last stage will take a long time (up to half an hour, depending on your internet connection). Once this command has completed, wait 2-3 minutes for the workers to reboot before continuing.

Note that the script setup-cluster calls three other scripts to do its work, and these scripts can be useful on their own. The script map-network scans the network to see what nodes are available, and stores them in the file workers.txt. You should re-run this whenever nodes are added or removed from the cluster. The script copy-keys copies the SSH keys to all of the workers. You should re-run this after adding new nodes to the cluster (after running map-network). Finally, the script upgrade-workers upgrades Raspbian on all of the worker nodes and then reboots them.

Finally, there is one more script, shutdown-workers which shuts down all of the workers in anticipation of the head node being shut down and the cluster being powered down.

You may be tempted to customise the worker nodes by logging in to them individually and giving them host names, etc. It’s best to resist this temptation. The Zen of cloud computing is to treat nodes like sheep and not like ponies! Don’t name them, don’t pick favourites, don’t treat them individually, and don’t care if the odd one occasionally goes missing. You might imagine that this can make it difficult to track down hardware problems with individual nodes, but it’s usually possible to track these down quite quickly by looking at indicator lights on the nodes and the network switch when the system is under load.

That’s it – you now have your own private cloud.

Use case: a standalone Spark cluster

There are lots of things that one can do with a cluster like this, but for illustration, I will now show how to use the cluster as a standalone Apache Spark cluster. In previous posts I have described how to install Spark on a Pi 2 and how to create a small Spark cluster. It may be worth quickly reviewing those posts before proceeding. On the head node run

cd
wget http://www.eu.apache.org/dist//spark/spark-1.4.1/spark-1.4.1-bin-hadoop2.6.tgz
tar xvfz spark-1.4.1-bin-hadoop2.6.tgz
cd spark-1.4.1-bin-hadoop2.6

to get and unpack a recent version of Spark. Then configure Spark appropriately with:

cp ~/workers.txt conf/slaves
echo '#!/usr/bin/env bash' > conf/spark-env.sh
echo "" >> conf/spark-env.sh
echo "SPARK_MASTER_IP=192.168.0.1" >> conf/spark-env.sh
echo "SPARK_WORKER_MEMORY=512m" >> conf/spark-env.sh

Having configured Spark on the head node (which will also act as the Spark Master), we can copy it to the workers with

parallel-scp -h ~/workers.txt -r -p 100 -t 0 /home/pi/spark-1.4.1-bin-hadoop2.6 /home/pi/spark-1.4.1-bin-hadoop2.6

Note that if you subsequently change the config, you don’t need to re-copy the entire Spark distribution. Just re-copy the confdirectory with

parallel-scp -h ~/workers.txt -r -p 100 -t 0 /home/pi/spark-1.4.1-bin-hadoop2.6/conf /home/pi/spark-1.4.1-bin-hadoop2.6/conf

A simple Spark session can then be run with

sbin/start-all.sh
bin/spark-shell --master spark://192.168.0.1:7077

When Spark eventually starts up, you can enter the following into the Spark shell

sc.textFile("README.md").count

After exiting the Spark shell (Ctrl-D), you can shut everything down with

sbin/stop-all.sh

See my other posts for additional pointers and further reading.

Some useful links/references

The main relevant link for this post is that of the associated code on GitHub:

https://github.com/darrenjw/blog/tree/master/pi-cluster

If you are a Git person you might want to clone or fork this repo. The main other post I found useful for setting up the Pi as a NAT router was:

http://qcktech.blogspot.co.uk/2012/08/raspberry-pi-as-router.html

There was also an article on turning your Pi into a Wifi access point in Issue 11 of MagPi that I found somewhat useful.

PUBLISHED BY

darrenjw

I am Professor of Stochastic Modelling within the School of Mathematics & Statistics at Newcastle University, UK. I am also a computational systems biologist

Setting up a standalone Apache Spark cluster of Raspberry Pi 2

[ Setting up a standalone Apache Spark cluster of Raspberry Pi 2 ]

In the previous post I explained how to install Apache Spark in “local” mode on a Raspberry Pi 2. In this post I will explain how to link together a collection of such nodes into a standalone Apache Spark cluster. Here, “standalone” refers to the fact that Spark is managing the cluster itself, and that it is not running on top of Hadoop or some other cluster management solution.

I will assume at least two Raspberry Pi 2 nodes on the same local network, with identical Spark distributions installed in the same directory of the same user account on each node. See the previous post for instructions on how to do this. I will use two such nodes, with Spark installed under the user account spark.

First, you must decide on one of your nodes to be the master. I have two nodes, raspi08 and raspi09. I will set up raspi08 as the master. The spark account on the master needs to be able to SSH into the same account on all of the other nodes without the need to provide a password, so it makes sense to begin by setting up passwordless SSH. Log in to the spark account on the master and generate SSH keys with:

ssh-keygen

Just press return when asked for a password to keep it password free. Copy the identity to each other node. eg. to copy it to raspi09 I use:

ssh-copy-id spark@raspi09

You will obviously need to provide a password at this point. Once the identity is copied, SSH into each node to ensure that you can indeed connect without the need for a password.

Once passwordless SSH is set up and working, log into the master node to configure the Spark installation. Within the conf/directory of the Spark installation, create a file called slaves and enter a list of all nodes you want to have as “workers”. eg. mine looks like:

raspi08
raspi09

Note that in my case raspi08 is listed as a worker despite also acting as master. That is perfectly possible. If you have plenty of nodes you might not want to do this, as the Pi 2 doesn’t really have quite enough RAM to be able to do this well, but since I have only two nodes, it seems like a good idea here.

Also within the conf/ directory, take a copy of the environment template:

cp spark-env.sh.template spark-env.sh

and then edit spark-env.sh according to your needs. I solved some obtuse Akka errors about workers not being able to connect back to the master by hardcoding the IP address of the Master node into the config file:

SPARK_MASTER_IP=192.168.1.177

You can find out the IP address of your node by running ifconfig. You should also set the memory that can be used by each worker node. This is a bit tricky, as RAM is a bit tight on the Pi 2. I went for 512MB, leaving nearly half a gig for the OS.

SPARK_WORKER_MEMORY=512m

Once you are done, the environment template (but not the slaves list) needs to be copied to each worker. eg.

scp spark-env.sh @raspi09:spark-1.3.0-bin-hadoop2.4/conf/

You shouldn’t need to supply a password…

At this point, you should be ready to bring up the cluster. You can bring up the master and workers all in one go with:

sbin/start-all.sh

When the master comes up, it starts a web service on port 8080. eg. I connect to it from any machine on the local network by pointing my browser at: http://raspi08:8080/

If a web page comes up, then the master is running. The page should give other diagnostic information, including a list of workers that have been brought up and are registered with the master. You can also access a lot of debugging info. If everything seems to look OK, make a note of the URL for the Spark master, which is displayed in large text at the top of the page. It should just be spark://192.168.1.177:7077 where the IP address is the IP address of your master node.

Try bringing up a spark shell on the master with:

bin/spark-shell --master spark://192.168.1.177:7077

Once the shell comes up, go back to your web browser and refresh the page to see the connection. Go back to the shell and try a simple test like:

sc.textFile("README.md").count

As usual, it is Ctrl-D to exit the shell. To bring down the cluster, use

sbin/stop-all.sh

Once you are happy that everything is working as it should, you probably want to reduce the amount of diagnostic debugging info that is echoed to the console. Do this by going back into the conf/directory and copying the log4j template:

cp log4j.properties.template log4j.properties

and then editing log4j.properties. There is a line near the beginning of the file:

log4j.rootCategory=INFO, console

Change INFO to WARN so it reads:

log4j.rootCategory=WARN, console

Then when you next bring up the cluster, everything should be a bit less noisy.

That’s it. You’ve built a Spark cluster! Note that when accessing files from Spark scripts (and applications) it is assumed that the file exists in the same directory on every worker node. For testing purposes, it is easy enough to use scp before running the script to copy the files to all of the workers. But that is obviously somewhat unsatisfactory in the long term. Another possibility is to set up an NFS file server and mount it at the same mount point on each worker. Then make sure that any files you access are shared via the NFS file server. Even that solution isn’t totally satisfactory, due to the slow interconnect on the Pi 2. Ultimately, it would be better to set up a proper distributed file system such as Hadoop’s HDFS on your cluster and then share files via HDFS. That is how most production Spark clusters are set up. I may look at that in another post, but in the meantime check out the Spark standalone documentation for further information.

 

PUBLISHED BY

Darrenjw( blog )

I am Professor of Stochastic Modelling within the School of Mathematics & Statistics at Newcastle University, UK. I am also a computational systems biologist.

 

Installing Apache Spark on a Raspberry Pi 2

[ Installing Apache Spark on a Raspberry Pi 2 ]

In this post I will walk through how to install and run Apache Spark on a Raspberry Pi 2. In the next post I will show how to make a Spark cluster from multiple (two!) Raspberry Pi 2. Very little of this is actually specific to the Raspberry Pi – most of this post will apply to any Linux system with a recent Java JVM (7 or 8) and Python 2.x installed. Also note that “installation” of Spark is really just unpacking tarball – it doesn’t need a “system-wide” installation or root access, so it’s not a big deal – you can just trash it later if you decide you don’t want it.

Apache Spark is a Scala library for the analysis of “big data”. Spark clusters are used for a huge range of tasks in big data analytics. It is something that every (aspiring) data scientist needs to know about. Spark runs fine on a (cluster of) Raspberry Pi 2. It does not run well on any Pi prior to the Raspberry Pi 2. Spark really needs at least a gig of RAM and multiple cores to be useful. Even a cluster of Raspberry Pi 2 will struggle for “real” big data applications due to it’s limited RAM per node and slow networking. That said, the Pi 2 provides a simple and cheap way of learning about how this powerful technology works.

Raspbian provides an ideal base on which to install Spark – it comes with Oracle’s ARM-optimised JDK8 and Python 2.7.x pre-installed on the default image. So, starting from a Raspbian install, go straight to the Spark downloads page and download a pre-built package. It doesn’t really matter exactly which version, but it should be a 1.3.x release, and probably pre-built for a recent Hadoop. eg. spark-1.3.0-bin-hadoop2.4.tgz, which is the version I used.

It is possible to install and run Spark from any user account, including the default “pi” account. This will be fine if you only have one Pi 2 and you are just going to run Spark on a single node in “local” mode. However, if you have multiple Pi 2, and you want to build any kind of Spark cluster, then it is really much better to create a dedicated user account (with the same name) on each Pi. With that in mind, I recommend creating a new user account. eg.

sudo adduser spark

Once created, log out and back in to the new user account, and then unpack Spark into the new home directory:

tar xvfz ~pi/spark-1.3.0-bin-hadoop2.4.tgz
cd spark-1.3.0-bin-hadoop2.4

Obviously, update the path to wherever you downloaded the tarball. Basically, that’s it! Test the installation with:

bin/run-example SparkPi 10

This will churn away for several seconds dumping tons of junk to the console, but buried among it should be a (poor) approximation to \pi. Assuming that works, next test that the Spark shell works:

bin/spark-shell --master local[4]

This will start the shell in local mode with 4 cores. Try entering:

sc.textFile("README.md").count

at the prompt to do a simple file line count. Note that Spark starts a diagnostic web server on port 4040, so while the shell is running, point a web browser at it. eg. I connect tohttp://raspi08.home:4040/ from any machine on my home network. This is the Spark UI, which is very useful for debugging purposes.

When you are done, Ctrl-D to exit the shell. The dumping of logging information to the console is pretty annoying. It’s possible to turn it off. I’ll explain how to do it in the next post – I recommend putting up with it until your cluster is set up and working properly, as it does contain a lot of very useful debugging info.

Next, try the pySpark shell:

bin/pyspark --master local[4]

The python test code is a bit more verbose than the Scala version😉

sc.textFile("README.md").count()

Again, Ctrl-D to exit.

So, that is Spark installed and working in “local” mode on a single node. In the next post I’ll show how to create a simple Spark “standalone” cluster using two Raspbery Pi 2. For this, I will start by assuming that Spark has been installed under the same user account in the same directory on every Pi 2. For further information about Spark, start with the official Spark documentation overview.

Useful links

Below are some links I found useful in preparing this post (and the next one):

https://spark.apache.org/docs/latest/spark-standalone.html

https://docs.sigmoidanalytics.com/index.php/Installing_Spark_and_Setting_Up_Your_Cluster

http://mbonaci.github.io/mbo-spark/

http://jugnu-life.blogspot.com/2013/08/spark-standalone-mode-installation-steps.html

http://ju-jutsu.com/wp/?p=550

 

PUBLISHED BY

Darrenjw( blog )

I am Professor of Stochastic Modelling within the School of Mathematics & Statistics at Newcastle University, UK. I am also a computational systems biologist.

Spark Machine Learning Library (MLlib)

[ Spark Machine Learning Library (MLlib) ]

spark-logo-hd             Screenshot 2016-03-24 22.30.31

  • Dependencies
  • Binary Classification
  • Linear Regression
  • Clustering
  • Collaborative Filtering
    • Explicit vs Implicit Feedback
  • Gradient Descent Primitive
  • Using MLLib in Scala
    • Binary Classification
    • Linear Regression
    • Clustering
    • Collaborative Filtering
  • Using MLLib in Java
  • Using MLLib in Python
    • Binary Classification
    • Linear Regression
    • Clustering
    • Collaborative Filtering

 

MLlib is a Spark implementation of some common machine learning (ML) functionality, as well associated tests and data generators. MLlib currently supports four common types of machine learning problem settings, namely, binary classification, regression, clustering and collaborative filtering, as well as an underlying gradient descent optimization primitive. This guide will outline the functionality supported in MLlib and also provides an example of invoking MLlib.

Dependencies

MLlib uses the jblas linear algebra library, which itself depends on native Fortran routines. You may need to install the gfortran runtime library if it is not already present on your nodes. MLlib will throw a linking error if it cannot detect these libraries automatically.

To use MLlib in Python, you will need NumPy version 1.7 or newer and Python 2.7.

Binary Classification

Binary classification is a supervised learning problem in which we want to classify entities into one of two distinct categories or labels, e.g., predicting whether or not emails are spam. This problem involves executing a learning Algorithm on a set of labeled examples, i.e., a set of entities represented via (numerical) features along with underlying category labels. The algorithm returns a trained Model that can predict the label for new entities for which the underlying label is unknown.

MLlib currently supports two standard model families for binary classification, namely Linear Support Vector Machines (SVMs) and Logistic Regression, along with L1 and L2 regularized variants of each model family. The training algorithms all leverage an underlying gradient descent primitive (described below), and take as input a regularization parameter (regParam) along with various parameters associated with gradient descent (stepSize, numIterations, miniBatchFraction).

Available algorithms for binary classification:

Linear Regression

Linear regression is another classical supervised learning setting. In this problem, each entity is associated with a real-valued label (as opposed to a binary label as in binary classification), and we want to predict labels as closely as possible given numerical features representing entities. MLlib supports linear regression as well as L1 (lasso) and L2 (ridge) regularized variants. The regression algorithms in MLlib also leverage the underlying gradient descent primitive (described below), and have the same parameters as the binary classification algorithms described above.

Available algorithms for linear regression:

Clustering

Clustering is an unsupervised learning problem whereby we aim to group subsets of entities with one another based on some notion of similarity. Clustering is often used for exploratory analysis and/or as a component of a hierarchical supervised learning pipeline (in which distinct classifiers or regression models are trained for each cluster). MLlib supports k-means clustering, one of the most commonly used clustering algorithms that clusters the data points into predfined number of clusters. The MLlib implementation includes a parallelized variant of the k-means++ method called kmeans||. The implementation in MLlib has the following parameters:

  • k is the number of desired clusters.
  • maxIterations is the maximum number of iterations to run.
  • initializationMode specifies either random initialization or initialization via k-means||.
  • runs is the number of times to run the k-means algorithm (k-means is not guaranteed to find a globally optimal solution, and when run multiple times on a given dataset, the algorithm returns the best clustering result).
  • initializiationSteps determines the number of steps in the k-means|| algorithm.
  • epsilon determines the distance threshold within which we consider k-means to have converged.

Available algorithms for clustering:

Collaborative Filtering

Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix. MLlib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. In particular, we implement the alternating least squares (ALS) algorithm to learn these latent factors. The implementation in MLlib has the following parameters:

  • numBlocks is the number of blacks used to parallelize computation (set to -1 to auto-configure).
  • rank is the number of latent factors in our model.
  • iterations is the number of iterations to run.
  • lambda specifies the regularization parameter in ALS.
  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data
  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations

Explicit vs Implicit Feedback

The standard approach to matrix factorization based collaborative filtering treats the entries in the user-item matrix as explicit preferences given by the user to the item.

It is common in many real-world use cases to only have access to implicit feedback (e.g. views, clicks, purchases, likes, shares etc.). The approach used in MLlib to deal with such data is taken from Collaborative Filtering for Implicit Feedback Datasets. Essentially instead of trying to model the matrix of ratings directly, this approach treats the data as a combination of binary preferences and confidence values. The ratings are then related to the level of confidence in observed user preferences, rather than explicit ratings given to items. The model then tries to find latent factors that can be used to predict the expected preference of a user for an item.

Available algorithms for collaborative filtering:

Gradient Descent Primitive

Gradient descent (along with stochastic variants thereof) are first-order optimization methods that are well-suited for large-scale and distributed computation. Gradient descent methods aim to find a local minimum of a function by iteratively taking steps in the direction of the negative gradient of the function at the current point, i.e., the current parameter value. Gradient descent is included as a low-level primitive in MLlib, upon which various ML algorithms are developed, and has the following parameters:

  • gradient is a class that computes the stochastic gradient of the function being optimized, i.e., with respect to a single training example, at the current parameter value. MLlib includes gradient classes for common loss functions, e.g., hinge, logistic, least-squares. The gradient class takes as input a training example, its label, and the current parameter value.
  • updater is a class that updates weights in each iteration of gradient descent. MLlib includes updaters for cases without regularization, as well as L1 and L2 regularizers.
  • stepSize is a scalar value denoting the initial step size for gradient descent. All updaters in MLlib use a step size at the t-th step equal to stepSize / sqrt(t).
  • numIterations is the number of iterations to run.
  • regParam is the regularization parameter when using L1 or L2 regularization.
  • miniBatchFraction is the fraction of the data used to compute the gradient at each iteration.

Available algorithms for gradient descent:

Using MLLib in Scala

Following code snippets can be executed in spark-shell.

Binary Classification

The following code snippet illustrates how to load a sample dataset, execute a training algorithm on this training data using a static method in the algorithm object, and make predictions with the resulting model to compute the training error.

import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint

// Load and parse the data file
val data = sc.textFile("mllib/data/sample_svm_data.txt")
val parsedData = data.map { line =>
  val parts = line.split(' ')
  LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
}

// Run training algorithm to build the model
val numIterations = 20
val model = SVMWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val labelAndPreds = parsedData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count
println("Training Error = " + trainErr)

The SVMWithSGD.train() method by default performs L2 regularization with the regularization parameter set to 1.0. If we want to configure this algorithm, we can customize SVMWithSGD further by creating a new object directly and calling setter methods. All other MLlib algorithms support customization in this way as well. For example, the following code produces an L1 regularized variant of SVMs with regularization parameter set to 0.1, and runs the training algorithm for 200 iterations.

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater)
val modelL1 = svmAlg.run(parsedData)

Linear Regression

The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We compute the Mean Squared Error at the end to evaluategoodness of fit

import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint

// Load and parse the data
val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
}

// Building the model
val numIterations = 20
val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count
println("training Mean Squared Error = " + MSE)

Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training Mean Squared Errors.

Clustering

In the following example after loading and parsing data, we use the KMeans object to cluster the data into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasingk. In fact the optimal k is usually one where there is an “elbow” in the WSSSE graph.

import org.apache.spark.mllib.clustering.KMeans

// Load and parse the data
val data = sc.textFile("kmeans_data.txt")
val parsedData = data.map( _.split(' ').map(_.toDouble))

// Cluster the data into two classes using KMeans
val numIterations = 20
val numClusters = 2
val clusters = KMeans.train(parsedData, numClusters, numIterations)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)

Collaborative Filtering

In the following example we load rating data. Each row consists of a user, a product and a rating. We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation model by measuring the Mean Squared Error of rating prediction.

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("mllib/data/als/test.data")
val ratings = data.map(_.split(',') match {
    case Array(user, item, rate) =>  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val numIterations = 20
val model = ALS.train(ratings, 1, 20, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map{ case Rating(user, product, rate)  => (user, product)}
val predictions = model.predict(usersProducts).map{
    case Rating(user, product, rate) => ((user, product), rate)
}
val ratesAndPreds = ratings.map{
    case Rating(user, product, rate) => ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map{
    case ((user, product), (r1, r2)) =>  math.pow((r1- r2), 2)
}.reduce(_ + _)/ratesAndPreds.count
println("Mean Squared Error = " + MSE)

If the rating matrix is derived from other source of information (i.e., it is inferred from other signals), you can use the trainImplicit method to get better results.

val model = ALS.trainImplicit(ratings, 1, 20, 0.01)

Using MLLib in Java

All of MLlib’s methods use Java-friendly types, so you can import and call them there the same way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the Spark Java API uses a separate JavaRDD class. You can convert a Java RDD to a Scala one by calling.rdd() on your JavaRDD object.

Using MLLib in Python

Following examples can be tested in the PySpark shell.

Binary Classification

The following example shows how to load a sample dataset, build Logistic Regression model, and make predictions with the resulting model to compute the training error.

from pyspark.mllib.classification import LogisticRegressionWithSGD
from numpy import array

# Load and parse the data
data = sc.textFile("mllib/data/sample_svm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
model = LogisticRegressionWithSGD.train(parsedData)

# Build the model
labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),
        model.predict(point.take(range(1, point.size)))))

# Evaluating the model on training data
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

Linear Regression

The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We compute the Mean Squared Error at the end to evaluategoodness of fit

from pyspark.mllib.regression import LinearRegressionWithSGD
from numpy import array

# Load and parse the data
data = sc.textFile("mllib/data/ridge-data/lpsa.data")
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))

# Build the model
model = LinearRegressionWithSGD.train(parsedData)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda point: (point.item(0),
        model.predict(point.take(range(1, point.size)))))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

Clustering

In the following example after loading and parsing data, we use the KMeans object to cluster the data into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasingk. In fact the optimal k is usually one where there is an “elbow” in the WSSSE graph.

from pyspark.mllib.clustering import KMeans
from numpy import array
from math import sqrt

# Load and parse the data
data = sc.textFile("kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
        runs=30, initialization_mode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training Mean Squared Errors.

Collaborative Filtering

In the following example we load rating data. Each row consists of a user, a product and a rating. We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation by measuring the Mean Squared Error of rating prediction.

from pyspark.mllib.recommendation import ALS
from numpy import array

# Load and parse the data
data = sc.textFile("mllib/data/als/test.data")
ratings = data.map(lambda line: array([float(x) for x in line.split(',')]))

# Build the recommendation model using Alternating Least Squares
model = ALS.train(ratings, 1, 20)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (int(p[0]), int(p[1])))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y)/ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

If the rating matrix is derived from other source of information (i.e., it is inferred from other signals), you can use the trainImplicit method to get better results.

# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, 1, 20)