Building Dynamic Sub-Queries with the API

Oftentimes I find myself looking at a field segmentation widget and think how awesome it would be if I could make queries for each field segment. This can be extremely useful for things like defining data scopes, creating enforcements, or generating reports on a subset of results from a parent query.

Doing this in the UI is straightforward when the segmented field yields only a handful of different values, but if we have 10, 20, or even 100 different fields, things can start to get out of hand very quickly.

When I run into these situations, I find that the API can be extremely helpful to quickly build these sub-queries. I’ll walk you through writing a Python script to build dynamic sub-queries given a base query and a field from which we’ll segment. 

Let’s get started.

Some Prerequisites 

We’ll be using Python, so get your virtual environment ready with the axonius-api-client library

pip install axonius_api_client

I’m assuming a basic understanding of the API client and scripting with Python. If you are unfamiliar with the API client, consider reviewing Nate's post reviewing the API client

Unfortunately, there are some API endpoints that have been updated slightly with 4.6. Therefore, the axonius_api_client does not offer all features required to accomplish the goals laid out in this post. Once the API client is updated, I’ll update this post accordingly.

Overview

Get Our Saved Query

We’ll be using the axonius-api-client to quickly retrieve the details of our saved query. In particular, we are interested in the columns specified in the saved query.  If the field that we want to segment on is not already a column in our saved query, we will explicitly ask for it to be included when we get all devices in that saved query.

sq_details = axonapi.devices.saved_query.get_by_name(BASE_QUERY)
#  Get devices in saved query
if SEGMENT_FIELD not in sq_details["view"]["fields"]:
  manual_fields = sq_details["view"]["fields"] + [SEGMENT_FIELD]
else:
  manual_fields = sq_details["view"]["fields"]
devices = axonapi.devices.get_by_saved_query(
  BASE_QUERY, manual_fields=manual_fields
)

Bucket Each Device by Field X

Now that we have all the devices, we need to bucket them based on the field that we want to segment on. We’ll be using a dictionary to accomplish this. The keys in our dictionary are the different values of the segmented field. The corresponding value for each key will be the list of devices with that segmented field value.

subquery_dict = defaultdict(list)
for device in devices:
  # Some devices might not have that value
  if device_field_val := device.get(SEGMENT_FIELD):
      subquery_dict[device_field_val].append(device)

The above uses the walrus operator, which is unique to Python >=3.8. The full script below will not feature the walrus operator for anyone using an earlier verion.

Create a Saved Query for Each Bucket

You might have noticed by now that the diagram above features a folder for the saved queries. This is a new feature introduced in Axonius 4.6 to make it easier to organize your saved queries. When we create these new saved queries based on all of our buckets, we’ll be creating the saved queries in the same folder as our base query. Let’s go ahead and do this now. 

The first step is getting the folder ID for our base query. We will be using the REST API directly to get this information (The axonius-api-client should be updated soon to include this new information coming in 4.6).

response = requests.get(
   f'{os.getenv("AX_URL")}/api/queries/specific/{sq_details["id"]}',
   headers=HEADERS,
)
response.raise_for_status()
full_query_details = response.json()["data"]
# folder ID found at full_query_details["attributes"]["folder_id"]

Now that we have the full details of our query, including its folder id, we can go ahead and make our child queries in the same folder as the parent. However, there will be a few things that we need to update from the parent query.

  1. Updating the name
    1. We’ll take the name of our bucket and append it to the name of the base query
    2. “AWS Subnet Exists” becomes "AWS Subnet Exists - My Subnet Name”
  2. Updating the description
    1. We’ll take the name of our bucket and append it to the description
      1. “Query to get all devices with AWS Subnet” Becomes - “Query to get all devices with AWS Subnet - Filtered by My Subnet Name”
  3. Updating columns
    1. We will be overwriting the columns in case the segmentation field is not already present
  4. Adding new search to the filter
    1. We will be including the value of the segmentation field as a filter
  5. Removing query wizard view - Because this can vary so much and for the sake of simplicity, we will be removing this, although we might consider how to maintain the query wizard in the future
for key, value in subquery_dict.items():
   # Let's copy the base query and make some small changes
   query_copy = copy.deepcopy(full_query_details)
   query_copy['type'] = "views_schema"
   search_to_add = f' and ({SEGMENT_FIELD} == "{key}")'
   query_copy['attributes']['view']['query']['expressions'] = []
   query_copy['attributes']['view']['query']['onlyExpressionsFilter'] = None
   query_copy['attributes']['view']['query']['filter'] += search_to_add
   query_copy['attributes']['name'] += f' - {key}'
   query_copy['attributes']["view"]["fields"] = manual_fields
   # Description could be None!
   new_desc = query_copy['attributes'].get('description', "") + f' - Filtered by {key}'
   query_copy['attributes']['description'] = new_desc
   payload = {"data": query_copy}

LASTLY, our API call will return a 400 if the query already exists. Let’s try to gracefully catch that exception.

try:
   response = requests.post(
       f'{os.getenv("AX_URL")}/api/queries/devices',
       headers=HEADERS,
       json=payload
  )
   response.raise_for_status()
except requests.exceptions.HTTPError as err:
   if errors := response.json().get("errors"):
       for error in errors:
           if error.get("detail") != ALREADY_USED_ERROR:
               raise f"Something went wrong: {err}"
   else:
       raise f"Something went wrong: {err}"

And when everything is said and done, our folder should be populated with all of our subqueries dynamically! 

So What's Next?

Now, these queries are just the jumping-off point. They should be able to power the dynamic use-cases that you have defined. The enforcement center, and most other areas in the platform, have API capabilities, so you could programmatically assign these sub-queries to specific enforcements center actions as well.

If anyone would find it beneficial to review how to leverage these dynamic sub-queries more specifically, let us know in the comments below.

TL;DR

Final Script

import copy
import os
import requests
from clients.axonapi import create_client
from collections import defaultdict


# CHANGE THESE AS NEEDED
SEGMENT_FIELD = "adapters_data.aws_adapter.subnet_name"
BASE_QUERY = "AWS Subnet Exists"


HEADERS = {
"api-key": os.getenv("AX_KEY"),
"api-secret": os.getenv("AX_SECRET"),
}
ALREADY_USED_ERROR = "Unable to create the saved query - name is in use"


def main():

axonapi = create_client()

# Get our saved query details
sq_details = axonapi.devices.saved_query.get_by_name(BASE_QUERY)

# Get devices in saved query
if SEGMENT_FIELD not in sq_details["view"]["fields"]:
manual_fields = sq_details["view"]["fields"] + [SEGMENT_FIELD]
else:
manual_fields = sq_details["view"]["fields"]
devices = axonapi.devices.get_by_saved_query(
BASE_QUERY, manual_fields=manual_fields
)

subquery_dict = defaultdict(list)
for device in devices:
# Some devices might not have that value
if device.get(SEGMENT_FIELD):
subquery_dict[device.get(SEGMENT_FIELD)].append(device)

# Get our saved query's folder id
response = requests.get(
f'{os.getenv("AX_URL")}/api/queries/specific/{sq_details["id"]}',
headers=HEADERS
)
response.raise_for_status()
full_query_details = response.json()["data"]

# Now for each bucket, create the appropriate subquery
for key, value in subquery_dict.items():
# Let's copy the base query and make some small changes
query_copy = copy.deepcopy(full_query_details)
query_copy["type"] = "views_schema"
search_to_add = f' and ({SEGMENT_FIELD} == "{key}")'
# We'll be updating the name, description, columns, and filters while removing expressions
query_copy["attributes"]["name"] += f" - {key}"
query_copy["attributes"]["view"]["fields"] = manual_fields
query_copy["attributes"]["view"]["query"]["filter"] += search_to_add
query_copy["attributes"]["view"]["query"]["expressions"] = []
query_copy["attributes"]["view"]["query"]["onlyExpressionsFilter"] = None
# Description is optional could be None!
new_desc = (
query_copy["attributes"].get("description", "") + f" - Filtered by {key}"
)
query_copy["attributes"]["description"] = new_desc
payload = {"data": query_copy}

try:
response = requests.post(
f'{os.getenv("AX_URL")}/api/queries/devices',
headers=HEADERS,
json=payload
)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
if response.json().get("errors"):
for error in response.json().get("errors"):
if error.get("detail") != ALREADY_USED_ERROR:
raise f"Something went wrong: {err}"
else:
raise f"Something went wrong: {err}"


if __name__ == "__main__":
main()
1

Comments

0 comments

Please sign in to leave a comment.

Didn't find what you were looking for?

New post