WebアプリケーションでSpring Integrationを使用する場合のスレッド

今回のブログは、Spring Framework Advent Calendar 201220日目の記事として投稿します。

前回のブログで、WebアプリケーションでSpring Integrationを使用すると、Spring Integrationが生成するスレッドをAPサーバが管理できないので危険だというお話をしました。

この件について、SpringSource CommunityのForumに、「Spring IntegrationをWebアプリケーションで使用するのは、スレッドの観点から安全なのか?」質問したところ、APサーバが管理するスレッドをSpring Integrationが使用する方法を紹介してもらいました。

今回のブログでは、APサーバが管理するスレッドをSpring Integrationが利用する方法を紹介します。

Spring Integrationには、CommonJと呼ばれるAPIを介して並列処理を実施する仕組みが提供されています。CommonJは、「JSR 237: Work Manager for Application Servers」と「JSR 236: Concurrency Utilities for JavaTM EE」を合わせた呼び名で、EJBServletがコンテナ内で並列処理をするためのAPIです。ちなみにWebLogicやWebSphereは対応しているようですが、Tomcatは対応していません。

CommonJでは、非同期処理用のcommonj.work.WorkManagerインタフェースと、スケジューリング用のcommonj.timers.TimerManagerインタフェースが提供されています。Spring Integrationはそれぞれに対応したorg.springframework.scheduling.commonj.WorkManagerTaskExecutorクラスとorg.springframework.jms.listener.DefaultMessageListenerContainerクラスが提供されており、Bean定義ファイルでBeanを定義して利用します。

TimerManagerTaskSchedulerのBean定義は以下になります。

    <bean id="taskScheduler" class="org.springframework.scheduling.commonj.TimerManagerTaskScheduler">
      <property name="timerManagerName" value="java:comp/env/tm/MyTimerManager" />
    </bean>

propertyで指定しているのは、APサーバが提供しているTimerManagerのJNDI名です。この設定により、ポーリングするようなEndpointでは、このBeanが使われるようです(動作確認は、タグとタグで作成したAdapterでのみ実施)。idは、"taskScheduler"にする必要があるようです(idの値を変えると動作しませんでした)。

WorkManagerTaskExecutorのBean定義は以下になります。

    <bean id="taskExecutor" class="org.springframework.scheduling.commonj.WorkManagerTaskExecutor">
      <property name="workManagerName" value="java:comp/env/wm/MyWorkManager" />
    </bean>

動作確認のため、タグでJMSのメッセージに応じて非同期処理を行うEndpointを作成しようとしましたが、タグだけでは、taskExecuterを利用できませんでした。Forumに問い合わせたところ、以下のように、DefaultMessageListenerContainerクラスのBeanを仲介する必要があると教わりました。

    <bean id="sampleQueueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="taskExecutor" ref="taskExecutor"/>
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="sampleQueue"/>
    </bean>
    <jms:message-driven-channel-adapter
        id="inboundAdapter" 
        channel="channel2" 
        container="sampleQueueListenerContainer"/>

DefaultMessageListenerContainerのpropertyでtaskExecutorを指定し、タグのcontainer属性で、DefaultMessageListenerContainerのBeanを指定します。

さいごに

タグを使うときはDefaultMessageListenerContainerを使用しなければAPサーバのWorkManagerが使用されないなど、癖があるようです。実際の開発プロジェクトで、APサーバが管理するスレッドをSpring Integrationで使う場合は、作成するEndpointの種類ごとにWorkManagerやTimerManagerがきちんと利用されるか?を検証したほうがよさそうです。

Spring Integrationの概要

今回のブログは、Spring Framework Advent Calendar 2012の13日目の記事として投稿します。

前回のブログでSpring Integrationを話題にしたので、今回は、Spring Integrationの概要と、どのようなコーディングになるかを紹介したいと思います。ただ、独自の解釈が入っており、表現がマニュアル等と違う部分があると思いますので、ご了承ください。

主要なキーワード

まず、Spring Integrationを使用する上で主要なキーワード4つを以下に記載します。

  • Message

Spring Integrationでやりとりするデータのことです。

  • Channel

一般的な説明では、データの送信側と受信側をつなぐものになりますが、個人的には、「Messageが格納される場所」「Messageをどのように提供するか?を定義する場所」と解釈しています。「どのように提供するか?」というのは、たとえば、Point-to-Pointで提供するのか、Publishre-Subscriberで提供するか、特定のMessageを優先して先に提供するかなどを指します。

  • Endpoint

一般的な説明では、Messageを受信したり送信したりする部分になりますが、個人的には、「Channelを仲介するもの」と解釈しています。

Endpointには様々な種類が存在しますが、大きくは以下の分類になります。
・Messageの流れを制御するもの
たとえば、Messageの内容に応じてChannelを振り分けたり、条件にマッチしたMessageだけ送信したり、複数のChannelから受け取ったMessageを集約して1つのChannelに送信するものなどがあります。


データ形式を変換するもの(Transformer)
Messageを、XML形式ににしたりJSON形式にしたりすることが出来ます。
 
・Applicationコードを呼び出すもの
Applicationのコード(具体的には、SpringのBeanのメソッド)を呼び出すことができます。呼び出した後、戻り値をMessageとしてChannelに格納することもできますが、以下の図のように、処理を完結させても構いません。


  • Adapter

Channelと外部システムをつなぐものです。JMS、HTTP、Mailなど様々なAdapterが存在します。Messageが外部システムに出て行く場合はOutbound Adapter、Messageが外部システムから入ってくる場合はInbound Adapterといいます。

サンプル

次のようなアプリケーションのサンプルを考えます。

SampleAppが送信したOrder(注文)データが、外部システム(メッセージキュー)を通って最終的にOrderServiceが受信するというものです。Channelが2つと、Adapter(OutboundとInbound)が2つ、Endpoint(Applicationコードを呼び出すもの)が1つ出てきます。

以下は、SpringのBean定義ファイルです。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:jms="http://www.springframework.org/schema/integration/jms"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
    http://www.springframework.org/schema/integration/jms
    http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.1.xsd">

  <context:component-scan base-package="com.genba"/>

  <!-- Channel -->
  <int:channel id="channel1"/>
  <int:channel id="channel2"/>
  <!-- ActiveMQ -->
  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://localhost"/>
  </bean>
  <!-- Outbound Adapter -->
  <jms:outbound-channel-adapter id="outboundAdapter" channel="channel1" destination-name="sampleQueue"/>
  <!-- Inbound Adapter -->
  <jms:message-driven-channel-adapter id="inboundAdapter" channel="channel2" destination-name="sampleQueue"/>
  <!-- Endpoint -->
  <int:service-activator id="saveOrder" input-channel="channel2" ref="orderService" method="saveOrder"/>

</beans>

以下は、Endpointが呼び出すApplicationのコードとなるOrderServiceクラスです。

@Component
public class OrderService {
    public void saveOrder(Order order) {
        System.out.println("OrderID: " + order.getOrderId());
        System.out.println("OrderDate: " + order.getOrderDate());
    }
}

以下は、実行用のmainメソッドです。

public class SampleApp {
	public static void main(String[] args) {
		// DIコンテナの起動
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
				"classpath:app-context.xml");
		context.start();

		// channel1を取得
		MessageChannel channel1 = (MessageChannel) context.getBean("channel1");
		
		// 送信するOrderオブジェクトを生成
		Order order = new Order();
		order.setOrderId("order001");
		order.setOrderDate(new Date());
		
		// Orderオブジェクトを送信
		channel1.send(MessageBuilder.withPayload(order).build());
	}
}

Order(注文)クラスは、属性とアクセサメソッドだけのクラスなので、ソースコードは省略します。

実行すると、以下がコンソールに表示されます。

OrderID: order001
OrderDate: Thu Dec 13 11:39:33 JST 2012

Spring IntegrationとWebアプリケーション

Spring Integrationを組み込むアプリケーションとして、Webアプリケーションをイメージする方が多いと思いますが、恐らく、Spring Integrationが内部で独自のスレッドを作るはずなので、Webアプリケーションに組み込むのは危険だと思います(APサーバがスレッドを管理できないため)。Webアプリケーションに組み込む検討をする際は、コミュニティのForumなどで詳しい人たちに意見を聞くのがよいでしょう。

Spring IntegrationとMuleの棲み分け

今回のブログは、Spring Framework Advent Calendar 2012の6日目の記事として投稿します。

Springに関係する内容として何を書こうか考えたところ、最近、私の周りでよく話題になるSpring IntegrationとMuleについて、妄想レベルですが、書きたいと思います。

Spring IntegrationとMuleは、どちらもEAIのプロダクトとして知られており、EIP(Enterprise Integration Patterns)に従って作られています。ですので、プロダクトの選定時の候補として、どちらを採用すべきか比較されることが多いのですが、実は、用途が結構違うのではと考えています。なぜかと言うと、サービスバス(データの流れを制御する部分という意味で使っています)によって疎結合にしようとする対象が違う(と思う)からです。

Muleの場合、サービスバスは、スタンドアロンのサーバの上に存在します。

ですので、システム間を疎結合にする場合に向いていると言えます。

これに対し、Spring Integrationの場合は、サービスバスが、各システム(Javaアプリ)の中に存在します。

ですので、アプリ内のプログラム間を疎結合にする場合に向いていると言えます。

こう考えると、2つのプロダクトは棲み分けが出来ているので、以下の図のように併用しても良いかもしれません。

Artifact Repository周りの作業の自動化

前回のブログでArtifact Repositoryの概要を記載しました。今回は、CI(継続的ビルト)ツールと連携して、Artifact Repository周りの作業を自動化したフローの例を紹介します。CIツールはJenkins、Artifact RepositoryはNexus(開発元はSonatype)、ビルドツールはMaven、SCMツールはGit、APサーバはTomcatを例として使います。また、JavaのWebアプリを想定し、Artifact Repositoryにはwarを格納することにします。

全体のフローを以下に示します。

Artifact RepositoryのNexusは、Maven Repositoryなので、バージョン番号はpom.xmlに指定します。ビルド・リリース担当者は、バージョン番号をpom.xmlに記載した後、SCMツールにチェックインします(1)。そして、Jenkinsのジョブ(ビルド用)を実行します(2)。Jenkinsのジョブの設定でシェルを記述しておき、Gitのチェックアウトのコマンド(3)とMavenのビルドのコマンド(4)を実行します(JenkinsのプラグインでGitやMavenを操作することもできますが、シェルを記述したほうが柔軟性が高いと思います)。Mavenのビルドでは、コンパイルだけでなく、warの作成とMaven Repository(ここではNexus)への格納も行うことができます(5、6)。

次に、リリースを行います。リリースする環境は、テスト環境や本番環境などさまざまな環境があると思いますが、ここでは特に特定していません。ビルド・リリース担当者は、リリースするwarのバージョン番号を指定して、Jenkinsのジョブ(リリース用)を実行します(7)。Jenkinsのジョブの設定で記述したシェルにより、Nexusからwarを取得します(8)。HTTPで簡単に取得できるので、wgetなどのコマンドでwarをダウンロードします。その後、Tomcatにwarをデプロイします(9)。Tomcatが提供するTCD(Tomcat Client Deployer)というツールによりコマンドでデプロイが可能です。なお、7でジョブを実行する際はバージョン番号を任意に指定できるため、切り戻しも簡単にできます。

Artifact Repository周りの作業が手作業になると、バージョン番号の指定間違い(取得したwarとデプロイするwarのバージョンが違ってしまうなど)がでてくる恐れがあるので、極力作業を自動化するのがよいでしょう。

Artifact Repository

リリースの流れに沿って複数の環境(テスト環境・本番環境など)にバイナリ(Javaの場合はjarやwarなど)をデプロイする場合は、環境ごとにバイナリをビルドするのではなく、1度ビルドして生成したバイナリを各環境で使用する方が良いと思います。環境ごとにビルドしてしまうと、何らかのミスにより、作成されたバイナリが変わってしまいリリースミスに繋がる可能性があるからです。

このときのバイナリを保管する場所のことを、継続的デリバリの用語ではArtifact Repositoryと呼びます。

Artifact Repositoryは、ファイルシステムやSCMを使用して実現してもよいのですが、専用のツールも多数存在します。Open Sourceで有名なツールとしては、Nexus(SonaType)、archiva(Apache Software Foundation)、Artifactory(JFrog)があります。これらはいずれも、Maven Repogitoryなので、Maven Central Repositoryがライブラリを保管するのと同じように、依存ライブラリやビルドしたライブラリを保管することができます。これらのツールをArtifact Repositoryとして導入する場合は、まずはMaven Central RepositoryのProxyとして活用してみるのがよいと思います。その中で、ツールの特性を理解し、プロセスへの絡め方を明確にするのが良いでしょう。

Vagrantを利用したローカルPCの環境整備

テスト環境や本番環境と比べ、開発者のローカルPCのサーバ環境(APサーバ、DBサーバ、メッセージングサーバなど)は開発者個人の裁量で構築することが多く、十分に整備されてない傾向があります(例えば、データベースにオラクルを利用する場合、ローカルPCにオラクルをインストールする人は少ないと思います)。その場合、ローカルPCできちんとした検証ができず、品質や生産性を落とすことになります(例えば、オラクルじゃないとシステムが動かないのでローカルPCで検証できないなど)。

この問題を解決する手段として、仮想マシンで構築したサーバ環境を開発者に配布するという方法があります。そして、Vagrantというツールを用いると、この方法をより効果的に行うことができます。

Vagrantは、仮想化製品のVirtualBoxをラップして、仮想マシンの配布や更新、複数の仮想マシンの起動/停止を簡単に行うツールです。

例えば、Oracle(DB)とWebSphere(APサーバ)で構成されたサーバ環境を開発メンバに配布するとしましょう。サーバ環境として、Oralceがインストールされた仮想マシンと、WebSphereがインストールされた仮想マシンを配布します。


開発リーダの作業

最初に、開発リーダは、OracleがインストールされたBoxと、WebSphereがインストールされたBoxをそれぞれ用意します。Boxとは、Vagrant仮想マシンを作成するためのテンプレートのようなものです(Boxの用意の仕方の説明は省略します。このブログでは、Boxを用意した後に、開発メンバがいかに簡単にサーバ環境を入手できるかの説明を目的にしています)。開発リーダは、用意したBoxのファイルをHTTPで公開します。ここでは、ファイル名をそれぞれ「db.box」「ap.box」とします。

次に、開発リーダは、Vagrantの設定ファイルであるVagrantfile(ファイル名もVagrantfile)を作成し公開します(公開の仕方は何でも良いですが、バージョン管理したい場合はSCMで公開するのが良いです)。Vagrantfileでは、仮想マシンIPアドレスやホスト名など、仮想マシンの基本的な設定が行えます(システム自動管理ツールと連携しさまざまな環境設定を行うことも可能ですが、このブログでは説明を省略します)。ここでは、2つの仮想マシンを使用する例を示します。

Vagrant::Config.run do |config|
  config.vm.define :ap do |ap_config|
    ap_config.vm.box = "ap"
    ap_config.vm.network :hostonly, "192.168.1.10"
  end

  config.vm.define :db do |db_config|
    db_config.vm.box = "db"
    db_config.vm.network :hostonly, "192.168.1.11"
  end
end

詳細な説明は省略しますが、「ap」と「db」の2つのBoxを使って仮想マシンを作成し、それぞれIPアドレスを割り振る設定を行っています。

開発メンバの作業

開発メンバは、以下のVagrantのコマンドを使って、2つのBoxを入手します(事前に、VirtualBoxVagrantをインストールする必要があります)。

vagrant box add ap http://ホスト名/ap.box
vagrant box add db http://ホスト名/db.box

※「ホスト名」は、Boxを公開しているコンピュータのホスト名です

2つのBoxをダウンロードし、それぞれ「ap」「db」の名前で保管します。

次に、開発メンバは、開発リーダが作成したVagrantfileを入手し、任意のディレクトリに格納します。格納するディレクトリは、開発プロジェクトごとに作成するのが良いでしょう(複数の開発プロジェクトに携わる場合は、複数のディレクトリでそれぞれVagrantfileを配置する)。

その後、Varantfileが配置されたディレクトリ上で、以下のコマンドを実行します。

vagrant up

このコマンドで、それぞれOracleとWebShpereがインストールされた2つの仮想マシンがBoxから作成され、起動します。開発メンバは、以上の作業だけで、サーバ環境をローカルPCに構築することができるのです。

仮想マシンを停止する場合は、以下のコマンドを実行します。

vagrant halt

このコマンドで、2つの仮想マシンが停止します。

また、仮想マシン上のファイルを間違って消してしまったりして環境が壊れた場合、以下のコマンドで、もとの状態に戻せます。

vagrant destroy
vagrant up

最後に

Vagrantを利用することで、開発メンバは、自分のローカルPCに、非常に簡単にサーバ環境を構築することができます。また、ローカルPC上のサーバ環境を壊しても、他の開発者に迷惑がかかるわけではないので、自由にいろいろなことが試せます。このことは、開発の生産性を大きく向上すると思います。

システム自動管理ツールとSCMの活用(前回の続き)

前回のブログでは、システム自動管理ツールとSCMを利用して、環境設定のバージョン管理が行えることを紹介しました。今回は、任意の環境(開発・テスト・本番など)を、任意のバージョンに更新するときの作業(例えば、開発環境をバージョン2にするなど)の自動化を紹介します。

自動化に使用するツールとその使い方はいろいろあると思いますが、ここでは、CIツール:Jenkins、自動管理ツール:Puppet、SCM:Gitを使用した例を紹介します。

以下に、全体像と処理の流れを示します。

上記の例では、開発環境、テスト環境、本番環境の3つの環境があり、それぞれの環境ごとにJenkinsのSlave(Masterから指示されたジョブを実行する役割)と、PuppetのMaster(Agentに環境の定義情報を配信する役割)を用意しています。JenkinsのMaster(Slaveに指示を出す役割)とGitは、全環境で共有しています。

図中の番号に沿って処理の流れを説明します。運用者は、環境ごとに用意されたJenkinsのジョブを実行します(1)。その際、環境の定義情報のバージョンを指定します。指定されたバージョン情報はJenkinsのMasterに送信され(2)、環境ごとのJenkinsのSlaveにジョブが指示されます(3)。JenkinsのSlaveは、Gitから、指定されたバージョンの定義情報をチェックアウトし(4)、PuppetのMasterに処理を指示します(5)。PuppetのMasterは、PuppetのAgentに定義情報を配信し(6)、PuppetのAgentは定義情報に沿って各サーバの環境を更新します。
※5と6の部分は、実際にはMCollectiveというツール(複数サーバにRPCを実行できるツール。Puppetを開発しているPuppet labsが推奨している)を仲介しています

図の例では、開発環境の更新の流れを示していますが、テスト環境や本番環境も同様です。自動化によって、作業負荷やケアレスミスの軽減が期待できます。

導入時の課題

サーバやソフトウェア間の連携が複雑になるため、ソフトウェアを始めOSやネットワークの知識がある程度必要になります。運用していて何か問題が起きたときに、問題個所を特定できなければなりません。
また、更新が適切に反映されたかどうかの判別を見誤りやすくなるのも課題です。例えば、JenkinsからPuppetに処理を指示する際、仲介役のMCollectiveは、指示ができた時点で正常終了となり、指示の後、実際に更新が反映されたかどうかは関与しません。そのため、Jenkinsの画面上は正常終了になっていても、実際には更新が失敗している可能性があります。更新の成否を判別する手段(Puppetのログを見るなど)を確立する必要があるでしょう。