Java Thread

 

1. Javaでサブスレッドを生成するには、

 

1.     implements Runnableもしくはextends Threadにてサブスレッド用のクラスを用意する。(もしくはこの後「2.Executorインタフェースを使うには」で説明する方法を用いる)

2.     サブスレッド用クラスにて、public void run()メソッドを実装する。このrun()メソッドこそがスレッド処理の実体になる。

3.     メインルーチンもしくはサブルーチン内にて、Threadクラスを以下の方法で作成し、start()メソッドを実行。

i)                 Foo implements RunnableにてRunnable実装クラスFooを用意した場合

Foo f1 = new Foo();

Thread t1 = new Thread(f1);     // Runnable実装クラスをThreadクラスの

// コンストラクタに渡す

f1.start();

ii)              Bar extends ThreadにてThread継承クラスBarを用意した場合

Bar b1 = new Bar();

b1.start();

4.     サブスレッド用クラスのrun()メソッドが呼び出される。

 

Javaで同期を取るには、

 

synchronizedブロックもしくはメソッドで排他制御を行う。ブロックもしくはメソッドがsynchronizedとして宣言された場合、Javaはスレッドがその部分を実行している最中にオブジェクトにロックをかける。このオブジェクトとは、synchronizedブロックではブロックの開始時点で指定されたオブジェクト、synchronizedメソッドではそのメソッドをもつクラスのインスタンスである。

synchronizedブロックの場合:

synchronized (object) {                 // objectオブジェクトにロックをかける。

        <<処理>>

}

 

synchronizedメソッドの場合:

private synchronized void foo() {               // thisオブジェクトにロックをかける。

        <<処理>>

}

オブジェクトにロックをかけるとは、

「オブジェクトを操作する排他的実行可能権を取得する」

といっても良いかも知れない。

注)排他的実行はsynchronizedブロック/メソッドに対してのみ有効である。

 

Javaでスレッド同士を協調させるには、

 

生産者-消費者問題(有限バッファ問題)

とよばれる問題を解決する。これは生産者と消費者の間にバッファ(緩衝材、生産者の生産物を消費者が消費し始めるまでに置いておくところ)と呼ばれるものを設置し、生産物の有無をみて、生産者もしくは消費者が待機状態に入るか、もしくは相手に状態の遷移を通知するかして解決する方法である。以下の4通りの状態および状態遷移が考えられる。


●生産者は、バッファがいっぱいなら待つ
●消費者は、バッファがいっぱいでなくなったら生産者に通知する
●消費者は、バッファが空なら待つ
●生産者は、バッファが空でなくなったら消費者に通知する

 

待機はwait()、通知はnotify()もしくはnotifyAll()でおこなう。これらのメソッドはObjectクラスで実装されており、あらゆるオブジェクトでの実行が可能であるが、実行するにはそのオブジェクトをロックしている必要がある。いいかえれば、synchronizedメソッドもしくはブロックの中でのみ、これらのメソッドを呼び出すことが出来る。

 

あるスレッドt1があるオブジェクトoに対して、wait()メソッドを呼び出す場合を考えよう。まずスレッドt1synchronized(o)でオブジェクoのロックを獲得する。その後t1は、o.wait()を実行して待機状態に入る(正確には、オブジェクトoの待機集合に入る)。この際、t1oのロックを開放する。さもないと、その後いかなるスレッドもoのロックを獲得できなくなり、notify()はおろか何も出来なくなるからである。

 

Threadクラスのメソッド(抜粋)

 

currentThread() 現在実行中のスレッドの参照を返す

activeCount()  現行スレッドグループ内のアクティブなスレッドの数を返す

enumerate()    現行スレッドグループ内のアクティブなスレッドを指定された配列にコピーする

interrupt()     このスレッドに割り込む

isAlive()       このスレッドが生存しているかどうか判定する

 

2. Executorインタフェースを使うには

JDK5.0からjava.util.concurrentパッケージが追加された。マルチスレッド処理に対し有用なクラスが含まれている。

 

2.1 Executorインタフェース

その中でも最も基本的なインタフェースがExecutorインタフェースである。スレッドの再利用やスケジューリングに活躍する。

ExecutorRunnableを引数に持つexecute()メソッドのみを持つインタフェースである。

 

例えば以下のように使う。

Executor ex = Executors.newSingleThreadExecutor();

Runnable r1 = ....

Runnable r2 = ....

ex.execute(r1);

ex.execute(r2);

 

java.util.concurrent.ExecutorsExecutorインタフェースなどの実装インスタンスを返すメソッドを提供するユーティリティクラスである。

Executorexecute()メソッドでRunnableを実装したタスク(run()メソッドのこと)を送信することにより、タスクを実行することが出来る。

 

Executorsに定義されている実装インスタンスを返すメソッドは、newSingleThreadExecutor()を含め、以下のものがある。

    * newSingleThreadExecutor()

         一つのスレッドでタスクの処理を行う

    * newFixedThreadPool()

         指定した数のスレッドを作成し、タスクの処理を行う

    * newCachedThreadPool()

         必要に応じて自動的にスレッドを作成し、タスクの処理を行う

 

2.2 ExecutorServiceインタフェース

ExecutorServiceインタフェースはExecutorインタフェースを拡張し、状態追跡やタスク処理の中断など便利なメソッドを提供するためのインタフェースである。(Executorsexecute()メソッドのみの提供)主なメソッドは以下。

execute()       Executorより継承。タスクを送信する。

submit()        タスクの計算結果や状態を取得するための、Futureオブジェクトを返すメソッド。

shutdown()      シャットダウンを実行します。以前に送信したタスクは実行しますが、新規タスクは受け入れません。

shutdownNow()   現在実行中のタスクの中断を試み、待機中のタスクの処理を停止します。

 

2.3       スレッドプール

スレッドプールとはタスクをキューに追加し、プールしておいたスレッドにより、順次タスクを処理していくマルチスレッドの形式の一つ。必要以上にスレッドを作成しないために、リソースの無駄な消費を抑えるメリットがある。 Executorsクラスの中で、スレッドプールを可能にしているクラスを返すメソッドについて解説する。

 

2.4       スケジューリング

JDK5.0より指定した時間後に処理を実行したり、指定した周期で処理を実行するスケジューリングが可能になった。

スケジューリングを可能にするクラスは、java.util.concurrent.ScheduledExecutorServiceインタフェースを実装している。

 

2.5       Callableインタフェース

JDK5.0より、新たにjava.util.concurrent.Callableインタフェースが追加された。 Runnableとは異なり、サブスレッドからメインスレッドに対して値を返したり、例外を投げたりすることが出来る。Runnableインタフェース同様、実装すべきメソッドはcall()のみである。ただし、シグネチャがrun()とはやや異なる。

        public V call() throws Exception

このcall()メソッド内に適宜戻り値(型はV)、および例外(Exceptionクラス)投入のコードを書くことにより、サブスレッドからメインスレッドに値を返したり、例外を投入することが出来る。メインスレッドではそれらの例外はFutureクラスのget()メソッドにて受け取るようにする。逆に言えば、メインスレッドでこのget()メソッドを呼ばない限り、サブスレッドで投入された例外は捕捉されることはない。

 

2.6       Futureインタフェース

Futureオブジェクトは、サブスレッド内でのタスクの状態や計算の結果をメインスレッドからサブスレッドに問い合わせたり、サブスレッドからメインスレッドに渡したりする際の橋渡し役を担う。先程説明したget()メソッド以外にも、タスクのキャンセルを試みるcancel()メソッド、タスクの状態を検査するisCancelled()メソッド、isDone()メソッドがある。

 

例)

import java.util.*;

import java.util.concurrent.*;

 

public class MyCallableTask implements Callable<Date> {

    public Date call() throws Exception {

        int i = 0;

        while (i < 3) {

               System.out.println("i = " + i);

               Thread.sleep(1000);

               i++;

        }

        if (true) {

               String errmsg = new String("Intensive Exception throw.");

               System.out.println(errmsg);

               throw new Exception(errmsg);

        }

        return new Date();

    }

   

    public static void main(String[] args){

        ExecutorService ex = Executors.newSingleThreadExecutor();

        Future<Date> future = ex.submit(new MyCallableTask());

        try {

               Thread.sleep(1000);

               //future.cancel(false);

               System.out.println("Is cancelled ? " + (future.isCancelled() ? "Yes" : "No"));

               System.out.println("Is done ? " + (future.isDone() ? "Yes" : "No"));

              Date date = future.get();

              System.out.println(date);

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

        }

        ex.shutdown();

    }

}

 

2.7       スレッドプール実装例

package com.aksys.pool;

 

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Set;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

 

/**

 * 各種スレッドプール作成、呼び出しサンプル

 *

 */

public class MyPoolMain {

      

       public static void main(String[] args) {

              System.out.println("***** process1 *****");

              process1();

              System.out.println("***** process2 *****");

              process2();

              System.out.println("***** process3 *****");

              process3();

              System.out.println("***** process4 *****");

              process4();

       }

      

       /**

        * 通常のスレッド呼び出し

        * Runnable実装クラスとThreadサブクラスの通常の使い方。

        * また、Threadサブクラスのコンストラクタで、superクラスのコンストラクタにRunnable実装クラスを渡したとしても、

        * Threadサブクラスのrun()メソッドが呼び出されることを確認。

        */

       private static void process1() {

              MyRunnable rb = new MyRunnable(1);

              MySubThread st = new MySubThread(rb);

              st.start();

             

              Thread t1 = new Thread(rb);

              t1.start();

             

              try {

                     st.join();

                     t1.join();

              } catch (InterruptedException e) {

                     e.printStackTrace();

              }

       }

      

       /**

        * Executor.executeを用いた、Runnableクラスの非同期呼び出し

        * ExecutorService.shutdownを呼び出し、全タスクの終了を促し、

        * ExecutorService.awaitTerminationにより、全タスクが完了したかどうかを確認。

        */

       private static void process2() {

              ExecutorService es = Executors.newFixedThreadPool(5);

 

              for (int i = 1; i <= 5; i++) {

                     MyRunnable mr = new MyRunnable(100 + i);

                     es.execute(mr);

              }

             

              es.shutdown();

              try {

                     if (es.awaitTermination(1, TimeUnit.MILLISECONDS)) {

                            System.out.println("await succeeded.");

                     } else {

                            System.out.println("await failed.");

                     }

              } catch (InterruptedException e) {

                     e.printStackTrace();

              }

       }

      

       /**

        * Executor.submitを用いた、Callableクラスの非同期呼び出し

       * ExecutorService.shutdownExecutorService.awaitTerminationの使い方は先ほどの例と同じだが、

        * ExecutorService.submitCallableタスク予約時に将来結果Future<MyResult>により、

        * 結果入手用オブジェクトを 入手しておき、全タスクを非同期にて実行し終了した後、

        * それら結果に同期的にアクセスしている。

        */

       private static void process3() {

              ExecutorService es = Executors.newFixedThreadPool(5);

              Map<Integer, Future<MyResult>> mrm

                     = new HashMap<Integer, Future<MyResult>>();

 

              for (int i = 1; i <= 5; i++) {

                     MyCallable mc = new MyCallable(1000 + i);

                     Future<MyResult> fmr = es.submit(mc);

                     mrm.put(i, fmr);

              }

              es.shutdown();

              try {

                     if (es.awaitTermination(1, TimeUnit.MILLISECONDS)) {

                            System.out.println("await succeeded.");

                            Set<Map.Entry<Integer, Future<MyResult>>> set = mrm.entrySet();

                            for (Map.Entry<Integer, Future<MyResult>> e : set) {

                                   try {

                                          System.out.println(e.getKey() + ": " + e.getValue().get());

                                   } catch (ExecutionException e1) {

                                          e1.printStackTrace();

                                   }

                            }

                     } else {

                            System.out.println("await failed.");

                     }

              } catch (InterruptedException e) {

                     e.printStackTrace();

              }

       }

      

       /**

        * Executor.invokeAllを用いた、Callableクラスの非同期呼び出し

        * 先ほどの例と同様にCallableタスクを予約し、非同期タスク実行後にFutureにてタスクの実行結果に

        * 動機的にアクセスしているが、タスクの起動を全タスクを予め登録しておいたCollection<? extends Callable>

        * によりinvokeAllメソッドに渡し、その結果をList<Future<T>>により受け取れるため、より簡易に複数タスク

        * の実行が可能となっている。

        */

       private static void process4() {

              ExecutorService es = Executors.newFixedThreadPool(5);

              List<MyCallable> mcl = new ArrayList<MyCallable>();

              List<Future<MyResult>> lfmr = null;

             

              for (int i = 1; i <= 5; i++) {

                     MyCallable mc = new MyCallable(1000 + i);

                     mcl.add(mc);

              }

             

              try {

                     lfmr = es.invokeAll(mcl);

              } catch (InterruptedException e) {

                     e.printStackTrace();

              }

             

              es.shutdown();

              try {

                     if (es.awaitTermination(1, TimeUnit.MILLISECONDS)) {

                            System.out.println("await succeeded.");

                            if (lfmr != null) {

                                   for (Future<MyResult> fmr : lfmr) {

                                          try {

                                                 System.out.println(fmr.get());

                                          } catch (ExecutionException e) {

                                                 e.printStackTrace();

                                          }

                                   }

                            } else {

                                   System.err.println("lfmr is not supplied.");

                            }

                     } else {

                            System.out.println("await failed.");

                     }

              } catch (InterruptedException e) {

                     e.printStackTrace();

              }

             

       }

}

 

/**

 * Threadサブクラス

 *

 */

class MySubThread extends Thread {

 

       @Override

       public void run() {

              System.out.println("Hello Sub-thread!");

       }

 

       public MySubThread(Runnable target) {

              super(target);

       }

}

 

/**

 * Runnable実装クラス

 *

 */

class MyRunnable implements Runnable {

      

       private final int i;

       private static SimpleDateFormat sdf

              = new SimpleDateFormat("HH:mm:ss.SSS");

 

       public MyRunnable(int i) {

              this.i = i;

       }

 

       @Override

       public void run() {

              log("Hello Runnable" + i + "!");

       }

 

       private void log(String msg) {

              Date d = new Date();

              System.out.println("[" + sdf.format(d) + "] " + msg);

       }

      

}

 

/**

 * Callable実装クラス

 *

 */

class MyCallable implements Callable<MyResult> {

 

       private final int i;

       private static SimpleDateFormat sdf

              = new SimpleDateFormat("HH:mm:ss.SSS");

      

       public MyCallable(int i) {

              super();

              this.i = i;

       }

 

       @Override

       public MyResult call() throws Exception {

              Date d = new Date();

              System.out.println("[" + sdf.format(d) + "] MyResult.call() for + " + this.i);

              return new MyResult(this.i, d);

       }

      

}

 

/**

 * Futureを介して返される結果クラス

 *

 */

class MyResult {

      

       private final int i;

       private final Date d;

       private static SimpleDateFormat sdf

              = new SimpleDateFormat("HH:mm:ss.SSS");

 

       public MyResult(int i, Date d) {

              this.i = i;

              this.d = d;

       }

 

       @Override

       public String toString() {

              return "[" + sdf.format(d) + "] MyResult i=" + i;

       }

      

}