Optimizing the End-to-End Training Pipeline on Apache Spark Clusters


Original Source Here

Photo by israel palacio on Unsplash

Artificial Intelligence

Optimizing the End-to-End Training Pipeline on Apache Spark Clusters

Large-Scale Machine Learning in the Cloud

The success of the XGBoost machine learning library is due to its ability to quickly train statistical models that perform well in practice. (The original research article, XGBoost: A Scalable Tree Boosting System, is definitely worth a read.) Because of its popularity, competing vendors invest in optimizing XGBoost for their platforms. Previous results demonstrated that XGBoost training on Intel Xeon processors is cheaper and up to 4.5x faster than NVIDIA V100 processors:

This previous work focused on single-node performance. Here I will focus on distributed XGBoost training performance on Apache Spark clusters.

Distributed training on Spark can be divided into two stages:

The first stage, data conversion, converts in-memory Spark RDD (resilient distributed datasets) to the native XGBoost DMatrix data structure containing the training input . The second stage, training, takes advantage of the XGBoost algorithmic advances to distribute the DMatrix among OpenMP threads. Data conversion mostly takes place within Spark using a single thread per executor, while training uses multiple threads to take advantage of all the processor resources:

The data is loaded by the Java Virtual Machine (JVM) as Spark’s internal row data format. The XGBoost library, however, is implemented in C++, so the data must be converted to LabeledPoint format and moved to off-heap space before DMatrix conversion, which introduces significant overhead to the data conversion stage of the end-to-end computation.

Optimizing the End-to-End Workflow

Two optimizations help reduce this overhead. First, loading data using as many Spark tasks as there are physical cores and creating a collection of small DMatrix instances improves performance. Tasks collocated on the same worker then combine their DMatrix instances. One of these tasks carries the single DMatrix into the training stage and spawns OpenMP threads for multithreaded execution. The other tasks simply exit:

Note that this optimization is designed for Spark v2.4 or earlier, but Spark v3.0 added a new feature to dynamically adjust the spark.cpus configuration. This is a more elegant solution, so it will be adopted soon.

The second optimization creates DMatrix using the Apache Arrow columnar data format, which supports fast data access without serialization overhead. Our solution relies on the Arrow Data Source implementation in the open-source Optimized Analytics Package for Spark Platform project, which supports reading data from various file formats into the in-memory Arrow format. This is implemented in C++, so it avoids the JVM and memory copy overheads. This also allows finer control over memory allocation and deallocation. While the JVM garbage collector may hold memory for some time, denying it to XGBoost, the C++ implementation frees memory in a timelier fashion. In some cases, memory utilization decreased from 85% to 55%.

Benchmarking the Optimizations

Two workloads are used to show the impact of these optimizations:

  1. Mortgage: A subset of Fannie Mae’s Single-Family Loan Performance Data with about 600 million samples and 46 features, stored as Parquet files.
  2. HiBench: Synthetic datasets generated using the HiBench Suite with 600 million, 900 million, or 1.2 billion samples and 50 features, stored as Parquet files.

Benchmarking is done on CPU and GPU instances in the Amazon Elastic Compute Cloud (Amazon EC2) (Table 1). The latest generation of NVIDIA GPU instance is compared to equivalently priced or cheaper Intel CPU instances. The benchmark consists of training classification models on Mortgage and HiBench and measuring the total time to convert the data and to complete 100 training rounds. Three XGBoost implementations are compared: the one in RAPIDS v0.17 that is specifically optimized to run on NVIDIA GPUs, stock XGBoost v1.3, and XGBoost v1.3 with the optimizations discussed above. The latter two implementations are run on the CPU instances.

Table 1. Amazon EC2 instances used for benchmarking (access January 2021)

The optimized XGBoost significantly outperforms the stock version in the HiBench benchmark on the same Spark cluster (eight nodes, four executors per node with 24 OpenMP threads per executor) (Figure 1).

Figure 1. End-to-end training time on a Spark cluster of eight c5.metal nodes

Figure 2 shows a scenario where the optimized XGBoost running on a Spark cluster of eight c5.metal instances (Intel Xeon Platinum 8275CL processors) outperforms an equivalently priced p4d.24large instance (NVIDIA A100 processors). In addition to being slower in training time, the GPU instances also incur significant initialization overhead every time the Spark context starts (indicated by the orange parts of Figure 2). This is a one-time cost, so it is not included in the speedup calculation, but it does make the GPU instances less appealing for Spark clusters.

Figure 2. Comparison of end-to-end training time for one p4d.24xlarge node and an equivalently priced cluster of eight c5.metal nodes


A lot has been done to improve XGBoost performance, but most work focuses on optimizing just the training stage. However, the time-consuming data conversion stage also offers optimization opportunities. This is especially true when running XGBoost on Spark, where data conversion accounts for a significant portion of the end-to-end run time. XGBoost training is much faster and cheaper on a Spark cluster when a better resource utilization strategy and data format are used during the data conversion stage (Figures 1–3). Future optimizations will take advantage of Spark v3.0’s stage-level resource management to dynamically control the task.cpus, which can solve the multiple threads issue during data conversion.

Getting Started

Example code can be found in the walkthrough of basic usage: https://github.com/oap-project/solution-navigator/tree/master/xgboost. Source code for the optimizations is available in the GitHub repo: https://github.com/intel-bigdata/xgboost/tree/arrow-to-dmatrix.

Performance Tips for Doing XGBoost Training on Amazon EC2 Instances

  • Try to give an equal amount of work to all Spark executors. For example, the number of partitions of input data should be divisible by the total number of cores configured in the Spark cluster.
  • Non-uniform memory access (NUMA) can have big impact on performance, so try to avoid cross-NUMA cost by binding the executors to a processor socket after the Spark context starts. Alternatively, simply use two C5.12xlarge instances instead of one C5.24xlarge instance.
  • Configure as few executors or ranks as possible. The number of executors may impact training performance. In our experience, two or four ranks per node typically gives best performance.
  • The more executors created on a node, the more memory is needed to support them. Fewer executors can have lower memory utilization. Fewer executors using the stock XGBoost will take much longer to load and convert the training data.
  • Keep an eye on node scaling. While adding more nodes can improve speedup, the benefit diminishes at some point because scaling is not perfect.


Trending AI/ML Article Identified & Digested via Granola by Ramsey Elbasheer; a Machine-Driven RSS Bot

%d bloggers like this: