std::threadとstd::vectorでboost::thread_groupのようにスレッドをまとめて起動

最近C++でのスレッド処理の勉強をしていて,C++11(C++0x)からboostを入れなくてもthreadクラスが使えるようになったようなのでそちらを使ってみる.んで,同じような処理をするスレッドをまとめてcreate/joinできるようにしたいなーと考えて色々調べていたところ,boost::thread_groupなるものがあるという事を知った.

C++, boost::thread : スレッドグループの生成と実行 | Yukun's Blog

しかしthread_groupはboostにしかないクラスのようだ.んーでも似たような処理がしたいなーと更に調べていたところ,次のサイトにある方法を使えばできそうだという事がわかった.

thread

要は,std::threadクラスを格納するvectorコンテナを作り,そこにcreateしたスレッドをpush_backしていく.終了時は,コンテナの中にあるstd::thread一つ一つに対してjoin操作を行なっていく.

以下,利用例
やたら長いが,引数にとったxzファイルの中身をただ表示するだけのプログラム.一応-nオプションでスレッド数を変更できるようになっている.

/*
 * readline_using_threads.cc
 */

#include <iostream>
#include <unordered_map>
#include <ext/stdio_filebuf.h>
#include <thread>
#include <mutex>
#include <list>
#include <vector>
#include <cstring>
#include <unistd.h>

namespace Original
{
  class Counter{
    private:
      std::list<std::string> files;
      std::mutex locker;

    public:
      Counter(std::list<std::string> input_files){
        files = input_files;
      }

      void counter(std::string filename);

      void file_allocator(){
        while(!files.empty()){
          locker.lock();
          std::string temp_filename(files.front());
          files.pop_front();
          locker.unlock();

          counter(temp_filename);
        }
        return;
      }
  };
}

void Original::Counter::counter(std::string target_file){
  // popenでコマンド実行後の出力をfdで受け取る
  std::string command("xzcat " + target_file);
  FILE *fd = popen(command.c_str(), "r");

  // streambufを作成し,istreamのコンストラクタに渡す
  __gnu_cxx::stdio_filebuf<char> *p_fb = new __gnu_cxx::stdio_filebuf<char>(fd, std::ios_base::in);
  std::istream input(static_cast<std::streambuf *>(p_fb));

  // getlineでストリームからコマンド出力を受け取る
  std::string buffer;
  while(getline(input, buffer)){
    std::cout << "id:" << std::this_thread::get_id() << " out > " << " " << buffer << std::endl;
    usleep(100);
  }
  return;
}

int main(int argc, char** argv)
{
  // デフォルトのスレッド数
  int num_of_threads = 4;

  int option;
  while((option = getopt(argc, argv, "n:")) != -1){
    switch(option){
      case 'n':{
        int num = atoi(optarg);
        if(num < 1 || num > 16){
          std::cout << "Number of threads should be 1 to 16!" << std::endl;
          exit(1);
        }
        num_of_threads = num;
        break;
      }

      case ':':
        std::cout << "option needs value!" << std::endl;
        break;

      case '?':
        std::cout << "unknown option!" << std::endl;
        break;
    }
  }
  argc -= optind - 1;
  argv += optind - 1;


  std::list<std::string> input_files;
  for(int i = 1; i < argc; ++i){
    input_files.push_back(argv[i]);
  }

  // スレッド数と処理対象ファイルの記述
  std::cout << "Number of threads: " << num_of_threads << std::endl;
  std::cout << "Processing files" << std::endl;
  for(auto iter = input_files.begin(); iter != input_files.end(); ++iter){
    std::cout << *iter << " ";
  }
  std::cout << "\n" <<std::endl;

  // threadをまとめて起動する ============================================
  Original::Counter count_object(input_files);
  std::vector<std::thread> count_threads;
  // スレッドの起動
  for(int i = 0; i < num_of_threads; ++i){
    count_threads.push_back(std::thread(std::bind(&Original::Counter::file_allocator, &count_object)));
  }
  // スレッドのjoin
  for(auto iter = count_threads.begin(); iter != count_threads.end(); ++iter){
    iter->join();
  }
  // ====================================================================
  return 0;
}


4つのスレッドで実行した結果.
ファイルの中身は,それぞれ「あいうえおあいうえおあいうえお」みたいなのが間に改行をはさんで書いてあるだけ.

$ ./readline_using_threads -n 4 test1.txt.xz test2.txt.xz test3.txt.xz test4.txt.xz
Number of threads: 4
Processing files
test1.txt.xz test2.txt.xz test3.txt.xz test4.txt.xz

id:1146394944 out > か
id:1114925376 out > あ
id:1135905088 out > ア
id:1125415232 out > さ
id:1114925376 out > い
id:1135905088 out > イ
id:1125415232 out > し
id:1146394944 out > き
id:1114925376 out > う
id:1135905088 out > ウ
id:1125415232 out > す
id:1146394944 out > く
id:1114925376 out > え
id:1135905088 out > エ
id:1125415232 out > せ
id:1146394944 out > け
id:1114925376 out > お
id:1135905088 out > オ
id:1125415232 out > そ
id:1146394944 out > こ
id:1114925376 out > あ
id:1135905088 out > ア
id:1125415232 out > さ
id:1146394944 out > か
id:1114925376 out > い
id:1135905088 out > イ
id:1125415232 out > し
id:1146394944 out > き
id:1114925376 out > う
id:1135905088 out > ウ
id:1125415232 out > す
id:1146394944 out > く
id:1114925376 out > え
id:1135905088 out > エ
id:1125415232 out > せ
id:1146394944 out > け
id:1114925376 out > お
id:1135905088 out > オ
id:1125415232 out > そ
id:1146394944 out > こ
id:1114925376 out > あ
id:1135905088 out > ア
id:1125415232 out > さ
id:1146394944 out > か
id:1114925376 out > い
id:1135905088 out > イ
id:1125415232 out > し
id:1146394944 out > き
id:1114925376 out > う
id:1135905088 out > ウ
id:1125415232 out > す
id:1146394944 out > く
id:1114925376 out > え
id:1135905088 out > エ
id:1125415232 out > せ
id:1146394944 out > け
id:1114925376 out > お
id:1135905088 out > オ
id:1125415232 out > そ
id:1146394944 out > こ

これでthread周りが少し簡潔にコーディングできるかな...