• <xmp id="om0om">
  • <table id="om0om"><noscript id="om0om"></noscript></table>
  • Generative AI

    NeMo Curator を使った日本語データのキュレーション

    Reading Time: 7 minutes

    本記事では、NeMo Curator を使用して、日本語データセットを作成する方法を説明します。

    データ キュレーションとは

    データ キュレーションとは、データのダウンロードやテキストの抽出、クリーニング、重複排除、フィルタリングなどを通じて、機械學習モデルの開発に必要なデータセットを構築するプロセスです。

    図 1. NeMo Curator チームが、Common Crawl データセットで 357M パラメーターの GPT スタイルのモデルを學習するアブレーション(一部の要素だけを追加、削除して比較)実験を実行し、それぞれのキュレーションステップがモデルのパフォーマンスに與える影響を評価。

    データ キュレーションは、大規模言語モデル (LLM) の事前學習、カスタマイズにおける最初の、そしておそらく最も重要なステップでもあります。しかし、この重要性にもかかわらず、LLM を學習するための大規模なデータセットを作成するために開発されたソフトウェアやツールのほとんどは、公開されておらず、拡張性もありません。そのため、LLM 開発者は、大規模な言語データセットをキュレートするための獨自ツールを構築する必要があります。

    このニーズの高まりに応えるため、NVIDIA は生成 AI モデルのための大規模で高品質なデータセットを準備するデータ キュレーション フレームワークである NeMo Curator をオープン ソース でリリースしました。

    NeMo Curator とは

    NeMo Curator は、基盤モデルの事前學習、ドメイン適応型事前學習 (DAPT)、教師ありファインチューニング (SFT)、Parameter-Efficient Fine-Tuning (PEFT) などの大規模言語モデル (LLM) のユース ケース向けに、高速でスケーラブルなデータセットの準備とキュレーションを行うために設計された Python ライブラリです。

    DaskRAPIDS で GPU を活用することでデータ キュレーションを大幅に高速化し、大幅な時間の短縮を実現します。このライブラリはカスタマイズ可能なモジュール式インターフェイスを提供し、パイプラインの拡張を簡素化し、高品質のトークンを準備することでモデルの収束を加速します。

    NVIDIA NeMo の一部である NeMo Curator は、Common Crawl、Wikipedia、arXiv などのパブリック ソースからすぐにデータをダウンロードしてキュレートするためのワークフローを提供します。また、開発者の獨自の要件にカスタマイズされたデータ キュレーション パイプラインにより、簡単にカスタム データセットを作成できる柔軟性も提供します。

    現在、NeMo Curator はすぐに使用できる機能として以下を提供しています。

    • データのダウンロードとテキストの抽出
    • 言語の識別と分離
    • テキストの再フォーマット化とクリーニング
    • 品質フィルタリング
    • ドキュメントレベルの重複排除
    • 多言語ダウンストリーム タスクの除染
    • 分散データ分類 (ドメイン、品質分類等)
    • 個人識別情報 (PII) の削除

    「図 2. NeMo Curator が提供する機能」には、それぞれのステップとそこで使われている主なテクノロジが記載されています。緑で塗りつぶされたステップでは、GPU を使って処理を大幅に高速化することが可能です。

    図 2. NeMo Curator が提供する機能

    NeMo Curator は GitHub 上にあるリポジトリ から、または NeMo Framework のコンテナーからすぐに始めることができます。

    NeMo Curator チュートリアル

    本記事では、日本語 Wikipedia からデータをダウンロードして、いくつかのキュレーション ステップについて、その使用方法を紹介します。

    本チュートリアルでの手順は以下の通りです。

    • NeMo Framework コンテナー を起動
    • データのダウンロードとテキストの抽出
    • 言語の検出と分離、テキストの再フォーマット化
    • ID の付與
    • ドキュメント レベルの重複排除
    • 合成データの生成

    チュートアルを開始する前に、NeMo Curator の核となっているデータセット クラス DocumentDataset に関する Working with DocumentDataset と CPU、GPU 上で処理を大規模にスケーリングする Dask に関する CPU and GPU Modules with Dask を一読することをお勧めします。

    また、今回のチュートリアルの検証環境は以下の條件で行っております。こちらの構成は一例であり、NeMo Curator は Volta 以降の任意の NVIDIA GPU、任意の計算ノードでジョブを実行することが可能です。Kubernetes クラスター上で NeMo Curator を実行する際のサンプルはこちらにあります。

    • ハードウェア
      • DGX Cloud A100
      • GPU: 1 x NVIDIA A100 80 GB GPUs (driver version: 535.183.08)
      • CPU: AMD EPYC 7V12 64-Core Processor
      • システム メモリ: 128 GB
    • ソフトウェア
      • OS: Ubuntu 22.04.4 LTS
      • Container: nvcr.io/nvidia/nemo:24.07

    事前準備

    以下のコマンドで作業用のディレクトリを作成し、移動します。

    mkdir curator-example
    cd curator-example

    Docker コンテナーの起動

    以下のコマンドでコンテナーを起動します。

    sudo docker run --rm -it --gpus all --ulimit memlock=-1 --network=host -v ${PWD}:/workspace -w /workspace  nvcr.io/nvidia/nemo:24.07 bash

    データのダウンロードとテキストの抽出

    前述したように、NeMo Curator では、Common Crawl、Wikipedia、arXiv について、それぞれ専用の関數が用意されており、引數を與えて実行することですぐにダウンロードおよびテキストの抽出を開始できます。

    (オプション): 獨自のデータ ソースをダウンロード、テキスト抽出することも可能です。この処理には、Common Crawl 用などの専用関數でも使われている NeMo Curator の DocumentDownloader を継承したダウンロード用のクラス、DocumentIterator を継承した反復処理用のクラス、DocumentExtractor を継承したテキスト抽出用のクラスを定義する必要があります。これらの例は Curating Custom Datasets for LLM Parameter-Efficient Fine-Tuning with NVIDIA NeMo CuratorCurating Custom Datasets for LLM Training with NVIDIA NeMo Curator に記載がありますのでぜひ參考にしてください。

    このチュートリアルでは、日本語 Wikipedia のデータをダウンロードして、そこからテキストを抽出します。

    日本語 Wikipedia を対象にする際は、download_wikipedia() というあらかじめ用意された関數を使用し、language='ja' によって日本語を対象に指定、dump_date にいつの時點のスナップショットをダウンロードするか指定します。ここでは時間およびリソースを節約するため、ダウンロードするファイル數を制限する url_limit を設定します (全てのファイルを対象にしたい場合はコメントアウトしてください)。

    LocalCluster の引數にあるワーカー數やメモリ制限は実行する環境に合わせて変更してください。今回の検証環境と以下の設定ではこの処理に 2 時間ほど要しました(このステップは本チュートリアルで最も時間がかかるパートになります)。

    以下、コンテナー起動後のスクリプトは Jupyter Notebook 上でのセル実行を想定しています。また、それぞれのステップは入力に使用するデータセットがすでに存在する狀況であればパスを変更することで個々に実行することが可能です。

    import os
    from nemo_curator.download import download_wikipedia
    from dask.distributed import Client, LocalCluster
    
    
    cur_dir = os.getcwd()
    print(cur_dir)
    data_dir = f"{cur_dir}/"
    
    cluster = LocalCluster(n_workers=48, processes=True, memory_limit='24GB')
    client = Client(cluster)
    
    # Output
    download_base_directory= os.path.join(data_dir, "wiki_downloads")
    download_output_directory = os.path.join(download_base_directory, "data")
    
    # Relevant parameters
    dump_date = "20240801"
    language = 'ja'
    url_limit = 1  # 1 file (jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2)
    
    res = download_wikipedia(
        download_output_directory,
        language=language, 
        dump_date=dump_date,
        url_limit=url_limit
    ).df.compute()
    
    #client.cluster.close()
    #client.shutdown()

    処理が完了すると wiki_downloads/data/ というディレクトリに jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2.jsonl というファイルが出力されます。このファイルのドキュメント數は 59,652 です。

    言語の検出と分離、テキストの再フォーマット化

    このセクションでは、先ほど抽出したドキュメントを fasttext の言語識別モデルを使用して、言語ごとに分類します。これによって、ドキュメントが言語ごとに作成されるサブフォルダーへ振り分けられます。

    import os
    import time
    from dask.distributed import Client, LocalCluster
    
    from nemo_curator import ScoreFilter,Modify
    from nemo_curator.datasets import DocumentDataset
    from nemo_curator.filters import FastTextLangId
    from nemo_curator.modifiers import UnicodeReformatter
    from nemo_curator.utils.file_utils import separate_by_metadata
    
    
    cur_dir = os.getcwd()
    print(cur_dir)
    data_dir = f"{cur_dir}/"
    
    # 前の処理でclusterを落としている場合は以下をアンコメントして再度起動してください
    #cluster = LocalCluster(n_workers=48, processes=True, memory_limit='24GB')
    #client = Client(cluster)
    
    # Input path
    multilingual_data_path = "./wiki_downloads/data/jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2.jsonl"
    
    # Output path
    language_base_output_path = os.path.join(data_dir,"language_sep")
    language_data_output_path = os.path.join(language_base_output_path,"data")
    language_separated_output_path = os.path.join(language_data_output_path,"language")
    
    # Fasttext model path
    model_path = language_base_output_path
    
    # Define key in output .jsonl files to store the language information
    language_field = "language"
    
    !wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin -P {model_path}
    t0 = time.time()
    
    # Load dataset 
    multilingual_dataset = DocumentDataset.read_json(multilingual_data_path, add_filename=True)
    
    # Define Language separation pipeline
    lang_filter = FastTextLangId(os.path.join(model_path,'lid.176.bin'))
    language_id_pipeline = ScoreFilter(lang_filter, score_field=language_field, score_type='object')
    filtered_dataset = language_id_pipeline(multilingual_dataset)
    
    # The language separation pipeline will produce a result looks like ['JA',0.96873], we only want to keep the 'JA' label and drop the detailed classifier score
    filtered_dataset.df[language_field] = filtered_dataset.df[language_field].apply(lambda score: score[1],meta = (language_field, 'object'))
    
    # Split the dataset to corresponding language sub-folders
    language_stats = separate_by_metadata(filtered_dataset.df, language_separated_output_path, metadata_field=language_field).compute()
    
    print(f"Time taken for splitting language:{time.time()-t0}")

    この処理が完了 (検証環境では 1-2 分) すると language_sep/data/language/ の下に分類された言語ごとにドキュメントが保存されています。

    次に、先ほど言語ごとに振り分けられたドキュメントの日本語を対象に、UnicodeReformatter (內部で ftfy を実行) を使用して、ドキュメント內のユニコードを再フォーマットします。

    t0 = time.time()
    
    # Define desired language
    target_language = "JA"
    
    # Output path
    lang_sep_cleaned_data_output_path = os.path.join(language_data_output_path, "cleaned")
    
    # Read the language specific data and fix the unicode in it
    lang_data_path = os.path.join(language_separated_output_path, target_language)
    lang_data = DocumentDataset.read_json(lang_data_path,add_filename=True)
    
    cleaner = Modify(UnicodeReformatter())
    cleaned_data = cleaner(lang_data)
    
    # Write the cleaned_data
    cleaned_data.to_json(lang_sep_cleaned_data_output_path, write_to_filename=True)
    
    print(f"Time taken for fixing unicode:{time.time()-t0}")

    この処理が完了 (検証環境では 7 分ほど) すると language_sep/data/cleaned/jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2.jsonl というファイルが出力されます。このファイルのドキュメント數は 59,603 です。

    ID の付與

    日本語 Wikipedia データにはすでに id がありますが、<prefix>_<id> フォーマットのように統一した id を付與すると複數のデータセットを扱う際に、どのデータセットのどのドキュメントが削除されたのかがわかりやすくなります。これは重複排除やフィルタリングなどを行う際に役に立ちます。

    この処理を実行する関數が AddID() です。この関數の引數は以下の通りです。

    • id_field: フィールドが入力 json ファイルに追加されます。キーが jsonl にすでに存在する場合は、その値が置き換えられます。
    • id_prefix: ID で使用される接頭辭。デフォルトは「doc_id」です。
    • start_index: ID の開始インデックス。デフォルトは「None」です。「None」に設定すると、高速な計算のために順序のない ID スキームが使用されます。ここでは、參照を容易にするために「0」に設定しています。
    import os
    import time
    
    from nemo_curator import AddId
    from nemo_curator.datasets import DocumentDataset
    
    
    cur_dir = os.getcwd()
    data_dir = f"{cur_dir}/"
    
    # Input
    add_id_input_data_dir = "./language_sep/data/cleaned"
    
    # Output
    added_id_output_path = os.path.join(data_dir, "add_id/cleaned")
    
    # Format of output ID will be <prefix>_<id>, Define prefix here
    add_ID_id_prefix="JA_wiki"
    
    t0 = time.time()
    # Read input files
    dataset = DocumentDataset.read_json(add_id_input_data_dir,add_filename=True)
    
    # Run AddID() on the input dataset
    add_id = AddId(id_field='id',id_prefix=add_ID_id_prefix,start_index=0)
    id_dataset = add_id(dataset)
    
    # Output files
    id_dataset.to_json(added_id_output_path, write_to_filename=True)
    
    print(f"Time taken for add ID:{time.time()-t0}")

    この処理が完了 (検証環境では數十秒ほど) すると、add_id/cleaned/ jawiki-20240801-pages-articles-multistream1.xml-p1p114794.bz2.jsonl というファイルが出力されます。結果を確認すると、id の value が “JA_wiki-0000000000” といった形式に変更されています。

    もし、ここまでで Dask クラスター が起動しているようであれば、次のセクションでは、GPUの Dask クラスター を起動するため、以下のコマンドで閉じてください (何かエラーが出た際は再度、実行してください)。

    client.cluster.close()
    client.shutdown()

    重複排除

    重複排除では、Exact Deduplication, Fuzzy Deduplication, 埋め込みを使用した Semantic Deduplication がサポートされています。ここでは、Exact Deduplication と Fuzzy Deduplication を扱います。Semantic Deduplication については、こちらを參照してください。

    Note: これらの処理を実行するには、前セクションの AddID() などを使用して、コーパス內の id が一意の狀態になっている必要があります。

    Exact Deduplication

    Exact Deduplication では、ドキュメントのテキストは「md5」などの特定のハッシュ アルゴリズムを使用して一意の文字列にハッシュ化されます。厳密に同一ハッシュ値を持つドキュメントは、同一のテキストを持っています。

    ここで使用する関數は ExactDuplicates() です。この関數の引數には、以下のものがあります。

    • id_field: ドキュメント ID を識別するための入力ファイル內のキー
    • text_field: ドキュメントのテキストを含む入力ファイル內のキー
    • hash_method: 使用されるハッシュ アルゴリズム。デフォルトは md5
    • cache_dir: 指定された場合、重複したドキュメント ID は cache_dir に出力されます。指定されていない場合は、ID は保存されません

    また、GPU の dask クラスターを使用して、重複排除の計算を高速化します。

    %env DASK_DATAFRAME__QUERY_PLANNING=False
    
    import os
    import time
    import pandas as pd
    
    from nemo_curator.datasets import DocumentDataset
    from nemo_curator.modules import ExactDuplicates
    from nemo_curator.utils.distributed_utils import get_client, get_num_workers
    
    def pre_imports():
        import cudf 
    
    
    cur_dir = os.getcwd()
    data_dir = f"{cur_dir}/"
    
    
    client = get_client(cluster_type='gpu', set_torch_to_use_rmm=False)
    print(f"Number of dask worker:{get_num_workers(client)}")
    client.run(pre_imports)
    
    # Input
    exact_dedup_input_dataset_dir = "./add_id/cleaned"
    
    # Output
    exact_dedup_base_output_path = os.path.join(data_dir, "exact_dedup")
    exact_dedup_log_dir = os.path.join(exact_dedup_base_output_path, 'log')
    exact_dedup_output_dir = os.path.join(exact_dedup_base_output_path, 'data')
    
    # Parameters for ExactDuplicates()
    exact_dedup_dataset_id_field = "id"
    exact_dedup_dataset_text_field = "text"
    
    !mkdir -p {exact_dedup_log_dir}
    !mkdir -p {exact_dedup_output_dir}
    
    t0 = time.time()
    # Read input dataset
    input_dataset = DocumentDataset.read_json(exact_dedup_input_dataset_dir, backend='cudf')
    
    # Run exact deduplication to the input
    exact_dup = ExactDuplicates(
        logger=exact_dedup_log_dir,
        id_field=exact_dedup_dataset_id_field,
        text_field=exact_dedup_dataset_text_field,
        hash_method="md5",
        cache_dir=exact_dedup_output_dir  # Duplicated document ID list is output to the cache_dir
    )
    duplicates = exact_dup(dataset=input_dataset)
    
    print(f"Number of exact duplicated file:{len(duplicates)}")
    print(f"Time taken for exact duplicate:{time.time()-t0}")
    
    exact_dedup_res = pd.read_parquet(os.path.join(exact_dedup_output_dir, "_exact_duplicates.parquet"))
    print(f"Number of exact duplicated document:{len(exact_dedup_res)}")
    exact_dedup_res.head()
    
    exact_dedup_res.groupby('_hashes')['id'].agg(lambda x: ' '.join(x)).reset_index().head()

    この処理が完了すると、exact_dedup/data/ に重複のあったドキュメントの id が保存されます。重複したドキュメントは 2 件ありました。検証環境では CPU を使用した場合に 11.2 秒ほど処理に時間がかかりましたが、GPU を使用した場合、3.8 秒と約 3 倍処理が高速化しました。

    Fuzzy Deduplication

    Fuzzy Deduplication は、Exact Deduplication とは異なり、厳密な重複を見つけ出すのではなく、GPU 実裝された MinhashLSH アルゴリズムによって、テキストの統計に基づき類似テキストを抽出します (意味的な類似性とは異なります)。この重複を抽出するには、複數の中間ステップがありますが、詳細はこちらを參照してください。

    Fuzzy Deduplication は複數の手順を個々にステップバイステップで実行することも可能ですが、FuzzyDuplicates() を使用すると簡単に実行できます。FuzzyDuplicatesConfig() 內の引數として、n-gram の長さ、バケットやバケット內の hash の數、重複と判斷する Jaccard 類似度の閾値などを設定し、FuzzyDuplicates() に渡します。

    Note: Fuzzy Deduplication は、AddID() を使用した id 形式、もしくは整數 id の形式でのみ機能します。

    %env DASK_DATAFRAME__QUERY_PLANNING=False
    
    import os
    import time
    import pandas as pd
    
    from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
    from nemo_curator.datasets import DocumentDataset
    from nemo_curator.utils.distributed_utils import get_client, get_num_workers
    
    import dask
    
    def pre_imports():
        import cudf 
    
        
    cur_dir = os.getcwd()
    data_dir = f"{cur_dir}/"
    
    client = get_client(cluster_type='gpu', set_torch_to_use_rmm=False)
    print(f"Number of dask worker:{get_num_workers(client)}")
    client.run(pre_imports)
    
    # Input
    fuzzy_dedup_data_path = "./add_id/cleaned"
    
    # Output
    fuzzy_dedup_base_output_path = os.path.join(data_dir, "fuzzy_wrapper")
    fuzzy_dedup_log_dir = os.path.join(fuzzy_dedup_base_output_path, 'log')
    fuzzy_dedup_cache_dir = os.path.join(fuzzy_dedup_base_output_path, 'cache')
    fuzzy_dedup_output_dir = os.path.join(fuzzy_dedup_base_output_path, 'data')
    
    # Relevant parameters
    id_field = 'id'
    text_field = 'text'
    filetype = "parquet"
    
    !mkdir -p {fuzzy_dedup_base_output_path}
    !mkdir -p {fuzzy_dedup_log_dir}
    !mkdir -p {fuzzy_dedup_cache_dir}
    !mkdir -p {fuzzy_dedup_output_dir}
    
    #!rm -r {fuzzy_dedup_cache_dir}
    
    with dask.config.set({"dataframe.backend": 'cudf'}):
            
        t0 = time.time()
            
        input_dataset = DocumentDataset.read_json(fuzzy_dedup_data_path, backend='cudf')
        fuzzy_dedup_config = FuzzyDuplicatesConfig(
            cache_dir=fuzzy_dedup_cache_dir,
            id_field=id_field,
            text_field=text_field,
            seed=42,  # Use the seed set in Minhash section for consistency
            char_ngrams=5,
            num_buckets=20,
            hashes_per_bucket=13,
            use_64_bit_hash=False,
            buckets_per_shuffle=5,
            false_positive_check=True,
            num_anchors=2,
            jaccard_threshold=0.8,
        )
        fuzzy_dup = FuzzyDuplicates(logger=fuzzy_dedup_log_dir, config=fuzzy_dedup_config)
        duplicates = fuzzy_dup(dataset=input_dataset)
            
        duplicates.to_parquet(fuzzy_dedup_output_dir, write_to_filename=False)
           
        print(f"Time taken for Connected Component: {time.time()-t0} s")    
            
    fuzzy_dedup_res = pd.read_parquet(fuzzy_dedup_output_dir)
    fuzzy_dedup_res.head()
    

    この処理が完了 (検証環境では數十秒ほど) すると、fuzzy_wrapper/data/ にパラメーターで指定した重複條件に合致したドキュメントの id が保存されます。

    Exact Deduplication と Fuzzy Deduplication によって、重複ドキュメントの id が出力されました。ここで重複しているドキュメントをデータセットから削除します。

    import os
    import pandas as pd
    
    from nemo_curator.datasets import DocumentDataset
    
    
    cur_dir = os.getcwd()
    data_dir = f"{cur_dir}/"
    
    # Input
    dataset_dir = "./add_id/cleaned"
    exact_dedup_output_dir="./exact_dedup/data"
    fuzzy_dedup_output_dir="./fuzzy_wrapper/data"
    
    # Output
    dudped_output_dir = os.path.join(data_dir, "remove_duplicate/result.parquet")
    
    # Relevant parameters
    input_id_field = 'id'
    id_prefix = "JA_wiki"
    
    !mkdir -p {dudped_output_dir}
    
    #Load .jsonl dataset (GPUメモリが足りない場合はbackend='pandas'へ変更してください)
    input_dataset = DocumentDataset.read_json(dataset_dir, backend='cudf')
    
    # Load exact deduplicate result and extract list of duplicated document ID (GPUメモリが足りない場合はbackend='pandas'へ変更してください)
    exact_duplicates = DocumentDataset.read_parquet(os.path.join(exact_dedup_output_dir, "_exact_duplicates.parquet"), backend='cudf')
    exact_docs_to_remove = exact_duplicates.df.map_partitions(
        lambda x: x[x._hashes.duplicated(keep="first")]
    )
    
    # Remove the duplicated document from input dataset
    result = input_dataset.df[
        ~input_dataset.df[input_id_field].isin(exact_docs_to_remove[input_id_field].compute())
    ]
    
    # Loads result from fuzzy dedup wrapper
    fuzzy_duplicates = pd.read_parquet(fuzzy_dedup_output_dir)
    
    # Generate list of near duplicate document ID
    fuzzy_docs_to_remove = fuzzy_duplicates.drop_duplicates(subset=['group'], keep='first')
    
    # Remove near duplicates
    result = result[~result[input_id_field].isin(fuzzy_docs_to_remove[input_id_field])]
    
    # Save final result to local (backend='pandas'の場合は、write_to_filename=Trueをコメントアウトしてください)
    result.to_parquet(dudped_output_dir, write_to_filename=True)
    
    res = pd.read_parquet(dudped_output_dir)
    print(f"Length of duplicate removed dataset:{len(res)}")

    この処理が完了 (検証環境では數秒) すると、remove_duplicate/result.parquet/ に重複が排除されたデータセットが保存されます。処理前のドキュメント數は 59,603 でしたが、Exact Deduplication と Fuzzy Deduplication で重複を排除した結果、59,508 のドキュメントが保存されました。

    合成データの生成

    LLM を活用して、事前學習やカスタマイズに使用するデータセットを生成することもできます。合成データは、LLM を低リソース言語/ドメインに適応させたり、他のモデルから知識を抽出したりする際などに有用です。ここでは、build.nvidia.com の API エンドポイント にアクセスして合成データを生成するパイプラインを実行してみましょう (このセクションを始めるには build.nvidia.com のアカウントを作成し、APIキーを取得する必要があります)。

    このセクションはこれまでのチュートリアルとは依存関係なく実行できます。

    まず、以下を実行してAPI エンドポイント へアクセスするための準備と相互の処理に必要なパラメーター、トピック、テンプレートを用意します。

    import asyncio
    import nest_asyncio
    from openai import AsyncOpenAI
    
    client = AsyncOpenAI(
        base_url="https://integrate.api.nvidia.com/v1",
        api_key="nvapi-"  # 取得したAPIキーを入力してください
    )
    
    n_subtopics = 2
    n_questions = 2
    topic = "機械學習"
    
    TOPIC_GENERATION_PROMPT_TEMPLATE = """\
    トピックが與えられた場合、そのトピックに関連する {n_subtopics} のサブトピックのリストを生成してください。
    トピックは:{topic}
    リストは番號なしで、サブトピックの説明なしでなければなりません。サブトピックはコンマで區切られる必要があります。リスト以外のテキストは存在してはなりません。
    """
    
    QUESTION_PROMPT_TEMPLATE = """\
    トピックが與えられた場合、そのトピックに関して{n_questions}個の質問を生成してください。
    トピックは:{sub_topic}
    リスト形式で、質問は改行文字で區切られる必要があります。リスト以外のテキストは存在してはなりません。
    """
    
    RESPONSE_PROMPT_TEMPLATE = """\
    質問が與えられた場合、その質問に対して考えられる2つの回答を生成してください。
    質問は:{question}
    リスト形式は以下の形式である必要があります:
    
    RESPONSE A: ここに回答Aのテキストを入力
    RESPONSE B: ここに回答Bのテキストを入力
    """
    

    次に、meta/llama-3.1-405b-instruct を呼び出して先ほど指定した「機械學習」というトピックから、より小さなサブトピックを生成します。

    # generate sub topics
    async def generate_subtopics(client, topic, n_subtopics):
        prompt = TOPIC_GENERATION_PROMPT_TEMPLATE.format(topic=topic, n_subtopics=n_subtopics)
        response = await client.chat.completions.create(
            model="meta/llama-3.1-405b-instruct",
            messages=[
                {"role" : "user",
                 "content" : prompt}
            ],
            temperature=0.2,
            top_p=0.7,
            max_tokens=1024,
        )
        return response
    subtopics = await generate_subtopics(client, topic, n_subtopics)
    subtopic_list = subtopics.choices[0].message.content.split(",")
    print(subtopic_list)

    結果として、以下のようなサブトピックが生成できました (亂數の影響で異なる結果が得られることもあります)。

    [‘ディープラーニング、強化學習’]

    次にサブトピックから質問文を生成してみましょう。

    # generate questions of sub topics
    async def generate_questions(client, sub_topic, n_questions):
        prompt = QUESTION_PROMPT_TEMPLATE.format(sub_topic=sub_topic, n_questions=n_questions)
        response = await client.chat.completions.create(
            model="meta/llama-3.1-405b-instruct",
            messages=[
                {"role" : "user",
                 "content" : prompt}
            ],
            temperature=0.2,
            top_p=0.7,
            max_tokens=1024,
        )
        if hasattr(response, 'choices') and response.choices:
            return response.choices[0].message.content
        else:
            print(f"Unexpected response structure: {response}")
            return None
    
    async def question_generator(client, subtopic_list, n_question):
        tasks = [generate_questions(client, subtopic, n_question) for subtopic in subtopic_list]
        question_list = await asyncio.gather(*tasks)
        return question_list
    
    nest_asyncio.apply()
    question_list = asyncio.run(question_generator(client, subtopic_list, n_questions))
    print(question_list)
    
    # format questions
    question_list_formatted = []
    for question_set in question_list:
        question_list_formatted += question_set.split("\n\n")

    実行が完了すると以下のような質問文が生成されます。

    [‘ディープラーニングと強化學習の違いは何か?\n\nディープラーニングと強化學習はどのように組み合わせて使用できるのか?’]

    次に先ほど生成した質問文から応答文を生成してみましょう。

    # generate response of each question
    async def generate_responses(client, question):
        prompt = RESPONSE_PROMPT_TEMPLATE.format(question=question)
        response = await client.chat.completions.create(
            model="meta/llama-3.1-405b-instruct",
            messages=[
                {"role" : "user",
                 "content" : prompt}
            ],
            temperature=0.2,
            top_p=0.7,
            max_tokens=1024,
        )
        if hasattr(response, 'choices') and response.choices:
            return response.choices[0].message.content
        else:
            print(f"Unexpected response structure: {response}")
            return None
    
    async def response_generator(client, question_list):
        tasks = [generate_responses(client, question) for question in question_list]
        response_list = await asyncio.gather(*tasks)
        return response_list
    
    question_response_list = asyncio.run(response_generator(client, question_list_formatted))
    print(question_response_list)

    生成された応答文の例は以下のようなものになります。各質問に対し、応答候補が 2 つずつ生成されます。これは DPO (Direct Preference Optimization) の學習にも利用できるフォーマットです。

    [‘RESPONSE A: ディープラーニングは、主に大規模なデータセットを使用してパターンを學習し、畫像や音聲の認識などのタスクに適用される一方、強化學習は、エージェントが環境とやり取りすることで、報酬を最大化する行動を學習する手法です。\n\nRESPONSE B: ディープラーニングは、ニューラルネットワークを使用してデータから特徴を抽出することに重點を置いており、強化學習は、エージェントが環境からのフィードバックに基づいて決定を下す方法を學習することに重點を置いています。’,

     ‘RESPONSE A: ディープラーニングと強化學習は、ディープラーニングを使用して強化學習のエージェントの狀態と行動を表現することで組み合わせることができます。ディープラーニングのネットワークは、強化學習のエージェントが環境を理解し、最適な行動を選択するのに役立ちます。\n\nRESPONSE B: 強化學習のエージェントは、ディープラーニングのネットワークを使用して、環境から得られた経験を學習し、最適な行動を選択することができます。ディープラーニングのネットワークは、強化學習のエージェントが環境を理解し、最適な行動を選択するのに役立ちます。また、強化學習のエージェントは、ディープラーニングのネットワークを使用して、環境から得られた経験を學習し、最適な行動を選択することができます。’]

    以下は先ほど生成したデータを jsonl 形式で出力するためのスクリプトです。

    # prepare question:response pair set list
    question_response_pair_list = []
    for question, response_set in zip(question_list_formatted, question_response_list):
        question_response_pair_list.append(
            {
                "question" : question, 
                "responses" : {
                    "response_a" : {"response" : response_set.split("RESPONSE B:")[0].replace("RESPONSE A:", "").strip().split("\n\n")[-1].strip()},
                    "response_b" : {"response" : response_set.split("RESPONSE B:")[-1].split("\n\n")[0].strip()}
                },
            }
        )
    
    import json
    
    # export to jsonl file
    with open('synthetic_data.jsonl', 'w') as f:
        for item in question_response_pair_list:
            f.write(json.dumps(item, ensure_ascii=False))
            f.write('\n')

    (オプション): 合成データの検証

    ここでは先ほど生成したデータを報酬モデル (Nemotron-4-340B-Reward) を使用して、品質ベースのフィルタリングを行う手順を紹介します。

    Nemotron-4-340B-Reward」は、研究者や開発者が獨自の LLM を構築するためのトレーニング データを生成する合成データ生成パイプラインの一部として利用できる多次元報酬モデルです。Nemotron-4-340B-Reward は、Nemotron-4-340B-Baseモデルと応答の最終トークンの表現を5つの HelpSteer2 屬性に対応するスカラー値に変換する線形レイヤーで構成されています。

    このモデルは、最大 4,096 トークンのコンテキストをサポートしており、以下の屬性を (通常 0 から 4 の範囲で) 評価します。

    • 有用性 (Helpfulness): プロンプトに対する応答の全體的な有用性。
    • 正確性 (Correctness): 関連する事実がエラーなく含まれているかどうか。
    • 一貫性 (Coherence): 表現の一貫性と明瞭さ。
    • 複雑性 (Complexity): 応答を書くために必要な知的深さ (基本的な言語能力を持つ誰でも書けるものか、深い専門知識が必要かどうか)。
    • 冗長性 (Verbosity): プロンプトで要求される詳細の量に対して、応答に含まれる詳細の量。

    以下で、先ほど生成したデータの応答文に対して、報酬モデルでスコアリングを実行してみましょう。

    # running reward scoring model to evaluate the responses 
    def get_scores_from_response(openai_response_template):
        logprobs = openai_response_template.choices[0].logprobs.content
        score_dict = {}
        for score in logprobs:
            score_dict[score.token] = score.logprob
        return score_dict
    
    async def get_response_and_scores(client, question, response_content):
        messages = [
            {"role": "user","content": question},
            {"role": "assistant","content": response_content},]
        response = await client.chat.completions.create(
            model="nvidia/nemotron-4-340b-reward",
            messages=messages,
        )
        scores = get_scores_from_response(response)
        return scores
    
    # scoring for question:response pair set
    async def process_question_response_pairs(client,question_response_score_list):
        tasks = []
        for question_response_pair in question_response_score_list:
            question = question_response_pair["question"]
            
            task_a = get_response_and_scores(client, question, question_response_pair["responses"]["response_a"]["response"])
            task_b = get_response_and_scores(client, question, question_response_pair["responses"]["response_b"]["response"])
            
            tasks.append((task_a, question_response_pair, "response_a"))
            tasks.append((task_b, question_response_pair, "response_b"))
        results = await asyncio.gather(*[task[0] for task in tasks])
        
        for i, (result, task_info) in enumerate(zip(results, tasks)):
            _, question_response_pair, response_key = task_info
            question_response_pair["responses"][response_key].update(result)
    question_response_score_list = question_response_pair_list.copy()
    await process_question_response_pairs(client, question_response_score_list)
    print(question_response_score_list)

    出力は以下のようになります。

    [{'question': 'ディープラーニングと強化學習の違いは何か?',
      'responses': {'response_a': {'response': 'ディープラーニングは、主に大規模なデータセットを使用してパターンを學習し、畫像や音聲の認識などのタスクに適用される一方、強化學習は、エージェントが環境とやり取りすることで、報酬を最大化する行動を學習する手法です。',
        'helpfulness': 2.796875,
        'correctness': 2.875,
        'coherence': 3.59375,
        'complexity': 1.53125,
        'verbosity': 1.3359375,
        'question': 'ディープラーニングと強化學習の違いは何か?'},
       'response_b': {'response': 'ディープラーニングは、ニューラルネットワークを使用してデータから特徴を抽出することに重點を置いており、強化學習は、エージェントが環境からのフィードバックに基づいて決定を下す方法を學習することに重點を置いています。',
        'helpfulness': 2.671875,
        'correctness': 2.796875,
        'coherence': 3.65625,
        'complexity': 1.46875,
        'verbosity': 1.3515625,
        'question': 'ディープラーニングと強化學習の違いは何か?'}}},
     {'question': 'ディープラーニングと強化學習はどのように組み合わせて使用できるのか?',
      'responses': {'response_a': {'response': 'ディープラーニングと強化學習は、ディープラーニングを使用して強化學習のエージェントの狀態と行動を表現することで組み合わせることができます。ディープラーニングのネットワークは、強化學習のエージェントが環境を理解し、最適な行動を選択するのに役立ちます。',
        'helpfulness': 2.609375,
        'correctness': 2.84375,
        'coherence': 3.65625,
        'complexity': 1.6953125,
        'verbosity': 1.3828125,
        'question': 'ディープラーニングと強化學習はどのように組み合わせて使用できるのか?'},
       'response_b': {'response': '強化學習のエージェントは、ディープラーニングのネットワークを使用して、環境から得られた経験を學習し、最適な行動を選択することができます。ディープラーニングのネットワークは、強化學習のエージェントが環境を理解し、最適な行動を選択するのに役立ちます。また、強化學習のエージェントは、ディープラーニングのネットワークを使用して、環境から得られた経験を學習し、最適な行動を選択することができます。',
        'helpfulness': 2.671875,
        'correctness': 2.703125,
        'coherence': 3.390625,
        'complexity': 1.7109375,
        'verbosity': 1.6640625,
        'question': 'ディープラーニングと強化學習はどのように組み合わせて使用できるのか?'}}}]

    「有用性」のスコアを元にサンプルをフィルタリングし、データセットの品質を高めることや、各サンプルの「有用性」が高い方の応答を「選ばれた応答」とすることで、DPO のデータセットとしても活用できます。

    本記事で紹介できなかった機能

    • ルールベースもしくは分類器ベースの品質フィルタリング
      大規模なデータセット內には、多くの場合、低品質とみなされるドキュメントが多數含まれています。品質を定義する指標はさまざまありますが、ドキュメントに含まれる句読點の數、長さ、繰り返しの頻度などのシンプルな統計情報で測定してフィルタリングするアプローチや高品質なデータが存在する場合はそれを使用して単純な分類器を學習し、フィルタリングするアプローチなどがあります。NeMo Curator ではこれらのアプローチに対応したモジュールを提供しています。詳細はこちらを參照してください。
    • PyTorch モデルを使ったデータ分類
      RAPIDS AI の CrossFit を活用して、マルチノード、マルチ GPU へスケーリング可能な PyTorch のモデルを使用したドキュメントの分類 (例: ドメインや品質) を実行可能です。NVIDIA では、ドキュメントのドメインを分類する nvidia/domain-classifier、品質を分類する nvidia/quality-classifier-deberta をオープン ソースでリリースしています。実行の手順はこちらを參照してください。
    • 個人識別情報 (PII) の削除
      データセットには意図せずに個人情報 (氏名、電話番號、メール アドレス、住所など) が含まれてしまっている可能性があります。PII の削除、匿名化によって、データセットから機密データを除去することが望ましいです。実行の手順はこちらを參照してください。
    • 指示プロンプトの追加
      指示チューニング用にデータセット內の各レコードへ指示プロンプトを追加することも可能です。実行の手順はこちらを參照してください。
    • ダウンストリーム タスクの除染、重複排除
      大規模なデータセット內には、ダウンストリーム タスクで LLM を評価するために使用するテスト データが紛れてしまっている可能性があります。NeMo Curator は OpenAI GPT-3Microsoft Turing NLG 530B のアプローチに従い 、ダウンストリーム タスクに存在するデータを大規模データセットから削除する機能を提供しています。詳細はこちらを參照してください。

    テキストの品質に関する言及が複數ありましたが、NeMo Curator はテキストの品質を判斷する複數の方法を提供しています。ドキュメントには、ベスト プラクティスとして紹介した各手法の長所、短所がまとめられています。

    まとめ

    本記事では、NeMo Curator を使用したデータセットの構築方法を紹介しました。NeMo Curator を使用して、開発者の方のデータセット構築がより効率的に進むと嬉しいです。


    関連情報

    +5

    Tags

    人人超碰97caoporen国产