{EMR}チュートリアル: Amazon EMR の開始方法

 

https://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/emr-gs.html
https://blog.serverworks.co.jp/amazonemr-tutorial

-- 1. コマンド等のインストール

-- 1.1 aws cli version 2 インストール

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
aws --version

 

-- 1.2 jqインストール
sudo yum -y install jq


-- 2. S3 バケットを作成する
※名前の末尾を数字にすることはできません。

aws s3 ls

aws s3 mb s3://bucket123emr


-- 3. アプリケーションを準備する

vim health_violations.py

import argparse

from pyspark.sql import SparkSession

def calculate_red_violations(data_source, output_uri):
    """
    Processes sample food establishment inspection data and queries the data to find the top 10 establishments
    with the most Red violations from 2006 to 2020.

    :param data_source: The URI of your food establishment data CSV, such as 's3://bucket123emr/food-establishment-data.csv'.
    :param output_uri: The URI where output is written, such as 's3://bucket123emr/restaurant_violation_results'.
    """
    with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
        # Load the restaurant violation CSV data
        if data_source is not None:
            restaurants_df = spark.read.option("header", "true").csv(data_source)

        # Create an in-memory DataFrame to query
        restaurants_df.createOrReplaceTempView("restaurant_violations")

        # Create a DataFrame of the top 10 restaurants with the most Red violations
        top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations 
          FROM restaurant_violations 
          WHERE violation_type = 'RED' 
          GROUP BY name 
          ORDER BY total_red_violations DESC LIMIT 10""")

        # Write the results to the specified output URI
        top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.")
    parser.add_argument(
        '--output_uri', help="The URI where output is saved, like an S3 bucket location.")
    args = parser.parse_args()

    calculate_red_violations(args.data_source, args.output_uri)


aws s3 cp health_violations.py s3://bucket123emr
aws s3 ls s3://bucket123emr --recursive


-- 4. 入力データを準備する


wget https://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/samples/food_establishment_data.zip

unzip food_establishment_data.zip

aws s3 cp food_establishment_data.csv s3://bucket123emr
aws s3 ls s3://bucket123emr --recursive

 

 

-- 5. Amazon EMR クラスターを起動する

aws emr create-default-roles

aws emr create-cluster \
--name emr01 \
--release-label emr-5.36.0 \
--applications Name=Spark \
--ec2-attributes KeyName=key1 \
--instance-type m5.xlarge \
--instance-count 2 \
--use-default-roles \
--no-termination-protected


aws emr list-clusters

aws emr describe-cluster \
--cluster-id j-1111111111111


-- 6. Amazon EMR に作業を送信する


aws emr add-steps \
--cluster-id j-1111111111111 \
--steps Type=Spark,Name="step01",ActionOnFailure=CONTINUE,\
Args=[s3://bucket123emr/health_violations.py,\
--data_source,s3://bucket123emr/food_establishment_data.csv,\
--output_uri,s3://bucket123emr/output]


aws emr describe-step \
--cluster-id j-1111111111111 \
--step-id s-2222222222222

 

-- 7. 結果を表示する

aws s3 ls s3://bucket123emr --recursive

aws s3 cp s3://bucket123emr/output/part-00000-33333333-3333-3333-3333-333333333333-c000.csv -


-- 8. クリーンアップ

-- クラスターの削除
aws emr terminate-clusters \
--cluster-ids j-1111111111111

aws emr list-clusters

 

-- バケットの削除
aws s3 ls

aws s3 rb s3://bucket123emr --force