Share with Your Network
Our customer service engineers wanted a script to list unique asset tags, and metadata data about assets. Currently, there is no straightforward way, like invoking one API, to list asset tags. You can only list the asset tags for a specific asset. This first implementation discovers all the unique asset tags and counts their usage. The result is placed in a CSV file. The outline is as follows:
- Collect all the assets via an asset export
- Run through all the asset collection asset tags and check for uniqueness.
The code for this blog is at blog1_uniq_asset_tags.py
. As in some previous code, logging is used. The log file is uniq_asset_tags.log
. And again, it is not managed, so you will have to clean it up as you see fit. The script produces the two-column CSV file, uniq_asset_tags.csv
.
Collecting Assets
Asset collection is done via “Data Exports” APIs. The code was modified from; and by the way, there is a newer stand-alone export_assets.py
. Let’s go over the code in blog1_uniq_asset_tags.py
.
First, the code checks if there is a command-line parameter, search ID. In most cases, this option should not be used. It is there in case the asset export takes too long. The normal path is id
will be zero which means the code will request an asset export and then check the status. The number, 50,000, is used as a guess and is used to calculate the wait time.
277 # If ID is not defined then request an asset export, else verify. 278 if id == 0: 279 (id, num_assets) = request_asset_exports(base_url, headers) 280 check_export_status(base_url, headers, id, num_assets) 281 else: 282 print_info(f"Using search ID: {id}") 283 check_export_status(base_url, headers, id, 50000)
request_asset_exports()
Let’s delve into request_asset_exports()
. The filter_params
in the request, the body specifies an active asset export in the JSONL format. As you recall JSONL takes fewer resources to process than JSON. For more details, check out the “Exporting Asset Data” section in “Acquiring Vulnerabilities Per Asset“. The “Request Data Export” API is invoked. A difference in code is that now the except
has been removed and error checking is done by a direct if
statement.
66 filter_params = { 67 'status' : ['active'], 68 'export_settings': { 69 'format': 'jsonl', 70 'model': 'asset' 71 } 72 } 73 74 response = requests.post(request_export_url, headers=headers, data=json.dumps(filter_params)) 75 if response.status_code != 200: 76 process_http_error(f"Request Data Export API Error", response, request_export_url) 77 sys.exit(1)
The search_id
and record_count
from the API response are returned.
79 resp = response.json() 80 search_id = str(resp['search_id']) 81 num_assets = resp['record_count'] 82 print_info(f"New search ID: {search_id} with {num_assets} assets") 83 return (search_id, num_assets)
check_export_status()
The search_id
in check_export_status()
is used to call get_export_status()
, which invokes the “Check Data Export Status” API to discover if the export is ready. When message
is “Export ready for download”, we’re good to go.
85 def get_export_status(base_url, headers, search_id): 86 check_status_url = f"{base_url}data_exports/status?search_id={search_id}" 87 88 response = requests.get(check_status_url, headers=headers) 89 if response.status_code == 206: 90 return False 91 if response.status_code != 200: 92 process_http_error(f"Get Export Status API Error", response, check_status_url) 93 sys.exit(1) 94 95 resp_json = response.json() 96 return resp_json['message'] == "Export ready for download"
Back in check_export_status()
, the export status is immediately checked. This is done so that if the export is done, there is no wait time. The rest of the code sleeps and checks the export status in a loop. A maximum export time is calculated and if exceeded will end the script with instructions. Any mathematician should relate to 2718
. Hopefully, the unction returns as successful. If not successful, the script is ended.
106 # Estimate export time for if we're waiting. 107 # Calculate wait interval between checking if the export file is ready. 108 wait_interval_secs = 5 if num_assets < 2718 else 10 109 wait_limit_secs = math.ceil(num_assets / 16) 110 111 # Loop to check status for wait_limit_secs seconds. 112 secs = 0 113 ready = False 114 while not ready and secs < wait_limit_secs: 115 print(f"Sleeping for {wait_interval_secs} seconds. ({secs})\r", end='') 116 time.sleep(wait_interval_secs) 117 ready = get_export_status(base_url, headers, search_id) 118 secs += wait_interval_secs 119 120 print("") 121 if secs >= wait_limit_secs: 122 print_info(f"Waited for {wait_limit_secs} seconds.") 123 print(f"Consider re-running with search ID") 124 sys.exit(1)
retrieve_asset_data()
Once the asset export is ready, we retrieve the data into a .gz
file. The file name is asset_
, and unzipped into a file named asset_
. For example, asset_14967.gz
is unzipped into asset_14967.jsonl
. Looking at the details, the code checks if there is an existing JSONL file. If there is, the function returns the file name.
If the JSONL file doesn’t exist, it is fetched using the “Retrieve Data Export” API. Before the API is invoked, the HTTP headers Accept
is modified to accept gzip
. Also note that the stream
parameter is set to True
.
137 headers = base_headers.copy() 138 headers['Accept'] = "application/gzip; charset=utf-8" 139 140 response = requests.get(get_data_url, headers=headers, stream=True) 141 if response.status_code != 200: 142 process_http_error(f"Retrieve Data Export API Error", response, get_data_url) 143 sys.exit(1) 144
Next, the gzip file is read, ungziped using Python’s gzip library, and written to a JSONL file. The JSONL file name is returned. I used the “Extracting only one file” at “Unzip a file in Python: 5 Scenarios You Should Know” as a guide.
145 try: 146 with open(gz_asset_file_name, 'wb') as file_gz: 147 for block in response.iter_content(8192): 148 file_gz.write(block) 149 150 except Exception as exp: 151 print(f"Retrieve asset data error: {str(exp)}") 152 logging.error(f"Retrieve asset data error: {str(exp)}") 153 sys.exit(1) 154 155 # Gunzip the file into another file. 156 print_info(f"Unzipping file {gz_asset_file_name} to {jsonl_asset_file_name}") 157 with gzip.open(gz_asset_file_name, 'rb') as f_in: 158 with open(jsonl_asset_file_name, 'wb') as f_out: 159 shutil.copyfileobj(f_in, f_out) 160 161 print_info(f"File {gz_asset_file_name} unzipped to {jsonl_asset_file_name}") 162 return jsonl_asset_file_name
Processing the JSONL File
Now that the assets have been collected, it is time to process them. The JSONL file lines or records are counted. If the line count is equal to one, then it is a very short JSONL file; or it is not a JSONL file. I took the stance that it is not a JSON file.
288 # Count and report. 289 num_assets = count_lines(jsonl_asset_file_name) 290 if num_assets == 1: 291 print_error(f"The format of file {jsonl_asset_file_name} is probably JSON, not JSONL") 292 sys.exit(1) 293 print(f"File: {jsonl_asset_file_name} with {num_assets} assets.")
proccess_jsonl_file()
Here is where the JSONL file is processed. The function parameters are the JSONL file name and the asset_tags
dictionary. (Since asset_tags
is a dictionary, it can be modified.).
A JSONL file is read one line at a time, converted to a dictionary in the function convert_to_json()
.
184 def convert_to_json(asset_line): 185 try: 186 asset = json.loads(asset_line.strip()) 187 except json.JSONDecodeError: 188 print_error("The file's format is probably not JSONL, but XML or CSV") 189 sys.exit(1) 190 191 return asset
I decided to wrap a function around json.loads()
, because of the exception handling. If the line can’t be converted, it is assumed that the file is not in JSONL, but possibly XML or CSV.
After the line is converted to a dictionary, the code checks for “locator.” Why? As far as I know, every asset has a "locator."
If the asset has "tags,"
the asset tags are processed.
210 with open(jsonl_asset_file_name, 'r') as jsonl_f: 211 for line_num, asset_line in enumerate(jsonl_f): 212 asset = convert_to_json(asset_line) 213 214 if not "locator" in asset: 215 print_error(f"Can't find 'locator' field. Is this an asset import?") 216 print_error(f"Line read: {asset}") 217 sys.exit(1) 218 219 if "tags" in asset: 220 logging.debug(f"{asset['id']}: {asset['tags']}") 221 process_tags(asset['id'], asset['tags'], asset_tags) 222 asset_count += 1
process_asset_tags()
The asset tag uniqueness and counting are done in this function. The parameters are:
asset_id
– The unique ID of the asset.tags_to_process
– An array of asset tags to process for uniqueness.asset_tags
– The asset tag dictionary maps the asset tag name to theAsset_Tag_Info
object.
Each asset tag in the array tags_to_process
is examined if it is a key in the asset_tags
dictionary. If the asset tag is a key, then it is counted by incr()
method; however, if the tag is not a key, an Asset_Tag_Info
object is created, attached to the key with a count of one.
195 def process_tags(asset_id, tags_to_process, asset_tags): 196 for tag in tags_to_process: 197 if tag in asset_tags: 198 asset_tags[tag].incr() 199 logging.debug(f"Existing asset tag: {tag} ({asset_id})") 200 else: 201 tag_info = Asset_Tag_Info() 202 asset_tags[tag] = tag_info 203 logging.info(f"New asset tag: {tag} ({asset_id})")
write_csv_file()
After all the tags in all the assets are processed, the asset_tags
dictionary is written to a CSV file. The Python csv library is used. Finally, the CSV file is made available.
230 def write_csv_file(asset_tags): 231 # Open up the CSV file and write the header row. 232 csv_file_name = "uniq_asset_tags.csv" 233 uniq_asset_tags_fp = open(csv_file_name, 'w', newline='') 234 uniq_tag_writer = csv.writer(uniq_asset_tags_fp) 235 uniq_tag_writer.writerow(["Asset Tag Name", "Asset Tag Count"]) 236 237 # Write the CSV file. 238 for asset_tag in asset_tags: 239 asset_tag_info = asset_tags[asset_tag] 240 uniq_tag_writer.writerow([asset_tag, asset_tag_info.get_count()]) 241 242 print_info(f"{csv_file_name} is now available.")
Conclusion
The results are in the CSV file, uniq_asset_tags.csv
so you can either sort by asset tag name or usage count. Once you know all your asset tag names, you might find some that are underutilized or even over-utilized. As always this code is in Kenna Security’s GitHub repository.
Until next time,