How do I update my Panther alerts in bulk? For example, if I have a large number of false positives, how do I resolve of those alerts?
You can do this programmatically via the Panther API. See the Panther API Alerts & Errors documentation for an example.
If you are updating less than 25 alerts at once, you can do this directly in the Panther Console. See the documentation for more information: Bulk updating alerts in Panther.
To see a walk-through of this process: in the Panther Console, use keyboard shortcut ⌘+K to pull up the search bar, then search for "bulk update alerts."
Run the script below in Terminal to bulk-update Panther alerts. You will need to pip install gql aiohttp
for this script to run.
Before you run the script, be sure to make the following changes to it:
On line 19, replace YOUR_PANTHER_INSTANCE
with your Panther instance URL.
Generate an API token in the Panther console with the Manage Alerts
permission. Insert into line 20, replacing YOUR_API_KEY.
Update lines 6-15 with values as needed for your query.
You can query with a date range, alert status, severities, log sources, or detection ID. Use some combination of these to get the alerts you're looking for.
Statuses, Severities, and Log Sources must be in the form of a list.
Detection ID is a string, and can be found in the "view detection" page in the Panther Console (see example on line 14).
If a variable is not needed, set to None
. Don't comment the variable out, or the script will throw an exception.
The script will seek confirmation from you before setting the alerts to resolved. Please double check the results of the query before committing to it, as this action is not reversible.
Script:
# pip install gql aiohttp
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
# ----------------- CHANGE THESE VALUES TO ALTER YOUR QUERY -----------------
# Do not remove any variables
query_start_date = "2022-01-01T00:00:00.000Z"
query_end_date = "2023-03-13T00:00:00.000Z"
query_statuses = ["OPEN"]
query_severities = ["HIGH"]
# Set values to None to update all sources and detections
query_log_sources = None # ["Okta.SystemLog"]
query_detection_id = None # "Okta.GeographicallyImprobableAccess"
# ----------------------------------------------------------------------------
transport = AIOHTTPTransport(
url="https://api.YOUR_PANTHER_INSTANCE.runpanther.net/public/graphql",
headers={"X-API-Key": "YOUR_API_KEY"}
)
client = Client(transport=transport, fetch_schema_from_transport=True)
# `FindAlerts` is a nickname for the query. You can fully omit it.
find_alerts = gql(
"""
query FindAlerts($input: AlertsInput!) {
alerts(input: $input) {
edges {
node {
id
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""
)
# `UpdateAlerts` is a nickname for the query. You can fully omit it.
update_alerts = gql(
"""
mutation UpdateAlerts($input: UpdateAlertStatusByIdInput!) {
updateAlertStatusById(input: $input) {
alerts {
id
}
}
}
"""
)
# an accumulator that holds all alerts that we fetch all pages
all_alerts = []
# a helper to know when to exit the loop
has_more = True
# the pagination cursor
cursor = None
# handle serializing input values
input_values = {}
if query_start_date:
input_values['createdAtAfter'] = query_start_date
if query_end_date:
input_values['createdAtBefore'] = query_end_date
if query_detection_id:
input_values['detectionId'] = query_detection_id
if query_statuses:
input_values['statuses'] = query_statuses
if query_severities:
input_values['severities'] = query_severities
if query_log_sources:
input_values['logTypes'] = query_log_sources
# Keep fetching pages until there are no more left
while has_more:
input_constructed = {'input': input_values.copy()}
if cursor:
input_constructed['input']['cursor'] = cursor
query_data = client.execute(
find_alerts,
variable_values=input_constructed
)
all_alerts.extend([edge["node"] for edge in query_data["alerts"]["edges"]])
if all_alerts:
print(f"Successfully found {len(all_alerts)} alerts!")
has_more = bool(query_data["alerts"]["pageInfo"]["hasNextPage"])
cursor = query_data["alerts"]["pageInfo"]["endCursor"]
if all_alerts:
user_input = ''
print(f"\nFound {len(all_alerts)} alerts with the following criteria:\n" + " - " + "\n - ".join(
[f"{key}: {value}" for key, value in input_values.items()]))
while user_input.lower() not in ['n', 'no', 'y', 'yes']:
# Get user input with a prompt
user_input = input(f"Proceed to set all of these to RESOLVED? Y/N: ")
if user_input.lower() in ['yes', 'y']:
print("Confirmed. These alerts are being resolved. This can take some time...")
mutation_data = client.execute(
update_alerts,
variable_values={
"input": {
"ids": [alert["id"] for alert in all_alerts],
"status": "RESOLVED"
}
}
)
print(f'Resolved {len(mutation_data["updateAlertStatusById"]["alerts"])} alert(s)!')
elif user_input.lower() in ['no', 'n']:
print('Exiting...')
else:
input_str = "Used the following criteria:\n"
for key, value in input_values.items():
input_str += f" - {key}: {value}\n"
input_str = input_str.rstrip()
print(f"No alerts found.\n\n{input_str}")