OpenAdaptor

 

The Open Adaptor Framework

https://openadaptor.openadaptor.org/pg/the_openadaptor_framework.htm

 

DataObjects: 基礎的なデータの型

 

- Source,Pipe,Sinkの間をDataObjectsの配列が行きかう。

- DataObject XML: 特別なXMLスキーマ

- 通常、SourceSinkDataObject XMLを読み込みもしくは書き込みすることに用いられる。

- DataObjectsは「名前-値」のペアのコレクションと考えることができ、それらは属性(attribute)と呼ばれる。

- たとえばPerson型のDataObject2つの属性、NameAgeをもちそれらの値はそれぞれFred21かもしれない。

- 属性の値はかならずしも原始的な型の値でなく、Javaのオブジェクトや、さらにDataObjectであってもよい。

 

DataObject String Readers & Writers: DataObjectと文字列を相互に変換するために使われる

DOStringReaderもしくはDOStringWriterSourceもしくはSink内で使うことにより、たとえばSourceDataObjectへの変換をReaderに委譲することができる。

これらの作業は通常chop()メソッドもしくはstick()メソッドを実装することにより実現できる。

chop: chop up the record into field

stick: stick the field strings together to form a record

 

Standard Sources, Sinks and Pipes

https://openadaptor.openadaptor.org/pg/standard_sources_sinks_pipes.htm

大概使えそうなものがありそうなのでまずはどのSourceもしくはSinkを使うかはここをチェックしてからにしよう。

また、DataObjectReaderWriterもいろいろあるようだ。(後ほど詳説する)

BenchMark用のコンポーネントも存在するようだ。また、ダミーのSourceDoNothingSource)や強制的に例外を発生させるPipeBenchmarkPipe)などもあるようなので特別なテストを試みたい場合は重宝するかもしれない。

 

Data Formats

https://openadaptor.openadaptor.org/pg/data_formats.htm

実際にデータを送信する際にopenadaptorはデータをDOXMLというメッセージ形式に変換してから送信する。

注意!見た目はXMLそっくりなDOXMLだがこれをopenadaptor以外のアプリケーションからXMLメッセージと思って処理をしてはいけない。そういう場合はDOStringWriterなどのユーティリティクラスを用いて正式なXMLメッセージに変換してから使うようにしてください!

 

Message Format:

V1V2V33種類あるが、デフォルトではV2が用いられる。

 

openadaptor and Unicode characters

https://openadaptor.openadaptor.org/pg/openadaptor_and_unicode.htm

openadaptorUnicode文字を正しく操作したい場合はこちら。

 

Getting Started

https://openadaptor.openadaptor.org/pg/getting_started.htm

(典型的なスタートアップコマンド)

$ java org.openadaptor.adaptor.RunAdaptor test.props A

 

File Components

https://openadaptor.openadaptor.org/pg/file_components.htm

 

Reading Delimited File Formats

デリミタで区切られたファイル → DataObject XML

A.Component1.Name = C1
A.Component2.Name = C2
A.C1.LinkTo1 = C2
A.C1.ClassName = org.openadaptor.adaptor.standard.FileSource                 
# C1の実装クラスはFileSource
A.C1.DOStringReader = org.openadaptor.dostrings.DelimitedStringReader     
# DOStringReader
DelimitedStringReaderを指定
A.C1.InputFileName = in.txt                                                                       
# 入力ファイルはin.txt
A.C1.NumAttributes = 6                                                                                          
# 6
要素を期待
A.C1.FieldDelimiter = 44                                                                                          
# デリミタとしてカンマ(ASCII ‘44’)を指定
A.C2.ClassName = org.openadaptor.adaptor.standard.FileSink                                   
# C2の実装クラスはFileSinkFileSinkのデフォルトの振る舞いはDataObject XMLを標準出力に出力)

 

StringReaderはこの場合、DataObjectの型名をAnyとし、属性の名前を”Att1”, ”Att2”… ”Att6”とし、型はデフォルトのString型。

 

警告!開発者はDataObject XMLを直接読み込むようなアプリケーションを開発してはなりません。これはあくまでopenadaptor内部のフォーマットであって予告なしに変更されることがあるからです。XMLへの読み込みもしくは書き込みが必要な場合は、XMLStreamReaderもしくはXMLStreamWriterを使ってください。

 

Reading “DataObject XML”

DataObject XML → デリミタで区切られたファイル

A.Component1.Name = C1
A.Component2.Name = C2
A.C1.LinkTo1 = C2
A.C1.ClassName = org.openadaptor.adaptor.standard.FileSource                 
# C1の実装クラスはFileSource
A.C1.InputFileName = out.xml                                                                                 
#
入力ファイルはout.xml
A.C2.ClassName = org.openadaptor.adaptor.standard.FileSink                                   
# C2
の実装クラスはFileSink
A.C2.DOStringWriter = org.openadaptor.dostrings.DelimitedStringWriter                      
# DOStringWriter
DelimitedStringWriterを指定
A.C2.RecordDelimiter = \n                                                                                      
# レコードの区切り文字は改行文字(\n)

 

C1DOStringReaderを指定していないので、デフォルトの入力フォーマットはDataObject XMLと仮定。

 

Display Components

https://openadaptor.openadaptor.org/pg/display_components.htm

TODO:

 

FilterPipe

file:///C:/Program%20Files/DealBus/2_9_2/javadocs/index.html

FilterTypeDataObjectsをタイプと属性の値によってフィルタリングすることができる。これにより興味のないDataObjectsをふるい分けすることができる。

 

A.C2.FilterEntireArray       = true                                                                          # 全体をふるいわけの対象とするか?falseの場合は配列の先頭のみを調査の対象とする

A.C2.Filter1                 = pass                                                                     # Filter1の特性としてpassを指定

A.C2.Filter1.Type            = EquityTrade                                                            # DataObjectのタイプ名としてEquityTradeを指定

A.C2.Filter1.AttName1        = Counterparty

A.C2.Filter1.AttValue1       = MEGACORP                                                            # フィルタリング用文字列(否定の場合は!を文字列前につけることができる)

# A.C2.Filter1.AttValueRegExp1もある

A.C2.Filter2                 = block

A.C2.Filter2.AttName1        = TradingDesk

A.C2.Filter2.AttValueRegExp1 = [A-Z ]*London[A-Z ]*

 

AbstractSimplePipe

file:///C:/Program%20Files/DealBus/2_9_2/javadocs/index.html

PipeSourceからDataObjectを受け取りそれをSinkに渡すコンポーネント。

通常カスタムPipeを実装する場合、開発者はこのAbstractSimplePipeを実装する。そのさい、ほかの面倒なことはこの抽象クラスの実装に任せ、transformDataObjectsメソッドの実装に集中させるというのがこのクラスのもっとも大きな意義である。

 

LocalSource

file:///C:/Program%20Files/DealBus/2_9_2/javadocs/index.html

アダプタをVMで動かす際に、データをコードから直接おくるためのSourceである。同様のことがLocalPipeでも実現できる。

以下の2メソッドにて、データの送信が可能。

              Vector process(DataObject[] dobs, boolean)

または

Vector process(String text, boolean all)

 

ちなみに、RunAdaptorを直接ソースコードから起動することでアダプタをVMで動かすことができる。

(実装例)

package com.aks;

 

import java.util.Properties;

import org.openadaptor.util.SuperProperties;

import org.openadaptor.adaptor.RunAdaptor;

import org.openadaptor.adaptor.IbafException;

import org.apache.log4j.Logger;

 

public class StartMain {

      

       private static Logger log = Logger.getLogger(StartMain.class.getName());

 

       public static void main(String[] args) {

             

              if (args.length != 2) {

                     log.error("usage: AdaptorWithLocalSource <props file> <adaptor name>");

                     System.exit(1);

              }

              System.out.println(args[0]);

              System.out.println(args[1]);

             

              SuperProperties props = null;

              try {

                     props = new SuperProperties(System.getProperties ());

                     props.load (args[0]);

              } catch (Exception e) {

                     log.error("failed to load properties from [" + args[0] + "]");

                     System.exit(1);

              }

             

              (new StartMain()).test(args[1], props);

       }

      

       public void test(String name, Properties props) {      

              RunAdaptor adaptor = null;

                    

              try {

                     adaptor = new RunAdaptor(name, props);

              } catch (IbafException ibe) {

                     log.error("failed to create adaptor, " + ibe.toString());

                     System.exit(1);

              }

 

adaptor.getController().setExitAllowed(false);

              Thread t = new Thread(adaptor);

              t.start();

              try {

                     t.join();

              } catch (InterruptedException ie) {

                     log.error(ie.getMessage());

              }

              System.exit(0);

       }

}

 

JMS Components

https://openadaptor.openadaptor.org/pg/jms_components.htm

JBossJMSサーバとするときは、以下のJarをクラスパスに通しておく必要がある。

jbossmq-client.jar, jboss-common-client.jar, jnp-client.jar, jboss-j2ee.jar, concurrent.jar, log4j.jarJBOSS_DIST配下のclientディレクトリ下に存在する)

注意!!log4j.jarは独自にクラスパスに通す必要がある。自前のlog4j-1-2-*.jarなどが先にクラスパスに通ってしまっている場合以下のエラーが発生し、ハマる。

Exception in thread "JBossSink.Topic" java.lang.NoSuchFieldError: TRACE

        at org.jboss.logging.Log4jLoggerPlugin.isTraceEnabled(Log4jLoggerPlugin.java:85)

 

また、JMS認証のためのユーザ登録が必要である。

%JBOSS_DIST%\server\default\deploy\jms\ hsqldb-jdbc-state-service.xmlに使用したいユーザのユーザ名とパスワードおよび適当なロール(Windowsの場合、自分のユーザアカウントと適当なパスワード、ロールはとりあえずguest)を追加すればよいと思われる。

例)

      POPULATE.TABLES.nn = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('doikota', 'xxxxxxx')

      POPULATE.TABLES.nn = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('guest','doikota')

 

それから、テスト用のトピックを設定しておく必要がある。

%JBOSS_DIST%\server\default\deploy\jms\jbossmq-destinations-service.xmlに使用するQueueもしくはTopicを設定しよう。

Queue/Topicともにすでに登録があるので、ここでは最も簡単な設定のTopicを新たに追加する。

  <!-- Added by Kotaro Doi -->

  <mbean code="org.jboss.mq.server.jmx.Topic"

               name="jboss.mq.destination:service=Topic,name=A">

    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>

  </mbean>

これによりTopictopic/Aが使えるようになる。

 

(具体例)

A.JBossSink.ClassName                                   = org.openadaptor.adaptor.jms.JMSSink

A.JBossSink.Topic.JndiProvider                          = localhost:1099

A.JBossSink.Topic.JndiFactory                           = org.jnp.interfaces.NamingContextFactory

A.JBossSink.Topic.Factory                               = ConnectionFactory

A.JBossSink.Topic.Subject                               = topic/A

A.JBossSink.Topic.UserName                              = doikota

A.JBossSink.Topic.Password                              = xxxxxxx

 

(参考)JBossをサービスとしてインストール

http://www.redhat.com/docs/manuals/jboss/jboss-eap-4.2/ja_JP/html/Getting_Started/Starting_and_Stopping_the_Server-Running_as_a_Service_under_Microsoft_Windows.html

 

 

Specific Database Components

https://openadaptor.openadaptor.org/pg/database_components.htm

openadaptorは以下のデータベースに対して直接のインタフェースを提供する。

  • Sybase
  • Microsoft SQL Server
  • Oracle
  • Informix
  • MySQL*
  • MS-Access
  • MS-Excel

Autocommit Mode

デフォルトではオートコミットはオフになっている。

A.C1.AutoCommitMode = true | false | ignore

 

データベースへの書き込み

JdbcSinkコンポーネントはストアドプロシージャを呼ぶことを念頭に設計されている。

 

(ストアドプロシージャ)

CREATE PROCEDURE dbo.spFamilyInsert (@relation varchar(10), @name varchar(20), @age int)
AS
   
BEGIN
       
INSERT rel_family (relation, name, age)
       
VALUES (@relation, @name, @age)
   
END

 

必要!

sp_procxmode spFamilyInsert, "anymode"

これをやっておかないと、

[07/11/12 14:32:04.472] ERROR: SQL Exception: Code = [7713]

[07/11/12 14:32:05.472] ERROR: JBossSource caught PipelineException [com.sybase.

jdbc2.jdbc.SybSQLException: Stored procedure 'spFamilyInsert' may be run only in

 unchained transaction mode. The 'SET CHAINED OFF' command will cause the curren

t session to use unchained transaction mode.

といったエラーが起こる。

http://publib.boulder.ibm.com/infocenter/wasinfo/v6r0/index.jsp?topic=/com.ibm.websphere.express.doc/info/exp/ae/rtrb_dsaccess6.html

 

 

(プロパティ)
A.FamilySink.AutoCommitMode                     = true

A.FamilySink.ClassName                          = org.openadaptor.adaptor.jdbc.JdbcSink

A.FamilySink.JdbcUrl                            = jdbc:sybase:Tds:tk1d0626syb.tokyo.dresdnerkb.com:2050

A.FamilySink.JdbcDriver                                 = com.sybase.jdbc2.jdbc.SybDriver

A.FamilySink.UserName                           = appsgm_ddl

A.FamilySink.Password                           = notdbo12

A.FamilySink.Database                           = stp_apps

A.FamilySink.Type1                              = Relevant

A.FamilySink.Relevant.StoredProcedure           = spFamilyInsert

A.FamilySink.Relevant.Att1                      = Relation

A.FamilySink.Relevant.Att2                      = Name

A.FamilySink.Relevant.Att3                      = Age

A.FamilySink.Relevant.Param1                    = relation

A.FamilySink.Relevant.Param2                    = name

A.FamilySink.Relevant.Param3                    = age

A.FamilySink.Relevant.ParamType1                        = CHAR

A.FamilySink.Relevant.ParamType2                        = CHAR

A.FamilySink.Relevant.ParamType3                        = NUM

 

Writing your own Source

SourceRunnableインタフェースを実装しているので、別のスレッドで走らせることができる。通常AbstractSimpleSourceをサブクラス化することで独自のSourceを実装する。

run()メソッドはAbstractSimpleSource内に実装済みなので、独自の振る舞いをさせたい場合を除き自分で実装する必要はない。

3つの別のモードが選択可能である。

 

1.Polling

定期的にポーリングを行う。データベース、ファイル、キューベースのミドルウェアなどで適用可能。

2.Litening

接続を待つような場合に適している。ソケットなど。

3.Callback

イベントを登録し、コールバックを待つような場合。

init()メソッドにて、以下のいずれかの登録が必要である。

 

  setSourceType(POLL_SOURCE);
  setSourceType(LISTEN_SOURCE);
  setSourceType(CALLBACK_SOURCE);

 

Polling Source:

Listen Source:

Callback Source:

サードパーティのコールバックメソッド内で、以下のメソッドをコールする。

        sourceProcess(DataObject[] dobs)

 

"Start up" and "Clean up"

開始時もしくは終了などに以下のメソッドを適切に実装する必要がある。

 

public void sourceStartUp()
    throws IbafException

 

public void sourceCleanUp()
    throws IbafException

 

openadaptor™ Message Hospitals

HospitalPipeをコンポーネントの一部として加えることで、メッセージをHospitalに送信することができる。

 

#                                  |--------------------|

#                                  v                    |

#             MyBISSource   -> MissingDataPipe   -> DummyAccidentPipe -> DOBXMLSink

#                                    |

#                                    -> MissingDataHospital

#

A.Component1.Name                         = MyBISSource

A.Component2.Name                         = MissingDataPipe

A.Component3.Name                         = DummyAccidentPipe

A.Component4.Name                         = DOBXMLSink

#

#      MyBISSource

#

A.MyBISSource.ClassName                   = com.aks.MyBISSource

A.MyBISSource.LinkTo1                     = MissingDataPipe

#

#      MissingDataPipe

#

A.MissingDataPipe.ClassName                      = org.openadaptor.adaptor.hospital.HospitalPipe

A.MissingDataPipe.HospitalName            = MissingDataHospital

A.MissingDataPipe.DefaultApplication      = BISSubscriber

A.MissingDataPipe.DefaultSubject          = BISRateMessage

A.MissingDataPipe.LinkTo1                 = DummyAccidentPipe

#

#      DOBXMLSink

#

A.DOBXMLSink.ClassName                    = org.openadaptor.adaptor.standard.FileSink

A.DOBXMLSink.OutputFileName                      = data/bisrate.xml

A.DOBXMLSink.MoveExistingOutputFile       = false

A.DOBXMLSink.CreateOutputFile             = true

#

#      MissingDataHospital

#

A.MissingDataHospital.ClassName           = org.openadaptor.adaptor.jdbc.sybase.HospitalSybaseImpl

A.MissingDataHospital.JdbcUrl             = jdbc:sybase:Tds:tk1d0626syb.tokyo.dresdnerkb.com:2050

A.MissingDataHospital.JdbcDriver          = com.sybase.jdbc.SybDriver

A.MissingDataHospital.UserName            = appsgm_ddl

A.MissingDataHospital.Password            = notdbo12

A.MissingDataHospital.Database            = stp_apps

 

#

#      DummyAccidentPipe

#

A.DummyAccidentPipe.ClassName             = com.aks.DummyAccidentPipe

A.DummyAccidentPipe.LinkTo1               = DOBXMLSink

 

といった設定で、MissingDataPipe以降でPipeLineExceptionが発生するとその時点で取り扱われているメッセージがHospital(病院)にPatient(患者)としてadmit(入院)させられる。

 

使用上の注意)

1.なぜかDealBus2.9.2のこのご時世でもopenadaptorはいまだにsybaseJDBCとして4.5を採用している。よって、jconn2.jarではなく、jConnect.jarもしくはjConnect.4.5対応のクラスファイルをCLASSPATHに通す必要がある。

2.入院先のデータベースには患者を格納するテーブルとストアドプロシージャをあらかじめ作っておく必要がある。

$ isql -STK_GDCL_D01_DS -Uappsgm_ddl -Pnotdbo12 -Dapps_gm -i HospitalSchema.sql

$ isql -STK_GDCL_D01_DS -Uappsgm_ddl -Pnotdbo12 -Dapps_gm -i HospitalProcs.sql

(これらの定義は%DEALBUS_HOME%\sql\sybaseに保存されている)

3.HospitalPipe(この場合、MissingDataPipeがそれにあたる)はコンポーネントのひとつとして定義される必要がある。

 

Patients

Patientsを作成する再に、”Index Parameters”と呼ばれるものを指定することができる。”Index Parameters”とは、名前-値のペアで、patientsをタグ付けする目的で使うことができる。

例)

A.MissingDataHospital.IndexMapping1                            = Relevants

A.MissingDataHospital.Relevants.IndexParamAlias1               = Code

A.MissingDataHospital.Relevants.IndexParamAttName1      = Name

結果はこのようになる。

PatientId

PatientStatus

DestAppName

Subject

RejectReason

SourceAppName

PubUniqueId

IndexParam1

1

NEW

BISSubscriber

BISRateMessage

Dummy Accident happend. Severity: HOSPITAL

 

 

Code=004850076

 

Receiving Discharged Messages from a Hospital

HospitalSourceHospitalの実装につなげて、discharge(退院)されたpatient(患者)を処理することもできる。

 

-          HospitalSourceSybaseHospitalを定期的にPollし、退院の準備が整った患者を待ち受ける。

-          HospitalSourceは退院した患者、を下流のコンポーネントに渡す。

 

Sinkがまたも退院したメッセージを拒絶した場合、HospitalSourceがメッセージを受け取り、Hospitalに再入院させられる。

Hospitalの実装のふるまいは、すでに存在する患者のステータスを変更することであり、新たな患者を作り出すことではない。

SybaseHospitalの実装はこのとおりのことを行う。

 

The Hospital Command Line Interface

以下の機能を提供する。

-          指定されたアプリケーションと項目上の患者の待ち行列を表示

-          患者を表示

-          患者の保有するDataObjectの配列を表示

-          患者の状態を更新(検査済み、退院待ち、死亡など)

-          患者の検索

 

以下のコマンドで実行が可能

$ java org.openadaptor.adaptor.hospital.HospitalCmdLineInterface <properties file> <adaptor prefix>

 

ただし、プロパティファイル内に以下の登録が必要。

A.HospitalName = <Hospital Name>

 

The Adaptor Framework Editor

TODO: