Listing Unique Asset Tags: Part One

May 3, 2022
Rick Ehrhart
API Evangelist

Share with Your Network

Our customer service engineers wanted a script to list unique asset tags, and metadata data about assets. Currently, there is no straightforward way, like invoking one API, to list asset tags.  You can only list the asset tags for a specific asset.  This first implementation discovers all the unique asset tags and counts their usage.  The result is placed in a CSV file.  The outline is as follows:

  • Collect all the assets via an asset export
  • Run through all the asset collection asset tags and check for uniqueness.

The code for this blog is at blog1_uniq_asset_tags.py.  As in some previous code, logging is used.  The log file is uniq_asset_tags.log.  And again, it is not managed, so you will have to clean it up as you see fit.  The script produces the two-column CSV file, uniq_asset_tags.csv.

Collecting Assets

Asset collection is done via “Data Exports” APIs.  The code was modified from; and by the way, there is a newer stand-alone export_assets.py.  Let’s go over the code in blog1_uniq_asset_tags.py.

First, the code checks if there is a command-line parameter, search ID.  In most cases, this option should not be used.  It is there in case the asset export takes too long.  The normal path is  id will be zero which means the code will request an asset export and then check the status.  The number, 50,000, is used as a guess and is used to calculate the wait time.

277     # If ID is not defined then request an asset export, else verify.
278     if id == 0:
279         (id, num_assets) = request_asset_exports(base_url, headers)
280         check_export_status(base_url, headers, id, num_assets)
281     else:
282         print_info(f"Using search ID: {id}")
283         check_export_status(base_url, headers, id, 50000)

request_asset_exports()

Let’s delve into request_asset_exports().  The filter_params in the request, the body specifies an active asset export in the JSONL format.  As you recall JSONL takes fewer resources to process than JSON.  For more details, check out the “Exporting Asset Data” section in “Acquiring Vulnerabilities Per Asset“.  The “Request Data Export” API is invoked.  A difference in code is that now the except has been removed and error checking is done by a direct if statement.

 66     filter_params = {
 67         'status' : ['active'],
 68         'export_settings': {
 69             'format': 'jsonl',
 70             'model': 'asset'
 71         }
 72     }
 73 
 74     response = requests.post(request_export_url, headers=headers, data=json.dumps(filter_params))
 75     if response.status_code != 200:
 76         process_http_error(f"Request Data Export API Error", response, request_export_url)
 77         sys.exit(1)

The search_id and record_count from the API response are returned.

 79     resp = response.json()
 80     search_id = str(resp['search_id'])
 81     num_assets = resp['record_count']
 82     print_info(f"New search ID: {search_id} with {num_assets} assets")
 83     return (search_id, num_assets)

check_export_status()

The search_id in check_export_status() is used to call get_export_status(), which invokes the “Check Data Export Status” API to discover if the export is ready.  When message is “Export ready for download”, we’re good to go.

 85 def get_export_status(base_url, headers, search_id):
 86     check_status_url = f"{base_url}data_exports/status?search_id={search_id}"
 87 
 88     response = requests.get(check_status_url, headers=headers)
 89     if response.status_code == 206:
 90         return False
 91     if response.status_code != 200:
 92         process_http_error(f"Get Export Status API Error", response, check_status_url)
 93         sys.exit(1)
 94 
 95     resp_json = response.json()
 96     return resp_json['message'] == "Export ready for download"

Back in check_export_status(),  the export status is immediately checked.  This is done so that if the export is done, there is no wait time.  The rest of the code sleeps and checks the export status in a loop.  A maximum export time is calculated and if exceeded will end the script with instructions.  Any mathematician should relate to 2718. Hopefully, the unction returns as successful.  If not successful, the script is ended.

106     # Estimate export time for if we're waiting.
107     # Calculate wait interval between checking if the export file is ready.
108     wait_interval_secs = 5 if num_assets < 2718 else 10
109     wait_limit_secs = math.ceil(num_assets / 16)
110 
111     # Loop to check status for wait_limit_secs seconds.
112     secs = 0
113     ready = False
114     while not ready and secs < wait_limit_secs:
115         print(f"Sleeping for {wait_interval_secs} seconds. ({secs})\r", end='')
116         time.sleep(wait_interval_secs)
117         ready = get_export_status(base_url, headers, search_id)
118         secs += wait_interval_secs
119 
120     print("")
121     if secs >= wait_limit_secs:
122         print_info(f"Waited for {wait_limit_secs} seconds.")
123         print(f"Consider re-running with search ID")
124         sys.exit(1)

retrieve_asset_data()

Once the asset export is ready, we retrieve the data into a .gz file.  The file name is asset_.gz, and unzipped into a file named asset_.jsonl.   For example, asset_14967.gz is unzipped into asset_14967.jsonl.  Looking at the details, the code checks if there is an existing JSONL file.  If there is, the function returns the file name.

If the JSONL file doesn’t exist, it is fetched using the “Retrieve Data Export” API. Before the API is invoked, the HTTP headers Accept is modified to accept gzip.  Also note that the stream parameter is set to True.

137     headers = base_headers.copy()
138     headers['Accept'] = "application/gzip; charset=utf-8"
139 
140     response = requests.get(get_data_url, headers=headers, stream=True)
141     if response.status_code != 200:
142         process_http_error(f"Retrieve Data Export API Error", response, get_data_url)
143         sys.exit(1)
144 

Next, the gzip file is read, ungziped using Python’s gzip library, and written to a JSONL file. The JSONL file name is returned.  I used the “Extracting only one file” at “Unzip a file in Python: 5 Scenarios You Should Know” as a guide.

145     try:
146         with open(gz_asset_file_name, 'wb') as file_gz:
147             for block in response.iter_content(8192):
148                 file_gz.write(block)
149 
150     except Exception as exp:
151         print(f"Retrieve asset data error: {str(exp)}")
152         logging.error(f"Retrieve asset data error: {str(exp)}")
153         sys.exit(1)
154 
155     # Gunzip the file into another file.
156     print_info(f"Unzipping file {gz_asset_file_name} to {jsonl_asset_file_name}")
157     with gzip.open(gz_asset_file_name, 'rb') as f_in:
158         with open(jsonl_asset_file_name, 'wb') as f_out:
159             shutil.copyfileobj(f_in, f_out)
160 
161     print_info(f"File {gz_asset_file_name} unzipped to {jsonl_asset_file_name}")
162     return jsonl_asset_file_name

Processing the JSONL File

Now that the assets have been collected, it is time to process them.  The JSONL file lines or records are counted.  If the line count is equal to one, then it is a very short JSONL file;  or it is not a JSONL file.  I took the stance that it is not a JSON file.

288     # Count and report.
289     num_assets = count_lines(jsonl_asset_file_name)
290     if num_assets == 1:
291         print_error(f"The format of file {jsonl_asset_file_name} is probably JSON, not JSONL")
292         sys.exit(1)
293     print(f"File: {jsonl_asset_file_name} with {num_assets} assets.")

proccess_jsonl_file()

Here is where the JSONL file is processed.  The function parameters are the JSONL file name and the asset_tags dictionary.  (Since asset_tags is a dictionary, it can be modified.).

A JSONL file is read one line at a time, converted to a dictionary in the function convert_to_json().

184 def convert_to_json(asset_line):
185     try:
186         asset = json.loads(asset_line.strip())
187     except json.JSONDecodeError:
188         print_error("The file's format is probably not JSONL, but XML or CSV")
189         sys.exit(1)
190 
191     return asset

I decided to wrap a function around json.loads(), because of the exception handling. If the line can’t be converted, it is assumed that the file is not in JSONL, but possibly XML or CSV.

After the line is converted to a dictionary, the code checks for “locator.” Why?  As far as I know, every asset has a "locator." If the asset has "tags," the asset tags are processed.

210     with open(jsonl_asset_file_name, 'r') as jsonl_f:
211         for line_num, asset_line in enumerate(jsonl_f):
212             asset = convert_to_json(asset_line)
213 
214             if not "locator" in asset:
215                 print_error(f"Can't find 'locator' field.  Is this an asset import?")
216                 print_error(f"Line read: {asset}")
217                 sys.exit(1)
218 
219             if "tags" in asset:
220                 logging.debug(f"{asset['id']}: {asset['tags']}")
221                 process_tags(asset['id'], asset['tags'], asset_tags)
222             asset_count += 1

process_asset_tags()

The asset tag uniqueness and counting are done in this function. The parameters are:

  • asset_id – The unique ID of the asset.
  • tags_to_process – An array of asset tags to process for uniqueness.
  • asset_tags – The asset tag dictionary maps the asset tag name to the Asset_Tag_Info object.

Each asset tag in the array tags_to_process is examined if it is a key in the asset_tags dictionary.  If the asset tag is a key, then it is counted by incr() method; however, if the tag is not a key, an Asset_Tag_Info object is created, attached to the key with a count of one.

195 def process_tags(asset_id, tags_to_process, asset_tags):
196     for tag in tags_to_process:
197         if tag in asset_tags:
198             asset_tags[tag].incr()
199             logging.debug(f"Existing asset tag: {tag} ({asset_id})")
200         else:
201             tag_info = Asset_Tag_Info()
202             asset_tags[tag] = tag_info
203             logging.info(f"New asset tag: {tag} ({asset_id})")

write_csv_file()

After all the tags in all the assets are processed, the asset_tags dictionary is written to a CSV file. The Python csv library is used.  Finally, the CSV file is made available.

230 def write_csv_file(asset_tags):
231     # Open up the CSV file and write the header row.
232     csv_file_name = "uniq_asset_tags.csv"
233     uniq_asset_tags_fp = open(csv_file_name, 'w', newline='')
234     uniq_tag_writer = csv.writer(uniq_asset_tags_fp)
235     uniq_tag_writer.writerow(["Asset Tag Name", "Asset Tag Count"])
236 
237     # Write the CSV file.
238     for asset_tag in asset_tags:
239         asset_tag_info = asset_tags[asset_tag]
240         uniq_tag_writer.writerow([asset_tag, asset_tag_info.get_count()])
241 
242     print_info(f"{csv_file_name} is now available.")

Conclusion

The results are in the CSV file, uniq_asset_tags.csv so you can either sort by asset tag name or usage count.  Once you know all your asset tag names, you might find some that are underutilized or even over-utilized.  As always this code is in Kenna Security’s GitHub repository.

Until next time,

Rick Ehrhart

API Evangelist

Read the Latest Content

Threat Intelligence

18+ Threat Intel Feeds Power Modern Vulnerability Management

You need lots of threat intelligence feeds to cover all of the threat and vulnerability data categories in the world. Learn about the threat intel feeds...
READ MORE
Data Science

Ask Us About Our Data Science

In vulnerability management, data deluge is a recurring problem. Learn what data science is and how it can help your company.
READ MORE
Risk-Based Vulnerability Management

What is Modern Vulnerability Management?

Modern vulnerability management is an orderly, systematic, and data-driven approach to enterprise vulnerability management.
READ MORE
FacebookLinkedInTwitterYouTube

© 2022 Kenna Security. All Rights Reserved. Privacy Policy.