LAC WATCH

セキュリティとITの最新情報

RSS

株式会社ラック

メールマガジン

サイバーセキュリティや
ラックに関する情報をお届けします。

サービス・製品 | 

Prisma® Cloudの運用負荷を軽減する実践的アプローチ

クラウドセキュリティの需要は急速に拡大しており、米国の調査会社によると2024年第2四半期には市場が42%成長したという報告もあります。これは海外の話にとどまらず、国内にいる私たちも、クラウド上のシステム開発と運用時のセキュリティ監視を行う「Prisma® Cloud」を導入済みのお客様からの問い合わせが増えていると感じています。

導入フェーズを終えた今、運用負荷を軽減する手段を模索している方も多いのではないでしょうか。この記事では、Prisma® Cloudの運用負荷を軽減するための具体的な方法をご紹介します。

なお、今回はPrisma® Cloud APIを活用するための基盤コードを作成し、現在実行中の定型作業のフローチャート化と効率化を行う流れで、Prisma® Cloudの運用業務をコード化する手法をご説明します。また、ChatGPTを活用したコード生成とテストの事例もありますので、ぜひ最後までご覧ください。

事前準備

この記事では、Prisma® Cloud RESTAPIを活用した処理をご紹介するので、事前に実行環境をご用意ください。

準備するもの

  • Python3.11が実行可能な環境
  • Prisma® CloudのテナントID
  • ログイン用のIDとパスワード

上記3つの準備が完了したら、初期ログインのコードを用意します。内容はほぼ定型文で下記のような内容です。prismacloud.apiを利用して、RESTAPIを実行するトークンを取得するための処理です。

import requests
import urllib.parse
from prismacloud.api import pc_api, pc_utility
 
# --ユーザカスタムコマンドライン引数-- #
parser = pc_utility.get_arg_parser()
args = parser.parse_args()
 
# --初期化&ログイン-- #
settings = pc_utility.get_settings(args)
pc_api.configure(settings)
 
def alt_login():
    url = urllib.parse.urljoin('https://' + pc_api.api, 'login')
    headers = { 'Content-Type': 'application/json' }
    body_params = {
        "prismaId": pc_api.name,
        "username": pc_api.identity,
        "password":pc_api.secret
    }
 
    response = requests.post(url, headers=headers, json=body_params, verify=pc_api.verify)
 
    token = response.json()['token']
    pc_api.token = token
    print(token)
    print()
 
alt_login()

トークンの取得には対象テナントの情報が必要なので、Pythonのホームディレクトリ直下に以下の内容でcredentials.jsonも用意してください。なお、右側はコメントなので、実行時には削除してください。記載フォーマットを変更することでアクセスキー利用とすることも可能です。

{
    "api": "https://api.xx.prismacloud.io/",//Prisma Cloudのテナントリージョンを指定
    "name": "xxxxxxxx",        //Prisma CloudのテナントID
    "username": "xxx@example.com",  //処理を実行させたいユーザのID
    "password": "xxx"         //処理を実行させたいユーザのパスワード
}

なお、上記で指定するのはPaloalto Hubではなく、Prisma® Cloud本体のログインIDとパスワードである点に注意してください。以下の図で示した、レガシーログインコンソールにログインするIDとパスワードになります。

レガシーログインコンソールにログインする様子

設定が正しければ、このようにコマンド発行用のトークン情報を取得できます。

コマンド発行用のトークン情報

定型作業のフローチャート化と効率化

さて、ここまで準備できれば、あとはどの業務を効率化するかを検討しましょう。この記事では、IAM系アラートで検知されたEC2インスタンスへのタグ情報付加を自動化します。

アラート一覧画面で同一の資源で複数のアラートが検知されているのに、アセットのタグでフィルタリングをかけるとなぜか表示されるアラートの数が激減することを、不思議に思ったことはありませんか?これはなぜなのでしょうか。

アラート一覧画面のアラート検知状態。アセットのタグでフィルタリングをかけると表示されるアラートの数が減少する。

その理由は、IAM系のアラートは資源のタグ情報を拾わない仕様だからです(2024年12月現在)。ですが、特に大量に出力されがちなIAM系の権限アラートを、タグでフィルタリングできないと不便ですよね。ベンダーに機能改善のリクエストをしても、すぐに実装されるわけではなく、オペレーション作業が待ってくれるわけでもありません。

そこで、自前でインベントリ上のタグ情報をIAMアラートに付加するコードを作ってみましょう。これが実現できれば、過検知を起こしがちな高権限アラートをタグで分類でき、優先順位の切り分け作業が大幅に楽になるはずです。

では、具体的にどうすればアラートにタグを付与できるのか説明します。まずは、インベントリ上にあるタグ情報を自分で拾いに行く必要がありますね。どのアセットのタグを拾うかは、IAMアラート上のインスタンスIDがあれば大丈夫でしょう。つまり、下記のフローで実現します。

解決策のフロー

  1. IAMアラートからインスタンスIDを抽出
  2. インスタンスIDを元に、インベントリからインスタンスのユニークキーを取得
  3. ユニークキーを元に、インベントリからタグ情報を個別取得
  4. 上記情報を突合し、アラートごとにタグ情報を付加

ChatGPTを活用したコード生成とテストの事例

続いて、コードを生成します。まず、どのRESTAPIを組み合わせればよいかを考えましょう。下記ページから使えそうなAPIを検討します。

Welcome to the Prisma Cloud APIs | Develop with Palo Alto Networks

今回は、以下のAPIを用いて実装します。

利用するAPI

それぞれのAPIのページを開き、リクエスト構造を確認します。アラート一覧の収集であれば、構造は下記の通りです。

アラート一覧を収集するリクエスト構造

フィルタ条件はpolicy.type=iam、alert.status=open、resource.type=EC2 Instanceで良さそうです。では、早速試してみましょう。下記のように、アラート情報の一覧が取得できました。

取得されたアラート情報の一覧

JSONは便利ですが、どうしても読みにくいと感じることがありますよね。そこで、必要な情報だけをCSVに出力しなおしていきます。これまでは、ここの情報整理に多くの時間がかかり大変だったのですが、その部分はChatGPTにお願いしてしまいましょう。ここでは、CSVに出力して、その後必要な情報を取得するように再修正をお願いしてみました。

ChatGPTに必要な情報だけをCSVに出力するようリクエストした様子

すると、以下のようなコードができました。

# --IAM系アラート取得処理
# --実行日から30日前までに発行されたIAMアラート:オープン中:EC2が対象のものを取得
# --alerts_output.csvに出力
import requests
import urllib.parse
from datetime import datetime, timedelta
from prismacloud.api import pc_api, pc_utility
import json
import pandas as pd
 
# --ユーザカスタムコマンドライン引数-- #
parser = pc_utility.get_arg_parser()
args = parser.parse_args()
 
# --初期化&ログイン-- #
settings = pc_utility.get_settings(args)
pc_api.configure(settings)
 
def alt_login():
    url = urllib.parse.urljoin('https://' + pc_api.api, 'login')
    headers = {'Content-Type': 'application/json'}
    body_params = {
        "prismaId": pc_api.name,
        "username": pc_api.identity,
        "password": pc_api.secret
    }
 
    response = requests.post(url, headers=headers, json=body_params, verify=pc_api.verify)
 
    if response.status_code == 200:
        token = response.json()['token']
        pc_api.token = token
        print("Token acquired:")
        return token
    else:
        print("Error acquiring token:", response.status_code, response.text)
        raise Exception("ログイン失敗")
 
def get_alerts(api_token):
    api_url = urllib.parse.urljoin('https://' + pc_api.api, '/v2/alert')
    headers = {
        'Content-Type': 'application/json',
        'x-redlock-auth': api_token
    }
    body_params = {
        "detailed": True,
        "fields": [
            "alert.id",
            "policy.name",
            "cloud.account",
            "resource.id",
            "resource.name",
            "policy.type"
        ],
        "filters": [
            {
                "name": "policy.type",
                "operator": "=",
                "value": "iam"
            },
            {
                "name": "alert.status",
                "operator": "=",
                "value": "open"
            },
            {
                "name": "resource.type",
                "operator": "=",
                "value": "EC2 Instance"
            }
        ],
        "groupBy": [
            "resource.type"
        ],
        "limit": 10000,
        "offset": 0,
        "pageToken": "",
        "sortBy": [
            "id:asc"
        ],
        "timeRange": {
            "relativeTimeType": "BACKWARD",
            "type": "relative",
            "value": {
                "amount": 30,
                "unit": "day"
            }
        }
    }
    print(body_params)
    response = requests.post(api_url, headers=headers, json=body_params, verify=pc_api.verify)
    if response.status_code == 200:
        print("Alerts retrieved successfully:")      
        data = response.json()
        # JSONデータを整理して必要な項目を抽出
        alerts = data.get('items', [])  # 'items'キーにデータが含まれていると仮定
        processed_alerts = []
        for alert in alerts:
            alert_id = alert.get('id', '')
            policy_name = alert.get('policy', {}).get('name', '')
            resource = alert.get('resource', {})
            resource_id = resource.get('id', '')
            resource_name = resource.get('name', '')
            account = resource.get('account', '')
            processed_alerts.append({
                'id': alert_id,
                'policy_name': policy_name,
                'resource_id': resource_id,
                'resource_name': resource_name,
                'account': account
            })
 
        # CSVに変換して保存
        if processed_alerts:
            df = pd.DataFrame(processed_alerts)
            output_file = 'alerts_output.csv'
            df.to_csv(output_file, index=False)
            print(f"Alerts have been saved to {output_file}")
        else:
            print("No alerts found in the response.")
    else:
        print("Error retrieving alerts:", response.status_code)
        print(response.text)
 
# メイン処理
try:
    api_token = alt_login()  # トークンを取得
    get_alerts(api_token)
except Exception as e:
    print("An error occurred:", str(e))

この勢いで、アセット情報一覧の取得もしてみましょう。リクエスト構造が以下のような内容なら、フィルタはresource.type=EC2 Instanceだけで良さそうです。早速実行してみます。

アセット情報一覧を取得するリクエスト構造

なかなかいい感じに取得できました。同じようにChatGPTにデータの整形を依頼してみます。

取得されたアセット情報一覧

下記のようなコードができました。

# --インスタンスID取得処理
# --インベントリ上のEC2インスタンス情報を取得→resource_scan_info_raw.jsonに出力
# -- resource_scan_info_raw.jsonを整理→resource_scan_info.csvに出力
import requests
import urllib.parse
from datetime import datetime, timedelta
from prismacloud.api import pc_api, pc_utility
import json
import pandas as pd
 
# --ユーザカスタムコマンドライン引数-- #
parser = pc_utility.get_arg_parser()
args = parser.parse_args()
 
# --初期化&ログイン-- #
settings = pc_utility.get_settings(args)
pc_api.configure(settings)
 
def alt_login():
    url = urllib.parse.urljoin('https://' + pc_api.api, 'login')
    headers = {'Content-Type': 'application/json'}
    body_params = {
        "prismaId": pc_api.name,
        "username": pc_api.identity,
        "password": pc_api.secret
    }
 
    response = requests.post(url, headers=headers, json=body_params, verify=pc_api.verify)
 
    if response.status_code == 200:
        token = response.json()['token']
        pc_api.token = token
        print("トークン取得成功:")
        return token
    else:
        print("Error acquiring token:", response.status_code, response.text)
        raise Exception("ログイン失敗")
 
def get_alerts_and_save(api_token):
    api_url = urllib.parse.urljoin('https://' + pc_api.api, 'v2/resource/scan_info')
    headers = {
        'Content-Type': 'application/json; charset=UTF-8',
        'Accept': '*/*',
        'x-redlock-auth': api_token
    }
    body_params = {
        "detailed": True,
        "fields": [
            "resource.name"
        ],
        "filters": [
            {
                "name": "resource.type",
                "operator": "=",
                "value": "EC2 Instance"
            }
        ],
        "groupBy": [
            "cloud.service"
        ],
        "limit": 10000,
        "offset": 0,
        "sortBy": [
            "id:asc"
        ]
    }
 
    response = requests.request("POST", api_url, headers=headers, json=body_params)
 
    if response.status_code == 200:
        print("Alerts retrieved successfully:")
        data = response.json()
 
        # JSON全体を保存
        raw_output_file = 'resource_scan_info_raw.json'
        with open(raw_output_file, 'w', encoding='utf-8') as raw_file:
            json.dump(data, raw_file, ensure_ascii=False, indent=4)
        print(f"Raw data has been saved to {raw_output_file}")
 
        # 保存したJSONを読み込み
        with open(raw_output_file, 'r', encoding='utf-8') as raw_file:
            loaded_data = json.load(raw_file)
 
        # 必要なフィールドのみを抽出
        resources = loaded_data.get('resources', [])  # 'resources'キーに合わせる
        if not resources:
            print("No resources found in the data. Check the structure of the JSON.")
            return
 
        processed_data = []
        for item in resources:
            processed_data.append({
                'id': item.get('id', ''),
                'name': item.get('name', ''),
                'unifiedAssetId': item.get('unifiedAssetId', ''),
                'accountName': item.get('accountName', '')
            })
 
        # 抽出データをCSVに保存
        output_file = 'resource_scan_info.csv'
        if processed_data:
            df = pd.DataFrame(processed_data)
            df.to_csv(output_file, index=False, encoding='utf-8')
            print(f"Processed resource scan info has been saved to {output_file}")
 
            # 再度保存したデータを読み込み
            read_df = pd.read_csv(output_file, encoding='utf-8')
            print("Loaded processed data:")
            print(read_df.head())
        else:
            print("No data was processed to save to CSV.")
    else:
        print("Error retrieving resources:", response.status_code)
        print(response.text)
 
# メイン処理
try:
    api_token = alt_login()  # トークンを取得
    get_alerts_and_save(api_token)
except Exception as e:
    print("An error occurred:", str(e))

次は、突合処理に進みます。これはシンプルなCSVファイル処理なので、まるっとAIに投げてしまいましょう。IAMアラート一覧上のインスタンスIDとアセット一覧上のインスタンスIDを突合させ、unifiedAssetId(Prisma® Cloud上一意なアセットID)だけを抜き出します。

# --突合処理
# -- alerts_output.csv とresource_scan_info.csvを突合
# -- 重複インスタンスIDを対象としたunifiedAssetIdを抽出→alerts_output_with_unifiedAssetId.csvに出力
import pandas as pd
 
def merge_unified_asset_id(alerts_file, resource_file, output_file):
    # Load the CSV files
    alerts_df = pd.read_csv(alerts_file)
    resource_df = pd.read_csv(resource_file)
 
    # Ensure that only the first column (ID) and unifiedAssetId are used from the resource file
    resource_df = resource_df[['id', 'unifiedAssetId']]
 
    # Perform a merge on the specified condition
    merged_df = alerts_df.merge(resource_df, 
                                left_on='resource_id', 
                                right_on='id', 
                                how='left')
 
    # Add unifiedAssetId to the original alerts_output_統合用 DataFrame
    alerts_df['unifiedAssetId'] = merged_df['unifiedAssetId']
 
    # Save the result to a new CSV file
    alerts_df.to_csv(output_file, index=False)
    print(f"Merged file saved to {output_file}")
 
# File paths
alerts_file_path = 'alerts_output.csv'
resource_file_path = 'resource_scan_info.csv'
output_file_path = 'alerts_output_with_unifiedAssetId.csv'
 
# Run the function
merge_unified_asset_id(alerts_file_path, resource_file_path, output_file_path)

続いて、タグ情報の収集APIに取得したunifiedAssetIdを投げてテストしてみます。良い感じに取れましたね。ちょうど、Prisma® Cloudテストのために作ったProxyサーバが引っかかりました。

タグ情報の収集APIに取得したunifiedAssetIdを投げてテストをする

では、またAIに働いてもらいましょう。上記テストに使ったコードに、下記処理を追加してもらいます。

  • unifiedAssetIdの値を変数に、一覧ファイルからキーにして全件の問い合わせをかけてもらう
  • 出力されたデータからasset_idとasset_data_tags情報だけを抜き取る
  • 一覧ファイルと結果を突合しマージ

でき上がったコードはこちらです。

# --タグ取得処理
# --alerts_output_with_unifiedAssetId.csv上のunifiedAssetIdごとにリクエスト発行
# --タグ付加したアラート情報をenriched_alerts_output.csvに出力
import requests
import urllib.parse
import json
import pandas as pd
from prismacloud.api import pc_api, pc_utility
 
# --ユーザカスタムコマンドライン引数-- #
parser = pc_utility.get_arg_parser()
args = parser.parse_args()
 
# --初期化&ログイン-- #
settings = pc_utility.get_settings(args)
pc_api.configure(settings)
 
def alt_login():
    url = urllib.parse.urljoin('https://' + pc_api.api, 'login')
    headers = {'Content-Type': 'application/json'}
    body_params = {
        "prismaId": pc_api.name,
        "username": pc_api.identity,
        "password": pc_api.secret
    }
 
    response = requests.post(url, headers=headers, json=body_params, verify=pc_api.verify)
 
    if response.status_code == 200:
        token = response.json()['token']
        pc_api.token = token
        print("トークン取得成功:")
        return token
    else:
        print("Error acquiring token:", response.status_code, response.text)
        raise Exception("ログイン失敗")
 
def process_unified_asset_ids(api_token, csv_file, output_csv, enriched_csv):
    # CSVファイルを読み込む
    df = pd.read_csv(csv_file)
 
    # unifiedAssetIdの重複を排除
    unique_asset_ids = df['unifiedAssetId'].drop_duplicates()
 
    all_tags = []
 
    for asset_id in unique_asset_ids:
        api_url = urllib.parse.urljoin('https://' + pc_api.api, 'uai/v1/asset')
        headers = {
            'Content-Type': 'application/json; charset=UTF-8',
            'Accept': '*/*',
            'x-redlock-auth': api_token
        }
        body_params = {
            "assetId": asset_id,
            "type": "asset",
        }
 
        response = requests.request("POST", api_url, headers=headers, json=body_params)
 
        if response.status_code == 200:
            print(f"Alerts retrieved successfully for assetId: {asset_id}")
            data = response.json()
            tags = data.get("data", {}).get("asset", {}).get("data", {}).get("tags", [])
            for tag in tags:
                all_tags.append({
                    "assetId": asset_id,
                    "key": tag.get("key"),
                    "value": tag.get("value")
                })
        else:
            print(f"Error retrieving resources for assetId {asset_id}: {response.status_code}")
            print(response.text)
 
    # 結果をタグごとに保存
    tags_df = pd.DataFrame(all_tags)
 
    # 入力データとタグデータを統合
    enriched_df = df.merge(tags_df, left_on='unifiedAssetId', right_on='assetId', how='left')
 
    # 統合データを保存
    enriched_df.to_csv(enriched_csv, index=False, encoding='utf-8')
    print(f"Enriched data saved to {enriched_csv}")
 
# メイン処理
try:
    api_token = alt_login()  # トークンを取得
    csv_file = 'alerts_output_with_unifiedAssetId.csv'  # 入力CSVファイル名
    output_csv = 'tags_output.csv'  # タグの中間出力ファイル名
    enriched_csv = 'enriched_alerts_output.csv'  # 統合出力ファイル名
    process_unified_asset_ids(api_token, csv_file, output_csv, enriched_csv)
except Exception as e:
    print("An error occurred:", str(e))

こちらも良い感じに取れました。Keyがタグ情報、Valueが実際に設定されている値です。

必要な情報だけを出力したCSVファイル

これで、IAM系のアラートでもタグのフィルタリングができるようになりました。ここまでくれば、タグを元にアラートを分類するのはExcel上のフィルタ機能で十分です。分類後は、下記APIを用いて一括Dismissする処理を追加開発しても良いかもしれません。

Dismiss Alerts | Develop with Palo Alto Networks

実際に実装したものが下記の通りです。これで、手動での切り分けからアラートDismissまで自動化できました。

APIを用いて一括Dismissする処理を追加した結果

このように、リクエストによる機能拡張が追いつかない場合でも、APIを活用すれば自前で課題を解決できます。これまでは、取得したデータの整形や整理、コード化に手間がかかりましたが、生成AIの活用で効率化できる時代になりました。

とはいえ、今回のコードにはまだ改善の余地があります。例えば、これまで取得したタグ情報の一覧をローカルに保存しておけば、毎回すべてのデータを問い合わせる必要がなくなります。また、特定のクラウドアカウントで生成されたアラートのみを対象とする機能を追加するのも有効でしょう。さらに、収集するアラートを「重要度が高以上」に制限することで、より実務に即した運用が可能になります。

このような改良を加えながら、実際に作成したコードを元にPDCAサイクルを回すことで、業務に最適化された自動化が実現できるはずです。

さいごに

この記事では、RESTAPIを実際に動かせるコードと、処理が自動化される流れをご覧いただきました。自動化を進めることで、作業効率が大幅に向上し、業務の精度も高まります。

ラックでは、自動化だけでなくシステム全体の最適化を目指した手厚いサポートを提供しています。Pythonだけでなく、Power Automateを活用した処理の自動化も可能です。ご興味がある方は、ぜひラックのクラウドセキュリティ統制支援サービスをご検討ください。

この記事は役に立ちましたか?

はい いいえ

関連記事

LAC WATCH

関連記事をご紹介します

  • Prisma Cloud APIを使って運用効率化しよう

  • サイバー攻撃からクラウドを守る、Prisma® Cloudで実現する一歩進んだ監視対策

  • Amazon EC2 IMDSv1に起因した情報流出を防ぐ - Prisma® Cloudから考えるセキュリティ(1)