NVSM Health
Health Check Details
Health Check Details¶
check_blacklist_recommendations¶
Brief¶
Check DCGM for GPU blacklist recommendations
Description¶
None
Module¶
Source Code Listing¶
def run(self):
own_params = self.getParameters()
params = self.__parameters_task.getResult()
check_blacklist_recommendations = False
for param, req in own_params.items():
if param in params.keys():
check_blacklist_recommendations = params[param]
# Return if parameter is not defined
if not check_blacklist_recommendations:
return
# check if kvm mode is on
if modules.kvm.kvm_mode_on.getResult() == True:
self.addCheckMessage('KVM mode is on, skipping blacklist recommendations check.')
self.addInformational()
return
# Run the blacklist_recommendations task
args = ['./modules/blacklist_recommendations/_gpu_blacklist_recommendations', \
'--detect', '--watches']
args.extend(check_blacklist_recommendations)
collect_task = tasks.RunCommand(args=args, timeout=1000)
collect_task.run()
# For DCGM failures, GPU blacklist recommendations can
# exit with returncode 1, handle it gracefully
# Return, for no respone or exitcode > 1
if (collect_task.getReturnCode() > 1) or not collect_task.getOutput():
self.addCheckMessage('No response or error while running GPU blacklist recommendations: {}'.format(
collect_task.getError()))
self.addUnknown()
return
healthy = True
try:
result = json.loads(collect_task.getOutput())
blacklist = result.get('blacklistedGpus', {})
# Check for GPU/NVSwitch blacklist recommendations
if len(blacklist) > 0:
healthy = False
self.addCheckMessage('Found {count} device(s) recommended for blacklist:'.format(
count=len(blacklist)))
else:
self.addCheckMessage('No devices found recommended for blacklist.')
for entity_id in sorted(blacklist.keys()):
details = blacklist[entity_id]
device_uuid = details.get('UUID')
device_bdf = details.get('BDF')
failure_explanation = details.get('Failure Explanation')
self.addCheckMessage('\t"GPU{entity_id}":\n' \
'\t"BDF": "{device_bdf}"\n' \
'\t"UUID": "{device_uuid}"\n' \
'\t"Failure Explanation": {failure_explanation}'.format(
entity_id=entity_id,
device_bdf=device_bdf,
device_uuid=device_uuid,
failure_explanation=failure_explanation))
# Check for other errors in blacklist recommendation script
error_list = result.get('errors', [])
if error_list:
nv_hostengine_running = True
self.addCheckMessage('Errors encountered:')
for e in error_list:
if 'host engine is not valid any longer' in e:
nv_hostengine_running = False
self.addCheckMessage('\t{}'.format(e))
# If nv-hostengine is not running return as unknown
if not nv_hostengine_running:
self.addUnknown()
return
healthy = False
except Exception as e:
self.addCheckMessage('Error while parsing GPU blacklist recommendations: {}'.format(e.message))
self.addUnknown()
return
# make sure SBE page pending retirements are caught as informational,
# as the blacklist_recommendations script ignores them as warnings
if healthy:
nvidia_smi_res = modules.nvidia_smi.parse_nvidia_smi.getResult()
if nvidia_smi_res:
for gpu, info in nvidia_smi_res.items():
gpu_dict = xmltodict.parse(info)
check_cls = modules.nvidia_smi.GpuCheckRetiredPagesPending(gpu, gpu_dict)
check_cls.setCallback(self.getCallback())
check_cls.run()
bad_count = check_cls.getResult()['unhealthy'] + check_cls.getResult()['unknown']
if bad_count:
healthy = False
self.addCheckMessage(check_cls.getTitle() )
if not healthy:
self.addCheckMessage(config.gpu_total_retired_pages_pending_error)
self.addInformational()
return
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
check_bom_dimms¶
Brief¶
Check Memory DIMMs devices information for consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
generic_bom_check(self, self.__parameters_task, self.__parse_dmidecode_task, \
config.dimms_info_str, config.dimms_command_str)
check_bom_disk_controllers¶
Brief¶
Check Disk Controllers PCIe devices information for consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
config.disk_controllers_info_str, config.disk_controllers_command_str, \
config.disk_controllers_pci_device_missing_str, config.pci_device_changed_str)
check_bom_ethernet_controllers¶
Brief¶
Check Ethernet Controllers PCIe devices information for consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
# Get parameters from own task
own_params = self.getParameters()
# Get parameters from parameter task
params = self.__parameters_task.getResult()
bom_config = None
for param, req in own_params.items():
if param in params.keys():
bom_config = params[param]
# If parameter is not found in platform parameters - Do Nothing
if bom_config == None:
return
# Print/Stream Task info and command messages
self.title(config.ethernet_controllers_info_str)
self.addCheckMessage(config.ethernet_controllers_command_str)
if self.__parse_lspci_task.getResult() != None:
# Compare task output with expected config
res = self.__parse_lspci_task.getResult()
if type(res) is dict:
out_dict = res
else:
out_dict = json.loads(res)
# dictionary compare
ddiff = DeepDiff(out_dict, bom_config)
message = ''
result = 'Healthy'
if any(key in ddiff for key in ['dictionary_item_added',
'values_changed']):
if 'dictionary_item_added' in ddiff:
for item in ddiff['dictionary_item_added']:
key = re.findall('\[\'(.*?)\'\]', item)
message += '\n'
message += config.ethernet_controllers_pci_device_missing_str.format(' -> '.join(key))
result = 'UnHealthy'
if 'values_changed' in ddiff:
for key, value in ddiff['values_changed'].items():
key = re.findall('\[\'(.*?)\'\]', key)
message += '\n'
# Best effort to add additional_message_key information
try:
message += 'For {} with '.format(ref_dict[key[0]]['device'])
except:
pass
message += config.pci_device_changed_str.format(
' -> '.join(key),
value['old_value'],
value['new_value'])
if result == 'Healthy':
result = 'Informational'
self.addCheckMessage(message)
if result == 'Healthy':
# Healthy check - No diffs found in bill-of-materials
self.addHealthy()
elif result == 'UnHealthy':
# UnHealthy check - Print/Stream diffs found in bill-of-materials
self.addUnHealthy()
elif result == 'Informational':
# Informational status - print change in config
self.addInformational()
else:
self.addUnknown()
check_bom_gpus¶
Brief¶
Check GPUs PCIe devices information for consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
config.gpus_info_str, config.gpus_command_str, \
config.gpus_pci_device_missing_str, config.pci_device_changed_str)
check_bom_ib_controllers¶
Brief¶
Check Infiband controllers PCIe devices information for consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
config.ib_controllers_info_str, config.ib_controllers_command_str, \
config.ib_controllers_pci_device_missing_str, config.pci_device_changed_str)
check_bom_nvswitch¶
Brief¶
Check NVSwitch controller PCIe devices information for consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
config.nvswitch_info_str, config.nvswitch_command_str, \
config.nvswitch_pci_device_missing_str, config.pci_device_changed_str)
check_bom_pcie_switches¶
Brief¶
Check PCIe Switches PCIe devices information for consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
config.pcie_switches_info_str, config.pcie_switches_command_str, \
config.pcie_switches_pci_device_missing_str, config.pci_device_changed_str)
check_bom_vgas¶
Brief¶
Check VGA Controller PCIe devices information for consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
config.vgas_info_str, config.vgas_command_str, \
config.vgas_pci_device_missing_str, config.pci_device_changed_str)
check_dcc_can_health¶
Brief¶
Drive Constellation: Check DCC CAN Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_dcc_hardware_health.getOutput()
if json_output != None:
try:
dcc_can_health = json_output['dcc_can_health']
if dcc_can_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_can_health['test_information'])
except Exception as e:
logging.debug("Error fetching can health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_can_reachability¶
Brief¶
Drive Constellation: Check DCC CAN reachability Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_check_application_config_health.getOutput()
if json_output != None:
try:
dcc_can_reachability = json_output['can_reachability']
if dcc_can_reachability['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_can_reachability['test_information'])
#self.__output(dcc_display_configuration['test_information'])
except Exception as e:
logging.debug("Error fetching DCC CAN health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_display_configuration¶
Brief¶
Drive Constellation: Check DCC Display Configuration Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
#self.addCheckMessage("Checking Application Config health")
#import pdb
#pdb.set_trace()
json_output = self.__run_check_application_config_health.getOutput()
if json_output != None:
try:
dcc_display_configuration = json_output['display_configuration']
if dcc_display_configuration['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_display_configuration['test_information'])
#self.__output(dcc_display_configuration['test_information'])
except Exception as e:
logging.debug("Error fetching DCC Display health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_display_synchronization¶
Brief¶
Drive Constellation: Check DCC Display Synchronization Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_check_application_config_health.getOutput()
if json_output != None:
try:
dcc_display_synchronization = json_output['display_synchronization']
if dcc_display_synchronization['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_display_synchronization['test_information'])
#self.__output(dcc_display_configuration['test_information'])
except Exception as e:
print("Error fetching ethernet health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_ecu_tegraA_health¶
Brief¶
Drive Constellation: Check DCC ECU TegraA Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
self.addCheckMessage("Checking DCC ECU TegraA Hardware health")
#import pdb
#pdb.set_trace()
json_output = self.__run_dcc_hardware_health.getOutput()
if json_output != None:
try:
dcc_tegraA_health = json_output['tegraA_health']
if dcc_tegraA_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_tegraA_health['test_information'])
except Exception as e:
print("Error fetching ecu TegraA health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_ecu_tegraA_storage_health¶
Brief¶
Drive Constellation: Check DCC ECU TegraA Storage Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
self.addCheckMessage("Checking DCC ECU TegraA Storage health")
#import pdb
#pdb.set_trace()
json_output = self.__run_dcc_ecu_application_health.getOutput()
if json_output != None:
try:
dcc_tegraB_health = json_output['tegraA_health']
if dcc_tegraB_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_tegraB_health['test_information'])
except Exception as e:
print("Error fetching ecu TegraA health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_ecu_tegraB_health¶
Brief¶
Drive Constellation: Check DCC ECU TegraB Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
self.addCheckMessage("Checking DCC ECU TegraB Hardware health")
#import pdb
#pdb.set_trace()
json_output = self.__run_dcc_hardware_health.getOutput()
if json_output != None:
try:
dcc_tegraB_health = json_output['tegraB_health']
if dcc_tegraB_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_tegraB_health['test_information'])
except Exception as e:
print("Error fetching ecu TegraB health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_ecu_tegraB_storage_health¶
Brief¶
Drive Constellation: Check DCC ECU TegraB Storage Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
self.addCheckMessage("Checking DCC ECU TegraB Storage health")
#import pdb
#pdb.set_trace()
json_output = self.__run_dcc_ecu_application_health.getOutput()
if json_output != None:
try:
dcc_tegraB_health = json_output['tegraB_health']
if dcc_tegraB_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_tegraB_health['test_information'])
except Exception as e:
print("Error fetching ecu TegraB health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_ethernet_health¶
Brief¶
Drive Constellation: Check DCC Ethernet Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
self.addCheckMessage("Checking DCS Hardware health")
#import pdb
#pdb.set_trace()
json_output = self.__run_dcc_hardware_health.getOutput()
if json_output != None:
try:
dcc_ethernet_health = json_output['dcc_ethernet_health']
if dcc_ethernet_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_ethernet_health['test_information'])
except Exception as e:
logging.debug("Error fetching ethernet health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_fan_health¶
Brief¶
Drive Constellation: Check DCC Fan Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_dcc_hardware_health.getOutput()
if json_output != None:
try:
dcc_fan_health = json_output['dcc_fan_health']
if dcc_fan_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_fan_health['test_information'])
return
except Exception as e:
logging.debug("Error fetching ethernet health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_gpu_health¶
Brief¶
Drive Constellation: Check DCC GPU Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_dcc_hardware_health.getOutput()
if json_output != None:
try:
dcc_gpu_health = json_output['dcc_gpu_health']
if dcc_gpu_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_gpu_health['test_information'])
except Exception as e:
logging.debug("Error fetching gpu health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_info¶
Brief¶
Drive Constellation: Get DCC Info
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_dcc_health_api_task.getOutput()
if json_output != None:
try:
dcc_info = json_output['dcc_info']
self.addHealthy()
for key, value in dcc_info.items():
key = key[0:].replace('_', ' ')
key = key[0:].title()
self.send("{:20} : {:20}".format(key, value))
except Exception as e:
self.addUnHealthy()
logging.debug("Error fetching dcc info: {}".format(e))
self.__output = None
#self.addUnknown()
return
check_dcc_network_reachability¶
Brief¶
Drive Constellation: Check DCC Network Reachability
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_check_application_config_health.getOutput()
if json_output != None:
try:
dcc_network_reachability = json_output['network_reachability']
if dcc_network_reachability['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_network_reachability['test_information'])
#self.__output(dcc_display_configuration['test_information'])
except Exception as e:
logging.debug("Error fetching network reachability info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_serializer_configuration¶
Brief¶
Drive Constellation: Check DCC Serializer Configuration Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_check_application_config_health.getOutput()
if json_output != None:
try:
dcc_serializer_configuration = json_output['serializer_configuration']
if dcc_serializer_configuration['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_serializer_configuration['test_information'])
#self.__output(dcc_display_configuration['test_information'])
except Exception as e:
logging.debug("Error fetching serializer_configuration info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_usb_health¶
Brief¶
Drive Constellation: Check DCC USB Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_dcc_hardware_health.getOutput()
if json_output != None:
try:
dcc_usb_health = json_output['dcc_usb_health']
if dcc_usb_health['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_usb_health['test_information'])
except Exception as e:
logging.debug("Error fetching usb health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcc_usb_reachability¶
Brief¶
Drive Constellation: Check DCC USB reachability Health
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_check_application_config_health.getOutput()
if json_output != None:
try:
dcc_usb_reachability = json_output['usb_reachability']
if dcc_usb_reachability['test_result'] == "Healthy":
self.addHealthy()
else:
self.addUnHealthy()
self.addCheckMessage(dcc_usb_reachability['test_information'])
#self.__output(dcc_display_configuration['test_information'])
except Exception as e:
logging.debug("Error fetching DCC USB reachability health info: {}".format(e))
self.addUnknown()
return
self.addHealthy()
check_dcs_psu_info¶
Brief¶
Drive Constellation: Check DCC PSU Info
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
Health = True
try:
dcs_psu_attrib_values = self.__parameters_task.getResult()['dcs_psu_attrib_values']
except:
# Could not get list of valid values
self.addCheckMessage("Could not get list of valid PSU values")
self.addUnknown()
return
# Get the parsed results
dcs_psu_results = {}
dcs_psu_results['PSU-0'] = self.__parse_psu0_task.getOutput()
dcs_psu_results['PSU-1'] = self.__parse_psu1_task.getOutput()
# Check PSU Vendor
Msg = self.check_psu_attrib('Vendor', dcs_psu_attrib_values, dcs_psu_results)
if Msg != '':
self.addCheckMessage(Msg)
Health = False
# Check PSU Model
Msg = self.check_psu_attrib('Model', dcs_psu_attrib_values, dcs_psu_results)
if Msg != '':
self.addCheckMessage(Msg)
Health = False
if Health == True:
self.addHealthy()
elif Health == False:
self.addUnHealthy()
else:
self.addUnknown()
check_dimm_part_number¶
Brief¶
Verify DIMM part number
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
# Return if parameter is not defined
try:
dimm_boms = self.__parameters_task.getResult()['dimm_bom']
part_number = self.__parameters_task.getResult()['dimm_part_number']
except:
return
# Unknown check for no result from dmidecode
res = self.__dimm_task.getResult()
if not res:
self.addCheckMessage('No result from parse dmidecode output')
self.addUnknown()
return
healthy = True
for dimm in dimm_boms:
if dimm in res.keys():
if 'part_number' in res[dimm].keys():
if res[dimm]['part_number'].strip() not in part_number:
self.addCheckMessage('Mismatch in DIMM "{}" part number, expected is "{}" found is "{}"'.format(dimm, " or ".join(part_number), res[dimm]['part_number']))
healthy = False
else:
self.addCheckMessage('DIMM "{}" part number not found'.format(dimm))
healthy = False
else:
# Must be caught on checking DIMMs
pass
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
check_dimm_vendors¶
Brief¶
Verify DIMM vendors
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
dimm_vendors = self.__parameters_task.getResult()['dmidecode_dimm_vendors']
except:
return
out = self.__get_dimm_vendors_task.getResult()
if not out:
self.addCheckMessage('ERROR: Could not parse dmidecode output')
self.addUnknown()
return
healthy = True
for dimm in out:
if dimm not in dimm_vendors:
self.addCheckMessage('Unknown DIMM vendor "{value}"'.format(value=dimm))
healthy = False
else:
# Found the expected dimm vendor
pass
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
check_ecu_info¶
Brief¶
Drive Constellation: Get ECU Info
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
json_output = self.__run_dcc_health_api_task.getOutput()
if json_output != None:
try:
ecu_info = json_output['ecu_info']
self.addHealthy()
for key, value in ecu_info.items():
key = key[0:].replace('_', ' ')
key = key[0:].title()
self.send("{:20} : {:20}".format(key, value))
except Exception as e:
self.addUnHealthy()
logging.debug("Error fetching dcc info: {}".format(e))
self.__output = None
#self.addUnknown()
return
check_ethernet_controller_info¶
Brief¶
None
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
devices = self.__parameters_task.getResult()[self.__parameter]
except:
return
for device in deepcopy(devices):
pstate_not_found = False
if self.__is_gpu_check:
bdf_pstate = self.__bdf_pstate.getResult()
try:
pstate = bdf_pstate[device['bdf']]
device['speed'] = device['speed'][pstate]
device['width'] = device['width'][pstate]
except:
# not able to find the pstate for this gpu bdf
pstate_not_found = True
kvm_mode_disabled = True
if self.__parameter== "gpu_link_info" and modules.kvm.kvm_mode_on.getResult() == True:
kvm_mode_disabled = False
for check_type in ['speed', 'width']:
check_cls = CheckLink(device['bdf'], device[check_type], check_type, self.__parse_lspci_task, self.__parameter)
if 'name' not in device:
device["name"] = ""
check_cls.setCallback(self.getCallback())
if kvm_mode_disabled:
if pstate_not_found:
device['speed'] = "None"
device['width'] = "None"
check_cls.addCheckMessage("unknown pstate for the GPU[{}]".format(device['bdf']))
check_cls.addUnknown()
else:
check_cls.run()
else:
device['speed'] = "None"
device['width'] = "None"
check_cls.addCheckMessage('KVM mode is on, skipping check.')
check_cls.addInformational()
self.addInformational()
check_cls.title(self.__title_str[check_type].format(**device).strip())
if kvm_mode_disabled:
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
self.addInformational(count=check_cls.getResult()['informational'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())
# clear message as this task doesnt print anything
self.title('')
check_fan_bom¶
Brief¶
Verify chassis fan presence
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
from nvsmhealth.lib import DictionarySuperset
try:
fan_bom = self.__parameters_task.getResult()['fan_bom']
except:
return
output = self.__sdr_device_bom_task.getResult()
dictionary_superset = DictionarySuperset.DictionarySuperset(
missing_message="Could not detect presence of chassis fan {}")
result = dictionary_superset.compare(output, fan_bom)
self.addCheckMessage("Checking output of 'ipmitool sdr elist' for expected chassis fans")
if result:
self.addCheckMessage(result)
self.addUnHealthy()
else:
self.addHealthy()
check_fru_consistency¶
Brief¶
Check FRU information for consistency
Description¶
The FRU (field replaceable unit) information recorded in the BMC (baseboard management controller) includes serial numbers for various FRUs on the system. For any given system, these serial numbers should be consistent among all FRUs. However, it is possible for these serial numbers to become inconsistent as the result of normal maintenance (such as FRU replacement). This check makes sure serial numbers are consistent for all FRUs recorded in the BMC.
Depends On¶
Source Code Listing¶
def run(self):
own_params = self.getParameters()
params = self.__parameters_task.getResult()
fru_devices = None
for param, req in own_params.items():
if param in params.keys():
fru_devices = params[param]
if fru_devices == None:
return
# Unknown check for parse tasks failure
if not (self.__fru_task.getResult() and self.__dmidecode_task.getResult()):
self.addCheckMessage("No results from 'ipmitool fru print' or 'dmidecode' commands")
self.addUnknown()
return
result = "healthy"
self.addCheckMessage(config.fru_command_str)
try:
fru_res = self.__fru_task.getResult()
# Check for FRU devices
devices_not_found = [device for device in fru_devices if device not in fru_res.keys()]
if devices_not_found:
self.addCheckMessage("FRU devices not found '{}'".format(", ".join(devices_not_found)))
if len(devices_not_found) == len(fru_devices):
self.addUnHealthy()
return
result = "unhealthy"
devices_found = [device for device in fru_devices if device not in devices_not_found]
# Check for FRU devices chassis serial number
chassis_serial_not_found = [device for device in devices_found if 'chassis_serial' not in fru_res[device].keys()]
if chassis_serial_not_found:
self.addCheckMessage("Chassis serial number not found for FRU devices '{}'".format(", ".join(chassis_serial_not_found)))
result = "unhealthy"
chassis_serial_found = [device for device in devices_found if device not in chassis_serial_not_found]
# Get expected serial number
dmidecode_res = self.__dmidecode_task.getResult()
chassis_info = [v['serial_number'] for k, v in dmidecode_res.items() if 'chassis information' in k.lower() and
'serial_number' in v.keys()]
if chassis_info:
expected_serial_number = chassis_info[0]
else:
self.addCheckMessage("Failed while fetching serial number from chassis information")
self.addUnknown()
return
# Check and print the FRU devices having inconsistent chassis serial numbers
diff = [device for device in chassis_serial_found if fru_res[device]['chassis_serial'] != expected_serial_number]
for device in diff:
self.addCheckMessage("FRU device '{}' got chassis serial '{}' whereas expected is '{}'"
.format(device, fru_res[device]['chassis_serial'], expected_serial_number))
# For change in FRU chassis serial print informational status
result = "info"
if result == "unhealthy":
self.addUnHealthy()
elif result == "info":
self.addInformational()
else:
self.addHealthy()
except:
self.addCheckMessage("Failed while checking FRU serial number consistency")
self.addUnknown()
check_gpu_direct_topology¶
Brief¶
Check GPUDirect Topology information for consistency
Description¶
None
Module¶
Source Code Listing¶
def run(self):
# Get parameters from own task
own_params = self.getParameters()
# Get parameters from parameter task
params = self.__parameters_task.getResult()
# Get GPU MIG State
gpu_mig_state = self.__gpu_mig_state.getResult()
# Check if kvm mode is on then skip
if modules.kvm.kvm_mode_on.getResult() == True:
self.addCheckMessage('KVM mode is on, skipping check.')
self.addInformational()
return
# If GPU State Enabled
if gpu_mig_state != None and any(gpu_mig_state.values()):
logging.info("MIG State Detected: Modifying to MIG Topology")
_gpu_direct_topology = params['gpu_direct_topology']
num_gpus = len(gpu_mig_state.values())
# gpu_mig_state is a dict
gpus = range(num_gpus)
mig_enabled = [x for x, y in gpu_mig_state.items() if y == 1]
mig_enabled_gpus = ['GPU{}'.format(x) for x in mig_enabled]
pxb_enabled_gpus = ['GPU{}'.format(
y) for y in [x-1 if x % 2 != 0 else x+1 for x in mig_enabled]]
non_mig_gpus = ['GPU{}'.format(x)
for x in gpus if x not in mig_enabled]
# Go over all the MIG enabled GPUs first
for index, mig_gpu in enumerate(mig_enabled_gpus):
for k, v in _gpu_direct_topology[mig_gpu].items():
if k == mig_gpu:
continue # Already marked as X
elif k == pxb_enabled_gpus[index]:
_gpu_direct_topology[mig_gpu][k] = 'PXB'
else:
_gpu_direct_topology[mig_gpu][k] = 'SYS'
#Go over non-mig gpus next
for index, gpu in enumerate(non_mig_gpus):
for k, v in _gpu_direct_topology[gpu].items():
if k == gpu:
continue
elif k in mig_enabled_gpus and gpu in pxb_enabled_gpus:
if mig_enabled_gpus.index(k) == pxb_enabled_gpus.index(gpu):
_gpu_direct_topology[gpu][k] = 'PXB'
else:
_gpu_direct_topology[gpu][k] = 'SYS'
elif k in mig_enabled_gpus:
_gpu_direct_topology[gpu][k] = 'SYS'
params['gpu_direct_topology'] = _gpu_direct_topology
expected_topology = None
for param, req in own_params.items():
if param in params.keys():
expected_topology = params[param]
# If parameter is not found in platform parameters - Do Nothing
if expected_topology == None:
return
# Print/Stream Task info and command messages
self.addCheckMessage(config.gpu_direct_topology_command_str)
# Unknown check for no result from nvidia-smi topology parse task
if not self.__parse_task.getResult():
self.addCheckMessage(
'No result for GPUDirect topology information gathered from nvidia-smi tool')
self.addUnknown()
return
try:
# Compare task output with expected config
topology = json.loads(self.__parse_task.getResult())
healthy, message = genericGpuTopologyCheck(
topology, expected_topology)
if not healthy:
self.addCheckMessage(message)
self.addUnHealthy()
else:
self.addHealthy()
except:
self.addCheckMessage('Error while checking gpu direct topology')
self.addUnknown() # Unknown check
check_gpu_link_info¶
Brief¶
None
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
devices = self.__parameters_task.getResult()[self.__parameter]
except:
return
for device in deepcopy(devices):
pstate_not_found = False
if self.__is_gpu_check:
bdf_pstate = self.__bdf_pstate.getResult()
try:
pstate = bdf_pstate[device['bdf']]
device['speed'] = device['speed'][pstate]
device['width'] = device['width'][pstate]
except:
# not able to find the pstate for this gpu bdf
pstate_not_found = True
kvm_mode_disabled = True
if self.__parameter== "gpu_link_info" and modules.kvm.kvm_mode_on.getResult() == True:
kvm_mode_disabled = False
for check_type in ['speed', 'width']:
check_cls = CheckLink(device['bdf'], device[check_type], check_type, self.__parse_lspci_task, self.__parameter)
if 'name' not in device:
device["name"] = ""
check_cls.setCallback(self.getCallback())
if kvm_mode_disabled:
if pstate_not_found:
device['speed'] = "None"
device['width'] = "None"
check_cls.addCheckMessage("unknown pstate for the GPU[{}]".format(device['bdf']))
check_cls.addUnknown()
else:
check_cls.run()
else:
device['speed'] = "None"
device['width'] = "None"
check_cls.addCheckMessage('KVM mode is on, skipping check.')
check_cls.addInformational()
self.addInformational()
check_cls.title(self.__title_str[check_type].format(**device).strip())
if kvm_mode_disabled:
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
self.addInformational(count=check_cls.getResult()['informational'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())
# clear message as this task doesnt print anything
self.title('')
check_gpu_p2p_topology¶
Brief¶
Check GPUs p2p Topology information for consistency
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
# Get parameters from own task
own_params = self.getParameters()
# Get parameters from parameter task
params = self.__parameters_task.getResult()
expected_topology = None
for param, req in own_params.items():
if param in params.keys():
expected_topology = params[param]
# If parameter is not found in platform parameters - Do Nothing
if expected_topology == None:
return
if modules.kvm.kvm_mode_on.getResult() == True:
self.addCheckMessage('KVM mode is on, skipping check.')
self.addInformational()
return
# Print/Stream Task info and command messages
self.addCheckMessage(config.gpu_p2p_topology_command_str)
# Unknown check for no result from nvidia-smi topology parse task
if not self.__parse_task.getResult():
self.addCheckMessage(
'No result for GPUs p2p topology information gathered from nvidia-smi tool')
self.addUnknown()
return
try:
# Compare task output with expected config
topology = json.loads(self.__parse_task.getResult())
healthy, message = genericGpuTopologyCheck(
topology, expected_topology)
if not healthy:
self.addCheckMessage(message)
self.addUnHealthy()
else:
self.addHealthy()
except:
self.addCheckMessage('Error while checking gpu direct topology')
self.addUnknown() # Unknown check
check_gpu_vbios_version_consistency¶
Brief¶
Verify GPUs VBIOS version consistency
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
if modules.kvm.kvm_mode_on.getResult() == True:
self.addCheckMessage('KVM mode is on, skipping check.')
self.addInformational()
return
# If nvidia-smi run got a failure, skip parsing the output
nvidia_smi_res = self.__parse_task.getResult()
if not nvidia_smi_res:
self.addCheckMessage('No result from nvidia-smi tool')
self.addUnknown()
return
inconsistent_gpus = {}
self.addCheckMessage(config.vbios_command_str)
'''
product name: vbios_version: gpu_name
'''
try:
for gpu, info in nvidia_smi_res.items():
gpu_dict = xmltodict.parse(info)
vbios_version = gpu_dict['nvidia_smi_log']['gpu']['vbios_version']
product_name = gpu_dict['nvidia_smi_log']['gpu']['pci']['pci_device_id']
if product_name not in inconsistent_gpus:
inconsistent_gpus[product_name] = {}
if vbios_version not in inconsistent_gpus[product_name]:
inconsistent_gpus[product_name][vbios_version] = []
inconsistent_gpus[product_name][vbios_version].append('GPU{}'.format(gpu))
# Unhealthy check for multiple versions
res = ""
for product_name, vbios_version in inconsistent_gpus.items():
if len(vbios_version) > 1:
for k, v in vbios_version.items():
res += "GPUs: {} has VBIOS version '{}'\n".format(
", ".join(v), k)
if res != "":
self.addCheckMessage(
f"Different VBIOS version found on GPUs\n{res}")
self.addUnHealthy()
else:
self.addHealthy()
# Unknown check
except:
self.addCheckMessage(
'Error while checking GPUs VBIOS version consistency')
self.addUnknown()
check_gpus¶
Brief¶
Check GPU health retired page count, retired pages pending, inforom storage version and vbios version
Description¶
None
Module¶
Depends On¶
Used By¶
Source Code Listing¶
def run(self):
own_params = self.getParameters()
params = self.__parameters_task.getResult()
gpu_total_retired_page_count = None
for param, req in own_params.items():
if param in params.keys():
gpu_total_retired_page_count = params[param]
# If nvidia-smi run got a failure, skip parsing the output
nvidia_smi_res = self.__parse_nvidia_smi_task.getResult()
if nvidia_smi_res == None:
return
gpus_info = {}
for gpu, info in nvidia_smi_res.items():
gpus_info[gpu] = {}
try:
gpu_dict = xmltodict.parse(info)
check_cls = GpuCheckVbiosVersion()
vbios_version = gpu_dict['nvidia_smi_log']['gpu']['vbios_version']
gpus_info[gpu]['vbios_version'] = vbios_version
product_name = gpu_dict['nvidia_smi_log']['gpu']['product_name']
gpus_info[gpu]['product_name'] = product_name
bdf = gpu_dict['nvidia_smi_log']['gpu']['pci']['pci_bus_id']
msg_str = config.gpu_vbios_version_str.format(
gpu_index=gpu, bdf=bdf)
check_cls.title(msg_str)
check_cls.setCallback(self.getCallback())
check_cls.send(msg=vbios_version)
super().addMessages(check_cls.getMessages())
check_cls = GpuCheckInforomStorageVersion()
inforom_version = gpu_dict['nvidia_smi_log']['gpu']['inforom_version']['img_version']
msg_str = config.gpu_inforom_version_str.format(
gpu_index=gpu, bdf=bdf)
check_cls.title(msg_str)
check_cls.setCallback(self.getCallback())
check_cls.send(msg=inforom_version)
super().addMessages(check_cls.getMessages())
# NVBug-2691112: Disable GPU total retired page count and retired pages pending health checks
# These checks are already covered in blacklist_recommendations health check
'''if gpu_total_retired_page_count != None:
check_cls = GpuCheckRetiredPagesCount(gpu, gpu_dict, gpu_total_retired_page_count)
check_cls.setCallback(self.getCallback())
check_cls.run()
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())
check_cls = GpuCheckRetiredPagesPending(gpu, gpu_dict)
check_cls.setCallback(self.getCallback())
check_cls.run()
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())'''
except:
logging.error(
"ERROR: Failed to perform GPU {} health check".format(gpu))
pass
self.title('')
self.__gpu_result = gpus_info
check_ib_controller_link_info¶
Brief¶
None
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
devices = self.__parameters_task.getResult()[self.__parameter]
except:
return
for device in deepcopy(devices):
pstate_not_found = False
if self.__is_gpu_check:
bdf_pstate = self.__bdf_pstate.getResult()
try:
pstate = bdf_pstate[device['bdf']]
device['speed'] = device['speed'][pstate]
device['width'] = device['width'][pstate]
except:
# not able to find the pstate for this gpu bdf
pstate_not_found = True
kvm_mode_disabled = True
if self.__parameter== "gpu_link_info" and modules.kvm.kvm_mode_on.getResult() == True:
kvm_mode_disabled = False
for check_type in ['speed', 'width']:
check_cls = CheckLink(device['bdf'], device[check_type], check_type, self.__parse_lspci_task, self.__parameter)
if 'name' not in device:
device["name"] = ""
check_cls.setCallback(self.getCallback())
if kvm_mode_disabled:
if pstate_not_found:
device['speed'] = "None"
device['width'] = "None"
check_cls.addCheckMessage("unknown pstate for the GPU[{}]".format(device['bdf']))
check_cls.addUnknown()
else:
check_cls.run()
else:
device['speed'] = "None"
device['width'] = "None"
check_cls.addCheckMessage('KVM mode is on, skipping check.')
check_cls.addInformational()
self.addInformational()
check_cls.title(self.__title_str[check_type].format(**device).strip())
if kvm_mode_disabled:
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
self.addInformational(count=check_cls.getResult()['informational'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())
# clear message as this task doesnt print anything
self.title('')
check_instant_blacklist_recommendations¶
Brief¶
Quick health check of GPU using DCGM
Description¶
None
Module¶
Source Code Listing¶
def run(self):
own_params = self.getParameters()
params = self.__parameters_task.getResult()
check_blacklist_recommendations = False
for param, req in own_params.items():
if param in params.keys():
check_blacklist_recommendations = params[param]
# Return if parameter is not defined
if not check_blacklist_recommendations:
return
# check if kvm mode is on
if modules.kvm.kvm_mode_on.getResult() == True:
self.addCheckMessage('KVM mode is on, skipping blacklist recommendations check.')
self.addInformational()
return
# Run the blacklist_recommendations task
args = ['./modules/blacklist_recommendations/_gpu_blacklist_recommendations', \
'--detect', '--watches']
args.extend(check_blacklist_recommendations)
collect_task = tasks.RunCommand(args=args, timeout=1000)
collect_task.run()
# For DCGM failures, GPU blacklist recommendations can
# exit with returncode 1, handle it gracefully
# Return, for no respone or exitcode > 1
if (collect_task.getReturnCode() > 1) or not collect_task.getOutput():
self.addCheckMessage('No response or error while running GPU blacklist recommendations: {}'.format(
collect_task.getError()))
self.addUnknown()
return
healthy = True
try:
result = json.loads(collect_task.getOutput())
blacklist = result.get('blacklistedGpus', {})
# Check for GPU/NVSwitch blacklist recommendations
if len(blacklist) > 0:
healthy = False
self.addCheckMessage('Found {count} device(s) recommended for blacklist:'.format(
count=len(blacklist)))
else:
self.addCheckMessage('No devices found recommended for blacklist.')
for entity_id in sorted(blacklist.keys()):
details = blacklist[entity_id]
device_uuid = details.get('UUID')
device_bdf = details.get('BDF')
failure_explanation = details.get('Failure Explanation')
self.addCheckMessage('\t"GPU{entity_id}":\n' \
'\t"BDF": "{device_bdf}"\n' \
'\t"UUID": "{device_uuid}"\n' \
'\t"Failure Explanation": {failure_explanation}'.format(
entity_id=entity_id,
device_bdf=device_bdf,
device_uuid=device_uuid,
failure_explanation=failure_explanation))
# Check for other errors in blacklist recommendation script
error_list = result.get('errors', [])
if error_list:
nv_hostengine_running = True
self.addCheckMessage('Errors encountered:')
for e in error_list:
if 'host engine is not valid any longer' in e:
nv_hostengine_running = False
self.addCheckMessage('\t{}'.format(e))
# If nv-hostengine is not running return as unknown
if not nv_hostengine_running:
self.addUnknown()
return
healthy = False
except Exception as e:
self.addCheckMessage('Error while parsing GPU blacklist recommendations: {}'.format(e.message))
self.addUnknown()
return
# make sure SBE page pending retirements are caught as informational,
# as the blacklist_recommendations script ignores them as warnings
if healthy:
nvidia_smi_res = modules.nvidia_smi.parse_nvidia_smi.getResult()
if nvidia_smi_res:
for gpu, info in nvidia_smi_res.items():
gpu_dict = xmltodict.parse(info)
check_cls = modules.nvidia_smi.GpuCheckRetiredPagesPending(gpu, gpu_dict)
check_cls.setCallback(self.getCallback())
check_cls.run()
bad_count = check_cls.getResult()['unhealthy'] + check_cls.getResult()['unknown']
if bad_count:
healthy = False
self.addCheckMessage(check_cls.getTitle() )
if not healthy:
self.addCheckMessage(config.gpu_total_retired_pages_pending_error)
self.addInformational()
return
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
check_ipmi_sensor_thresholds¶
Brief¶
Check BMC sensor thresholds
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
threshold_check_dispatch = {
'lower_non_recoverable': lambda observed, threshold: threshold < observed,
'lower_critical': lambda observed, threshold: threshold < observed,
'upper_non_recoverable': lambda observed, threshold: observed < threshold,
'upper_critical': lambda observed, threshold: observed < threshold
}
threshold_display = {
'lower_non_recoverable': '{name}: Observed value "{observed}" ({units}) below non-recoverable lower threshold "{threshold}"',
'lower_critical': '{name}: Observed value "{observed}" ({units}) below critical lower threshold "{threshold}"',
'upper_non_recoverable': '{name}: Observed value "{observed}" ({units}) above non-recoverable upper threshold "{threshold}"',
'upper_critical': '{name}: Observed value "{observed}" ({units}) above critical upper threshold "{threshold}"'
}
# Look for any sensor values that fall outside of critical thresholds
try:
healthy = True
sensors = self.__parse_ipmi_sensor_task.getResult()
if not sensors:
return
for sensor in sensors:
name = sensor['name']
observed = sensor['current_reading']
try:
observed = float(observed)
except:
continue
units = sensor['type']
if units.lower() == 'discrete':
continue
for field in [
'lower_non_recoverable',
'lower_critical',
'upper_non_recoverable',
'upper_critical' ]:
threshold = sensor.get(field)
try:
threshold = float(threshold)
except:
continue
check = threshold_check_dispatch[field]
if check(observed, threshold):
continue # Observed value is within threshold
healthy = False
display = threshold_display[field]
self.addCheckMessage(display.format(
name=name,
observed=observed,
units=units,
threshold=threshold))
self.addCheckMessage('Checked {count} sensor values against BMC thresholds.'.format(
count=len(sensors)))
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
except:
self.addCheckMessage('No sensors found in "ipmitool sensor"')
self.addUnknown()
return
check_ipmitool_working¶
Brief¶
Check that the ipmitool command is working
Description¶
This checks the exit status of the “ipmitool” command. If the ipmitool command runs with successful exit status, then this is a good indication that ipmitool was able to communicate with the BMC (baseboard management controller).
Depends On¶
Source Code Listing¶
def run(self):
pass
check_logical_core_count¶
Brief¶
Number of logical CPU cores [{0}]
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
output = self.__parse_lscpu_task.getResult()
observed_core_count = output['CPU']
self.title(self.getTitle().format(observed_core_count))
try:
expected_core_count = self.__parameters_task.getResult()['lscpu_number_of_cores']
except:
return
if observed_core_count == expected_core_count:
self.addCheckMessage('Observed {0} logical CPU cores, matching expectations'.format(
observed_core_count))
self.addHealthy()
return
self.addCheckMessage('Observed {0} logical CPU cores when {1} cores were expected'.format(
observed_core_count, expected_core_count))
self.addUnHealthy()
if observed_core_count * 2 == expected_core_count:
# When only half of the expected logical cores are observed, we
# suspect hyperthreading might be disabled
# Look for the hyperthreading flag
hyperthreading_enabled = output['hyperthread'] == 2
if not hyperthreading_enabled:
self.addCheckMessage('It appears that Hyper-Threading is disabled.' \
' Some customers choose to disable Hyper-Threading in' \
' order to improve the performance of certain' \
' workloads. If Hyper-Threading was intentionally' \
' disabled, please ignore this message.')
check_mdadm_disks¶
Brief¶
Status of software RAID disk superblocks
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
check = self.__parameters_task.getResult()['mdadm_disk_status']
if not check:
return
except:
return
disk_info = self.__mdadm_parse_examine.getResult()
if not disk_info:
self.addCheckMessage("No result from parse 'mdadm --examine' for software RAID superblock")
self.addUnknown()
return
self.addCheckMessage("Checking output of 'mdadm --examine' for each software RAID superblock")
# Check the checksum of each RAID disk superblock managed by mdadm
healthy = True
for name, disk in disk_info.items():
if 'checksum' not in disk:
self.addCheckMessage('Checksum not known for RAID disk "{0}"'.format(name))
healthy = False
continue
checksum = disk['checksum']
if 'correct' not in checksum:
self.addCheckMessage('Observed failed checksum "{0}" on RAID disk "{1}"'.format(
checksum, name))
healthy = False
# Return healthy/unhealthy status
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
check_mdadm_volumes¶
Brief¶
Status of software RAID volumes
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
check = self.__parameters_task.getResult()['mdadm_volume_status']
if not check:
return
except:
return
volume_info = self.__mdadm_parse_details.getResult()
if not volume_info:
self.addCheckMessage("No result from parse 'mdadm --detail' for software RAID volume")
self.addUnknown()
return
self.addCheckMessage("Checking output of 'mdadm --detail' for each software RAID volume")
good_volume_states = [ 'clean', 'active', 'write-pending', 'active-idle' ]
healthy = True
for name, volume in volume_info.items():
# Check the volume state
if 'state' not in volume:
self.addCheckMessage('State not known for RAID volume "{0}"'.format(name))
healthy = False
elif 'recovering' in volume['state'].lower():
self.addCheckMessage('It appears that the RAID volume "{0}" is currently' \
' recovering. This is normal. However, volume performance' \
' might be reduced while the volume is recovering. The' \
' recovery process should complete soon, but if it does' \
' not please contact NVIDIA support.'.format(name))
elif 'resync' in volume['state'].lower():
self.addCheckMessage('It appears that the RAID volume "{0}" is currently' \
' resyncing. This is normal. However, volume performance' \
' might be reduced while the volume is resyncing. The' \
' resync process should complete soon, but if it does' \
' not please contact NVIDIA support.'.format(name))
elif volume['state'].lower() not in good_volume_states:
self.addCheckMessage('Observed unhealthy state "{0}" for RAID volume "{1}"'.format(
volume['state'], name))
healthy = False
# Check for failed devices in the volume
if 'failed_devices' not in volume:
pass # Hmm...
elif int(volume['failed_devices']) > 0:
self.addCheckMessage('Observed {0} failed device(s) in RAID volume "{1}"'.format(
volume['failed_devices'], name))
healthy = False
# Return healthy/unhealthy status
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
check_meminfo_mem_size¶
Brief¶
Installed memory capacity [{0:.2f}GB]
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
threshold = self.__parameters_task.getResult()['meminfo_memory_size']
except:
return
threshold = threshold * 9.537e-7
self.title(self.getTitle().format(threshold))
actual_mem = self.__collect_meminfo_task.getResult()
if not actual_mem:
self.addUnknown()
return
# check for actual mem w.r.t threshold with tolerance of 1GB
tolerance = 1
if modules.common.almost_equal(actual_mem, threshold, tolerance):
self.addHealthy()
else:
self.addCheckMessage('Amount of memory is {0:.2f} GB'.format(actual_mem))
self.addUnHealthy()
check_mlx_fw_version¶
Brief¶
Verify Mellanox devices firmware version consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
own_params = self.getParameters()
params = self.__parameters_task.getResult()
firmware_versions = 0
for param, req in own_params.items():
if param in params.keys():
firmware_versions = params[param]
if firmware_versions == 0:
return
# If mlxfwmanager run got a failure, skip parsing the output
mlnx_res = self.__run_task.getOutput()
if self.__run_task.getReturnCode() or not mlnx_res:
self.addCheckMessage('No result from mellanox firmware manager')
self.addUnknown()
return
self.addCheckMessage(config.mlx_fw_ver_cmd_str)
inconsistent_devices = {}
try:
res_dict = xmltodict.parse(mlnx_res)
for res in res_dict['Devices']['Device']:
pci = res['@pciName']
fw_ver = res['Versions']['FW']['@current']
if fw_ver not in inconsistent_devices:
inconsistent_devices[fw_ver] = []
inconsistent_devices[fw_ver].append(pci)
# Unhealthy check for multiple versions
if len(inconsistent_devices) > firmware_versions:
res = ""
for k, v in inconsistent_devices.items():
res += "PCI device: '{}' has firmware version '{}'\n".format(", ".join(v), k)
self.addCheckMessage(f"Different firmware version found on Mellanox devices\n{res}")
self.addUnHealthy()
else:
self.addHealthy()
# Unknown check
except:
self.addCheckMessage('Error while checking Mellanox devices firmware version consistency')
self.addUnknown()
check_net_link¶
Brief¶
Verify Network Interfaces Link
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
net_link = self.__parameters_task.getResult()['net_link']
if not net_link:
return
except:
return
from re import findall
from nvsmhealth.lib import DictionarySuperset
output = self.__parse_net_ifconfig_task.getOutput()['interface_names']
if output == None:
return
#~ TODO: Not all interfaces are currently up
#~ Using the ones passed by net_link
interfaces = list(net_link.keys())
health_list = [True for x in range(len(interfaces))]
j = {}
for i, device in enumerate(interfaces):
j[device] = {}
ethtool = tasks.RunCommand(args=['ethtool', '{}'.format(device)])
ethtool.run()
if(ethtool.getReturnCode()!=0):
self.addCheckMessage("Check network interface setup: {}".format(device))
health_list[i] = False
continue
regex = r'Link detected:+ ([A-Za-z0-9_.-]*)'
x = findall(regex, ethtool.getOutput())
try:
if(x[0] == 'yes'):
j[device]['link'] = True
else:
j[device]['link'] = False
except:
j[device]['link'] = False
dictionary_superset = DictionarySuperset.DictionarySuperset(
missing_message="Could not detect {}")
result = dictionary_superset.compare(j, net_link)
if (all(health_list) and result == None):
self.addHealthy()
else:
self.addCheckMessage(result)
self.addUnHealthy()
check_net_ping¶
Brief¶
Verify Network IP Reachability
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
from re import findall
try:
net_ping = self.__parameters_task.getResult()['net_ping']
if not net_ping:
return
except:
return
health_list = [True for x in range(len(net_ping))]
for i, (interface, ip) in enumerate(net_ping.items()):
run_ping_task = tasks.RunCommand(
args=['ping', '-c', str(config.check_net_ping_count), '-W', str(config.check_net_ping_timeout), ip]) \
.title('Run ping') \
.describe('''Check IP Reachability via Ping''')
try:
run_ping_task.run()
if(run_ping_task.getReturnCode()!=0):
raise Exception("Unable to ping {} at {}".format(ip, interface))
output = run_ping_task.getOutput()
except:
self.addCheckMessage("Unable to ping {} at {}".format(ip, interface))
health_list[i] = False
continue
regex = r'[0-9 ]*[a-z].+?(?=,).+?(?=,).+(.+?(?=%))'
packet_loss = findall(regex, output)
self.addCheckMessage("Checking Packet Loss on {} at {}: {}%".format(interface, ip, packet_loss[0]))
if(packet_loss[0]!='0'):
health_list[i] = False
elif(packet_loss[0]=='0'):
health_list[i] = True
if all(health_list):
self.addHealthy()
else:
self.addUnHealthy()
check_nvidia_grid_license¶
Brief¶
Drive Constellation: GRID License Status
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
try:
dcs_grid = self.__parameters_task.getResult()['dcs_grid_license']
if not dcs_grid:
return
except:
return
nvidia_smi_res = self.__parse_nvidia_smi_task.getResult()
if nvidia_smi_res == None:
return
healthy = False
try:
for gpu, info in nvidia_smi_res.items():
gpu_dict = xmltodict.parse(info)
# GRID License check on GRID Products only
# NVBug-2795033: GPU cards which can be licensed will have the `License Status` field and
# those which cannot will not have this field.
if gpu_dict['nvidia_smi_log']['gpu'].get('grid_licensed_product', None) == None:
continue
elif gpu_dict['nvidia_smi_log']['gpu']['grid_licensed_product'].get('license_status', None) == None:
continue
# NVBug:3145085 - If the product name is QVDCW and status is licensed then only show healthy
elif gpu_dict['nvidia_smi_log']['gpu']['grid_licensed_product'].get('licensed_product_name', None) == None:
continue
product_name = gpu_dict['nvidia_smi_log']['gpu']['grid_licensed_product']['licensed_product_name']
product_name = product_name.strip()
if product_name == 'Quadro Virtual Data Center Workstation':
if not gpu_dict['nvidia_smi_log']['gpu']['grid_licensed_product']['license_status'] == 'Licensed':
healthy = False
break
else:
healthy = True
if healthy == True:
self.addHealthy()
else:
self.addCheckMessage("Check GRID License: Contact Nvidia")
self.addUnHealthy()
except:
self.addCheckMessage("Error while performing GRID License Check")
self.addUnknown()
check_nvidia_smi_gpu_bus_id¶
Brief¶
Verify GPU’s identified using nvidia-smi
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
# Get parameters from own task
own_params = self.getParameters()
# Get parameters from parameter task
params = self.__parameters_task.getResult()
expected_bdfs = None
for param, req in own_params.items():
if param in params.keys():
expected_bdfs = params[param]
# If parameter is not found in platform parameters - Do Nothing
if expected_bdfs == None:
return
if modules.kvm.kvm_mode_on.getResult() == True:
self.addCheckMessage('KVM mode is on, skipping check.')
self.addInformational()
return
# Unknown check for no result from nvidia-smi gpu_bus_id parse task
if not self.__parse_task.getResult():
self.addCheckMessage(
'No result for gpu_bus_id information gathered from nvidia-smi tool')
self.addUnknown()
return
# Print/Stream Task info and command messages
self.addCheckMessage(
'Checking output of "nvidia-smi --query-gpu=gpu_bus_id --format=csv,noheader" for expected GPUs')
message = ''
healthy = True
try:
bdfs = self.__parse_task.getResult()
for gpu in expected_bdfs.keys():
if gpu not in bdfs:
message += '\nGPU not identified at PCI address "{}"'.format(
gpu)
healthy = False
if not healthy:
self.addCheckMessage(message)
self.addUnHealthy()
else:
self.addHealthy()
except:
self.addCheckMessage('Error while identifying GPUs bus_id')
self.addUnknown()
check_nvidia_smi_nvlink_status¶
Brief¶
Parse nvlink status with NVIDIA System Management Interface (nvidia-smi)
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
own_params = self.getParameters()
params = self.__parameters_task.getResult()
nvlink_speed = None
nvlink_active_count = None
for param, req in own_params.items():
if param in params.keys():
if 'nvlink_speed' in param:
nvlink_speed = params[param]
else:
nvlink_active_count = params[param]
if nvlink_speed == None:
return
# Test for older versions of nvidia-smi (via NVIDIA driver version)
# NVLink speed query requires NVIDIA driver version 384.98 or later
# Log error and return for version lesser than 384.98
nvidia_driver_version = self.__driver_version.getResult()
if nvidia_driver_version == None:
return
if not self.is_valid_version(nvidia_driver_version):
logging.error(
"ERROR: This version {} of nvidia-smi does not query NVLink speed".format(nvidia_driver_version))
logging.error(
"ERROR: NVLink speed query requires NVIDIA driver version 384.98 or later")
return
# Get GPU BDF information
nvidia_smi_res = self.__parse_nvidia_smi_task.getResult()
gpus_info = {}
try:
for gpu, info in nvidia_smi_res.items():
gpus_info[gpu] = xmltodict.parse(info)
except:
pass
# Get the nvlink status
nvlink_nvidia_smi = tasks.RunCommand(
args=['nvidia-smi', 'nvlink', '-s'])
nvlink_nvidia_smi.run()
if nvlink_nvidia_smi.getReturnCode() != 0:
return
nvlink_nvidia_smi_output = nvlink_nvidia_smi.getOutput()
if len(nvlink_nvidia_smi_output) == 0:
logging.debug(
"nvidia-smi nvlink -s return code 0 but returned empty. Check if MIG enabled.")
return
link_regex = re.compile(
"\\s*Link (?P<link_index>\\d+):\\s*(?P<link_status>.*)$")
gpu_regex = re.compile("GPU (?P<gpu_index>\\d+)")
nvlink_lines = nvlink_nvidia_smi_output.split('\n')
nvlinks = {}
for line in nvlink_lines:
gpu_match = gpu_regex.match(line)
if gpu_match:
gpu = gpu_match.group('gpu_index').strip()
nvlinks[gpu] = []
link_match = link_regex.match(line)
if link_match:
index = link_match.group('link_index').strip()
status = link_match.group('link_status').strip()
nvlinks[gpu].append({'link': index, 'status': status})
for gpu, nvlink in nvlinks.items():
try:
gpu_dict = gpus_info[int(gpu)]
bdf = gpu_dict['nvidia_smi_log']['gpu']['pci']['pci_bus_id']
active_count = nvlink_active_count
except:
bdf = ""
pass
for link in nvlink:
check_cls = NvlinkSpeed(nvlink_speed, active_count, link)
check_cls.title(config.nvlink_speed_info_str.format(
gpu, bdf, link['link'], link['status']))
check_cls.setCallback(self.getCallback())
check_cls.run()
if nvlink_active_count != None:
active_count = check_cls.get_activeCount()
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())
self.title('')
check_nvme_devices¶
Brief¶
Verify installed NVMe devices
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
import json
own_params = self.getParameters()
params = self.__parameters_task.getResult()
nvme_config = None
for param, req in own_params.items():
if param in params.keys():
nvme_config = params[param]
if not nvme_config:
return
nvsm_config = config.read_nvsm_config_file()
if nvsm_config != None:
if not nvsm_config["use_standard_config_storage"]:
return
if self.__parse_nvme_devices.getResult():
self.addCheckMessage(config.nvme_command_str)
devices = json.loads(self.__parse_nvme_devices.getResult())
if not [conf for conf in nvme_config if devices.items() == conf.items()]:
count = 0
res = 'Supported NVMe device(s) configuration:\n'
for conf in nvme_config:
for size, count in conf.items():
res += '"{}" NVMe device(s) with capacity "{}"\n'.format(
count, size)
if count < len(nvme_config):
res += 'or \n'
count += 1
res += 'Found NVMe device(s) configuration:'
for size, count in devices.items():
res += '\n"{}" NVMe device(s) with capacity "{}"'.format(
count, size)
self.addCheckMessage(res)
self.addUnHealthy()
else:
self.addHealthy()
else:
self.addCheckMessage("No results from parse nvme devices")
self.addUnknown()
check_nvme_link_info¶
Brief¶
None
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
devices = self.__parameters_task.getResult()[self.__parameter]
except:
return
for device in deepcopy(devices):
pstate_not_found = False
if self.__is_gpu_check:
bdf_pstate = self.__bdf_pstate.getResult()
try:
pstate = bdf_pstate[device['bdf']]
device['speed'] = device['speed'][pstate]
device['width'] = device['width'][pstate]
except:
# not able to find the pstate for this gpu bdf
pstate_not_found = True
kvm_mode_disabled = True
if self.__parameter== "gpu_link_info" and modules.kvm.kvm_mode_on.getResult() == True:
kvm_mode_disabled = False
for check_type in ['speed', 'width']:
check_cls = CheckLink(device['bdf'], device[check_type], check_type, self.__parse_lspci_task, self.__parameter)
if 'name' not in device:
device["name"] = ""
check_cls.setCallback(self.getCallback())
if kvm_mode_disabled:
if pstate_not_found:
device['speed'] = "None"
device['width'] = "None"
check_cls.addCheckMessage("unknown pstate for the GPU[{}]".format(device['bdf']))
check_cls.addUnknown()
else:
check_cls.run()
else:
device['speed'] = "None"
device['width'] = "None"
check_cls.addCheckMessage('KVM mode is on, skipping check.')
check_cls.addInformational()
self.addInformational()
check_cls.title(self.__title_str[check_type].format(**device).strip())
if kvm_mode_disabled:
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
self.addInformational(count=check_cls.getResult()['informational'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())
# clear message as this task doesnt print anything
self.title('')
check_nvme_smart_log¶
Brief¶
Check SMART status of NVMe devices
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
check = self.__parameters_task.getResult()['nvme_check_smart_log']
if not check:
return
except:
return
# NVBUG2794792: Toshiba/Kioxia CM5: medium error observed in SMART log when I/O sent to the locked drive
skipDriveModel = None
try:
skipDriveModel = self.__parameters_task.getResult()[
'skip_nvme_drive_model']
except:
pass
import json
# Return if nvme run throws error or no results
nvme_drive_map = {}
if not self.__nvme_list.getReturnCode() and self.__nvme_list.getOutput():
try:
nvme_stream = json.loads(nvme_list.getOutput())
devices = nvme_stream['Devices']
for device in devices:
d = device['DevicePath']
deviceName = d.split(os.sep)[-1]
nvme_drive_map[deviceName] = device['ModelNumber']
except Exception as e:
logging.debug("Error while parsing nvsm devices:{}".format(e))
pass
self.addCheckMessage(
"Checking output of 'nvme smart-log' for each NVMe device")
nvme_smart_log = self.__nvme_smart_log_task.getResult()
if not nvme_smart_log:
self.addUnknown()
return
healthy = True
for name, device in nvme_smart_log.items():
# Check for critical warnings, which indicate drive is in error state
critical_warnings = device.get('critical_warning', 0)
if critical_warnings != '0':
self.addCheckMessage('Found {0} critical warning(s) on NVMe drive "{1}".'.format(
critical_warnings, name))
healthy = False
# Check that remaining spare capacity is above the threshold
available_spare = device.get('available_spare', 1.0)
available_spare_threshold = device.get(
'available_spare_threshold', 0.1)
if available_spare < available_spare_threshold:
self.addCheckMessage('Remaining spare capacity of {remaining}% on NVMe drive "{drive}" fails to meet threshold of {threshold}%.'.format(
drive=name,
remaining=int(available_spare * 1e2),
threshold=int(available_spare_threshold * 1e2)))
healthy = False
# Check that vendor estimate of percentage used is below 90%
used = device.get('percentage_used', 0.0)
if used > 0.9:
self.addCheckMessage('Over {used}% expected life used on NVMe drive "{drive}".'.format(
used=int(used * 1e2),
drive=name))
healthy = False
# Skip media error check for locked devices
if nvme_drive_map[name] == skipDriveModel:
continue
# Check for media errors, which occur when the controller detects
# unrecovered data integrity errors
media_errors = device.get('media_errors', 0)
if media_errors != '0':
self.addCheckMessage('Found {0} media error(s) on NVMe drive "{1}".'.format(
media_errors, name))
healthy = False
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
check_psu_bom¶
Brief¶
Verify chassis power supply presence
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
from nvsmhealth.lib import DictionarySuperset
try:
psu_bom = self.__parameters_task.getResult()['psu_bom']
except:
return
output = self.__sdr_device_bom_task.getResult()
dictionary_superset = DictionarySuperset.DictionarySuperset(
missing_message="Could not detect presence of chassis power supply {}")
result = dictionary_superset.compare(output, psu_bom)
self.addCheckMessage("Checking output of 'ipmitool sdr elist' for expected chassis PSUs")
if result:
self.addCheckMessage(result)
self.addUnHealthy()
else:
healthy = True
# NVBUG-200528273: Check for Power supply lost
# Even if PSU status is ok, readings might have power supply ac lost message
psu_res = self.__parse_ipmi_sdr_elist_task.getResult()
# Filter PSU status keys for readings
psu_status_keys = [k for k in psu_bom.keys() if 'status' in k.lower()]
for s in psu_res:
if s['name'] in psu_status_keys:
reading = s['reading']
if 'power supply ac lost' in reading.lower():
self.addCheckMessage("AC input is lost, {} has reading:\n{}".format(s['name'], s['reading']))
healthy = False
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
check_psu_info¶
Brief¶
Check PSU Info (Vendor, Model) for Consistency
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
# print(self.__collect_psu_info_task.getReturnCode(), self.__collect_psu_info_task.getResult())
if self.__collect_psu_info_task.getReturnCode():
self.addCheckMessage("Unable to collect PSU (Vendor, Model) Information.")
self.addUnknown()
return
psu_info = self.__collect_psu_info_task.getResult()
for item in psu_info:
s = set()
for i in item.keys():
k = i.split("_")[0]
s.add(item[i])
if len(s) > 1:
self.addCheckMessage("Multiple PSU {}s found. {}".format(k,[i for i in s]))
self.addUnHealthy()
return
self.addHealthy()
check_smartctl_disk_count¶
Brief¶
Verify installed disks
Description¶
Verify that all of the expected disks are installed
Depends On¶
Source Code Listing¶
def run(self):
try:
exp_disk_count = self.__parameters_task.getResult()[list(self.getParameters())[0]]
except:
return
self.addCheckMessage("Checking output of 'smartctl' for expected disks")
if self.__is_raid:
disk_count = self.__get_disk_count_task.getResult()
else:
disk_count = self.__get_disk_count_task.getDisksBySize()
if not disk_count:
self.addCheckMessage("No disk(s) found")
self.addUnknown()
return
dictionary_superset = DictionarySuperset.DictionarySuperset(
missing_message="No disks of capacity '{}' were found",
changed_message="Disks of capacity '{}' were found '{}' when '{}' disk(s) were expected")
result = dictionary_superset.compare(disk_count, exp_disk_count)
if result == None:
# Healthy check - No diffs found
self.addHealthy()
else:
# UnHealthy check - Print/Stream diffs found
self.addUnHealthy()
self.addCheckMessage(result)
check_smartctl_megaraid_disk_count¶
Brief¶
Verify installed MegaRAID disks
Description¶
Count the disks attached to the MegaRAID controller using the smartctl command
Depends On¶
Source Code Listing¶
def run(self):
try:
exp_disk_count = self.__parameters_task.getResult()[list(self.getParameters())[0]]
except:
return
self.addCheckMessage("Checking output of 'smartctl' for expected disks")
if self.__is_raid:
disk_count = self.__get_disk_count_task.getResult()
else:
disk_count = self.__get_disk_count_task.getDisksBySize()
if not disk_count:
self.addCheckMessage("No disk(s) found")
self.addUnknown()
return
dictionary_superset = DictionarySuperset.DictionarySuperset(
missing_message="No disks of capacity '{}' were found",
changed_message="Disks of capacity '{}' were found '{}' when '{}' disk(s) were expected")
result = dictionary_superset.compare(disk_count, exp_disk_count)
if result == None:
# Healthy check - No diffs found
self.addHealthy()
else:
# UnHealthy check - Print/Stream diffs found
self.addUnHealthy()
self.addCheckMessage(result)
check_smartctl_ssd_brick¶
Brief¶
Check for SSD health
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
check = self.__parameters_task.getResult()['smartctl_check_ssd_brick']
if not check:
return
except:
return
errormod_pattern = re.compile(r'ERRORMOD', flags=re.IGNORECASE)
# Look for disks with bricked firmware conditions
brick = False
try:
disk_bom = self.__smartctl_info_task.getBomList()
if disk_bom is None:
return
for key, value in disk_bom.items():
# Check for "ERRORMOD" in firmware version
if not 'firmware_version' in value:
continue
firmware_version = value['firmware_version']
m = errormod_pattern.match(firmware_version)
if not m:
continue
# NOTE: It is likely that the disk capacity is also incorrect, but
# we do not check for this here. Disk capacity is checked elsewhere
# as part of the "smartctl-disk-count" check.
brick = key
break
except Exception as e:
self.addCheckMessage(str(e))
self.addUnknown()
return
# Print details message
if brick:
self.addUnHealthy()
self.addCheckMessage('Possible firmware bug on disk "{0}"'.format(brick))
else:
self.addHealthy()
self.addCheckMessage('No disks with firmware bug found')
check_storcli_disk_state¶
Brief¶
None
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
if self.__run_command.getReturnCode():
# command failed to execute
return
try:
devices = self.__parameters_task.getResult()['storcli_disk_stats']
except:
return
output = json.loads(self.__run_command.getOutput().strip())
for name, key, idx, exp_val in devices:
check_cls = CheckDisk(output, name, key, idx, exp_val)
check_cls.setCallback(self.getCallback())
check_cls.run()
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())
# clear message as this task doesnt print anything
self.title('')
check_storcli_phy_links¶
Brief¶
None
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
if self.__run_command.getReturnCode():
# command failed to execute
return
try:
devices = self.__parameters_task.getResult()['storcli_phy_links']
except:
return
output = json.loads(self.__run_command.getOutput().strip())
for check in ["LINK_SPEED", "SAS_ADDRESS", "SAS_PORT"]:
for device in devices:
check_cls = CheckPhy(output, device, check)
check_cls.setCallback(self.getCallback())
check_cls.run()
self.addHealthy(count=check_cls.getResult()['healthy'])
self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
self.addUnknown(count=check_cls.getResult()['unknown'])
check_cls.sendComplete()
super().addMessages(check_cls.getMessages())
# clear message as this task doesnt print anything
self.title('')
check_storcli_sanity_installed¶
Brief¶
[sanity] MegaRAID storcli utility installed
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
storcli_platform_string = self.__parameters_task.getResult()['storcli_platform_string']
except:
# paramter not found
return
if self.__run_command.getReturnCode():
self.addUnHealthy()
self.addCheckMessage('The storcli utility does not appear to be installed')
self.addCheckMessage('Please ensure storcli64 is installed in the /opt/MegaRAID/storcli/ directory')
else:
self.addHealthy()
check_storcli_sanity_supported¶
Brief¶
[sanity] {} BaseOS support for storcli utility
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
storcli_platform_string = self.__parameters_task.getResult()['storcli_platform_string']
except:
# paramter not found
return
self.title(self.getTitle().format(storcli_platform_string))
baseos_version = self.__base_os_version_task.getResult()
if not baseos_version or 'sw_version' not in baseos_version:
self.addCheckMessage("Error checking {} BaseOS version".format( storcli_platform_string))
self.addUnknown()
return
# Check DGX BaseOS version for storcli support
message = 'Installed {} BaseOS version "{}" '.format(storcli_platform_string, baseos_version)
if Version(baseos_version['sw_version']) >= Version('3.1.6'):
# DGX BaseOS 3.1.6 introduces support for the storcli64 utility
message += 'should support storcli'
self.addHealthy()
else:
message += 'does not support storcli'
self.addUnHealthy()
self.addCheckMessage(message)
check_superuser_privileges¶
Brief¶
Check for superuser privileges
Description¶
This checks that NVSM Health is running with an effective user ID of 0, which indicates superuser or “root” privileges. Many NVSM Health checks require superuser privileges in order to run certain privileged commands or access privileged log files.
Used By¶
- check_blacklist_recommendations
- check_instant_blacklist_recommendations
- collect_fru
- collect_nvme_smart_log
- collect_psu_info
- dcc_passgen
- dcv_bmc_run_ipmi_info
- dcv_run_ipmi_fru
- dcv_run_ipmi_getenables
- dcv_run_ipmi_sdr_elist
- dcv_run_ipmi_sensor
- nvme_list
- parse_smartctl_device_info
- parse_smartctl_system_disk_info
- run_bmc_boot_slot_task
- run_cec_boot_status
- run_cec_version
- run_dmidecode
- run_dmidecode_memory
- run_gpu_monitor_status
- run_ipmi_info
- run_ipmi_sdr_elist
- run_ipmi_sensor
- run_ipmitool
- run_mdadm_detail
- run_mdadm_examine
- run_psu0_fw_version
- run_psu0_model
- run_psu0_serial_number
- run_psu0_vendor
- run_psu1_fw_version
- run_psu1_model
- run_psu1_serial_number
- run_psu1_vendor
- run_smartctl_scan
- run_storcli_pall
- run_storcli_vall
- run_xl_info
- show_dcs_psu_info
Source Code Listing¶
def run(self):
import os
# TODO: How should classes implementing ICheck communicate
# health check results?
if os.geteuid() != 0:
pass
check_xenserver_logical_core_count¶
Brief¶
Number of logical CPU cores [{0}]
Description¶
None
Depends On¶
Source Code Listing¶
def run(self):
try:
expected_core_count = modules.parameter.parameters.getResult()['xenserver_number_of_cores']
except:
return
xlinfo_result = parse_xl_info.getResult()
if not xlinfo_result:
self.addUnknown()
return
observed_core_count = xlinfo_result['nr_cpus']
self.title(self.getTitle().format(observed_core_count))
if observed_core_count == expected_core_count:
self.addCheckMessage('Observed {0} logical CPU cores, matching expectations'.format(
observed_core_count))
self.addHealthy()
return
if observed_core_count * 2 == expected_core_count:
# When only half of the expected logical cores are observed, we
# suspect hyperthreading might be disabled
# Look for the hyperthreading flag
hyperthreading_enabled = xlinfo_result['threads_per_core'] == 2
if not hyperthreading_enabled:
self.addCheckMessage('It appears that Hyper-Threading is disabled.' \
' Some customers choose to disable Hyper-Threading in' \
' order to improve the performance of certain' \
' workloads. If Hyper-Threading was intentionally' \
' disabled, please ignore this message.')
self.addUnHealthy()
return
self.addCheckMessage('Observed {0} logical CPU cores when {1} cores were expected'.format(
observed_core_count, expected_core_count))
self.addUnHealthy()
dcv_check_fan_bom¶
Brief¶
Drive Constellation: Verify chassis fan presence for DCC
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
from nvsmhealth.lib import DictionarySuperset
try:
dcv_fan_bom = self.__parameters_task.getResult()['dcv_fan_bom']
except:
return
output = self.__dcv_sdr_device_bom_task.getResult()
dictionary_superset = DictionarySuperset.DictionarySuperset(
missing_message="Could not detect presence of DCC chassis fan: {}")
result = dictionary_superset.compare(output, dcv_fan_bom)
self.addCheckMessage("Checking output of 'ipmitool sdr elist' for expected chassis fans on DCC")
if result:
self.addCheckMessage(result)
self.addUnHealthy()
else:
self.addHealthy()
dcv_check_fru_consistency¶
Brief¶
Drive Constellation: Check FRU information for consistency
Description¶
The FRU (field replaceable unit) information recorded in the BMC (baseboard management controller) includes serial numbers for various FRUs on the system. For any given system, these serial numbers should be consistent among all FRUs. However, it is possible for these serial numbers to become inconsistent as the result of normal maintenance (such as FRU replacement). This check makes sure serial numbers are consistent for all FRUs recorded in the DCC BMC.
Module¶
Depends On¶
Source Code Listing¶
def run(self):
# TODO:[Kenzen-499] Check if FRU in self.__fru_task is consistent and report health using ICheck interface
# TODO:Similar approach needs to be used by DCC BMC
pass
dcv_check_ipmi_sensor_thresholds¶
Brief¶
Drive Constellation: Check DCC BMC sensor thresholds
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
try:
dcs_dcv_sensor_threshold = self.__parameters_task.getResult()['dcs_dcv_sensor_threshold']
if not dcs_dcv_sensor_threshold:
return
except:
return
threshold_check_dispatch = {
'lower_non_recoverable': lambda observed, threshold: threshold < observed,
'lower_critical': lambda observed, threshold: threshold < observed,
'upper_non_recoverable': lambda observed, threshold: observed < threshold,
'upper_critical': lambda observed, threshold: observed < threshold
}
threshold_display = {
'lower_non_recoverable': '{name}: Observed value "{observed}" ({units}) below non-recoverable lower threshold "{threshold}"',
'lower_critical': '{name}: Observed value "{observed}" ({units}) below critical lower threshold "{threshold}"',
'upper_non_recoverable': '{name}: Observed value "{observed}" ({units}) above non-recoverable upper threshold "{threshold}"',
'upper_critical': '{name}: Observed value "{observed}" ({units}) above critical upper threshold "{threshold}"'
}
# Look for any sensor values that fall outside of critical thresholds
healthy = True
try:
sensors = self.__dcv_parse_ipmi_sensor_task.getResult()
for sensor in sensors:
name = sensor['name']
observed = sensor['current_reading']
try:
observed = float(observed)
except:
continue
units = sensor['type']
if units.lower() == 'discrete':
continue
for field in [
'lower_non_recoverable',
'lower_critical',
'upper_non_recoverable',
'upper_critical' ]:
threshold = sensor.get(field)
try:
threshold = float(threshold)
except:
continue
check = threshold_check_dispatch[field]
if check(observed, threshold):
continue # Observed value is within threshold
healthy = False
display = threshold_display[field]
self.addCheckMessage(display.format(
name=name,
observed=observed,
units=units,
threshold=threshold))
self.addCheckMessage('Checked {count} sensor values against DCC BMC thresholds.'.format(
count=len(sensors)))
except:
self.addUnknown()
if healthy:
self.addHealthy()
else:
self.addUnHealthy()
dcv_check_psu_bom¶
Brief¶
Drive Constellation: Verify chassis power supply presence on DCC
Description¶
None
Module¶
Depends On¶
Source Code Listing¶
def run(self):
from nvsmhealth.lib import DictionarySuperset
try:
dcv_psu_bom = self.__parameters_task.getResult()['dcv_psu_bom']
except:
return
output = self.__dcv_sdr_device_bom_task.getResult()
dictionary_superset = DictionarySuperset.DictionarySuperset(
missing_message="Could not detect presence of DCC chassis power supply: {}")
result = dictionary_superset.compare(output, dcv_psu_bom)
self.addCheckMessage("Checking output of 'ipmitool sdr elist' for expected chassis PSUs on DCC")
if result:
self.addCheckMessage(result)
self.addUnHealthy()
else:
healthy = True
# NVBUG-200554527: Check for Power supply lost for DCC
# Even if PSU status is ok, readings might have power supply ac lost message
psu_res = self.__dcv_parse_ipmi_sdr_elist_task.getResult()
# Filter PSU status keys for readings
psu_status_keys = [k for k in dcv_psu_bom.keys() if 'status' in k.lower()]
for s in psu_res:
if s['name'] in psu_status_keys:
reading = s['reading']
if 'power supply ac lost' in reading.lower():
self.addCheckMessage("AC input is lost, {} has reading:\n{}".format(s['name'], s['reading']))
healthy = False
if healthy:
self.addHealthy()
else:
self.addUnHealthy()