Rustler User Guide

This document is in progress. Please send any suggestions and comments to data@tacc.utexas.edu.

TACC's new Hadoop cluster, Rustler, is specifically architected to facilitate data-driven research. Using the Hadoop framework and YARN (MapReduce 2.0), Rustler is optimal for analyzing and storing huge amounts of data, structured or unstructured, in minimal time. Rustler differs from traditional high performance computing resources where storage and computations are located in physically separated racks. Instead, Rustler offers 1PB of distributed storage across its 64 data nodes, supporting the Big Data Analytics paradigms such as MapReduce, and applications, such as Hadoop and Spark. Rustler offers support for a variety of Big Data analysis applications and platforms including Hadoop, Spark, and R as well as programming support in such languages as C/C++, Java, Scala and Python.

Rustler is ideal for researchers whose work requires repeated access, transformation and analysis of multi-terabytes of data. Please email data@tacc.utexas.edu for more information.

System Architecture

Rustler is a 66 node computing cluster for data intensive computing. Each node has dual eight core Ivy Bridge CPUs (Intel(R) Xeon(R) CPU E5-2650) running at 2.60GHz, with 128 GB DDR3 memory and 16 1TB SATA drives providing the backing for the HDFS file system. All nodes are connected via a 10 Gbps network. Among the 66 nodes available on rustler, 64 nodes are configured as data nodes, 1 node is configured as name node and 1 node is configured as login node. The login node has 13TB storage for user to staging the data and is also connected to the TACC wide shared work file system, allowing for easy transfer of results to a POSIX file system available to all of the other clusters at TACC.

File Systems

  • $HOME
  • $WORK/Stockyard
  • HDFS

System Access

The Rustler login node is used both for submitting jobs and for ingesting and retrieving data to and from the HDFS file system. Rustler is now in Friendly User Mode. Please email data@tacc.utexas.edu to request access to Rustler.

SSH access

To create a login session from a local machine it is necessary to have an SSH client. Wikipedia is a good source of information on SSH, and provides information on the various clients available for your particular operating system. To ensure a secure login session, users must connect to machines using the secure shell ssh command. Data movement must be done using the secure shell commands scp and sftp.

Do not run the optional ssh-keygen command to set up Public-key authentication. This command sets up a passphrase that will interfere with the execution of job scripts in the batch system. If you have already done this, remove the .ssh directory (and the files under it) from your home directory. Log out and log back in to regenerate the keys.

To initiate an ssh connection to a Rustler login node from your local system, use one of the SSH clients supporting the SSH-2 protocol e.g., like OpenSSH, Putty, SecureCRT. Then execute the following command:

localhost$ ssh userid@login.rustler.tacc.utexas.edu

Note: the TACC userid is needed if the user name on the local machine and the TACC machine differ.

Login passwords (which are identical to TACC portal passwords, not XUP passwords) can be changed in the TACC User Portal. Select "Change Password" under the "HOME" tab after you login. If you've forgotten your password, go to the TACC User Portal and select the "? Forgot Password" button.

Hadoop Installation on Rustler

The Cloudera's open source distribution of Apache Hadoop (Hadoop 2.3.0-cdh5.1.0) has been installed on Rustler. The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.

The project includes these modules:

  • Hadoop Common: The common utilities that support the other Hadoop modules.
  • Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data.
  • Hadoop YARN: A framework for job scheduling and cluster resource management.
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.

For backward compatibility, CDH 5 continues to support the original MapReduce framework (i.e. the JobTracker and TaskTrackers), but you should begin migrating to MRv2.

All Java libraries related with CDH 5 is available at /usr/lib/hadoop-mapreduce/. User should add /usr/lib/hadoop-mapreduce/ to their Java classpath variable and make sure the required java library files can be found when compiling their own developed code.

Running Hadoop MapReduce Jobs

The user can invoke Hadoop commands from login node directly.

Usage: hadoop [--config confdir] [COMMAND] [GENERIC_OPTIONS] [COMMAND_OPTIONS]
  • To run a user developed java application:
    login1$ hadoop jar MyJarFile.jar MainClass args
  • In addition to running a jar file directly with a hadoop command, a user can also submit a job specified by a job file with the "mapred" command:
    Usage: mapred [--config confdir] COMMAND
  • To submit a job specified by MyMR.job, a user can run following:

    login1$ mapred job -submit MyMR.job
  • for example, to list current running job in hadoop cluster

    login1$ mapred job -list

The mapred job command also enables user to interact with an active job by job-id

  • To check the status of the job

    login1$ mapred job -status job-id
  • To kill an active MapReduce job:

    login1$ mapred job -kill job-id

The default working director under hdfs for each user also is also the user's home directory under the hdfs at /user/username. This directory should have been created for each user. Please contact us, if the directory is not available. A user can interact with HDFS using hdfs command:

Usage: hdfs [--config confdir] [COMMAND] [GENERIC_OPTIONS] [COMMAND_OPTIONS]

Here are a few common actions:

  • list the content under the current working directory

    login1$ hdfs dfs -ls
  • copy a file, foo.test, from local file system to HDFS and named as foo_in_hdfs.test

    login1$ hdfs dfs --copyFromLocal foo.test foo_in_hdfs.test
  • copy a file, foo_in_hdfs.test, from hdfs to local file system as foo_from_hdfs.test

    login1$ hdfs dfs --copyToLocal foo_in_hdfs.test foo_from_hdfs.test
  • Remove a file named foo.test in the current working directory of hdfs

    login1$ hdfs dfs -rm foo.test

Please see the Hadoop Commands Manual for more information.

Monitoring Hadoop Jobs

A web interface is available for checking cluster status, please visit https://rustler.tacc.utexas.edu/about/ and login with your TACC user id.

Application Support

Additional analysis software and libraries can be installed upon request. Currently the following software/libraries are available on Rustler

  • Hadoop Ecosystem
  • Mahout
  • Python

Hadoop Streaming

Although the Hadoop framework is implemented in Java, MapReduce applications need not be written in Java. Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows users to create and run jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer. See details in http://hadoop.apache.org/docs/r1.2.1/streaming.html.

RHadoop

For Hadoop users, writing MapReduce programs in R may be considered easier, more productive and more elegant with much less code than in java and easier deployment. It is great to prototype and do research. For R users, it opens the doors of MapReduce programming and access to big data analysis.

RHadoop is a bridge between R, a language and environment to statistically explore data sets, and Hadoop, a framework that allows for the distributed processing of large data sets across clusters of computers.

Set the complete path to the "hadoop" executable

login1$ export HADOOP_CMD=/usr/bin/hadoop

Set the complete path to the hadoop streaming jar file

login1$ export HADOOP_STREAMING=/usr/lib/hadoop-mapreduce/hadoop-streaming-2.3.0-cdh5.1.0.jar

RHadoop example

Run RHadoop example code, which randomly generates 500 number from binomial distribution with size=100 and prob=0.5 and uses mapreduce procedure to sum up the frequency for each number.

login1$ Rscript /share/doc/example_RHadoop.R

Expected output should be similar to the following:

15/02/15 13:57:01 INFO mapreduce.Job: map 0% reduce 0%
15/02/15 13:57:09 INFO mapreduce.Job: map 100% reduce 0%
15/02/15 13:57:16 INFO mapreduce.Job: map 100% reduce 100%
15/02/15 13:57:17 INFO mapreduce.Job: Job job_1422482982071_0276 completed successfully
15/02/15 13:57:17 INFO mapreduce.Job: Counters: 50
	File System Counters
		FILE: Number of bytes read=8175
		FILE: Number of bytes written=305917
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1755
		HDFS: Number of bytes written=4890
		HDFS: Number of read operations=13
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=2
		Launched reduce tasks=1
		Rack-local map tasks=2
		Total time spent by all maps in occupied slots (ms)=24810
		Total time spent by all reduces in occupied slots (ms)=10158
		Total time spent by all map tasks (ms)=12405
		Total time spent by all reduce tasks (ms)=5079
		Total vcore-seconds taken by all map tasks=12405
		Total vcore-seconds taken by all reduce tasks=5079
		Total megabyte-seconds taken by all map tasks=100430880
		Total megabyte-seconds taken by all reduce tasks=60948000
	Map-Reduce Framework
		Map input records=3
		Map output records=57
		Map output bytes=8036
		Map output materialized bytes=8181
		Input split bytes=226
		Combine input records=0
		Combine output records=0
		Reduce input groups=28
		Reduce shuffle bytes=8181
		Reduce input records=57
		Reduce output records=58
		Spilled Records=114
		Shuffled Maps =2
		Failed Shuffles=0
		Merged Map outputs=2
		GC time elapsed (ms)=641
		CPU time spent (ms)=6930
		Physical memory (bytes) snapshot=4078620672
		Virtual memory (bytes) snapshot=31807492096
		Total committed heap usage (bytes)=6315245568
		Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1529
	File Output Format Counters 
	Bytes Written=4890
	

SparkR

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. SparkR exposes the Spark API through the RDD (Resilient Distributed Datasets) class and allows users to interactively run jobs from the R shell on a cluster.

Run SparkR example code, pi.R, which uses Monte Carlo method to estimate the value of Pi (http://en.wikipedia.org/wiki/Monte_Carlo_method). In the pi.R example code, 100,000 random points placing on a square are splited into 10 slices of RDD.

login1$ YARN_CONF_DIR=/etc/hadoop/conf MASTER=yarn-client \
	SparkR-pkg-master/sparkR SparkR-pkg-master/examples/pi.R yarn-client 10

Expected output should be similar to the following

Loading required package: methods
[SparkR] Initializing with classpath /home/rhuang/SparkR-pkg-master/lib/SparkR/sparkr-assembly-0.1.jar
...
15/02/15 13:35:06 INFO YarnClientImpl: Submitted application application_1422482982071_0275
15/02/15 13:35:20 INFO RackResolver: Resolved c252-113.rustler.tacc.utexas.edu to /default-rack
15/02/15 13:35:20 INFO RackResolver: Resolved c253-102.rustler.tacc.utexas.edu to /default-rack
Pi is roughly 3.1419 
Num elements in RDD 1000000 
Stopping SparkR
15/02/15 13:35:25 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/02/15 13:35:25 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down;... 

Apache Spark

Apache Spark is a popular, general purpose cluster computing framework. While it can run in a variety of configurations, on Rustler we support running it on top of the YARN cluster manager so users can leverage the entire cluster for their computational tasks. In this configuration, these jobs will also use HDFS to access their dataset in a distributed fashion. Spark creates RDDs (Resilient Distributed Datasets) which are in-memory, fault tolerant transformations of datasets in disk based stores (like HDFS) or intermediate data generated by computations. By vitue of keeping these objects in memory, Spark is a very fast distributed computing framework.

Reference: https://spark.apache.org/docs/latest/index.html

spark-submit command

Spark applications are typically submitted using the "spark-submit" which internally sets up various SPARK and HADOOP environment variables prior to launching the application.

Usage: spark-submit [Spark Options] Application Jar or Python File [Application Options]

Run "spark-submit" with no options to see the Spark Options supported. Also see examples below.

Spark Example: Estimate Pi

Lets examine running some examples that come shipped with the Spark installation

(as /usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.3.jar). Various other examples may be run using this procedure. Please refer to the Spark examples section of the Spark source code for details. (https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples)

This example estimates the value of PI using the Monte Carlo method. The application takes one argument, the number of iterations to run (we chose 100 here).

login1$ spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster \
    --master yarn /usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.3.jar 100

Please note:

  • "--master" flag that specifies Spark Master as YARN since it is the resource manager of the cluster.
  • "--deploy-mode" flag used is "cluster". This is important as it ensures that nothing runs on the login node.

The "spark-submit" command outputs to screen periodic application report stanzas for the job that looks as follows for a successful run:

15/02/12 14:32:17 INFO yarn.Client: Application report from ASM: 
	application identifier: application_1422482982071_0154
	appId: 154
	clientToAMToken: null
	appDiagnostics: 
	appMasterHost: N/A
	appQueue: root.agupta
	appMasterRpcPort: -1
	appStartTime: 1423773105544
	yarnAppState: ACCEPTED
	distributedFinalState: UNDEFINED
	appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0154/
	appUser: agupta
...
15/02/12 14:32:22 INFO yarn.Client: Application report from ASM: 
	application identifier: application_1422482982071_0154
	appId: 154
	clientToAMToken: null
	appDiagnostics: 
	appMasterHost: c251-114.rustler.tacc.utexas.edu
	appQueue: root.agupta
	appMasterRpcPort: 0
	appStartTime: 1423773105544
	yarnAppState: RUNNING
	distributedFinalState: UNDEFINED
	appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0154/
	appUser: agupta
...
15/02/12 14:32:29 INFO yarn.Client: Application report from ASM: 
	application identifier: application_1422482982071_0154
	appId: 154
	clientToAMToken: null
	appDiagnostics: 
	appMasterHost: c251-114.rustler.tacc.utexas.edu
	appQueue: root.agupta
	appMasterRpcPort: 0
	appStartTime: 1423773105544
	yarnAppState: FINISHED
	distributedFinalState: SUCCEEDED
	appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0154/A
	appUser: agupta
	

The application identifier identifies a specific job. (in this example: application_1422482982071_0154). Its important to remember that since we are using YARN as our cluster manager, this is a YARN application ID. When application is running, it may also be viewed with the YARN command interface using: yarn application -list.

If the application ended with some failures the last stanza of the application report would reflect that in the field distributedFinalState would show state "FAILED".

The above output of spark-submit may be cancelled in order to get the shell prompt back using Ctrl-C.

More information on an application (its output, failures etc) can be found by looking at the application specific log files that are placed in the HDFS partition at the following path:

/var/log/hadoop-yarn/apps/username/logs/ApplicationIdentifier/

These may be examined using the dfs commands of HDFS.

For this specific example:

login1$ hdfs dfs -ls /var/log/hadoop-yarn/apps/agupta/logs/application_1422482982071_0154/
Found 3 items
...get shorter output

The last few lines of the log of the application master contains the results

(c251-114.rustler.tacc.utexas.edu_56010):
...
stdout24Pi is roughly 3.1411232

Example: PageRank

This example runs a simple version of the PageRank algorithm on an input graph. The format of the input graph needs to be (with no blank lines):

FromURL	ToURL
FromURL	ToURL
...
FromURL	ToURL

This example takes 2 arguments:

  1. The input graph file with the above format
    • If the file is in HDFS, provide the HDFS path relative to your HDFS home folder.
    • If the file is on the local file system, provide the absolute path prefixed by "file://"
  2. The number of iterations to run
spark-submit --class org.apache.spark.examples.JavaPageRank --deploy-mode cluster --master yarn \
	/usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.3.jar \
	file:///home/agupta/edge_list.txt 50

In this example, the input graph (in "edge_list.txt") is a graph of 1000 nodes and 10000 randomly created edges and the algorithm is run for 50 iterations.

The output is similar to the previous example:

15/02/12 16:06:21 INFO yarn.Client: Application report from ASM: 
	application identifier: application_1422482982071_0160
	appId: 160
	clientToAMToken: null
	appDiagnostics: 
	appMasterHost: N/A
	appQueue: root.agupta
	appMasterRpcPort: -1
	appStartTime: 1423778749382
	yarnAppState: ACCEPTED
	distributedFinalState: UNDEFINED
	appTrackingUrl: 
...
15/02/12 16:06:26 INFO yarn.Client: Application report from ASM: 
	application identifier: application_1422482982071_0160
	appId: 160
	clientToAMToken: null
	appDiagnostics: 
	appMasterHost: c253-121.rustler.tacc.utexas.edu
	appQueue: root.agupta
	appMasterRpcPort: 0
	appStartTime: 1423778749382
	yarnAppState: RUNNING
	distributedFinalState: UNDEFINED
	appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0160/
	appUser: agupta
...
15/02/12 16:06:50 INFO yarn.Client: Application report from ASM: 
	application identifier: application_1422482982071_0160
	appId: 160
	clientToAMToken: null
	appDiagnostics: 
	appMasterHost: c253-121.rustler.tacc.utexas.edu
	appQueue: root.agupta
	appMasterRpcPort: 0
	appStartTime: 1423778749382
	yarnAppState: FINISHED
	distributedFinalState: SUCCEEDED
	appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0160/A
	appUser: agupta
	

The results of a run can also be found in the application master log file in HDFS:

/var/log/hadoop-yarn/apps/agupta/logs/application_1422482982071_0160/c253-121.rustler.tacc.utexas.edu_36803

A section of the results looks like:

501 has rank: 1.103978228715302.
978 has rank: 0.8360883884213044.
34 has rank: 1.5498260479298391.
336 has rank: 0.8305731590891683.
716 has rank: 1.2556948018386884.
341 has rank: 0.6837823117718801.
680 has rank: 0.8281788633623367.
440 has rank: 0.7826825522156411.
542 has rank: 0.8771669240674593.
565 has rank: 0.6657704280226431.
109 has rank: 1.1040866397827505.
143 has rank: 0.6229546869395034.
132 has rank: 0.9529671056821962.
682 has rank: 1.0320869820757694.
430 has rank: 1.3435371064188004.
557 has rank: 0.4198054934772092.
169 has rank: 1.5631867069302998.
481 has rank: 1.4025914187834019.
744 has rank: 1.6301768850924943.
698 has rank: 1.207357785843826.
248 has rank: 1.1890010743342392.
695 has rank: 1.1707258115083423.
688 has rank: 0.9079755027702306.
607 has rank: 0.8205664753174244.

References

Policies

TACC resources are deployed, configured, and operated to serve a large, diverse user community. It is important that all users are aware of and abide by TACC Usage Policies. Failure to do so may result in suspension or cancellation of the project and associated allocation and closure of all associated logins. Illegal transgressions will be addressed through UT and/or legal authorities. The Usage Policies are documented here: http://www.tacc.utexas.edu/user-services/usage-policies.

Help

Submit a helpdesk ticket via the TACC User Portal.

Last update: July 22, 2015