2015年4月15日水曜日

Amazon Simple Queue ServiceでFIFOを実装してみる ~前編~


データアナリティクス・グループの宮本です。

今回はAmazon Simple Queue Serviceについて書かせて頂きたいと思います。

まずはAmazon Simple Queue Serviceとはどのようなものかというと、Amazon Web Service(以下AWS)曰く
Amazon Simple Queue Service(SQS)は、高速で、信頼性が高く、スケーラビリティに優れ、十分に管理されたメッセージキューサービスです。
とのことです。

このサービスの使い道としては、時間の掛かりそうな処理を非同期で行う時などが考えられ、AWSでは動画をエンコードするようなサイトで利用者のリクエストをエンコードするサーバーが受ける際にサーバー側でエンコード処理を行っている場合は、SQSにリクエストのメッセージをキューイングしておいて、サーバーのエンコード処理が終わった後にSQSからリクエストメッセージを取得して次のエンコード処理に移るような仕組みを作る際に使えます。

参考URL:http://aws.amazon.com/jp/sqs/

導入が非常に容易で、かつ安価などメリットが多いSQSですが、デメリットの1つとしてFIFO(First In First Out)が保証されないことがあります。
SQSのFAQにも
Q: Amazon SQS はメッセージへの「先入れ先出し」(FIFO)アクセスを提供していますか?
いいえ。Amazon SQS は Amazon SQS キュー内のメッセージへの FIFO アクセスを保証していません。その理由は主に Amazon SQS の分散的性質のためです。特定のメッセージ順序付けが必要な場合、それを扱うようアプリケーションを設計する必要があります。
というように書かれています。

つまり複数のメッセージがSQSに入っている場合、メッセージ取得時には最初にSQSへ入ったメッセージではなくランダムな順番でメッセージが返却されることになります。
なのでアプリケーションの制御がない場合は、上記のようなエンコードの例だと、エンコード処理をリクエストした利用者の順番に処理されるわけではなく、SQSがサーバーへ返すランダムな順番でエンコード処理が行われていくことになります。

ということで、まず今回の記事ではSQSへのメッセージ送信と、受信したメッセージが送信した順番がFIFOになっていない事を確認したいと思います。

実装は送信側と受信側で分けますが、それぞれ大きなポイントと実装コードは以下のとおりです。

メッセージ送信時

ポイント
  1. SQSへメッセージ送信時に、messageAttributeでFIFO処理用に順番をカウントする属性(ここではFifoCounterとします)を付けて送信する。
  2. 送信後に上記のカウント属性をカウントアップする。
ソースコード

private static void sendMessages(int numOfMessages){ for(int i = 0;i <= numOfMessages; i++){ Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>(); messageAttributes.put("FifoCounter", new MessageAttributeValue() .withDataType("Number") .withStringValue(String.valueOf(fifoCounter))); sqs.sendMessage(new SendMessageRequest() .withQueueUrl(myQueueUrl).withMessageBody("message " + fifoCounter) .withMessageAttributes(messageAttributes)); fifoCounter++; } }
上記ソースコードでは、送信用メソッドとしてパラメータで渡された件数のSQSメッセージを送信する処理をしています。送信メッセージは"message x"のようになり、xには0を最初とした処理の順番が数値では入ります。
ソースコード内の"sqs"や"myQueueUrl","fifoCounter"はstatic変数としてクラス内に別途定義してあります。

メッセージ受信時

ポイント
  1. メッセージ取得時には複数メッセージ(最大値の10件)を一括で取得する。
  2. メッセージ取得時に上記送信時で送信したFIFO処理用の属性(処理の順番)を取得する。
  3. 取得したメッセージのFIFO処理用の属性をコンソールに表示する。
ソースコード

private static void reciveMessages(){ ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); //メッセージを10件一括で取得 receiveMessageRequest.setMaxNumberOfMessages(10); //取得時にFifoCounterの属性を取得 Collection<String> messageAttributeNames = new ArrayList<String>(); messageAttributeNames.add("FifoCounter"); receiveMessageRequest.setMessageAttributeNames(messageAttributeNames); //メッセージの取得 List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); for(Message m :messages){ String fifoCounterString = m.getMessageAttributes().get("FifoCounter").getStringValue(); int fifoCounter = Integer.parseInt(fifoCounterString); System.out.println(fifoCounter); } }
送信側のソースコードを実行後に受信側を実行すると、下記のように処理順番がランダムになって結果がコンソールに表示されます。

7
9
25
28
31
36
以下省略

0から順に表示されていれば、送信されたメッセージの順番で処理されていることになりますが、上記のようにランダムな順番で処理されています。
当然の事ながらSQSのFAQでの宣言通り、受信時には送信されたメッセージの順番が保証されていませんでした。

次回の後編で受信側のプログラムを修正してメッセージを送信された順番に処理していく実装をしてみたいと思います。

0 件のコメント:

コメントを投稿