信頼はずっと、挑戦はもっと。

お問い合わせ
TEL:03-3496-3888

BLOG コアテックの社員ブログ (毎週月曜~金曜更新中)

2020

30

3月

wafのサンプルログを定期的にとって、あったら自動でchatworkに通知してほしいというきもち

テクログ

どうも、相変わらずいろいろ検証をしています。


RDS Proxy はやくGAになってほしいですね。


さて、標題の気持ちになったのでLambdaを書きました。

ファイル分け?大丈夫大丈夫このくらいなら。

冗長だとしてもわかりやすさ優先なのはいつもどおりです。



弊社ではcloud9でのserverless frameworkでのデプロイも普通になってきましたね。

Lambdaがはかどりますね。


さてソースです。


こちら、参考URLです。

https://dev.classmethod.jp/articles/get-aws-waf-sample-logs/



こちらを色々変更し、指定したwaf 指定したルールを 指定した間隔で行うようにしています。

環境変数に入るだけ何個でもいけますね。

あ、これはCF版ですが、LB版は上のクライアントを変えるだけだと思います。

(作って運用しています。増えすぎるのであえてCF版とLB版を分けました)

実行はcloudwatch eventsで設定です。

slsの設定で。



環境変数はこんな感じ



CHATWORK_API_TOKEN XXあんごうかしたトークンだよXX

CHECK_LISTS [{'Name':'ばいたいめいだよ','WebAclID':'aaaa-aaaaとかだよ','WAFName':'わかりやすくするためのwafの名前です','RuleID':'aaaa-bbbbとかのルールID','RuleName':'これも基本わかりやすくするためのルール名'},]

INTERVAL 11(とか数字)

ROOM_ID 11111111(とかのルームID)



# -*- coding: utf-8 -*-
#cfのwafのサンプルログを確認し、あれば通知する
import os
import urllib
from datetime import datetime, timedelta
import boto3
import sys
from base64 import b64decode
import ast

REGION = "ap-northeast-1"

#環境変数
#Chatwork API トークン
ENCRYPTED_CHATWORK_API_TOKEN = os.environ['CHATWORK_API_TOKEN']
CHATWORK_API_TOKEN = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_CHATWORK_API_TOKEN))['Plaintext']

INTERVAL = int(os.environ['INTERVAL'])
ROOM_ID = os.environ['ROOM_ID']
CHECK_LISTS = ast.literal_eval(os.environ['CHECK_LISTS'])

#client用意 CF版だよ
waf = boto3.client('waf')

####
# replace datetime to string
####
def time2str(x):
    x['Timestamp'] = x['Timestamp'].isoformat()
    return x

####
#chatworkに通知
####
def postChatwork(message, room_id):
  END_POINT_BASE = "https://api.chatwork.com/v2"
  END_POINT      = "/rooms/"
  ACTION         = "/messages"
  
  headers = { 'X-ChatWorkToken': CHATWORK_API_TOKEN }
  
  data  = { 'body': message }
  data = urllib.parse.urlencode(data)
  data = data.encode('utf-8')
  
  # リクエストの生成と送信
  post_message_url = END_POINT_BASE + END_POINT + room_id + ACTION
  request = urllib.request.Request(post_message_url, data=data, method="POST", headers=headers)
  with urllib.request.urlopen(request) as response:
    response_body = response.read().decode("utf-8")
    print(response_body)

####
#dictを文字にするだけ
####
def makeText(marge_logs):
  text = "[info][title]CF版 WAF 検知ログ 直近"+str(INTERVAL)+"分[/title]"
  
  for log in marge_logs:
    for key, value in log.items():
      text += key + "    " + value + "\n"
    text += "[hr]"
    
  text += "[/info]"
    
  return text

###
#main
###
def lambda_handler(event, context):
  #LISTごとに繰り返し
  for CHECK_LIST in CHECK_LISTS:
    WebAclId = CHECK_LIST['WebAclID']
    acl = waf.get_web_acl(WebACLId=WebAclId)
    
    WebACLName = acl['WebACL']['Name']
  
    marge_logs = []
    for rule in acl['WebACL']['Rules']:
      RuleId = rule['RuleId']
      
      if RuleId == CHECK_LIST['RuleID']:
        # get sample requests
        r = waf.get_sampled_requests(
          MaxItems=3,
          WebAclId=WebAclId,
          RuleId=RuleId,
          TimeWindow={
            'EndTime': datetime.utcnow(),
            'StartTime': datetime.utcnow() - timedelta(minutes=INTERVAL)
          }
        )
    
        # check sample count
        if 'SampledRequests' not in r or len(r['SampledRequests']) == 0:
          #ログのみ
          print("WebAclId:" + WebAclId + " RuleId:" +RuleId + " サンプルカウントなし")
        else:
          # replace datetime to string
          logs = list(map(time2str, r['SampledRequests']))
          for l in logs:
            headers_dict = l['Request']['Headers']
        
            Host = ''
            From = ''
            UserAgent = ''
            UserAgent2 = ''
            Referer = ''
            Referer2 = ''
            for hd in headers_dict:
              if hd['Name'] == 'Host':
                Host = hd['Value']
              elif hd['Name'] == 'From':
                From = hd['Value']
              elif hd['Name'] == 'User-Agent':
                UserAgent = hd['Value']
              elif hd['Name'] == 'user-agent':
                UserAgent2 = hd['Value']
              elif hd['Name'] == 'Referer':
                Referer = hd['Value']
              elif hd['Name'] == 'referer':
                Referer2 = hd['Value']
            
            simpledict = {
              '媒体名':CHECK_LIST['Name'],
              '時間':l['Timestamp'],
              'IP':'https://ipee.at/'+l['Request']['ClientIP'],
              '国':l['Request']['Country'],
              'URI':l['Request']['URI'],
              'Headers.Host':Host,
              'Headers.From':From,
              'Headers.User-Agent':UserAgent + UserAgent2,
              'Headers.Referer':Referer + Referer2,
              '対象WAF/ルール':CHECK_LIST['WAFName']+' / '+CHECK_LIST['RuleName'],
              'Method':l['Request']['Method'],
            }
            marge_logs.append(simpledict)
   
          # chatwork通知
          if marge_logs != []:
            try:
              print(marge_logs)
              cw_text = makeText(marge_logs)
              postChatwork(cw_text, ROOM_ID)
            except Exception as e:
              print(e)
          else:
            postChatwork("ログないよ", ROOM_ID)#基本ここはないですね


ではよいWAF生活を!



この記事を書いた人

画像:投稿者アイコン

tzwy

所 属:
WEBインテグレーション事業部
出身地:
saitama
仕事内容:
いろいろ検証、作成、障害対応とか品質向上とか

RELATED ARTICLE

関連記事

記事一覧へ