これから始めるAWS SQS!メッセージキューと上手に付き合うワンランク上の実装を解説😎

これから始めるAWS SQS!メッセージキューと上手に付き合うワンランク上の実装を解説😎

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!

AWS SQS って具体的に何ができるのか、よくわからない方が多いのでは?と思います。

本記事では、メッセージキューを使用したユースケースと、そのプログラミング方法を紹介します。

サービス開発の中で AWS SQS を一切使用しないという選択も可能ですが、ユースケースによっては実装コスト・リスクを大きく軽減できる可能性があります、是非覚えていってください。

想定する読者

  • AWS サーバーレスで開発を行っているヒト
  • メッセージキューを扱ったことがないヒト
  • AWS SQS の概要から実装方法を知りたいヒト

はじめに

AWS SQS でできること

非同期での処理実行

例えば、財務システムの開発で帳票を PDF 出力できる機能を考えてみましょう。

PDF 出力には、財務システムの持つ様々な計算ロジックを起動させる必要があり、PDF 出力完了までに10秒以上の時間を要する可能性があるとします。

PDF 出力ボタンをクリックした際に、処理が完了するまでそのまま待機させるのは UX 悪いですよね。そういった時に活躍するのが SQS です。SQS を使用すると、下記のようなワークフローを実現することができます。

  1. PDF 出力ボタンのクリックイベントで DynamoDB へ Item 作成と SQS へメッセージキュー発行
  2. キュー発行完了すると PDF 一覧画面に「PDF作成中…」というItemを表示
  3. SQS でメッセージキュー受信をトリガーに Lambda を起動
  4. Lambda で帳票作成処理を実行し完了すると S3 へ生成した PDF データを PUT
  5. PUT した S3 のファイル名を1で生成した DynamoDB の Item へ PUT
  6. PDF 一覧画面にはダウンロードボタンが表示され S3 から PDF をダウンロードできる

財務システムの参考

非同期での処理実行には、上記以外にも様々な方法があります。ビジネス要件に合わせて適切なアーキテクチャーを検討しましょう。

  • CloudWatch イベントを発行し Lambda を実行
  • AWS SNS へメッセージトピックを発行しサブスクリプション
  • 自前でバッチ受付処理を構築

AWS SNS との違いは?

SNS はメッセージトピックを作成しサブスクリプションへ通知します。

対して SQS は通知を受け取り処理する側です。(SNS のサブスクリプションへ SQS を登録することも可能です)

1つの通知処理で同時に複数の通知先をへ通知を行いたケースには、SNS を活用しましょう。

AWS SQS 自体はロジックを持てないの?

持てません。

SQS はキューへメッセージを溜め込み、任意のタイミングで取り出せるサービスです。

キューからメッセージを取り出す方法には、SQS のメッセージキュー受信をトリガーに Lambda を起動させる方法や、任意のタイミングで Lambda を発火させて SQS のキューを取り出す方法などがあります。

では本題の Serverless Framework での SQS 構築及びリトライテクニックを紹介します

基本的には CloudFormation の文法で構築を行いますので、CloudFormation のプロジェクトの方も安心してください。

本サンプルを利用する場合は、事前に ServerlessFramework を使用する環境をローカルに構築しておいてください。

serverless.yml

Lambda で SQS の発行を行いますので、IAM ロールを忘れずに設定しましょう。Resource には本来は SQS の ARN を取得し設定するのが望ましいですね。STG と PRD が同居する AWS アカウントであった時に、誤って STG から PRD のキューを操作したら怖いですね。

service: SQS-Serverless
provider:
  name: aws
  runtime: nodejs12.x
  stage: ${opt:stage,"dev"}
  region: ap-northeast-1
  profile: ragate
  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - 'sqs:*'
      Resource:
        - '*'

plugins:
  - serverless-webpack

custom:
  webpack:
    includeModules: true
    packager: 'npm'

functions: ${file(./resources/functions.yml)}

resources:
  - ${file(./resources/sqs.yml)}

package:
  individually: true

/resources/sqs.yml

SQS が任意の回数キュー処理に失敗した場合はデッドレターキューにキューが移行されます。(これを応用してリトライ処理を行うことが可能です)

下記のサンプルでは、5秒間隔で最大2回リトライ処理を行うことが可能です。

Resources:
  SQSQueue:
    Type: AWS::SQS::Queue
    Properties:
      # ContentBasedDeduplication: true # FifoQueue指定時のみ設定、先入れ先出し (FIFO) キューに対して、コンテンツに基づく重複排除を有効にするかどうかを指定
      DelaySeconds: 0 # メッセージの配信を遅延させたい場合。
      # FifoQueue: false # 指定しないと標準キューで作成される
      # KmsDataKeyReusePeriodSeconds: 300 # AWS KMS を呼び出す前にメッセージを暗号化または復号できる時間の長さ
      # KmsMasterKeyId: String KMSを利用する場合は必須な様子
      MaximumMessageSize: 262144 # メッセージのバイト数の制限(超過するとSQSが処理を拒否、最大値で262144まで設定が可能)
      MessageRetentionPeriod: 345600 # Amazon SQS がメッセージを保持する秒数(デフォルトは345,600(4日))
      QueueName: "SampleQueue" # FIFO キューを作成するには、FIFO キューの名前は .fifo サフィックスで終わる必要があります
      ReceiveMessageWaitTimeSeconds: 5 # キューが処理中の際の、ポーリング待機時間(0を指定するとショートポーリングとなる)
      RedrivePolicy:
        deadLetterTargetArn: { Fn::GetAtt: [DeadLetterSQSQueue, Arn] }
        maxReceiveCount: 2
      VisibilityTimeout: 5 # キュー受信時から、他Componentがキューを見れない秒数

  DeadLetterSQSQueue: # デッドレターキューにキューが入ったらLambdaを起動し、メール通知などを検討したい
    Type: AWS::SQS::Queue
    Properties:
      DelaySeconds: 0
      MaximumMessageSize: 262144
      MessageRetentionPeriod: 200
      QueueName: "DeadSampleQueue"
      ReceiveMessageWaitTimeSeconds: 5
      VisibilityTimeout: 10

/resources/functions.yml

ファンクションの中身は後述です。

createSQSQueue:
  handler: src/functions/createSQSQueue.handler
  environment:
    QUEUE_NAME: { Fn::GetAtt: [SQSQueue, QueueName] }
    QUEUE_ARN: { Fn::GetAtt: [SQSQueue, Arn] }
    QUEUE_END_POINT: !Ref SQSQueue

createEventSQSQueue:
  handler: src/functions/createEventSQSQueue.handler
  timeout: 4 # VisibilityTimeoutよりも低く設定する必要があるので一旦4秒に設定しておく
  events:
    - sqs:
        arn: { Fn::GetAtt: [SQSQueue, Arn] }
        batchSize: 1

/src/functions/createSQSQueue.handler

キュー発行処理を行います。この Lambda は AppSync からの呼び出しを想定していますが、パラメーターを受け取る箇所を修正すれば API Gateway やその他のサービスから呼び出すことも可能です。

const AWS = require("aws-sdk")
const SQS = new AWS.SQS({region: "ap-northeast-1"})
const QueueUrl = process.env.QUEUE_END_POINT

export async function handler(event, context) {
  try {    
    const MessageBody = JSON.stringify({
      id: event.id,
      text: event.input.text
    })
    const result = await SQS.sendMessage({MessageBody, QueueUrl}).promise()
    return {
      id: event.id,
      text: event.input.text,
      queueName: process.env.QUEUE_NAME,
      queueEndpoint: QueueUrl,
      messageId: result['MessageId']
    }
  } catch (e) {
    console.error(e)
  }
}

/src/functions/createEventSQSQueue.handler

ここではキューを受信した後の処理を記述します。本サンプルでは SQS のキュー受信時に Lambda をキックする仕様としているので、event オブジェクトには SQS が受信したメッセージが含まれます。

私たちがよく使用するワークフローは下記です。

  1. DynamoDB の Item の ID を受信
  2. その ID に関する処理を実施
  3. 処理が正常終了したら DynamoDB の Item のステータスを PUT
  4. もしどこかの処理で失敗すれば throw()

下記の例では、リトライ処理を意図的に起こしているものとなります。

例えばここの処理に try{}catch{} を設置し、catch に入ってきたら throw するように実装してあげれば、任意のタイミングで処理を失敗させて最初からやり直すことが可能です。

export async function handler(event, context) {
  throw('リトライ処理のためのThrow!');
}

一つ、大きな注意点があります!

実は SQS では、キューのポーリング(取得)が重複して実行される危険性があるんです。

つまり、同じ Lambda 処理が2回実行されてしまう可能性があります。

これは標準キューだけでなく、FIFO キューでも同じ現象が発生する可能性があります。(FIFO キューでも発生する可能性があるということに注意しましょう)

対策として、SQS ではキューに任意の ID を付与しているので、その ID を処理の前に一度 DynamoDB へ PUT し、次回以降の処理ではすでに DynamoDB に Item が存在していたら処理を実行しない(即リターン)ような対策が必要です。ここはかなりのハマりどころなのでしっかり意識しつつ実装を行います。

Lambda 関数には、冪等性を持たせるような実装を行う意識が必要です。

まとめ

浅い理解で実装してしまうと重複実行で事故になる可能性があります。特に重複実行しないように Lambda に冪等性のあるプログラミングさせましょう。

SQSの開発相談はお気軽にご相談ください。