Rustler User Guide
This document is in progress. Please send any suggestions and comments to email@example.com.
TACC's new Hadoop cluster, Rustler, is specifically architected to facilitate data-driven research. Using the Hadoop framework and YARN (MapReduce 2.0), Rustler is optimal for analyzing and storing huge amounts of data, structured or unstructured, in minimal time. Rustler differs from traditional high performance computing resources where storage and computations are located in physically separated racks. Instead, Rustler offers 1PB of distributed storage across its 64 data nodes, supporting the Big Data Analytics paradigms such as MapReduce, and applications, such as Hadoop and Spark. Rustler offers support for a variety of Big Data analysis applications and platforms including Hadoop, Spark, and R as well as programming support in such languages as C/C++, Java, Scala and Python.
Rustler is ideal for researchers whose work requires repeated access, transformation and analysis of multi-terabytes of data. Please email firstname.lastname@example.org for more information.
Rustler is a 66 node computing cluster for data intensive computing. Each node has dual eight core Ivy Bridge CPUs (Intel(R) Xeon(R) CPU E5-2650) running at 2.60GHz, with 128 GB DDR3 memory and 16 1TB SATA drives providing the backing for the HDFS file system. All nodes are connected via a 10 Gbps network. Among the 66 nodes available on rustler, 64 nodes are configured as data nodes, 1 node is configured as name node and 1 node is configured as login node. The login node has 13TB storage for user to staging the data and is also connected to the TACC wide shared work file system, allowing for easy transfer of results to a POSIX file system available to all of the other clusters at TACC.
The Rustler login node is used both for submitting jobs and for ingesting and retrieving data to and from the HDFS file system. Rustler is now in Friendly User Mode. Please email email@example.com to request access to Rustler.
To create a login session from a local machine it is necessary to have an SSH client. Wikipedia is a good source of information on SSH, and provides information on the various clients available for your particular operating system. To ensure a secure login session, users must connect to machines using the secure shell
ssh command. Data movement must be done using the secure shell commands
Do not run the optional
ssh-keygen command to set up Public-key authentication. This command sets up a passphrase that will interfere with the execution of job scripts in the batch system. If you have already done this, remove the
.ssh directory (and the files under it) from your home directory. Log out and log back in to regenerate the keys.
To initiate an
ssh connection to a Rustler login node from your local system, use one of the SSH clients supporting the SSH-2 protocol e.g., like OpenSSH, Putty, SecureCRT. Then execute the following command:
localhost$ ssh firstname.lastname@example.org
Note: the TACC userid is needed if the user name on the local machine and the TACC machine differ.
Login passwords (which are identical to TACC portal passwords, not XUP passwords) can be changed in the TACC User Portal. Select "Change Password" under the "HOME" tab after you login. If you've forgotten your password, go to the TACC User Portal and select the "? Forgot Password" button.
The Cloudera's open source distribution of Apache Hadoop (Hadoop 2.3.0-cdh5.1.0) has been installed on Rustler. The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.
The project includes these modules:
- Hadoop Common: The common utilities that support the other Hadoop modules.
- Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data.
- Hadoop YARN: A framework for job scheduling and cluster resource management.
- Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.
For backward compatibility, CDH 5 continues to support the original MapReduce framework (i.e. the JobTracker and TaskTrackers), but you should begin migrating to MRv2.
All Java libraries related with CDH 5 is available at /usr/lib/hadoop-mapreduce/. User should add /usr/lib/hadoop-mapreduce/ to their Java classpath variable and make sure the required java library files can be found when compiling their own developed code.
Running Hadoop MapReduce Jobs
The user can invoke Hadoop commands from login node directly.
Usage: hadoop [--config confdir] [COMMAND] [GENERIC_OPTIONS] [COMMAND_OPTIONS]
- To run a user developed java application:
login1$ hadoop jar MyJarFile.jar MainClass args
- In addition to running a jar file directly with a
hadoopcommand, a user can also submit a job specified by a job file with the "
Usage: mapred [--config confdir] COMMAND
To submit a job specified by MyMR.job, a user can run following:
login1$ mapred job -submit MyMR.job
for example, to list current running job in hadoop cluster
login1$ mapred job -list
The mapred job command also enables user to interact with an active job by job-id
To check the status of the job
login1$ mapred job -status job-id
To kill an active MapReduce job:
login1$ mapred job -kill job-id
The default working director under hdfs for each user also is also the user's home directory under the hdfs at
/user/username. This directory should have been created for each user. Please contact us, if the directory is not available. A user can interact with HDFS using hdfs command:
Usage: hdfs [--config confdir] [COMMAND] [GENERIC_OPTIONS] [COMMAND_OPTIONS]
Here are a few common actions:
list the content under the current working directory
login1$ hdfs dfs -ls
copy a file,
foo.test, from local file system to HDFS and named as
login1$ hdfs dfs --copyFromLocal foo.test foo_in_hdfs.test
copy a file,
foo_in_hdfs.test,from hdfs to local file system as
login1$ hdfs dfs --copyToLocal foo_in_hdfs.test foo_from_hdfs.test
Remove a file named foo.test in the current working directory of hdfs
login1$ hdfs dfs -rm foo.test
Please see the Hadoop Commands Manual for more information.
A web interface is available for checking cluster status, please visit https://rustler.tacc.utexas.edu/about/ and login with your TACC user id.
Additional analysis software and libraries can be installed upon request. Currently the following software/libraries are available on Rustler
- Hadoop Ecosystem
Although the Hadoop framework is implemented in Java, MapReduce applications need not be written in Java. Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows users to create and run jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer. See details in http://hadoop.apache.org/docs/r1.2.1/streaming.html.
For Hadoop users, writing MapReduce programs in R may be considered easier, more productive and more elegant with much less code than in java and easier deployment. It is great to prototype and do research. For R users, it opens the doors of MapReduce programming and access to big data analysis.
RHadoop is a bridge between R, a language and environment to statistically explore data sets, and Hadoop, a framework that allows for the distributed processing of large data sets across clusters of computers.
Set the complete path to the "
login1$ export HADOOP_CMD=/usr/bin/hadoop
Set the complete path to the hadoop streaming jar file
login1$ export HADOOP_STREAMING=/usr/lib/hadoop-mapreduce/hadoop-streaming-2.3.0-cdh5.1.0.jar
Run RHadoop example code, which randomly generates 500 number from binomial distribution with size=100 and prob=0.5 and uses mapreduce procedure to sum up the frequency for each number.
login1$ Rscript /share/doc/example_RHadoop.R
Expected output should be similar to the following:
15/02/15 13:57:01 INFO mapreduce.Job: map 0% reduce 0% 15/02/15 13:57:09 INFO mapreduce.Job: map 100% reduce 0% 15/02/15 13:57:16 INFO mapreduce.Job: map 100% reduce 100% 15/02/15 13:57:17 INFO mapreduce.Job: Job job_1422482982071_0276 completed successfully 15/02/15 13:57:17 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=8175 FILE: Number of bytes written=305917 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1755 HDFS: Number of bytes written=4890 HDFS: Number of read operations=13 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Rack-local map tasks=2 Total time spent by all maps in occupied slots (ms)=24810 Total time spent by all reduces in occupied slots (ms)=10158 Total time spent by all map tasks (ms)=12405 Total time spent by all reduce tasks (ms)=5079 Total vcore-seconds taken by all map tasks=12405 Total vcore-seconds taken by all reduce tasks=5079 Total megabyte-seconds taken by all map tasks=100430880 Total megabyte-seconds taken by all reduce tasks=60948000 Map-Reduce Framework Map input records=3 Map output records=57 Map output bytes=8036 Map output materialized bytes=8181 Input split bytes=226 Combine input records=0 Combine output records=0 Reduce input groups=28 Reduce shuffle bytes=8181 Reduce input records=57 Reduce output records=58 Spilled Records=114 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=641 CPU time spent (ms)=6930 Physical memory (bytes) snapshot=4078620672 Virtual memory (bytes) snapshot=31807492096 Total committed heap usage (bytes)=6315245568 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=1529 File Output Format Counters Bytes Written=4890
SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. SparkR exposes the Spark API through the RDD (Resilient Distributed Datasets) class and allows users to interactively run jobs from the R shell on a cluster.
Run SparkR example code, pi.R, which uses Monte Carlo method to estimate the value of Pi (http://en.wikipedia.org/wiki/Monte_Carlo_method). In the pi.R example code, 100,000 random points placing on a square are splited into 10 slices of RDD.
login1$ YARN_CONF_DIR=/etc/hadoop/conf MASTER=yarn-client \ SparkR-pkg-master/sparkR SparkR-pkg-master/examples/pi.R yarn-client 10
Expected output should be similar to the following
Loading required package: methods [SparkR] Initializing with classpath /home/rhuang/SparkR-pkg-master/lib/SparkR/sparkr-assembly-0.1.jar ... 15/02/15 13:35:06 INFO YarnClientImpl: Submitted application application_1422482982071_0275 15/02/15 13:35:20 INFO RackResolver: Resolved c252-113.rustler.tacc.utexas.edu to /default-rack 15/02/15 13:35:20 INFO RackResolver: Resolved c253-102.rustler.tacc.utexas.edu to /default-rack Pi is roughly 3.1419 Num elements in RDD 1000000 Stopping SparkR 15/02/15 13:35:25 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/02/15 13:35:25 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down;...
Apache Spark is a popular, general purpose cluster computing framework. While it can run in a variety of configurations, on Rustler we support running it on top of the YARN cluster manager so users can leverage the entire cluster for their computational tasks. In this configuration, these jobs will also use HDFS to access their dataset in a distributed fashion. Spark creates RDDs (Resilient Distributed Datasets) which are in-memory, fault tolerant transformations of datasets in disk based stores (like HDFS) or intermediate data generated by computations. By vitue of keeping these objects in memory, Spark is a very fast distributed computing framework.
Spark applications are typically submitted using the "spark-submit" which internally sets up various SPARK and HADOOP environment variables prior to launching the application.
Usage: spark-submit [Spark Options] Application Jar or Python File [Application Options]
Run "spark-submit" with no options to see the Spark Options supported. Also see examples below.
Lets examine running some examples that come shipped with the Spark installation
(as /usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.3.jar). Various other examples may be run using this procedure. Please refer to the Spark examples section of the Spark source code for details. (https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples)
This example estimates the value of PI using the Monte Carlo method. The application takes one argument, the number of iterations to run (we chose 100 here).
login1$ spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster \ --master yarn /usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.3.jar 100
--master" flag that specifies Spark Master as YARN since it is the resource manager of the cluster.
--deploy-mode" flag used is "cluster". This is important as it ensures that nothing runs on the login node.
spark-submit" command outputs to screen periodic application report stanzas for the job that looks as follows for a successful run:
15/02/12 14:32:17 INFO yarn.Client: Application report from ASM: application identifier: application_1422482982071_0154 appId: 154 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.agupta appMasterRpcPort: -1 appStartTime: 1423773105544 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0154/ appUser: agupta ... 15/02/12 14:32:22 INFO yarn.Client: Application report from ASM: application identifier: application_1422482982071_0154 appId: 154 clientToAMToken: null appDiagnostics: appMasterHost: c251-114.rustler.tacc.utexas.edu appQueue: root.agupta appMasterRpcPort: 0 appStartTime: 1423773105544 yarnAppState: RUNNING distributedFinalState: UNDEFINED appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0154/ appUser: agupta ... 15/02/12 14:32:29 INFO yarn.Client: Application report from ASM: application identifier: application_1422482982071_0154 appId: 154 clientToAMToken: null appDiagnostics: appMasterHost: c251-114.rustler.tacc.utexas.edu appQueue: root.agupta appMasterRpcPort: 0 appStartTime: 1423773105544 yarnAppState: FINISHED distributedFinalState: SUCCEEDED appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0154/A appUser: agupta
The application identifier identifies a specific job. (in this example: application_1422482982071_0154). Its important to remember that since we are using YARN as our cluster manager, this is a YARN application ID. When application is running, it may also be viewed with the YARN command interface using:
yarn application -list.
If the application ended with some failures the last stanza of the application report would reflect that in the field distributedFinalState would show state "FAILED".
The above output of
spark-submit may be cancelled in order to get the shell prompt back using Ctrl-C.
More information on an application (its output, failures etc) can be found by looking at the application specific log files that are placed in the HDFS partition at the following path:
These may be examined using the dfs commands of HDFS.
For this specific example:
login1$ hdfs dfs -ls /var/log/hadoop-yarn/apps/agupta/logs/application_1422482982071_0154/ Found 3 items ...get shorter output
The last few lines of the log of the application master contains the results
(c251-114.rustler.tacc.utexas.edu_56010): ... stdout24Pi is roughly 3.1411232
This example runs a simple version of the PageRank algorithm on an input graph. The format of the input graph needs to be (with no blank lines):
FromURL ToURL FromURL ToURL ... FromURL ToURL
This example takes 2 arguments:
- The input graph file with the above format
- If the file is in HDFS, provide the HDFS path relative to your HDFS home folder.
- If the file is on the local file system, provide the absolute path prefixed by "file://"
- The number of iterations to run
spark-submit --class org.apache.spark.examples.JavaPageRank --deploy-mode cluster --master yarn \ /usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.3.jar \ file:///home/agupta/edge_list.txt 50
In this example, the input graph (in "
edge_list.txt") is a graph of 1000 nodes and 10000 randomly created edges and the algorithm is run for 50 iterations.
The output is similar to the previous example:
15/02/12 16:06:21 INFO yarn.Client: Application report from ASM: application identifier: application_1422482982071_0160 appId: 160 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: root.agupta appMasterRpcPort: -1 appStartTime: 1423778749382 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: ... 15/02/12 16:06:26 INFO yarn.Client: Application report from ASM: application identifier: application_1422482982071_0160 appId: 160 clientToAMToken: null appDiagnostics: appMasterHost: c253-121.rustler.tacc.utexas.edu appQueue: root.agupta appMasterRpcPort: 0 appStartTime: 1423778749382 yarnAppState: RUNNING distributedFinalState: UNDEFINED appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0160/ appUser: agupta ... 15/02/12 16:06:50 INFO yarn.Client: Application report from ASM: application identifier: application_1422482982071_0160 appId: 160 clientToAMToken: null appDiagnostics: appMasterHost: c253-121.rustler.tacc.utexas.edu appQueue: root.agupta appMasterRpcPort: 0 appStartTime: 1423778749382 yarnAppState: FINISHED distributedFinalState: SUCCEEDED appTrackingUrl: http://name.rustler.tacc.utexas.edu:8088/proxy/application_1422482982071_0160/A appUser: agupta
The results of a run can also be found in the application master log file in HDFS:
A section of the results looks like:
501 has rank: 1.103978228715302. 978 has rank: 0.8360883884213044. 34 has rank: 1.5498260479298391. 336 has rank: 0.8305731590891683. 716 has rank: 1.2556948018386884. 341 has rank: 0.6837823117718801. 680 has rank: 0.8281788633623367. 440 has rank: 0.7826825522156411. 542 has rank: 0.8771669240674593. 565 has rank: 0.6657704280226431. 109 has rank: 1.1040866397827505. 143 has rank: 0.6229546869395034. 132 has rank: 0.9529671056821962. 682 has rank: 1.0320869820757694. 430 has rank: 1.3435371064188004. 557 has rank: 0.4198054934772092. 169 has rank: 1.5631867069302998. 481 has rank: 1.4025914187834019. 744 has rank: 1.6301768850924943. 698 has rank: 1.207357785843826. 248 has rank: 1.1890010743342392. 695 has rank: 1.1707258115083423. 688 has rank: 0.9079755027702306. 607 has rank: 0.8205664753174244.
TACC resources are deployed, configured, and operated to serve a large, diverse user community. It is important that all users are aware of and abide by TACC Usage Policies. Failure to do so may result in suspension or cancellation of the project and associated allocation and closure of all associated logins. Illegal transgressions will be addressed through UT and/or legal authorities. The Usage Policies are documented here: http://www.tacc.utexas.edu/user-services/usage-policies.
Last update: July 22, 2015