読者です 読者をやめる 読者になる 読者になる

「Data Connector for Amazon S3」でELBのログを取り込む

こんにちは。matsです。

 

トレジャーデータからこんなサービスがリリースされました。

http://blog-jp.treasuredata.com/entry/2015/06/22/125518

たまたま社内でELBのログを調査する機会があったので、「Data Connector」でELBのアクセスログをTDに送るのに使用してみました。

今回は「Data Connector」を使う時の手順について書こうかと思います。

 

TreasureData ToolBeltのインストール

公式サイトから落としてインストールします。

私は自分のMacbookにインストールしました。

 

TreasureData ToolBeltの設定

アカウント認証

下記コマンドでアカウント認証をします。(対話形式です)

$ td -e https://api.treasuredata.com account -f

Enter your Treasure Data credentials.
Email: kazuki.matsuda@intimatemerger.com
Password (typing will be hidden):
Authenticated successfully.
Use 'td -e https://api.treasuredata.com db:create <db_name>' to create a database.

 

APIエンドポイントの登録

下記コマンドでコールするAPIを設定します。(切り替えるとYBIでもいけるのかな?)

$ td server:endpoint https://api.treasuredata.com

 

設定ファイルの作成

基本的にはembulkの設定と同じようです。

ELBのログファイルを読み込む設定はこんな感じになりました。

elb.yml

config:
  in:
    type: s3
    bucket: <bucket_name>
    path_prefix: [prefix]/AWSLogs/[account_id]/elasticloadbalancing/ap-northeast-1/[YYYY]/[MM]/[DD]/
    endpoint: s3-ap-northeast-1.amazonaws.com
    access_key_id: XXXXXXXXXXXXXXXXXXXXXX
    secret_access_key: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    parser:
      charset: UTF-8
      newline: CRLF
      type: csv
      delimiter: ' '
      skip_header_lines: 1
      allow_extra_columns: true
      allow_optional_columns: true
      columns:
        - name: time
          type: timestamp
          format: "%Y-%m-%dT%H:%M:%S"
        - name: elb_name
          type: string
        - name: client:port
          type: string
        - name: backend:port
          type: string
        - name: request_processing_time
          type: double
        - name: backend_processing_time
          type: double
        - name: response_processing_time
          type: double
        - name: elb_status_code
          type: string
        - name: backend_status_code
          type: string
        - name: received_bytes
          type: long
        - name: sent_bytes
          type: long
        - name: request
          type: string
        - name: user_agent
          type: string
        - name: ssl_cipher
          type: string
        - name: ssl_protocol
          type: string
  out:
    mode: append

 

ログフォーマットはこちらを参考にしました。が、実際にはかれているログをよく見ると

  • user_agent
  • ssl_cipher
  • ssl_protocol

といった項目があるので、注意が必要です。(これ公式のドキュメントに書いてほしいな。。)

embulk使用時とちがうのは、outの書き方ぐらいかと思います。デフォルトの出力先がTDになっているので、アウトプットプラグインの指定が必要ない感じです。

 

今回は特定の日のログが目当てだったので、path_prefixで日にちまで指定して取り込みました。

階層を上げれば月別とかで取り込むことができるので便利ですね。

 

ジョブの登録

次のコマンドでTDにジョブを登録します。

$ td connector:issue elb.yml --database embulk --table elb_accesslog --time-column time

Job 27708303 is queued.
Use 'td job:show 27708303' to show the status.

databaseとtableをあらかじめ作っていないと、ジョブを登録できないようなので注意が必要です。

 

実行するとTDの画面上からはこんな感じで「RUNNING」のジョブに積まれます。

 

td_bulk

 

ジョブの定期実行

ジョブをタスクスケジューラーで定期的に動かす場合は、下記コマンドで登録することができます。

 

$ td connector:create 
  daily_import 
  "10 0 * * *" 
  embulk 
  elb_accesslog 
  elb.yml 
  --time-column time

 

が、定期的に出力されるログを取り込む場合は、取り込む際に対象ファイルの削除か、path_prefixの動的な変更が出来ないと正直使いにくいです。。

 

定期実行とは相性が悪いみたいなので、Lambdaを絡めてファイルの生成と同時にTDに取り込むというのをそのうちやってみようかと思います。

 

おしまい

 

 

AWSでAerospikeをインストールする

こんにちは。matsです。

 

ちょうどAerospikeを増強する機会があったので、インストールについてまとめてみました。

基本的にはこちらのリファレンスを参考にしています。

 

サーバの起動

下記の構成で立ち上げます。

起動したら次のコマンドで拡張ネットワーキングが有効になっていることを確認します。

「driver: ixgbevf」とあれば大丈夫です。

# ethtool -i eth0

driver: ixgbevf
version: 2.14.2+amzn
firmware-version: N/A
bus-info: 0000:00:03.0
supports-statistics: yes
supports-test: yes
supports-eeprom-access: no
supports-register-dump: yes
supports-priv-flags: no

 

Aerospikeのインストール

rpmパッケージを落としてきてインストールするだけです。楽ちん。

(ここまで楽ならyumリポジトリあってもいいのに。。)

# wget --output-document=aerospike.tgz http://www.aerospike.com/download/server/3.5.14/artifact/el6
# tar -xzvf aerospike.tgz
# cd aerospike-server-community-*-el6
# ./asinstall

 

ネットワーク(NIC)の最適化

Amazon Linux (HVM)の各NICは、毎秒約250Kパケットを処理することができますが、より高いパフォーマンスを求める場合は、以下のどちらかの対応が必要な様です。

(これは一つのNICから受けるパケットは一つのCPUで受けて処理する仕様に起因しているようです)

 

ENI(仮想NIC)を追加する

NICを追加し、それぞれに対応するCPUを分けることで対応します。

が、結構設定が面倒くさいので、私のオススメは次のやつです。

 

RPSを使う

Linux kernelの2.6.35以降で利用できる機能にRPSというものがあります。

詳細な仕様は私も分かっていないのですが、簡単な設定をすることで特定のNICのパケットを処理するCPUの個数を指定することができる様です。

設定方法は下記コマンドになります。

echo ffffffff > /sys/class/net/eth0/queues/rx-0/rps_cpus

AMIに固めるときなどは /etc/rc.local の最後に書いています。

 

Ephemeral Diskのアンマウント

Aerospikeはファイルシステムを介さず、直接ブロックデバイスに書き込むのでマウントは不要です。

EC2ではデフォルトでマウントされてしまうので外しておきます。

# umount /media/ephemeral0
# sed -i -e '/ephemeral0/d' /etc/fstab

 

Aerospikeの起動

あとは起動するだけです。

ちなみに、自動起動は設定していません。(サーバが落ちた時などは予期しないことが起こっている事が多いので)

# service aerospike start

 

 

おわり

Airbnbの「Airflow」を試す

こんにちは。最近pythonと戯れているmatsです。

 

6月の頭にこんなリリースがありました。

http://japan.cnet.com/news/service/35065498/

今回はこの「Airflow」を試してみようかと思います。

 

Airflowとは

「Airflow」はAirbnbオープンソースとして公開したワークフロー管理プラットフォームです。

データパイプラインを規模に応じてタイムリーにオーサリング、スケジューリング、モニタリングすることができ、過去数年間で爆発的に成長したAirbnbにとっては不可欠な存在とのこと。

 

ソースコード: https://github.com/airbnb/airflow/

ドキュメント: http://pythonhosted.org/airflow/

 

Airflowのインストール

Airflowはpython製で2.7系のみの対応です。今回はAmazon Linuxにインストールを行いました。

 

準備

コンパイルに必要なパッケージをインストールします。

$ sudo yum -y groupinstall "Development Tools"

 

Airflow本体のインストール

pythonのライブラリとして公開されているので、pipコマンドで導入可能です。(結構時間がかかります)

$ pip install airflow

 

データベースの初期化

次のコマンドでデータベースの初期化を行います。

デフォルトがSQLiteになっていてサクッとはじめられるのはいいですね(^^)

$ airflow initdb

DB: sqlite:////home/ec2-user/airflow/airflow.db
Done.

 

Airflowの起動

webserverを起動します。

$ airflow webserver

 ____________ _____________
 ____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
 _/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/

2015-06-28 16:34:20,376 - root - INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
2015-06-28 16:34:20,376 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/example_python_operator.py
2015-06-28 16:34:20,377 - root - INFO - Loaded DAG <DAG: example_python_operator>
2015-06-28 16:34:20,378 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/example_bash_operator.py
2015-06-28 16:34:20,378 - root - INFO - Loaded DAG <DAG: example_bash_operator>
2015-06-28 16:34:20,378 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/tutorial.py
2015-06-28 16:34:20,379 - root - INFO - Loaded DAG <DAG: tutorial>
Running Tornado server on host 0.0.0.0 and port 8080...

 

worker等を起動する場合は別途MySQL等が必要です。(初期設定ではceleryのバックエンドにMySQLを使用することになっています)

$ airflow worker

 -------------- celery@ip-192-168-4-67 v3.1.18 (Cipater)
---- **** -----
--- * *** * -- Linux-3.14.44-32.39.amzn1.x86_64-x86_64-with-glibc2.2.5
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x7f270bd6dd90
- ** ---------- .> transport: sqla+mysql://airflow:airflow@localhost:3306/airflow
- ** ---------- .> results: db+mysql://airflow:airflow@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> default exchange=celery(direct) key=celery


[2015-06-28 15:09:19,033: WARNING/MainProcess] celery@ip-192-168-4-67 ready.
Starting flask
 * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)

 

$ airflow flower

[I 150628 15:09:45 command:114] Visit me at http://localhost:5555
[I 150628 15:09:45 command:116] Broker: sqla+mysql://airflow:airflow@localhost:3306/airflow
[I 150628 15:09:45 command:119] Registered tasks:
 ['celery.backend_cleanup',
 'celery.chain',
 'celery.chord',
 'celery.chord_unlock',
 'celery.chunks',
 'celery.group',
 'celery.map',
 'celery.starmap']
[I 150628 15:09:45 mixins:225] Connected to sqla+mysql://airflow:airflow@localhost:3306/airflow

 

ブラウザから http://:8080 にアクセスすると・・・

Airflow_-_DAGs

お、でた!(使い方がwからない。。。)

 

所感

Q. で、結局のところ使えるの?

所定のルールでpythonスクリプト書き、登録するとこんな感じで表示されるようです。(for文でクルクル回している処理が、小ジョブとしてそれぞれ表示されていたりするのはおもしろいですね)

Airflow_DAGs

ワークフロー全体で1つのコードのようなので、動的にジョブを組み合わせるとかは難しいのかもしれません。

IMには、用意されたジョブを動的に連結させて動かす仕組みが既にあるので、同様のことをする場合はジョブを選択していくとAirflow登録用のpythonコードが生成出来る仕組みが別途必要そうです。

ある程度固まっているワークフローをガンガン回すのには向いている感じがします。

 

Q. アーキテクチャー的には?

キューの管理やデータストアやワーカーは独立させる事ができるみたいなので、比較的障害等には強い気がします。

 

Q. オートスケールとかAWSとの相性は?

まだわかりません。ワーカーとしてスポットインスタンスを使えると最高なのですが。。

キュー管理に使われているceleryはバックエンドにSQSを使えたりするみたいです。

 

 

似たような仕組みを既に社内に持っているので実際に使うかは微妙ですが、引き続き動向は追っていこうかと思います。(試すとか言って起動しかしていないのはご愛嬌)

 

おわり

Aerospikeの勉強会に登壇してきました

こにちは、matsです。

 

昨日、行われていた「Aerospike Deep Dive」に登壇してきました。

aerospike_deep_dive

 

Deepと付いているのですが、どちらかと言うと導入事例やTips的なものがメインだったので全くDeepではありませんでした。。

内容的には

  • IMでの使われ方(導入事例) ← メイン
  • 導入時に検討した他のKVS
  • 導入して気づいたこと

こんな感じで、後半2つは参考程度のお話になります。

海外で大規模に運用しているところでは、BlueKaiやAppNexusなどのDMPで弊社と同様の使い方をしているようなので、Aerospikeの用途的には比較的王道をいっているのかと思います。なので、事例としては結構いいんじゃないかなと思い、お話させて頂いた感じです。

最後に質問で、SSD領域を使ってやっている属性データの加工の際のレスポンスについて聞かれましたが、Aerospike的にはほぼ全て1ms以下で応答していて、それ以上の精度で計測するのは正直辛いですね。。ほんと早すぎて限界が全くわからないです。

 

※追記

Youtubeに動画が上がりましたので追加しておきます。(他の登壇者の動画はこちら

 

 

絶賛エンジニアを募集中ですので、ElasticsearchやAerospikeなどを用いた大規模なデータ処理に興味のある方いらっしゃいましたらご連絡頂ければと思います〜。

採用ページはこちら

 

AWS運用における3種の神器

こんにちは。matsです。

 

IMは膨大な量のデータを扱うビッグデータカンパニーでありますが、それと同時にAWSをゴリゴリつかうオートスケールカンパニー(?)でもあります。

ほぼ全てのサーバをオートスケールによって運用しており、高い対障害性と運用の効率化を実現しております。今回はAWSのオートスケールを使って数百台規模のサーバを運用する際に、非常に便利な3つのサービスをご紹介したいと思います。

 

1. Mackerel

mackerel

Mackerelはクラウド型のモニタリングサービスです。最近はサーバのリソースだけでなく、スクリプトによる監視も行えるようになっています。

各サーバで稼働するエージェントにより監視を行うプッシュ型のサービスのため、オートスケールとの相性が非常によく、台数の増加に伴う運用負荷の増加が殆どありません。同様のサービスではDataDogがありますが、SearviceとRoleでサービスを分類・管理出来るという点と、ボリュームディスカウントが効く点からMackerelを採用しております。

また、SearviceとRoleでサービスを分類するというやり方は、残りの2つのサービスでも採用しているため弊社においては非常に重要な概念になります。

 

 

2. Papertrail

papertrail

日本ではあまり聞かないサービスですが、PapertrailはGitHubなどの採用実績のあるログの集約・監視を行うサービスになります。

リモートSyslog形式で投げ込むことにより、複数のサーバのログをまとめてブラウザ上からほぼリアルタイムに見ることが出来ます。(まとめる単位はMackerelのSearviceとRoleに合わせています。)サーバ台数が多いと、各サーバに入ってログを確認するということだけでも大変になってくるので重宝しています。

特定文字列の監視を設定し、Slack等にアラートを飛ばすことも可能です。

また、集めたログはS3に日別で自動アーカイブすることが可能なので、IMではほぼ全てのログをPapertrailに投げ込んでいます。(PT側の保存期間を短くして、S3にアーカイブするといった使い方も可能)

これだけ至れり尽せりな仕様でありながら、数十ドル/月で利用可能です。(検索可能な期間と月間に受信するログの量で変動します)

正直、何でコレが流行っていないのか理解に苦しむレベルのネ申サービスです。

 

 

3. Rundeck

 

rundeck-logotype-512

RundeckはOSSのタスクスケジューラーになります。ブログなどでcronの代替として紹介されていることが多いですが、AWSとの連携により真の力を発揮することが出来ます。

Rundeckはプラグインにより、EC2のインスタンスの一覧を取得可能で、その際に付与されているタグも識別することが出来ます。

Mackerelなどで利用しているSearviceとRoleを適切にタグに設定していれば、そのグルーピング単位でジョブの実行やソースコードのデプロイを行うことが可能です。これにより、ホスト名やIPで個々のサーバを管理する必要がなくなります。

また、アドホックにコマンドを発行することも可能なので、全台のOpenSSLのバージョンを調べるといった使い方も出来たりします。

 

 

PapertrailとRundeckは国内の事例が少ないようなので、別途記事を書こうかと思います。

mackerelのスクリプトによる監視を使ってみる

こんにちはmatsです。

 

以前、こんな記事を投稿しました。

http://tech.im-dmp.net/archives/632

 

強引かつ(´・ω・`)ショボイ方法で監視をしていたのですが、はてなさんが神アップデートをしてくれたので、試したいと思います。

http://help-ja.mackerel.io/entry/custom-checks

nagios-pluginを使ったプロセス監視

 

今回はnagios-pluginsの中にある、「check_procs」モジュールを使ってelasticsearchのプロセス監視を行ってみたいと思います。

 

nagios-pluginのインストール

 

AWS環境(AmazonLinux)なので、yumでいれます。(CentOS等はepelだったかと)

$ sudo yum install -y nagios-plugins-all

 

これで「/usr/lib64/nagios/plugins/」配下にモジュールがインストールされます。

 

mackerel-agentの設定

設定は次のように mackerel-agent.conf に記載するだけです。

 

/etc/mackerel-agent/mackerel-agent.conf

apikey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX="
roles = [ "service:role" ]

[plugin.checks.elasticsearch]
command = "/usr/lib64/nagios/plugins/check_procs -w 1:1 -c 1:1 -u elasticsearch -C java"

 

アラートの判定ロジックはnagiosを踏襲しているので、コマンド自体はそのままでいけます。

監視状況はmackerel上でこんな感じに確認できます。

mackerel_proc

 

 

気になる点としては、一回エラーを検知すると即発報してしまうのでnagiosのような柔軟な監視設計ができないところくらいでしょうか。

特に問題はないのですが、EC2のAutoScalingでサーバがterminateされる時にプロセスの終了とmackerelの退役処理の間に監視が差し込まれたりして、アラートがあがったりしてしまうことがありました。(nagios等だと即発報とかにはしないので気にならないのですが)

コレに関しては、停止順を精査すればいいだけのことではありますが。

 

mackerelスゲーというお話でした。

PythonでTreasureDataにBulkImportした話

こんにちは、g0eです。

mats氏によって「中の人」の紹介記事が勝手に書かれていることに最近気づきました。javaはもう忘れました。

 

弊社では各種ローデータの収集や一次加工にTreasureData(以下:TD)を活用しています。

tresuredata

そして、アプリケーションは主にpythonで書いていまして、tdコマンドの中を覗く(lessする)とruby派っぽい雰囲気が伝わってくるんですが、pythonでも結構戦えたので紹介したいと思います。

今回のお題は、pythonでTDにtsvとかcsvのファイルをbulk_importした、という話です。 男らしくtdコマンドをsubprocessで叩けばいいんじゃないか、と思わなくもないですが、ちょっとややこしい環境で動かそうとしてうまくいかなかったので、td-client-pythonを使うことにしました。

このライブラリ、コードとコメントを読むと実は結構色々なことが出来るみたいで、そもそも最初はbulk_importの機能も実装されている事に気づきませんでした。

 

TDの環境設定とかtdコマンドの設定とかは既に終わっている前提で、まずはさくっとpipでインストールします。

(今回の記事は td-client==0.2.0 をベースに書いています)

$ pip install td-client

 

サンプルコード

pythonのコード的には↓みたいな感じで動くはず (既に動いているコードから切り貼りしてきただけなんで、エラー出たら教えて下さい)

# -*- coding: utf-8 -*-
import tdclient
import time

TD_APIKEY = "XXXX" # TDのAPIキー
TD_DB = "hoge" # TDのデータベース名
TD_TB = "piyo" # TDのテーブル名
TSV_FILE ="/path/to/hoge.tsv" # bulk_importしたいtsvファイル

cl = tdclient.Client(TD_APIKEY)
ts = int(time.time()) # timestamp

# bulk_importするtsvファイル(tdコマンドでimport:auto出来るフォーマットで)
f = open(TSV_FILE,"r")

# bulk_import用のセッション名
# ユニークになればOKっぽいのでファイル名とtimestampを使ってみる
# ただ、/か.かを含んでいるとエラーになったっぽいので除去
session_name = f.name.split("/")[-1].replace(".","") + "_" + str(ts)

# bulk_import用のsessionを生成
bulk = cl.create_bulk_import(session_name,TD_DB,TD_TB)

# ファイルをアップロード
bulk.upload_file(session_name,"tsv",f)

# ファイルの追加・変更がないことを通知するっぽい
# freezeしないと先に進めないのでおまじない的に
bulk.freeze()

# TDのUIからJobsを確認するとこの段階でMap-Reduceが走っている様子
# wait=Trueにして処理が完了するまで待たないとcommitでエラーが出ます
bulk.perform(wait=True)

# この処理が終わって、やっとUI上で追加されたデータを確認できる
bulk.commit(wait=True)

# 処理に成功した件数とエラーになった件数はこんな感じで取得できる
print("valid_records:" + str(bulk.valid_records))
print("error_records:" + str(bulk.error_records))

# 後処理
bulk.delete()
f.close()

 

freezeとかperformとかcommitとかの概念はBulk Import Internalsに書いているみたいなんで、気になる方は見てみて下さい。

以上