Massaging Queue serviceのサンプルの話 -ハローワールド!編-

今回はMessaging Queueの簡単なプログラム、ハローワールドを書いてみました。

Messaging Queueとは

要するにユーザ要求を受けたプロセスと処理するプロセスを分けれる。

ユーザ要求を受けたプロセスは処理依頼をサーバに投げて、処理終了。
それを監視しているプロセスが処理する。

ってイメージでOKです。

前書いた記事 : Massaging Queueとはの話

RabbitMQサーバの構築

今回MQを実現するのに利用したサーバはRabbitMQを使いました!

ま、AMQPのミドルウェア使えばだいたいOK

RabbitMQサーバ構築メモ : QNAP TS-351上にRabbitMQを立ち上げる話

RabbitMQのライブラリ

amqpなのでspringでも良いですがRabbitMQ公式を利用しました。

POM.xmlで参照はこちら…

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.0</version>
        </dependency>

送受信プログラムの作成- Hello World –

超シンプルで全てのメッセージの送信するプログラムを書いてみます。

公式のチュートリアルではこの部分ですね。(https://www.rabbitmq.com/tutorials/tutorial-one-python.html )

送信者プログラムが「ハローワールド!」とRabbitMQに送るだけです。

受信側プログラムで「ハローワールド!」が表示されれば成功です。
メッセージが日本語なのはUTF-8がちゃんと働いているか?文字化けしないか?って程度のことです。

送信側のプログラム

ハローワールドの送受信するにあたって、送信部分を始めに実装してみました。
※ べつに受信側を先でも良いですが、送信側のほうがシンプルなので…

プログラムを抜粋するとこんな感じです。
(フルソースはこちら : https://github.com/wataru775/example.rabbitmq.queue/blob/main/src/main/java/org/mmpp/example/rabbitmq/queue/hello/PostQueueApplication.java)

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(MQ_HOST);
        factory.setUsername(MQ_USER);
        factory.setPassword(MQ_PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 疎通確認
        String message = "ハローワールド !";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        channel.close();
        connection.close();

実際にメッセージを送信しているのはこの部分です

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "ハローワールド !";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

定義部分をまとめると

ConnectionFactoryの部分でサーバの情報(ホスト、ユーザ、パスワード)いてコネクションを作成しています。

コネクションからチャネルを生成して

チャネル情報をqueueDeclareにて定義しています。
(https://javadoc.io/doc/com.rabbitmq/amqp-client/latest/com/rabbitmq/client/Channel.html)

メソッドの情報は…

AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException
queue : キュー名
durable : 耐久性のあるキューを宣言 (キューは、サーバの再起動後も存続します)
exclusive : 排他的キュー(この接続に限定)を宣言
autoDelete : 自動削除キューを宣言 (サーバは、使われなくなったキューを削除します)

パケットを投げているbasicPublishは

フルソースにはありますが、QUEUE_NAMEにはhelloと言う名称を指定しています。
この名称は送信側受信側と一致する必要があります。Queueの名前なので…

メッセージの送信はbasicPublishで文字列のbyteを投げているだけです。

void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException
exchange : メッセージを公開するエクスチェンジ
routingKey : ルーティングキー
props : メッセージの他のプロパティ – ルーティングヘッダなど
body : the message body

らしいです!w
動作は今後書いていきます。

動かしたら… なにも発生しませんw
WebUIとかで確認するとQueueのカウントが増えます。

受信側のプログラム

メッセージを投げたので次に受信側のプログラムを作成します。

先程と同じQueue名のチャネルを確認します。

受信プログラムは…

フルソース) https://github.com/wataru775/example.rabbitmq.queue/blob/main/src/main/java/org/mmpp/example/rabbitmq/queue/hello/ReceiveQueueWorker.java

抜粋 )

        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(MQ_HOST);
        factory.setUsername(MQ_USER);
        factory.setPassword(MQ_PASSWORD);

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            logger.info( " Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

送信側とChannel生成までは同じです。

実際の受信動作が書いているのはこの部分です

        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            logger.info( " Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

チャネルのbasicConsumeにて受けたときにDeliverCallbackに割り当てます

basicConsumeの内容は…

String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException
queue – the name of the queue
autoAck : サーバーはメッセージが配送されたら承認されたとみなす。
サーバが明示的な肯定応答を期待する場合はfalse
consumerTag – コンテキストを確立するためにクライアントが生成したコンシューマタグ
callback – コンシューマオブジェクトへのインタフェース

DeliverCallbackでの受信動作は…

            String message = new String(delivery.getBody(), "UTF-8");
            logger.info( " Received '" + message + "'");

delivery.getBody()で送信されたメッセージが取得できますのでメッセージを取り出してログに吐いています

実際の動作

メッセージがすると「ハローワールド!」が表示されればOKです…

[pool-1-thread-6] INFO org.mmpp.ex.r.q.hello.ReceiveQueueWorker -  Received 'ハローワールド !'

受信側プログラムは強制終了(^c)するまで動きます。
その折、送信側のプログラムを動作すると受信ログが出力されます。

ポイントは、

受信プログラムが動く前にQueueに投げているメッセージは受信側プログラムの実行時に全て出力されます

はい、Queueなので、入ったメッセージは全て処理できます。

これで簡単にメッセージを投げる側、受ける側を作ることができます。

超クール

応用編

実際にメッセージを投げて受けるだけのシンプルなものではなくもう少し応用を考えたいと思います

メッセージをたくさん投げる

先程はメッセージ(ハローワールド!)を1メッセージ投げましたが、いっぱい投げてみます。

受信側プログラム

ここでの重要な部分は先程の受信プログラムだと想定した動きをしません…

発生はランダムですが、メッセージの順番が変わったりします。

理由は… 受信プログラムがスレッドで動いているのでログ出力がそれぞれ行われるので同時状態になってしまうので、前後します

それを発生させないために1秒ほど処理時間を稼ぎます。

ソース : https://github.com/wataru775/example.rabbitmq.queue/tree/main/src/main/java/org/mmpp/example/rabbitmq/queue/a_lot

        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            logger.info( " Received '" + message + "'");
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

ただ単に、Thread.sleep(1000L);を増やしただけです…

送信側プログラム

ハローワールドを10メッセージ送信するのを並べての良いですが、
送信するメッセージに番号を付けて投げるプログラムを作ります。
これで、処理順番を確認できます

ソース )
https://github.com/wataru775/example.rabbitmq.queue/blob/main/src/main/java/org/mmpp/example/rabbitmq/queue/a_lot/PostQueueApplication.java

抜粋 )

        for(int i = 1 ; i <= 10 ; i ++) {
            String message = "ハローワールド ! No." + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            logger.info(" Sent '" + message + "'");
        }

動作

実際に受信すると… 以下の様な10メッセージが表示されます。

[pool-1-thread-16] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.1'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.2'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.3'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.4'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.5'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.6'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.7'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.8'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.9'
[pool-1-thread-17] INFO org.mmpp.ex.r.q.a_lot.ReceiveQueueWorker -  Received 'ハローワールド ! No.10'

当たり前ですねw

複数送信側プロセスがある場合

例えば、複数送信側プロセスがある場合はどうでしょうか?

この部分がポイントとしては大きいと思います。

というのはオークションなどの早いものがちの場合に送信側プロセスですべて処理させると排他ロックとか…
そもそも別サービスならとかややこしくなります。
MQを使うとシンプルにできます、送信側プロセスは「投げる!」ただそれだけです。
MQサーバ上ではQueueなので順番に並べてくれます(同時はありえません)、
受信側プロセスは最初のメッセージだけを処理すればOK
残りは無視するなりすれば処理は軽量になります
今回は説明しませんがQoSの指定もできますので、優先するとかも可能にできます。
MQサーバ内の動きなのでプログラム側では優先順位を付ける程度の変更でOKです

受信側プロセスが複数ある場合

では、複数の受信があればどうなるでしょうか?

答えを書くと、図のようにそれぞれが別のメッセージを処理されます

複数で受信してみる

受信側プロセスを複数実行するために実行可能jarを生成します。( 関連 : 受信側プログラムjarの作成 (下に記事あり))

OSで端末を複数立ち上げてそれぞれで受信側プログラムを実行してみます。
今回は3端末でそれぞれ受信側プログラムを実行しました。
実行方法はOSの端末にて先のjarコマンドを実行してください

受信状態を確認したら、送信側プログラムを実行します。

画面では送信側プログラムを「PostQueueApplication.jar」で実行していますが、
開発環境の実行でも問題ありません。

受信側プログラムの結果はこのようになります。(異なる場合もあります)

1つ目のプロセス…

[pool-1-thread-7] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.2'
[pool-1-thread-7] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.5'
[pool-1-thread-8] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.8'

2つ目のプロセス…

[pool-1-thread-7] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.3'
[pool-1-thread-7] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.6'
[pool-1-thread-8] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.9'

3つ目のプロセス…

[pool-1-thread-7] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.1'
[pool-1-thread-8] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.4'
[pool-1-thread-8] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.7'
[pool-1-thread-8] INFO org.mmpp.ex.r.workers.ReceiveQueueWorker -  Received 'ハローワールド ! No.10'

この様にプロセスごとに違うメッセージを表示されます。
さらに、同じメッセージは処理されていません。

終いに

今回はMessage Queue Serviceのハローワールド程度を紹介しました

不具合は発生しない前提のプログラムです。

もし、プロセスが落ちた場合など不具合が発生した場合にはその受信側プロセスが掴んでいるメッセージがLostする不具合を含んでいます

そういった場合にはどうすればよいのか?次回はそのあたりをまとめておきます(Ack)

関連

受信側プログラムjarの作成

複数の受信側プロセスを複数作成するために実行可能jarの生成方法をまとめます

実行可能jarの作成方法は…
このあたりを参考ください : http://35.78.51.61/archives/2668

ソース :
https://github.com/wataru775/example.rabbitmq.queue/blob/main/pom.xml

抜粋 )

                    <execution>
                        <id>executable-alot-lost-receivequeue-jar</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <finalName>AlotLostReceiveQueue</finalName>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.mmpp.example.rabbitmq.queue.a_lot.ReceiveLostQueueWorker</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>

ここで、複数受信の受信側実行jar(AlotReceiveQueue.jar)の作成を定義しています

jarの生成はmvnコマンドを実行すればOKです。

mvn package

targetに指定した実行jar(AlotReceiveQueue.jar)が作成されます。

実行方法 )

java -jar target/AlotReceiveQueue.jar