これから始めるAWS SQS!メッセージキューと上手に付き合うワンランク上の実装を解説😎

これから始めるAWS SQS!メッセージキューと上手に付き合うワンランク上の実装を解説😎

こんにちは!

AWS SQSって具体的に何ができるのか、よくわからない方が多いのでは?と思います。

本記事では、メッセージキューを使用したユースケースと、そのプログラミング方法を紹介します。

サービス開発の中でAWS SQSを一切使用しないという選択も可能ですが、ユースケースによっては実装コスト・リスクを大きく軽減できる可能性があります、是非覚えていってください。

想定する読者

  • AWSサーバーレスで開発を行っているヒト
  • メッセージキューを扱ったことがないヒト
  • AWS SQSの概要から実装方法を知りたいヒト

はじめに

AWS SQSでできること

非同期での処理実行

例えば、財務システムの開発で帳票をPDF出力できる機能を考えてみましょう。

PDF出力には、財務システムの持つ様々な計算ロジックを起動させる必要があり、PDF出力完了までに10秒以上の時間を要する可能性があるとします。

PDF出力ボタンをクリックした際に、処理が完了するまでそのまま待機させるのはUX悪いですよね。そういった時に活躍するのがSQSです。SQSを使用すると、下記のようなワークフローを実現することができます。

  1. PDF出力ボタンのクリックイベントでDynamoDBへItem作成とSQSへメッセージキュー発行
  2. キュー発行完了するとPDF一覧画面に「PDF作成中…」というItemを表示
  3. SQSでメッセージキュー受信をトリガーにLambdaを起動
  4. Lambdaで帳票作成処理を実行し完了するとS3へ生成したPDFデータをPUT
  5. PUTしたS3のファイル名を1で生成したDynamoDBのItemへPUT
  6. PDF一覧画面にはダウンロードボタンが表示されS3からPDFをダウンロードできる

財務システムの参考

非同期での処理実行には、上記以外にも様々な方法があります。ビジネス要件に合わせて適切なアーキテクチャーを検討しましょう。

  • CloudWatchイベントを発行しLambdaを実行
  • AWS SNSへメッセージトピックを発行しサブスクリプション
  • 自前でバッチ受付処理を構築

AWS SNSとの違いは?

SNSはメッセージトピックを作成しサブスクリプションへ通知します。

対してSQSは通知を受け取り処理する側です。(SNSのサブスクリプションへSQSを登録することも可能です)

1つの通知処理で同時に複数の通知先をへ通知を行いたケースには、SNSを活用しましょう。

AWS SQS自体はロジックを持てないの?

持てません。

SQSはキューへメッセージを溜め込み、任意のタイミングで取り出せるサービスです。

キューからメッセージを取り出す方法には、SQSのメッセージキュー受信をトリガーにLambdaを起動させる方法や、任意のタイミングでLambdaを発火させてSQSのキューを取り出す方法などがあります。

では本題のServerless FrameworkでのSQS構築及びリトライテクニックを紹介します

基本的にはCloudFormationの文法で構築を行いますので、CloudFormationのプロジェクトの方も安心してください。

本サンプルを利用する場合は、事前にServerlessFrameworkを使用する環境をローカルに構築しておいてください。

serverless.yml

LambdaでSQSの発行を行いますので、IAMロールを忘れずに設定しましょう。Resourceには本来はSQSのARNを取得し設定するのが望ましいですね。STGとPRDが同居するAWSアカウントであった時に、誤ってSTGからPRDのキューを操作したら怖いですね。

service: SQS-Serverless
provider:
  name: aws
  runtime: nodejs12.x
  stage: ${opt:stage,"dev"}
  region: ap-northeast-1
  profile: ragate
  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - 'sqs:*'
      Resource:
        - '*'

plugins:
  - serverless-webpack

custom:
  webpack:
    includeModules: true
    packager: 'npm'

functions: ${file(./resources/functions.yml)}

resources:
  - ${file(./resources/sqs.yml)}

package:
  individually: true

/resources/sqs.yml

SQSが任意の回数キュー処理に失敗した場合はデッドレターキューにキューが移行されます。(これを応用してリトライ処理を行うことが可能です)

下記のサンプルでは、5秒間隔で最大2回リトライ処理を行うことが可能です。

Resources:
  SQSQueue:
    Type: AWS::SQS::Queue
    Properties:
      # ContentBasedDeduplication: true # FifoQueue指定時のみ設定、先入れ先出し (FIFO) キューに対して、コンテンツに基づく重複排除を有効にするかどうかを指定
      DelaySeconds: 0 # メッセージの配信を遅延させたい場合。
      # FifoQueue: false # 指定しないと標準キューで作成される
      # KmsDataKeyReusePeriodSeconds: 300 # AWS KMS を呼び出す前にメッセージを暗号化または復号できる時間の長さ
      # KmsMasterKeyId: String KMSを利用する場合は必須な様子
      MaximumMessageSize: 262144 # メッセージのバイト数の制限(超過するとSQSが処理を拒否、最大値で262144まで設定が可能)
      MessageRetentionPeriod: 345600 # Amazon SQS がメッセージを保持する秒数(デフォルトは345,600(4日))
      QueueName: "SampleQueue" # FIFO キューを作成するには、FIFO キューの名前は .fifo サフィックスで終わる必要があります
      ReceiveMessageWaitTimeSeconds: 5 # キューが処理中の際の、ポーリング待機時間(0を指定するとショートポーリングとなる)
      RedrivePolicy:
        deadLetterTargetArn: { Fn::GetAtt: [DeadLetterSQSQueue, Arn] }
        maxReceiveCount: 2
      VisibilityTimeout: 5 # キュー受信時から、他Componentがキューを見れない秒数

  DeadLetterSQSQueue: # デッドレターキューにキューが入ったらLambdaを起動し、メール通知などを検討したい
    Type: AWS::SQS::Queue
    Properties:
      DelaySeconds: 0
      MaximumMessageSize: 262144
      MessageRetentionPeriod: 200
      QueueName: "DeadSampleQueue"
      ReceiveMessageWaitTimeSeconds: 5
      VisibilityTimeout: 10

/resources/functions.yml

ファンクションの中身は後述です。

createSQSQueue:
  handler: src/functions/createSQSQueue.handler
  environment:
    QUEUE_NAME: { Fn::GetAtt: [SQSQueue, QueueName] }
    QUEUE_ARN: { Fn::GetAtt: [SQSQueue, Arn] }
    QUEUE_END_POINT: !Ref SQSQueue

createEventSQSQueue:
  handler: src/functions/createEventSQSQueue.handler
  timeout: 4 # VisibilityTimeoutよりも低く設定する必要があるので一旦4秒に設定しておく
  events:
    - sqs:
        arn: { Fn::GetAtt: [SQSQueue, Arn] }
        batchSize: 1

/src/functions/createSQSQueue.handler

キュー発行処理を行います。このLambdaはAppSyncからの呼び出しを想定していますが、パラメーターを受け取る箇所を修正すればAPI Gatewayやその他のサービスから呼び出すことも可能です。

const AWS = require("aws-sdk")
const SQS = new AWS.SQS({region: "ap-northeast-1"})
const QueueUrl = process.env.QUEUE_END_POINT

export async function handler(event, context) {
  try {    
    const MessageBody = JSON.stringify({
      id: event.id,
      text: event.input.text
    })
    const result = await SQS.sendMessage({MessageBody, QueueUrl}).promise()
    return {
      id: event.id,
      text: event.input.text,
      queueName: process.env.QUEUE_NAME,
      queueEndpoint: QueueUrl,
      messageId: result['MessageId']
    }
  } catch (e) {
    console.error(e)
  }
}

/src/functions/createEventSQSQueue.handler

ここではキューを受信した後の処理を記述します。本サンプルではSQSのキュー受信時にLambdaをキックする仕様としているので、eventオブジェクトにはSQSが受信したメッセージが含まれます。

私たちがよく使用するワークフローは下記です。

  1. DynamoDBのItemのIDを受信
  2. そのIDに関する処理を実施
  3. 処理が正常終了したらDynamoDBのItemのステータスをPUT
  4. もしどこかの処理で失敗すればthrow()

下記の例では、リトライ処理を意図的に起こしているものとなります。

例えばここの処理にtry{}catch{}を設置し、catchに入ってきたらthrowするように実装してあげれば、任意のタイミングで処理を失敗させて最初からやり直すことが可能です。

export async function handler(event, context) {
  throw('リトライ処理のためのThrow!');
}

一つ、大きな注意点があります!

実はSQSでは、キューのポーリング(取得)が重複して実行される危険性があるんです。

つまり、同じLambda処理が2回実行されてしまう可能性があります。

これは標準キューだけでなく、FIFOキューでも同じ現象が発生する可能性があります。(FIFOキューでも発生する可能性があるということに注意しましょう)

対策として、SQSではキューに任意のIDを付与しているので、そのIDを処理の前に一度DynamoDBへPUTし、次回以降の処理ではすでにDynamoDBにItemが存在していたら処理を実行しない(即リターン)ような対策が必要です。ここはかなりのハマりどころなのでしっかり意識しつつ実装を行います。

Lambda関数には、冪等性を持たせるような実装を行う意識が必要です。

まとめ

浅い理解で実装してしまうと重複実行で事故になる可能性があります。特に重複実行しないようにLambdaに冪等性のあるプログラミングさせましょう。

SQSの開発相談はお気軽にご相談ください。