2012年10月12日 星期五

Java SE 7 - Thread - ForkJoinPool 2013-02-23修正

將工作從一個切割成多個,而每個產生的工作又可以再切割成多個工作並行處理,例如:

A工作分成A1及A2,A1及A2又分成A11、A12及A21、A22,這時A、A1及A2會一直等待,當A11及A12都完成工作,將結果傳給A1後,A1就停止等待,將結果進行處理並傳回給A,而A21及A22也都完成工作,將結果傳給A2後,A2就停止等待,並將結果處理並傳回給A,最後A都收到A1及A2的結果後,將最後結果整合並傳回最終結果。

每個工作會分配Thread來執行,如果不需要工作在執行完成後傳回結果,可以使用RecursiveAction,如果需要在工作重後傳回結果,則可以使用RecursiveTask。

例如:以下是將MapReduce計算某一篇文章出現的字母次數的範例進行重寫:

 /*
 * Powered By Samuel - 林靖傑2013/02/23
 * 轉貼請說明出處!
 *
 * */
public class WordCountFork {
   
 public static void main(String[] args) {
     ForkJoinPool mainPool = new ForkJoinPool();
     //取出文字檔內容
     String Content = "";
     char[] TextData = new char[1024];
     int length = -1;

     try (Reader input = new BufferedReader(new FileReader("WordFile.txt"));
            Writer SW = new StringWriter()){
            while ((length = input.read(TextData)) != -1){
                      SW.write(TextData, 0, length);
            }
            Content = SW.toString();
     } catch (FileNotFoundException e) {
             e.printStackTrace();
     } catch (IOException e) {
             e.printStackTrace();
     }

     //Step 1: 執行
     int[] WordAry = mainPool.invoke(new LineAnaylze(Content));

     //取出結果
     for (int i = 0 ; i < WordAry.length; i ++){
            String Alphabet = Character.toString ((char) (65 + i));
            System.out.println(
                Alphabet + ":" + String.valueOf(WordAry[i]));
     }
  }
}

//Step 2: 建立RecursiveTask來傳回Task結果
//如果不需帶入值, 可以使用RecursiveAction
class LineAnaylze extends RecursiveTask<int[]> {
    private String Content = "";
    public LineAnaylze(String content){
             Content = content;
    }

    @Override
    protected int[] compute() {
         //Step 2: 如果字數大於50, 則各切割成一半成小工作執行
         if (Content.length() > 50){
              int half = Content.length() / 2;
              //先建立Task, 每一個工作都是一個Task, 所以會一直分化執行
              LineAnaylze half1 = new LineAnaylze(Content.substring(0, (half - 1)));
              LineAnaylze half2 = new LineAnaylze(Content.substring(half));
              //讓ForkJoinPool分配Thread執行Task
              half1.fork();
              half2.fork();
              //等待兩個工作完成
              int[] WordPair1 = half1.join();
              int[] WordPair2 = half2.join();

              //將兩個Task結果整合傳回
              for (int i = 0; i < WordPair1.length; i ++){
                     WordPair1[i] = WordPair1[i] + WordPair2[i];
              }
               return WordPair1;
         } else {
              //不足50個字直接執行後傳回
              return countWord(Content);
         }
    }

    //計算字數函式
    private int[] countWord(String line){
          int[] WordAry = new int[26];
          for (int i = 0 ;i < line.length(); i ++){
                 String Alphabet = String.valueOf(line.charAt(i)).toUpperCase().trim();
                 if (!Alphabet.equals("")){
                      int WordCount = 0;
                      int Ascii = (int)Alphabet.charAt(0);
                      if (Ascii >= 65 && Ascii <= 90){
                           WordCount = WordAry[Ascii - 65];
                           WordAry[Ascii - 65] = ++WordCount;
                      }
                 }
          }

          return WordAry;
    }
}

轉貼請註明出處,最好直接使用聯結轉貼!Thanks~
作者: Samuel
日期:2013/02/23

沒有留言:

張貼留言