• <xmp id="om0om">
  • <table id="om0om"><noscript id="om0om"></noscript></table>
  • 數據科學

    使用 Dask 加速 GPU 集群上的 XGBoost

    在 xgboost1 . 0 中,我們引入了 新的官方 Dask 接口 來支持高效的分布式訓練。 快速轉發到 XGBoost1 . 4 ,接口現在功能齊全。如果您對 xgboostdask 接口還不熟悉,請參閱第一篇文章,以獲得一個溫和的介紹。在本文中,我們將看一些簡單的代碼示例,展示如何最大化 GPU 加速的好處。

    我們的例子集中在希格斯數據集上,這是一個來自 機器學習庫 的中等規模的分類問題。 在下面的章節中,我們從基本數據加載和預處理開始,使用 GPU 加速的 Dask 和 Dask-ml 。然后,針對不同配置的返回數據訓練 XGBoost 模型。同時,分享一些新特性。之后,我們將展示如何在 GPU 集群上計算 SHAP 值以及可以獲得的加速比。最后,我們分享了一些優化技術與推理。

    以下示例需要在至少有一個 NVIDIA GPU 的機器上運行, GPU 可以是筆記本電腦或云實例。 Dask 的優點之一是它的靈活性,用戶可以在筆記本電腦上測試他們的代碼。它們還可以將計算擴展到具有最小代碼更改量的集群。 另外,要設置環境,我們需要 xgboost==1.4daskdask-mldask-cuda達斯克 – cuDF python 包,可從 RAPIDS 康達頻道: 獲得

    conda install -c rapidsai -c conda-forge dask dask-ml dask-cuda dask-cudf xgboost=1.4.2

    在 GPU 集群上用 Dask 加載數據

    首先,我們將數據集下載到 data 目錄中。

    mkdir data
    curl http://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz --output ./data/HIGGS.csv.gz

    然后使用 dask-cuda 設置 GPU 集群:

    import os
    from time import time
    from typing import Tuple
    
    from dask import dataframe as dd
    from dask_cuda import LocalCUDACluster
    from distributed import Client, wait
    import dask_cudf
    from dask_ml.model_selection import train_test_split
    
    import xgboost as xgb
    from xgboost import dask as dxgb
    import numpy as np
    import argparse
    
    # … main content to be inserted here in the following sections
    
    if __name__ == "__main__":
       parser = argparse.ArgumentParser()
       parser.add_argument("--n_workers", type=int, required=True)
       args = parser.parse_args()
       with LocalCUDACluster(args.n_workers) as cluster:
           print("dashboard:", cluster.dashboard_link)
           with Client(cluster) as client:
               main(client)

    給定一個集群,我們開始將數據加載到 gpu 中。 由于在參數調整期間多次加載數據,因此我們將 CSV 文件轉換為 Parquet 格式以獲得更好的性能。 這可以使用 dask_cudf 輕松完成:

    def to_parquet() -> str:
       """Convert the HIGGS.csv file to parquet files."""
       dirpath = "./data"
       parquet_path = os.path.join(dirpath, "HIGGS.parquet")
       if os.path.exists(parquet_path):
           return parquet_path
       csv_path = os.path.join(dirpath, "HIGGS.csv")
       colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
       df = dask_cudf.read_csv(csv_path, header=None, names=colnames, dtype=np.float32)
       df.to_parquet(parquet_path)
       return parquet_path

    數據加載后,我們準備培訓/驗證拆分:

    def load_higgs(
        path,
    ) -> Tuple[
        dask_cudf.DataFrame, dask_cudf.Series, dask_cudf.DataFrame, dask_cudf.Series
    ]:
        df = dask_cudf.read_parquet(path)
    
        y = df["label"]
        X = df[df.columns.difference(["label"])]
    
        X_train, X_valid, y_train, y_valid = train_test_split(
            X, y, test_size=0.33, random_state=42
        )
        X_train, X_valid, y_train, y_valid = client.persist(
            [X_train, X_valid, y_train, y_valid]
        )
        wait([X_train, X_valid, y_train, y_valid])
    
        return X_train, X_valid, y_train, y_valid

    在前面的示例中,我們使用 dask-cudf 從磁盤加載數據,使用 dask-ml 中的 火車測試分裂了 函數拆分數據集。 大多數時候, dask 的 GPU 后端與 dask-ml 中的實用程序無縫地工作,我們可以加速整個 ML 管道。

    提前停止訓練

    最常請求的特性之一是提前停止對 Dask 接口的支持。 在 XGBoost1 . 4 版本中,我們不僅可以指定停止輪的數量,還可以開發定制的提前停止策略。 對于最簡單的情況,向 train 函數提供停止回合可以實現提前停止:

    def fit_model_es(client, X, y, X_valid, y_valid) -> xgb.Booster:
       early_stopping_rounds = 5
       Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
       Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
       # train the model
       booster = dxgb.train(
           client,
           {
               "objective": "binary:logistic",
               "eval_metric": "error",
               "tree_method": "gpu_hist",
           },
           Xy,
           evals=[(Xy_valid, "Valid")],
           num_boost_round=1000,
           early_stopping_rounds=early_stopping_rounds,
       )["booster"]
       return booster


    在前面的片段中有兩件事需要注意。 首先,我們指定觸發提前停止訓練的輪數。 XGBoost 將在連續 X 輪驗證指標未能改善時停止培訓過程,其中 X 是指定提前停止的輪數。 其次,我們使用名為 DaskDeviceQuantileDMatrix 的數據類型進行訓練,但使用 DaskDMatrix 進行驗證。 DaskDeviceQuantileDMatrixDaskDMatrix 的替代品,用于基于 GPU 的訓練輸入,避免了額外的數據拷貝。


    gpu_hist 一起使用時, DaskDeviceQuantileDMatrix 可以節省大量內存,并且輸入數據已經在 GPU 上。圖 1 描述了 DaskDeviceQuantileDMatrix. 的結構 數據分區不再需要復制和連接,取而代之的是,由草圖算法生成的摘要被用作真實數據的代理。

    The figure depicts the steps to construct the DaskDeviceQuantileDMatrix. First, a summary is generated from different data partitions by sketching algorithm, then we use the summary to construct a compressed gradient histogram index.

    圖 1 : DaskDeviceQuantileDMatrix 的構造 .

    在 XGBoost 中,提前停止作為回調函數實現。 新的回調接口可以用來實現更高級的提前停止策略。下面的代碼顯示了提前停止的另一種實現,其中有一個附加參數要求 XGBoost 僅返回最佳模型,而不是完整模型:

      def fit_model_customized_es(client, X, y, X_valid, y_valid):
    ????early_stopping_rounds = 5
    ????es = xgb.callback.EarlyStopping(rounds=early_stopping_rounds, save_best=True)
    ????Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
    ????Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
    ????# train the model
    ????booster = xgb.dask.train(
    ????????client,
    ????????{
    ????????????"objective": "binary:logistic",
    ????????????"eval_metric": "error",
    ????????????"tree_method": "gpu_hist",
    ????????},
    ????????Xy,
    ????????evals=[(Xy_valid, "Valid")],
    ????????num_boost_round=1000,
    ????????callbacks=[es],
    ????)["booster"]
    ????return booster
    

    在前面的示例中, EarlyStopping 回調作為參數提供給 train ,而不是使用 early_stopping_rounds 參數。為了提供一個定制的提前停止策略,探索 EarlyStopping 的其他參數或子類化這個回調是一個很好的起點。

    定制目標和評估指標

    XGBoost 被設計成可以通過定制的目標函數和度量進行擴展。在 1 . 4 中,這個特性被引入 dask 接口。要求與單節點接口完全相同:

    def fit_model_customized_objective(client, X, y, X_valid, y_valid) -> dxgb.Booster:
        def logit(predt: np.ndarray, Xy: xgb.DMatrix) -> Tuple[np.ndarray, np.ndarray]:
            predt = 1.0 / (1.0 + np.exp(-predt))
            labels = Xy.get_label()
            grad = predt - labels
            hess = predt * (1.0 - predt)
            return grad, hess
    
        def error(predt: np.ndarray, Xy: xgb.DMatrix) -> Tuple[str, float]:
            label = Xy.get_label()
            r = np.zeros(predt.shape)
            predt = 1.0 / (1.0 + np.exp(-predt))
            gt = predt > 0.5
            r[gt] = 1 - label[gt]
            le = predt <= 0.5
            r[le] = label[le]
            return "CustomErr", float(np.average(r))
    
        # Use early stopping with custom objective and metric.
        early_stopping_rounds = 5
        # Specify the metric we want to use for early stopping.
        es = xgb.callback.EarlyStopping(
        rounds=early_stopping_rounds, save_best=True, metric_name="CustomErr"
        )
    
        Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)
        Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
        booster = dxgb.train(
            client,
            {"eval_metric": "error", "tree_method": "gpu_hist"},
            Xy,
            evals=[(Xy_valid, "Valid")],
            num_boost_round=1000,
            obj=logit,  # pass the custom objective
            feval=error,  # pass the custom metric
            callbacks=[es],
        )["booster"]
        return booster

    在前面的函數中,我們使用定制的目標函數和度量來實現一個 logistic 回歸模型以及提前停止。請注意,該函數同時返回 gradient 和 hessian , XGBoost 使用它們來優化模型。 另外,需要在回調中指定名為 metric_name 的參數。它用于通知 XGBoost 應該使用自定義錯誤函數來評估早期停止標準。

    解釋模型

    在得到我們的第一個模型之后,我們 MIG ht 想用 SHAP 來解釋預測。 SHapley 加法解釋( SHapley Additive explainstructions , SHapley Additive explainstructions )是一種基于 SHapley 值解釋機器學習模型輸出的博弈論方法。 有關算法的詳細信息,請參閱 papers 。 由于 XGBoost 現在支持 GPU 加速的 Shapley 值,因此我們將此功能擴展到 Dask 接口。現在,用戶可以在分布式 GPU 集群上計算 shap 值。這是由顯著改進的預測函數和 GPUTreeShap 庫 實現的:

    def explain(client, model, X):
       # Use array instead of dataframe in case of output dim is greater than 2.
       X_array = X.values
       contribs = dxgb.predict(
           client, model, X_array, pred_contribs=True, validate_features=False
       )
       # Use the result for further analysis
       return contribs

    XGBoost 使用多個 GPU 計算 shap 值的性能如圖 2 所示。

    The chart shows a significant speedup using GPU cluster to compute SHAP value.

    圖 2 : Shap 推斷時間。

    基準測試是在一臺 NVIDIA DGX-1 服務器上進行的,該服務器有 8 個 V100 gpu 和兩個 20 核的 Xeon E5-2698 v4 cpu ,并進行了一輪訓練、 shap 值計算和推理。

    得到的 SHAP 值可用于可視化、使用特征權重調整列采樣或用于其他數據工程目的。

    運行推理

    經過一些調整,我們得到了對新數據進行推理的最終模型。 XGBoost Dask 接口的預測沒有舊版本那么有效,而且內存不足。在 1 . 4 中,我們修改了預測函數并增加了對就地預測的支持。 對于正態預測,它使用與 shap 值計算相同的接口:

    def predict(client, model, X):
        predt = dxgb.predict(client, model, X)
        assert isinstance(predt, dd.Series)
        return predt


    標準的 predict 函數提供了一個通用接口,可同時接受 DaskDMatrix 和 dask 集合(數據幀或數組),但沒有針對內存使用進行優化。在這里,我們將其替換為就地預測,它支持基本的推理任務,并且不需要將數據復制到 XGBoost 的內部數據結構中:

    def inplace_predict(client, model, X):
        # Use inplace_predict instead of standard predict.
        predt = dxgb.inplace_predict(client, model, X)
        assert isinstance(predt, dd.Series)
        return predt

    內存節省取決于每個塊的大小和輸入類型。當使用同一模型多次運行推理時,另一個潛在的優化是對模型進行預格式化。默認情況下,每次調用 predict 時, XGBoost 都會將模型傳輸給 worker ,從而產生大量開銷。好消息是 Dask 函數接受 future 對象作為完成模型的代理。然后我們可以傳輸數據,這些數據可以與其他計算和持久化數據重疊。

     def inplace_predict_multi_parts(client, model, X_train, X_valid):
        """Simulate the scenario that we need to run prediction on multiple datasets using train
    	and valid. In real world the number of datasets is unlimited
    
        """
        # prescatter the model onto workers
        model_f = client.scatter(model)
        predictions = []
        for X in [X_train, X_valid]:
            # Use inplace_predict instead of standard predict.
            predt = dxgb.inplace_predict(client, model_f, X)
            assert isinstance(predt, dd.Series)
            predictions.append(predt)
            return predictions

    在前面的代碼片段中,我們將未來的模型傳遞給 XGBoost ,而不是真正的模型。 這樣我們就避免了在預測過程中的重復傳輸,或者我們可以將模型傳輸與其他操作(如加載數據)并行,如注釋中所建議的那樣。

    把它們放在一起

    在前面的部分中,我們將演示早期停止、形狀值計算、自定義目標以及最終推斷。下表顯示了具有不同工作線程數的 GPU 集群的端到端加速。

    The chart shows the time of running the pipeline with different number of GPU workers.

    圖 3 : GPU 集群端到端時間。

    與之前一樣,基準測試是在一臺 NVIDIA DGX-1 服務器上執行的,該服務器有 8 個 V100 gpu 和兩個 20 核的 Xeon E5 – 2698 v4 cpu ,并進行一輪訓練、 shap 值計算和推理。此外,我們還共享了兩種內存使用優化,圖 4 描述了總體內存使用比較。

    The chart shows the memory usage of standard pipeline and optimized pipeline. With 64-bit floating point input, the peak GPU memory usage is close to 10000 MiB while optimized pipeline uses about 6000MiB.
    圖 4 :內存使用情況。

    左兩列是 64 位數據類型訓練的內存使用情況,右兩列是 32 位數據類型訓練的內存使用情況。標準是指使用正常的數據矩陣和預測函數進行訓練。有效的方法是使用 DaskDeviceQuantileDMatrixinplace_predict.

    Scikit 學習包裝器

    前面的章節考慮了“功能”接口的基本模型訓練,但是,還有一個類似 scikit 學習估計器的接口。它更容易使用,但有更多的限制。在 XGBoost1 . 4 中,此接口與單節點實現具有相同的特性。用戶可以選擇不同的估計器,如 DaskXGBClassifier 用于分類,而 DaskXGBRanker 用于排名。查看參考資料以獲得可用估算器的完整列表: https://xgboost.readthedocs.io/en/latest/python/python_api.html#module-xgboost.dask

    概括

    我們已經介紹了一個在 GPU 集群上使用 RAPIDS 庫加速 XGBoost 的示例,它顯示了使 XGBoost 代碼現代化可以幫助最大限度地提高培訓效率。通過 XGBoost Dask 接口和 RAPIDS ,用戶可以通過一個易于使用的 API 實現顯著的加速。盡管 XGBoost-Dask 接口已經達到了與單節點 API 的功能對等,但仍在繼續開發,以便更好地與其他庫集成,實現超參數調優等新功能。對于與 dask 接口相關的新功能請求,您可以在 XGBoost 的 GitHub 存儲庫 上打開一個問題。

    要了解有關同時使用 Dask 和 RAPIDS 的更多信息,請查看 NVIDIA 2021 年達斯克分布式峰會上的演講 。有關 RAPIDS 和 Dask 的概述,請收聽 GPU 加速數據科學研討會 。要深入了解基于代碼的示例,請查看 輔導的

    ?

    ?

    0

    標簽

    人人超碰97caoporen国产