こんにちは!
AWS SQS って具体的に何ができるのか、よくわからない方が多いのでは?と思います。
本記事では、メッセージキューを使用したユースケースと、そのプログラミング方法を紹介します。
サービス開発の中で AWS SQS を一切使用しないという選択も可能ですが、ユースケースによっては実装コスト・リスクを大きく軽減できる可能性があります、是非覚えていってください。
例えば、財務システムの開発で帳票を PDF 出力できる機能を考えてみましょう。
PDF 出力には、財務システムの持つ様々な計算ロジックを起動させる必要があり、PDF 出力完了までに10秒以上の時間を要する可能性があるとします。
PDF 出力ボタンをクリックした際に、処理が完了するまでそのまま待機させるのは UX 悪いですよね。そういった時に活躍するのが SQS です。SQS を使用すると、下記のようなワークフローを実現することができます。
非同期での処理実行には、上記以外にも様々な方法があります。ビジネス要件に合わせて適切なアーキテクチャーを検討しましょう。
SNS はメッセージトピックを作成しサブスクリプションへ通知します。
対して SQS は通知を受け取り処理する側です。(SNS のサブスクリプションへ SQS を登録することも可能です)
1つの通知処理で同時に複数の通知先をへ通知を行いたケースには、SNS を活用しましょう。
持てません。
SQS はキューへメッセージを溜め込み、任意のタイミングで取り出せるサービスです。
キューからメッセージを取り出す方法には、SQS のメッセージキュー受信をトリガーに Lambda を起動させる方法や、任意のタイミングで Lambda を発火させて SQS のキューを取り出す方法などがあります。
基本的には CloudFormation の文法で構築を行いますので、CloudFormation のプロジェクトの方も安心してください。
本サンプルを利用する場合は、事前に ServerlessFramework を使用する環境をローカルに構築しておいてください。
Lambda で SQS の発行を行いますので、IAM ロールを忘れずに設定しましょう。Resource には本来は SQS の ARN を取得し設定するのが望ましいですね。STG と PRD が同居する AWS アカウントであった時に、誤って STG から PRD のキューを操作したら怖いですね。
service: SQS-Serverless
provider:
name: aws
runtime: nodejs12.x
stage: ${opt:stage,"dev"}
region: ap-northeast-1
profile: ragate
iamRoleStatements:
- Effect: 'Allow'
Action:
- 'sqs:*'
Resource:
- '*'
plugins:
- serverless-webpack
custom:
webpack:
includeModules: true
packager: 'npm'
functions: ${file(./resources/functions.yml)}
resources:
- ${file(./resources/sqs.yml)}
package:
individually: true
SQS が任意の回数キュー処理に失敗した場合はデッドレターキューにキューが移行されます。(これを応用してリトライ処理を行うことが可能です)
下記のサンプルでは、5秒間隔で最大2回リトライ処理を行うことが可能です。
Resources:
SQSQueue:
Type: AWS::SQS::Queue
Properties:
# ContentBasedDeduplication: true # FifoQueue指定時のみ設定、先入れ先出し (FIFO) キューに対して、コンテンツに基づく重複排除を有効にするかどうかを指定
DelaySeconds: 0 # メッセージの配信を遅延させたい場合。
# FifoQueue: false # 指定しないと標準キューで作成される
# KmsDataKeyReusePeriodSeconds: 300 # AWS KMS を呼び出す前にメッセージを暗号化または復号できる時間の長さ
# KmsMasterKeyId: String KMSを利用する場合は必須な様子
MaximumMessageSize: 262144 # メッセージのバイト数の制限(超過するとSQSが処理を拒否、最大値で262144まで設定が可能)
MessageRetentionPeriod: 345600 # Amazon SQS がメッセージを保持する秒数(デフォルトは345,600(4日))
QueueName: "SampleQueue" # FIFO キューを作成するには、FIFO キューの名前は .fifo サフィックスで終わる必要があります
ReceiveMessageWaitTimeSeconds: 5 # キューが処理中の際の、ポーリング待機時間(0を指定するとショートポーリングとなる)
RedrivePolicy:
deadLetterTargetArn: { Fn::GetAtt: [DeadLetterSQSQueue, Arn] }
maxReceiveCount: 2
VisibilityTimeout: 5 # キュー受信時から、他Componentがキューを見れない秒数
DeadLetterSQSQueue: # デッドレターキューにキューが入ったらLambdaを起動し、メール通知などを検討したい
Type: AWS::SQS::Queue
Properties:
DelaySeconds: 0
MaximumMessageSize: 262144
MessageRetentionPeriod: 200
QueueName: "DeadSampleQueue"
ReceiveMessageWaitTimeSeconds: 5
VisibilityTimeout: 10
ファンクションの中身は後述です。
createSQSQueue:
handler: src/functions/createSQSQueue.handler
environment:
QUEUE_NAME: { Fn::GetAtt: [SQSQueue, QueueName] }
QUEUE_ARN: { Fn::GetAtt: [SQSQueue, Arn] }
QUEUE_END_POINT: !Ref SQSQueue
createEventSQSQueue:
handler: src/functions/createEventSQSQueue.handler
timeout: 4 # VisibilityTimeoutよりも低く設定する必要があるので一旦4秒に設定しておく
events:
- sqs:
arn: { Fn::GetAtt: [SQSQueue, Arn] }
batchSize: 1
キュー発行処理を行います。この Lambda は AppSync からの呼び出しを想定していますが、パラメーターを受け取る箇所を修正すれば API Gateway やその他のサービスから呼び出すことも可能です。
const AWS = require("aws-sdk")
const SQS = new AWS.SQS({region: "ap-northeast-1"})
const QueueUrl = process.env.QUEUE_END_POINT
export async function handler(event, context) {
try {
const MessageBody = JSON.stringify({
id: event.id,
text: event.input.text
})
const result = await SQS.sendMessage({MessageBody, QueueUrl}).promise()
return {
id: event.id,
text: event.input.text,
queueName: process.env.QUEUE_NAME,
queueEndpoint: QueueUrl,
messageId: result['MessageId']
}
} catch (e) {
console.error(e)
}
}
ここではキューを受信した後の処理を記述します。本サンプルでは SQS のキュー受信時に Lambda をキックする仕様としているので、event オブジェクトには SQS が受信したメッセージが含まれます。
私たちがよく使用するワークフローは下記です。
下記の例では、リトライ処理を意図的に起こしているものとなります。
例えばここの処理に try{}catch{} を設置し、catch に入ってきたら throw するように実装してあげれば、任意のタイミングで処理を失敗させて最初からやり直すことが可能です。
export async function handler(event, context) {
throw('リトライ処理のためのThrow!');
}
実は SQS では、キューのポーリング(取得)が重複して実行される危険性があるんです。
つまり、同じ Lambda 処理が2回実行されてしまう可能性があります。
これは標準キューだけでなく、FIFO キューでも同じ現象が発生する可能性があります。(FIFO キューでも発生する可能性があるということに注意しましょう)
対策として、SQS ではキューに任意の ID を付与しているので、その ID を処理の前に一度 DynamoDB へ PUT し、次回以降の処理ではすでに DynamoDB に Item が存在していたら処理を実行しない(即リターン)ような対策が必要です。ここはかなりのハマりどころなのでしっかり意識しつつ実装を行います。
Lambda 関数には、冪等性を持たせるような実装を行う意識が必要です。
浅い理解で実装してしまうと重複実行で事故になる可能性があります。特に重複実行しないように Lambda に冪等性のあるプログラミングさせましょう。
SQSの開発相談はお気軽にご相談ください。
スモールスタート開発支援、サーバーレス・NoSQLのことなら
ラーゲイトまでご相談ください
低コスト、サーバーレスの
モダナイズ開発をご検討なら
下請け対応可能
Sler企業様からの依頼も歓迎