NVSM Health

Health Check Details

Health Check Details

check_blacklist_recommendations

Brief

Check DCGM for GPU blacklist recommendations

Description

None

Source Code Listing

    def run(self):

        own_params = self.getParameters()
        params = self.__parameters_task.getResult()

        check_blacklist_recommendations = False
        for param, req in own_params.items():
            if param in params.keys():
                check_blacklist_recommendations = params[param]

        # Return if parameter is not defined
        if not check_blacklist_recommendations:
            return

        # check if kvm mode is on
        if modules.kvm.kvm_mode_on.getResult() == True:
            self.addCheckMessage('KVM mode is on, skipping blacklist recommendations check.')
            self.addInformational()
            return

        # Run the blacklist_recommendations task
        args = ['./modules/blacklist_recommendations/_gpu_blacklist_recommendations', \
            '--detect', '--watches']
        args.extend(check_blacklist_recommendations)
        collect_task = tasks.RunCommand(args=args, timeout=1000)
        collect_task.run()

        # For DCGM failures, GPU blacklist recommendations can
        # exit with returncode 1, handle it gracefully
        # Return, for no respone or exitcode > 1
        if (collect_task.getReturnCode() > 1) or not collect_task.getOutput():
            self.addCheckMessage('No response or error while running GPU blacklist recommendations: {}'.format(
                                    collect_task.getError()))
            self.addUnknown()
            return

        healthy = True

        try:
            result = json.loads(collect_task.getOutput())
            blacklist = result.get('blacklistedGpus', {})
            # Check for GPU/NVSwitch blacklist recommendations
            if len(blacklist) > 0:
                healthy = False
                self.addCheckMessage('Found {count} device(s) recommended for blacklist:'.format(
                    count=len(blacklist)))
            else:
                self.addCheckMessage('No devices found recommended for blacklist.')

            for entity_id in sorted(blacklist.keys()):
                details = blacklist[entity_id]
                device_uuid = details.get('UUID')
                device_bdf = details.get('BDF')
                failure_explanation = details.get('Failure Explanation')
                self.addCheckMessage('\t"GPU{entity_id}":\n' \
                                     '\t"BDF": "{device_bdf}"\n' \
                                     '\t"UUID": "{device_uuid}"\n' \
                                     '\t"Failure Explanation": {failure_explanation}'.format(
                                     entity_id=entity_id,
                                     device_bdf=device_bdf,
                                     device_uuid=device_uuid,
                                     failure_explanation=failure_explanation))
            # Check for other errors in blacklist recommendation script
            error_list = result.get('errors', [])
            if error_list:
                nv_hostengine_running = True
                self.addCheckMessage('Errors encountered:')
                for e in error_list:
                    if 'host engine is not valid any longer' in e:
                        nv_hostengine_running = False
                    self.addCheckMessage('\t{}'.format(e))

                # If nv-hostengine is not running return as unknown
                if not nv_hostengine_running:
                    self.addUnknown()
                    return

                healthy = False

        except Exception as e:
            self.addCheckMessage('Error while parsing GPU blacklist recommendations: {}'.format(e.message))
            self.addUnknown()
            return

        # make sure SBE page pending retirements are caught as informational,
        # as the blacklist_recommendations script ignores them as warnings
        if healthy:
            nvidia_smi_res = modules.nvidia_smi.parse_nvidia_smi.getResult()
            if nvidia_smi_res:
                for gpu, info in nvidia_smi_res.items():
                    gpu_dict = xmltodict.parse(info)
                    check_cls = modules.nvidia_smi.GpuCheckRetiredPagesPending(gpu, gpu_dict)
                    check_cls.setCallback(self.getCallback())
                    check_cls.run()
                    bad_count = check_cls.getResult()['unhealthy'] + check_cls.getResult()['unknown']
                    if bad_count:
                        healthy = False
                        self.addCheckMessage(check_cls.getTitle() )
                if not healthy:
                    self.addCheckMessage(config.gpu_total_retired_pages_pending_error)
                    self.addInformational()
                    return

        if healthy:
            self.addHealthy()
        else:
            self.addUnHealthy()

check_bom_dimms

Brief

Check Memory DIMMs devices information for consistency

Description

None

Module

bom

Source Code Listing

    def run(self):
        generic_bom_check(self, self.__parameters_task, self.__parse_dmidecode_task, \
            config.dimms_info_str, config.dimms_command_str)

check_bom_disk_controllers

Brief

Check Disk Controllers PCIe devices information for consistency

Description

None

Module

bom

Source Code Listing

    def run(self):
        generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
            config.disk_controllers_info_str, config.disk_controllers_command_str, \
            config.disk_controllers_pci_device_missing_str, config.pci_device_changed_str)

check_bom_ethernet_controllers

Brief

Check Ethernet Controllers PCIe devices information for consistency

Description

None

Module

bom

Source Code Listing

    def run(self):
        # Get parameters from own task
        own_params = self.getParameters()

        # Get parameters from parameter task
        params = self.__parameters_task.getResult()

        bom_config = None
        for param, req in own_params.items():
            if param in params.keys():
               bom_config = params[param]
        # If parameter is not found in platform parameters - Do Nothing
        if bom_config == None:
            return
        # Print/Stream Task info and command messages
        self.title(config.ethernet_controllers_info_str)
        self.addCheckMessage(config.ethernet_controllers_command_str)

        if self.__parse_lspci_task.getResult() != None:
            # Compare task output with expected config
            res = self.__parse_lspci_task.getResult()
            if type(res) is dict:
                out_dict = res
            else:
                out_dict = json.loads(res)

            # dictionary compare
            ddiff = DeepDiff(out_dict, bom_config)

            message = ''
            result = 'Healthy'
            if any(key in ddiff for key in ['dictionary_item_added',
                                            'values_changed']):
                if 'dictionary_item_added' in ddiff:
                    for item in ddiff['dictionary_item_added']:
                        key = re.findall('\[\'(.*?)\'\]', item)
                        message += '\n'
                        message += config.ethernet_controllers_pci_device_missing_str.format(' -> '.join(key))
                    result = 'UnHealthy'

                if 'values_changed' in ddiff:
                    for key, value in ddiff['values_changed'].items():
                        key = re.findall('\[\'(.*?)\'\]', key)
                        message += '\n'
                        # Best effort to add additional_message_key information
                        try:
                            message += 'For {} with '.format(ref_dict[key[0]]['device'])
                        except:
                            pass
                        message += config.pci_device_changed_str.format(
                                ' -> '.join(key),
                                value['old_value'],
                                value['new_value'])
                    if result == 'Healthy':
                        result = 'Informational'
            self.addCheckMessage(message)

            if result == 'Healthy':
                # Healthy check - No diffs found in bill-of-materials
                self.addHealthy()
            elif result == 'UnHealthy':
                # UnHealthy check - Print/Stream diffs found in bill-of-materials
                self.addUnHealthy()
            elif result == 'Informational':
                # Informational status - print change in config
                self.addInformational()
            else:
                self.addUnknown()

check_bom_gpus

Brief

Check GPUs PCIe devices information for consistency

Description

None

Module

bom

Source Code Listing

    def run(self):
        generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
            config.gpus_info_str, config.gpus_command_str, \
            config.gpus_pci_device_missing_str, config.pci_device_changed_str)

check_bom_ib_controllers

Brief

Check Infiband controllers PCIe devices information for consistency

Description

None

Module

bom

Source Code Listing

    def run(self):
        generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
            config.ib_controllers_info_str, config.ib_controllers_command_str, \
            config.ib_controllers_pci_device_missing_str, config.pci_device_changed_str)

check_bom_nvswitch

Brief

Check NVSwitch controller PCIe devices information for consistency

Description

None

Module

bom

Source Code Listing

    def run(self):
        generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
            config.nvswitch_info_str, config.nvswitch_command_str, \
            config.nvswitch_pci_device_missing_str, config.pci_device_changed_str)

check_bom_pcie_switches

Brief

Check PCIe Switches PCIe devices information for consistency

Description

None

Module

bom

Source Code Listing

    def run(self):
        generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
            config.pcie_switches_info_str, config.pcie_switches_command_str, \
            config.pcie_switches_pci_device_missing_str, config.pci_device_changed_str)

check_bom_vgas

Brief

Check VGA Controller PCIe devices information for consistency

Description

None

Module

bom

Source Code Listing

    def run(self):
        generic_bom_check(self, self.__parameters_task, self.__parse_lspci_task, \
            config.vgas_info_str, config.vgas_command_str, \
            config.vgas_pci_device_missing_str, config.pci_device_changed_str)

check_dcc_can_health

Brief

Drive Constellation: Check DCC CAN Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        json_output = self.__run_dcc_hardware_health.getOutput()
        if json_output != None:
            try:
                dcc_can_health = json_output['dcc_can_health']
                if dcc_can_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_can_health['test_information'])

            except Exception as e:
                logging.debug("Error fetching can health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_can_reachability

Brief

Drive Constellation: Check DCC CAN reachability Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):

        json_output = self.__run_check_application_config_health.getOutput()
        if json_output != None:
            try:
                dcc_can_reachability = json_output['can_reachability']
                if dcc_can_reachability['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_can_reachability['test_information'])
                    #self.__output(dcc_display_configuration['test_information'])

            except Exception as e:
                logging.debug("Error fetching DCC CAN health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_display_configuration

Brief

Drive Constellation: Check DCC Display Configuration Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        #self.addCheckMessage("Checking Application Config health")
        #import pdb
        #pdb.set_trace()
        json_output = self.__run_check_application_config_health.getOutput()
        if json_output != None:
            try:
                dcc_display_configuration = json_output['display_configuration']
                if dcc_display_configuration['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_display_configuration['test_information'])
                    #self.__output(dcc_display_configuration['test_information'])

            except Exception as e:
                logging.debug("Error fetching DCC Display health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_display_synchronization

Brief

Drive Constellation: Check DCC Display Synchronization Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        json_output = self.__run_check_application_config_health.getOutput()
        if json_output != None:
            try:
                dcc_display_synchronization = json_output['display_synchronization']
                if dcc_display_synchronization['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_display_synchronization['test_information'])
                    #self.__output(dcc_display_configuration['test_information'])

            except Exception as e:
                print("Error fetching ethernet health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_ecu_tegraA_health

Brief

Drive Constellation: Check DCC ECU TegraA Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        self.addCheckMessage("Checking DCC ECU TegraA Hardware health")
        #import pdb
        #pdb.set_trace()
        json_output = self.__run_dcc_hardware_health.getOutput()
        if json_output != None:
            try:
                dcc_tegraA_health = json_output['tegraA_health']
                if dcc_tegraA_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_tegraA_health['test_information'])

            except Exception as e:
                print("Error fetching ecu TegraA health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_ecu_tegraA_storage_health

Brief

Drive Constellation: Check DCC ECU TegraA Storage Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        self.addCheckMessage("Checking DCC ECU TegraA Storage health")
        #import pdb
        #pdb.set_trace()
        json_output = self.__run_dcc_ecu_application_health.getOutput()
        if json_output != None:
            try:
                dcc_tegraB_health = json_output['tegraA_health']
                if dcc_tegraB_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                self.addCheckMessage(dcc_tegraB_health['test_information'])

            except Exception as e:
                print("Error fetching ecu TegraA health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_ecu_tegraB_health

Brief

Drive Constellation: Check DCC ECU TegraB Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        self.addCheckMessage("Checking DCC ECU TegraB Hardware health")
        #import pdb
        #pdb.set_trace()
        json_output = self.__run_dcc_hardware_health.getOutput()
        if json_output != None:
            try:
                dcc_tegraB_health = json_output['tegraB_health']
                if dcc_tegraB_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_tegraB_health['test_information'])

            except Exception as e:
                print("Error fetching ecu TegraB health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_ecu_tegraB_storage_health

Brief

Drive Constellation: Check DCC ECU TegraB Storage Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        self.addCheckMessage("Checking DCC ECU TegraB Storage health")
        #import pdb
        #pdb.set_trace()
        json_output = self.__run_dcc_ecu_application_health.getOutput()
        if json_output != None:
            try:
                dcc_tegraB_health = json_output['tegraB_health']
                if dcc_tegraB_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                self.addCheckMessage(dcc_tegraB_health['test_information'])

            except Exception as e:
                print("Error fetching ecu TegraB health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_ethernet_health

Brief

Drive Constellation: Check DCC Ethernet Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        self.addCheckMessage("Checking DCS Hardware health")
        #import pdb
        #pdb.set_trace()
        json_output = self.__run_dcc_hardware_health.getOutput()
        if json_output != None:
            try:
                dcc_ethernet_health = json_output['dcc_ethernet_health']
                if dcc_ethernet_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_ethernet_health['test_information'])

            except Exception as e:
                logging.debug("Error fetching ethernet health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_fan_health

Brief

Drive Constellation: Check DCC Fan Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        json_output = self.__run_dcc_hardware_health.getOutput()
        if json_output != None:
            try:
                dcc_fan_health = json_output['dcc_fan_health']
                if dcc_fan_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_fan_health['test_information'])
                    return
            except Exception as e:
                logging.debug("Error fetching ethernet health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_gpu_health

Brief

Drive Constellation: Check DCC GPU Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        json_output = self.__run_dcc_hardware_health.getOutput()
        if json_output != None:
            try:
                dcc_gpu_health = json_output['dcc_gpu_health']
                if dcc_gpu_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_gpu_health['test_information'])

            except Exception as e:
                logging.debug("Error fetching gpu health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_info

Brief

Drive Constellation: Get DCC Info

Description

None

Module

dcs_modules

Source Code Listing

    def run(self):
        json_output = self.__run_dcc_health_api_task.getOutput()
        if json_output != None:
            try:
                dcc_info = json_output['dcc_info']
                self.addHealthy()
                for key, value in dcc_info.items():
                    key = key[0:].replace('_', ' ')
                    key = key[0:].title()
                    self.send("{:20} : {:20}".format(key, value))
            except Exception as e:
                self.addUnHealthy()
                logging.debug("Error fetching dcc info: {}".format(e))
                self.__output = None 
                #self.addUnknown()
                return

check_dcc_network_reachability

Brief

Drive Constellation: Check DCC Network Reachability

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        json_output = self.__run_check_application_config_health.getOutput()
        if json_output != None:
            try:
                dcc_network_reachability = json_output['network_reachability']
                if dcc_network_reachability['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_network_reachability['test_information'])
                    #self.__output(dcc_display_configuration['test_information'])

            except Exception as e:
                logging.debug("Error fetching network reachability info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_serializer_configuration

Brief

Drive Constellation: Check DCC Serializer Configuration Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        json_output = self.__run_check_application_config_health.getOutput()
        if json_output != None:
            try:
                dcc_serializer_configuration = json_output['serializer_configuration']
                if dcc_serializer_configuration['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_serializer_configuration['test_information'])
                    #self.__output(dcc_display_configuration['test_information'])

            except Exception as e:
                logging.debug("Error fetching serializer_configuration info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_usb_health

Brief

Drive Constellation: Check DCC USB Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):
        json_output = self.__run_dcc_hardware_health.getOutput()
        if json_output != None:
            try:
                dcc_usb_health = json_output['dcc_usb_health']
                if dcc_usb_health['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_usb_health['test_information'])

            except Exception as e:
                logging.debug("Error fetching usb health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcc_usb_reachability

Brief

Drive Constellation: Check DCC USB reachability Health

Description

None

Module

dcs_modules

Source Code Listing

     def run(self):

        json_output = self.__run_check_application_config_health.getOutput()
        if json_output != None:
            try:
                dcc_usb_reachability = json_output['usb_reachability']
                if dcc_usb_reachability['test_result']  == "Healthy":
                    self.addHealthy()
                else:
                    self.addUnHealthy()
                    self.addCheckMessage(dcc_usb_reachability['test_information'])
                    #self.__output(dcc_display_configuration['test_information'])

            except Exception as e:
                logging.debug("Error fetching DCC USB reachability health info: {}".format(e))
                self.addUnknown()
                return
        self.addHealthy()

check_dcs_psu_info

Brief

Drive Constellation: Check DCC PSU Info

Description

None

Module

dcs_modules

Used By

Source Code Listing

    def run(self):
        Health = True
        try:
            dcs_psu_attrib_values = self.__parameters_task.getResult()['dcs_psu_attrib_values']
        except:
            # Could not get list of valid values
            self.addCheckMessage("Could not get list of valid PSU values")
            self.addUnknown()
            return

        # Get the parsed results
        dcs_psu_results = {}
        dcs_psu_results['PSU-0'] = self.__parse_psu0_task.getOutput()
        dcs_psu_results['PSU-1'] = self.__parse_psu1_task.getOutput()

        # Check PSU Vendor
        Msg = self.check_psu_attrib('Vendor', dcs_psu_attrib_values, dcs_psu_results)
        if Msg != '':
            self.addCheckMessage(Msg)
            Health = False

        # Check PSU Model
        Msg = self.check_psu_attrib('Model', dcs_psu_attrib_values, dcs_psu_results)
        if Msg != '':
            self.addCheckMessage(Msg)
            Health = False

        if Health == True:
            self.addHealthy()
        elif Health == False:
            self.addUnHealthy()
        else:
            self.addUnknown()

check_dimm_part_number

Brief

Verify DIMM part number

Description

None

Module

dimm

Source Code Listing

    def run(self):
        # Return if parameter is not defined
        try:
            dimm_boms = self.__parameters_task.getResult()['dimm_bom']
            part_number = self.__parameters_task.getResult()['dimm_part_number']
        except:
            return

        # Unknown check for no result from dmidecode
        res = self.__dimm_task.getResult()
        if not res:
            self.addCheckMessage('No result from parse dmidecode output')
            self.addUnknown()
            return

        healthy = True
        for dimm in dimm_boms:
            if dimm in res.keys():
                if 'part_number' in res[dimm].keys():
                    if res[dimm]['part_number'].strip() not in part_number:
                        self.addCheckMessage('Mismatch in DIMM "{}" part number, expected is "{}" found is "{}"'.format(dimm, " or ".join(part_number), res[dimm]['part_number']))
                        healthy = False
                else:
                    self.addCheckMessage('DIMM "{}" part number not found'.format(dimm))
                    healthy = False
            else:
                # Must be caught on checking DIMMs
                pass

        if healthy:
            self.addHealthy()
        else:
            self.addUnHealthy()

check_dimm_vendors

Brief

Verify DIMM vendors

Description

None

Module

dimm

Source Code Listing

    def run(self):

        try:
            dimm_vendors = self.__parameters_task.getResult()['dmidecode_dimm_vendors']
        except:
            return

        out = self.__get_dimm_vendors_task.getResult()
        if not out:
            self.addCheckMessage('ERROR: Could not parse dmidecode output')
            self.addUnknown()
            return

        healthy = True
        for dimm in out:
            if dimm not in dimm_vendors:
                self.addCheckMessage('Unknown DIMM vendor "{value}"'.format(value=dimm))
                healthy = False
            else:
                # Found the expected dimm vendor
                pass

        if healthy:
            self.addHealthy()
        else:
            self.addUnHealthy()

check_ecu_info

Brief

Drive Constellation: Get ECU Info

Description

None

Module

dcs_modules

Source Code Listing

    def run(self):
        json_output = self.__run_dcc_health_api_task.getOutput()
        if json_output != None:
            try:
                ecu_info = json_output['ecu_info']
                self.addHealthy()
                for key, value in ecu_info.items():
                    key = key[0:].replace('_', ' ')
                    key = key[0:].title()
                    self.send("{:20} : {:20}".format(key, value))
            except Exception as e:
                self.addUnHealthy()
                logging.debug("Error fetching dcc info: {}".format(e))
                self.__output = None
                #self.addUnknown()
                return

check_ethernet_controller_info

Brief

None

Description

None

Module

bom

Source Code Listing

    def run(self):
        try:
            devices = self.__parameters_task.getResult()[self.__parameter]
        except:
            return
        for device in deepcopy(devices):
            pstate_not_found = False
            if self.__is_gpu_check:
                bdf_pstate = self.__bdf_pstate.getResult()
                try:
                    pstate = bdf_pstate[device['bdf']]
                    device['speed'] = device['speed'][pstate]
                    device['width'] = device['width'][pstate]
                except:
                    # not able to find the pstate for this gpu bdf
                    pstate_not_found = True
            kvm_mode_disabled = True
            if self.__parameter== "gpu_link_info"  and modules.kvm.kvm_mode_on.getResult() == True:
                kvm_mode_disabled = False
            for check_type in ['speed', 'width']:

                check_cls = CheckLink(device['bdf'], device[check_type], check_type, self.__parse_lspci_task, self.__parameter)
                if 'name' not in device:
                    device["name"] = ""
                check_cls.setCallback(self.getCallback())
                if kvm_mode_disabled:
                    if pstate_not_found:
                        device['speed'] = "None"
                        device['width'] = "None"
                        check_cls.addCheckMessage("unknown pstate for the GPU[{}]".format(device['bdf']))
                        check_cls.addUnknown()
                    else:
                        check_cls.run()
                else:
                    device['speed'] = "None"
                    device['width'] = "None"
                    check_cls.addCheckMessage('KVM mode is on, skipping check.')
                    check_cls.addInformational()
                    self.addInformational()

                check_cls.title(self.__title_str[check_type].format(**device).strip())
                if kvm_mode_disabled:
                    self.addHealthy(count=check_cls.getResult()['healthy'])
                    self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
                    self.addUnknown(count=check_cls.getResult()['unknown'])
                    self.addInformational(count=check_cls.getResult()['informational'])
                check_cls.sendComplete()
                super().addMessages(check_cls.getMessages())

        # clear message as this task doesnt print anything
        self.title('')

check_fan_bom

Brief

Verify chassis fan presence

Description

None

Module

ipmitool

Source Code Listing

    def run(self):
        from nvsmhealth.lib import DictionarySuperset
        try:
            fan_bom = self.__parameters_task.getResult()['fan_bom']
        except:
            return

        output = self.__sdr_device_bom_task.getResult()
        dictionary_superset = DictionarySuperset.DictionarySuperset(
                    missing_message="Could not detect presence of chassis fan {}")
        result = dictionary_superset.compare(output, fan_bom)
        self.addCheckMessage("Checking output of 'ipmitool sdr elist' for expected chassis fans")
        if result:
            self.addCheckMessage(result)
            self.addUnHealthy()
        else:
            self.addHealthy()

check_fru_consistency

Brief

Check FRU information for consistency

Description

The FRU (field replaceable unit) information recorded in the BMC (baseboard management controller) includes serial numbers for various FRUs on the system. For any given system, these serial numbers should be consistent among all FRUs. However, it is possible for these serial numbers to become inconsistent as the result of normal maintenance (such as FRU replacement). This check makes sure serial numbers are consistent for all FRUs recorded in the BMC.

Module

ipmitool

Source Code Listing

    def run(self):
        own_params = self.getParameters()
        params = self.__parameters_task.getResult()

        fru_devices = None
        for param, req in own_params.items():
            if param in params.keys():
                fru_devices = params[param]

        if fru_devices == None:
            return

        # Unknown check for parse tasks failure
        if not (self.__fru_task.getResult() and self.__dmidecode_task.getResult()):
            self.addCheckMessage("No results from 'ipmitool fru print' or 'dmidecode' commands")
            self.addUnknown()
            return

        result = "healthy"
        self.addCheckMessage(config.fru_command_str)

        try:
            fru_res = self.__fru_task.getResult()
            # Check for FRU devices
            devices_not_found = [device for device in fru_devices if device not in fru_res.keys()]
            if devices_not_found:
                self.addCheckMessage("FRU devices not found '{}'".format(", ".join(devices_not_found)))
                if len(devices_not_found) == len(fru_devices):
                    self.addUnHealthy()
                    return
                result = "unhealthy"

            devices_found = [device for device in fru_devices if device not in devices_not_found]
            # Check for FRU devices chassis serial number
            chassis_serial_not_found = [device for device in devices_found if 'chassis_serial' not in fru_res[device].keys()]
            if chassis_serial_not_found:
                self.addCheckMessage("Chassis serial number not found for FRU devices '{}'".format(", ".join(chassis_serial_not_found)))
                result = "unhealthy"

            chassis_serial_found = [device for device in devices_found if device not in chassis_serial_not_found]

            # Get expected serial number
            dmidecode_res = self.__dmidecode_task.getResult()
            chassis_info = [v['serial_number'] for k, v in dmidecode_res.items() if 'chassis information' in k.lower() and
                            'serial_number' in v.keys()]
            if chassis_info:
                expected_serial_number = chassis_info[0]
            else:
                self.addCheckMessage("Failed while fetching serial number from chassis information")
                self.addUnknown()
                return

            # Check and print the FRU devices having inconsistent chassis serial numbers
            diff = [device for device in chassis_serial_found if fru_res[device]['chassis_serial'] != expected_serial_number]
            for device in diff:
                self.addCheckMessage("FRU device '{}' got chassis serial '{}' whereas expected is '{}'"
                                        .format(device, fru_res[device]['chassis_serial'], expected_serial_number))
                # For change in FRU chassis serial print informational status
                result = "info"

            if result == "unhealthy":
                self.addUnHealthy()
            elif result == "info":
                self.addInformational()
            else:
                self.addHealthy()

        except:
            self.addCheckMessage("Failed while checking FRU serial number consistency")
            self.addUnknown()

check_gpu_direct_topology

Brief

Check GPUDirect Topology information for consistency

Description

None

Module

nvidia_smi

Source Code Listing

    def run(self):
        # Get parameters from own task
        own_params = self.getParameters()

        # Get parameters from parameter task
        params = self.__parameters_task.getResult()

        # Get GPU MIG State
        gpu_mig_state = self.__gpu_mig_state.getResult()

        # Check if kvm mode is on then skip
        if modules.kvm.kvm_mode_on.getResult() == True:
            self.addCheckMessage('KVM mode is on, skipping check.')
            self.addInformational()
            return

        # If GPU State Enabled
        if gpu_mig_state != None and any(gpu_mig_state.values()):
            logging.info("MIG State Detected: Modifying to MIG Topology")
            _gpu_direct_topology = params['gpu_direct_topology']
            num_gpus = len(gpu_mig_state.values())
            # gpu_mig_state is a dict
            gpus = range(num_gpus)
            mig_enabled = [x for x, y in gpu_mig_state.items() if y == 1]
            mig_enabled_gpus = ['GPU{}'.format(x) for x in mig_enabled]
            pxb_enabled_gpus = ['GPU{}'.format(
                y) for y in [x-1 if x % 2 != 0 else x+1 for x in mig_enabled]]
            non_mig_gpus = ['GPU{}'.format(x)
                            for x in gpus if x not in mig_enabled]

            # Go over all the MIG enabled GPUs first
            for index, mig_gpu in enumerate(mig_enabled_gpus):
                for k, v in _gpu_direct_topology[mig_gpu].items():
                    if k == mig_gpu:
                        continue  # Already marked as X
                    elif k == pxb_enabled_gpus[index]:
                        _gpu_direct_topology[mig_gpu][k] = 'PXB'
                    else:
                        _gpu_direct_topology[mig_gpu][k] = 'SYS'

            #Go over non-mig gpus next
            for index, gpu in enumerate(non_mig_gpus):
                for k, v in _gpu_direct_topology[gpu].items():
                    if k == gpu:
                        continue
                    elif k in mig_enabled_gpus and gpu in pxb_enabled_gpus:
                        if mig_enabled_gpus.index(k) == pxb_enabled_gpus.index(gpu):
                            _gpu_direct_topology[gpu][k] = 'PXB'
                        else:
                            _gpu_direct_topology[gpu][k] = 'SYS'
                    elif k in mig_enabled_gpus:
                        _gpu_direct_topology[gpu][k] = 'SYS'

            params['gpu_direct_topology'] = _gpu_direct_topology

        expected_topology = None
        for param, req in own_params.items():
            if param in params.keys():
               expected_topology = params[param]
        # If parameter is not found in platform parameters - Do Nothing
        if expected_topology == None:
            return

        # Print/Stream Task info and command messages
        self.addCheckMessage(config.gpu_direct_topology_command_str)

        # Unknown check for no result from nvidia-smi topology parse task
        if not self.__parse_task.getResult():
            self.addCheckMessage(
                'No result for GPUDirect topology information gathered from nvidia-smi tool')
            self.addUnknown()
            return

        try:
            # Compare task output with expected config
            topology = json.loads(self.__parse_task.getResult())
            healthy, message = genericGpuTopologyCheck(
                topology, expected_topology)
            if not healthy:
                self.addCheckMessage(message)
                self.addUnHealthy()
            else:
                self.addHealthy()
        except:
            self.addCheckMessage('Error while checking gpu direct topology')
            self.addUnknown()  # Unknown check

check_gpu_p2p_topology

Brief

Check GPUs p2p Topology information for consistency

Description

None

Module

nvidia_smi

Source Code Listing

    def run(self):
        # Get parameters from own task
        own_params = self.getParameters()

        # Get parameters from parameter task
        params = self.__parameters_task.getResult()

        expected_topology = None
        for param, req in own_params.items():
            if param in params.keys():
               expected_topology = params[param]
        # If parameter is not found in platform parameters - Do Nothing
        if expected_topology == None:
            return
        if modules.kvm.kvm_mode_on.getResult() == True:
            self.addCheckMessage('KVM mode is on, skipping check.')
            self.addInformational()
            return
        # Print/Stream Task info and command messages
        self.addCheckMessage(config.gpu_p2p_topology_command_str)

        # Unknown check for no result from nvidia-smi topology parse task
        if not self.__parse_task.getResult():
            self.addCheckMessage(
                'No result for GPUs p2p topology information gathered from nvidia-smi tool')
            self.addUnknown()
            return

        try:
            # Compare task output with expected config
            topology = json.loads(self.__parse_task.getResult())
            healthy, message = genericGpuTopologyCheck(
                topology, expected_topology)
            if not healthy:
                self.addCheckMessage(message)
                self.addUnHealthy()
            else:
                self.addHealthy()
        except:
            self.addCheckMessage('Error while checking gpu direct topology')
            self.addUnknown()  # Unknown check

check_gpu_vbios_version_consistency

Brief

Verify GPUs VBIOS version consistency

Description

None

Module

nvidia_smi

Source Code Listing

    def run(self):
        if modules.kvm.kvm_mode_on.getResult() == True:
            self.addCheckMessage('KVM mode is on, skipping check.')
            self.addInformational()
            return
        # If nvidia-smi run got a failure, skip parsing the output

        nvidia_smi_res = self.__parse_task.getResult()
        if not nvidia_smi_res:
            self.addCheckMessage('No result from nvidia-smi tool')
            self.addUnknown()
            return

        inconsistent_gpus = {}
        self.addCheckMessage(config.vbios_command_str)
        '''
        product name: vbios_version: gpu_name
        '''
        try:
            for gpu, info in nvidia_smi_res.items():
                gpu_dict = xmltodict.parse(info)
                vbios_version = gpu_dict['nvidia_smi_log']['gpu']['vbios_version']
                product_name = gpu_dict['nvidia_smi_log']['gpu']['pci']['pci_device_id']
                if product_name not in inconsistent_gpus:
                    inconsistent_gpus[product_name] = {}
                if vbios_version not in inconsistent_gpus[product_name]:
                    inconsistent_gpus[product_name][vbios_version] = []
                inconsistent_gpus[product_name][vbios_version].append('GPU{}'.format(gpu))
            # Unhealthy check for multiple versions
            res = ""
            for product_name, vbios_version in inconsistent_gpus.items():
                if len(vbios_version) > 1:
                    for k, v in vbios_version.items():
                        res += "GPUs: {} has VBIOS version '{}'\n".format(
                            ", ".join(v), k)
            if res != "":
                self.addCheckMessage(
                    f"Different VBIOS version found on GPUs\n{res}")
                self.addUnHealthy()
            else:
                self.addHealthy()
        # Unknown check
        except:
            self.addCheckMessage(
                'Error while checking GPUs VBIOS version consistency')
            self.addUnknown()

check_gpus

Brief

Check GPU health retired page count, retired pages pending, inforom storage version and vbios version

Description

None

Module

nvidia_smi

Source Code Listing

    def run(self):
        own_params = self.getParameters()
        params = self.__parameters_task.getResult()

        gpu_total_retired_page_count = None
        for param, req in own_params.items():
            if param in params.keys():
                gpu_total_retired_page_count = params[param]

        # If nvidia-smi run got a failure, skip parsing the output
        nvidia_smi_res = self.__parse_nvidia_smi_task.getResult()
        if nvidia_smi_res == None:
            return

        gpus_info = {}

        for gpu, info in nvidia_smi_res.items():
            gpus_info[gpu] = {}
            try:
                gpu_dict = xmltodict.parse(info)
                check_cls = GpuCheckVbiosVersion()
                vbios_version = gpu_dict['nvidia_smi_log']['gpu']['vbios_version']
                gpus_info[gpu]['vbios_version'] = vbios_version
                product_name = gpu_dict['nvidia_smi_log']['gpu']['product_name']
                gpus_info[gpu]['product_name'] = product_name
                bdf = gpu_dict['nvidia_smi_log']['gpu']['pci']['pci_bus_id']
                msg_str = config.gpu_vbios_version_str.format(
                    gpu_index=gpu, bdf=bdf)
                check_cls.title(msg_str)
                check_cls.setCallback(self.getCallback())
                check_cls.send(msg=vbios_version)
                super().addMessages(check_cls.getMessages())

                check_cls = GpuCheckInforomStorageVersion()
                inforom_version = gpu_dict['nvidia_smi_log']['gpu']['inforom_version']['img_version']
                msg_str = config.gpu_inforom_version_str.format(
                    gpu_index=gpu, bdf=bdf)
                check_cls.title(msg_str)
                check_cls.setCallback(self.getCallback())
                check_cls.send(msg=inforom_version)
                super().addMessages(check_cls.getMessages())

                # NVBug-2691112: Disable GPU total retired page count and retired pages pending health checks
                # These checks are already covered in blacklist_recommendations health check
                '''if gpu_total_retired_page_count != None:
                    check_cls = GpuCheckRetiredPagesCount(gpu, gpu_dict, gpu_total_retired_page_count)
                    check_cls.setCallback(self.getCallback())
                    check_cls.run()
                    self.addHealthy(count=check_cls.getResult()['healthy'])
                    self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
                    self.addUnknown(count=check_cls.getResult()['unknown'])
                    check_cls.sendComplete()
                    super().addMessages(check_cls.getMessages())

                    check_cls = GpuCheckRetiredPagesPending(gpu, gpu_dict)
                    check_cls.setCallback(self.getCallback())
                    check_cls.run()
                    self.addHealthy(count=check_cls.getResult()['healthy'])
                    self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
                    self.addUnknown(count=check_cls.getResult()['unknown'])
                    check_cls.sendComplete()
                    super().addMessages(check_cls.getMessages())'''
            except:
                logging.error(
                    "ERROR: Failed to perform GPU {} health check".format(gpu))
                pass
        self.title('')

        self.__gpu_result = gpus_info

check_instant_blacklist_recommendations

Brief

Quick health check of GPU using DCGM

Description

None

Source Code Listing

    def run(self):

        own_params = self.getParameters()
        params = self.__parameters_task.getResult()

        check_blacklist_recommendations = False
        for param, req in own_params.items():
            if param in params.keys():
                check_blacklist_recommendations = params[param]

        # Return if parameter is not defined
        if not check_blacklist_recommendations:
            return

        # check if kvm mode is on
        if modules.kvm.kvm_mode_on.getResult() == True:
            self.addCheckMessage('KVM mode is on, skipping blacklist recommendations check.')
            self.addInformational()
            return

        # Run the blacklist_recommendations task
        args = ['./modules/blacklist_recommendations/_gpu_blacklist_recommendations', \
            '--detect', '--watches']
        args.extend(check_blacklist_recommendations)
        collect_task = tasks.RunCommand(args=args, timeout=1000)
        collect_task.run()

        # For DCGM failures, GPU blacklist recommendations can
        # exit with returncode 1, handle it gracefully
        # Return, for no respone or exitcode > 1
        if (collect_task.getReturnCode() > 1) or not collect_task.getOutput():
            self.addCheckMessage('No response or error while running GPU blacklist recommendations: {}'.format(
                                    collect_task.getError()))
            self.addUnknown()
            return

        healthy = True

        try:
            result = json.loads(collect_task.getOutput())
            blacklist = result.get('blacklistedGpus', {})
            # Check for GPU/NVSwitch blacklist recommendations
            if len(blacklist) > 0:
                healthy = False
                self.addCheckMessage('Found {count} device(s) recommended for blacklist:'.format(
                    count=len(blacklist)))
            else:
                self.addCheckMessage('No devices found recommended for blacklist.')

            for entity_id in sorted(blacklist.keys()):
                details = blacklist[entity_id]
                device_uuid = details.get('UUID')
                device_bdf = details.get('BDF')
                failure_explanation = details.get('Failure Explanation')
                self.addCheckMessage('\t"GPU{entity_id}":\n' \
                                     '\t"BDF": "{device_bdf}"\n' \
                                     '\t"UUID": "{device_uuid}"\n' \
                                     '\t"Failure Explanation": {failure_explanation}'.format(
                                     entity_id=entity_id,
                                     device_bdf=device_bdf,
                                     device_uuid=device_uuid,
                                     failure_explanation=failure_explanation))
            # Check for other errors in blacklist recommendation script
            error_list = result.get('errors', [])
            if error_list:
                nv_hostengine_running = True
                self.addCheckMessage('Errors encountered:')
                for e in error_list:
                    if 'host engine is not valid any longer' in e:
                        nv_hostengine_running = False
                    self.addCheckMessage('\t{}'.format(e))

                # If nv-hostengine is not running return as unknown
                if not nv_hostengine_running:
                    self.addUnknown()
                    return

                healthy = False

        except Exception as e:
            self.addCheckMessage('Error while parsing GPU blacklist recommendations: {}'.format(e.message))
            self.addUnknown()
            return

        # make sure SBE page pending retirements are caught as informational,
        # as the blacklist_recommendations script ignores them as warnings
        if healthy:
            nvidia_smi_res = modules.nvidia_smi.parse_nvidia_smi.getResult()
            if nvidia_smi_res:
                for gpu, info in nvidia_smi_res.items():
                    gpu_dict = xmltodict.parse(info)
                    check_cls = modules.nvidia_smi.GpuCheckRetiredPagesPending(gpu, gpu_dict)
                    check_cls.setCallback(self.getCallback())
                    check_cls.run()
                    bad_count = check_cls.getResult()['unhealthy'] + check_cls.getResult()['unknown']
                    if bad_count:
                        healthy = False
                        self.addCheckMessage(check_cls.getTitle() )
                if not healthy:
                    self.addCheckMessage(config.gpu_total_retired_pages_pending_error)
                    self.addInformational()
                    return

        if healthy:
            self.addHealthy()
        else:
            self.addUnHealthy()

check_ipmi_sensor_thresholds

Brief

Check BMC sensor thresholds

Description

None

Module

ipmitool

Depends On

Source Code Listing

    def run(self):
        threshold_check_dispatch = {
            'lower_non_recoverable': lambda observed, threshold: threshold < observed,
            'lower_critical': lambda observed, threshold: threshold < observed,
            'upper_non_recoverable': lambda observed, threshold: observed < threshold,
            'upper_critical': lambda observed, threshold: observed < threshold
        }

        threshold_display = {
            'lower_non_recoverable': '{name}: Observed value "{observed}" ({units}) below non-recoverable lower threshold "{threshold}"',
            'lower_critical': '{name}: Observed value "{observed}" ({units}) below critical lower threshold "{threshold}"',
            'upper_non_recoverable': '{name}: Observed value "{observed}" ({units}) above non-recoverable upper threshold "{threshold}"',
            'upper_critical': '{name}: Observed value "{observed}" ({units}) above critical upper threshold "{threshold}"'
        }

        # Look for any sensor values that fall outside of critical thresholds
        try:
            healthy = True
            sensors = self.__parse_ipmi_sensor_task.getResult()
            if not sensors:
                return
            for sensor in sensors:
                name = sensor['name']
                observed = sensor['current_reading']
                try:
                    observed = float(observed)
                except:
                    continue
                units = sensor['type']
                if units.lower() == 'discrete':
                    continue
                for field in [
                        'lower_non_recoverable',
                        'lower_critical',
                        'upper_non_recoverable',
                        'upper_critical' ]:
                    threshold = sensor.get(field)
                    try:
                        threshold = float(threshold)
                    except:
                        continue
                    check = threshold_check_dispatch[field]
                    if check(observed, threshold):
                        continue  # Observed value is within threshold
                    healthy = False
                    display = threshold_display[field]
                    self.addCheckMessage(display.format(
                            name=name,
                            observed=observed,
                            units=units,
                            threshold=threshold))
            self.addCheckMessage('Checked {count} sensor values against BMC thresholds.'.format(
                 count=len(sensors)))

            if healthy:
                self.addHealthy()
            else:
                self.addUnHealthy()

        except:
            self.addCheckMessage('No sensors found in "ipmitool sensor"')
            self.addUnknown()
            return

check_ipmitool_working

Brief

Check that the ipmitool command is working

Description

This checks the exit status of the “ipmitool” command. If the ipmitool command runs with successful exit status, then this is a good indication that ipmitool was able to communicate with the BMC (baseboard management controller).

Module

bmc

Depends On

Source Code Listing

    def run(self):
        pass

check_logical_core_count

Brief

Number of logical CPU cores [{0}]

Description

None

Module

lscpu

Source Code Listing

    def run(self):
        output = self.__parse_lscpu_task.getResult()

        observed_core_count = output['CPU']
        self.title(self.getTitle().format(observed_core_count))

        try:
            expected_core_count = self.__parameters_task.getResult()['lscpu_number_of_cores']
        except:
            return

        if observed_core_count == expected_core_count:
            self.addCheckMessage('Observed {0} logical CPU cores, matching expectations'.format(
                        observed_core_count))
            self.addHealthy()
            return

        self.addCheckMessage('Observed {0} logical CPU cores when {1} cores were expected'.format(
                    observed_core_count, expected_core_count))
        self.addUnHealthy()

        if observed_core_count * 2 == expected_core_count:
            # When only half of the expected logical cores are observed, we
            # suspect hyperthreading might be disabled

            # Look for the hyperthreading flag
            hyperthreading_enabled = output['hyperthread'] == 2

            if not hyperthreading_enabled:
                self.addCheckMessage('It appears that Hyper-Threading is disabled.' \
                      ' Some customers choose to disable Hyper-Threading in' \
                  ' order to improve the performance of certain' \
                  ' workloads. If Hyper-Threading was intentionally' \
                  ' disabled, please ignore this message.')

check_mdadm_disks

Brief

Status of software RAID disk superblocks

Description

None

Module

mdadm

Source Code Listing

    def run(self):
        try:
            check = self.__parameters_task.getResult()['mdadm_disk_status']
            if not check:
                return
        except:
            return

        disk_info = self.__mdadm_parse_examine.getResult()
        if not disk_info:
            self.addCheckMessage("No result from parse 'mdadm --examine' for software RAID superblock")
            self.addUnknown()
            return

        self.addCheckMessage("Checking output of 'mdadm --examine' for each software RAID superblock")

        # Check the checksum of each RAID disk superblock managed by mdadm
        healthy = True
        for name, disk in disk_info.items():
            if 'checksum' not in disk:
                self.addCheckMessage('Checksum not known for RAID disk "{0}"'.format(name))
                healthy = False
                continue
            checksum = disk['checksum']
            if 'correct' not in checksum:
                self.addCheckMessage('Observed failed checksum "{0}" on RAID disk "{1}"'.format(
                            checksum, name))
                healthy = False

         # Return healthy/unhealthy status
        if healthy:
            self.addHealthy()
        else:
            self.addUnHealthy()

check_mdadm_volumes

Brief

Status of software RAID volumes

Description

None

Module

mdadm

Source Code Listing

    def run(self):
        try:
            check = self.__parameters_task.getResult()['mdadm_volume_status']
            if not check:
                return
        except:
            return

        volume_info = self.__mdadm_parse_details.getResult()
        if not volume_info:
            self.addCheckMessage("No result from parse 'mdadm --detail' for software RAID volume")
            self.addUnknown()
            return

        self.addCheckMessage("Checking output of 'mdadm --detail' for each software RAID volume")

        good_volume_states = [ 'clean', 'active', 'write-pending', 'active-idle' ]

        healthy = True
        for name, volume in volume_info.items():
            # Check the volume state
            if 'state' not in volume:
                self.addCheckMessage('State not known for RAID volume "{0}"'.format(name))
                healthy = False
            elif 'recovering' in volume['state'].lower():
                self.addCheckMessage('It appears that the RAID volume "{0}" is currently' \
                      ' recovering. This is normal. However, volume performance' \
                      ' might be reduced while the volume is recovering. The' \
                      ' recovery process should complete soon, but if it does' \
                      ' not please contact NVIDIA support.'.format(name))
            elif 'resync' in volume['state'].lower():
                self.addCheckMessage('It appears that the RAID volume "{0}" is currently' \
                      ' resyncing. This is normal. However, volume performance' \
                      ' might be reduced while the volume is resyncing. The' \
                      ' resync process should complete soon, but if it does' \
                      ' not please contact NVIDIA support.'.format(name))
            elif volume['state'].lower() not in good_volume_states:
                self.addCheckMessage('Observed unhealthy state "{0}" for RAID volume "{1}"'.format(
                            volume['state'], name))
                healthy = False
            # Check for failed devices in the volume
            if 'failed_devices' not in volume:
                pass  # Hmm...
            elif int(volume['failed_devices']) > 0:
                self.addCheckMessage('Observed {0} failed device(s) in RAID volume "{1}"'.format(
                            volume['failed_devices'], name))
                healthy = False

        # Return healthy/unhealthy status
        if healthy:
            self.addHealthy()
        else:
            self.addUnHealthy()

check_meminfo_mem_size

Brief

Installed memory capacity [{0:.2f}GB]

Description

None

Module

meminfo

Source Code Listing

    def run(self):
        try:
            threshold = self.__parameters_task.getResult()['meminfo_memory_size']
        except:
            return

        threshold = threshold * 9.537e-7
        self.title(self.getTitle().format(threshold))

        actual_mem = self.__collect_meminfo_task.getResult()
        if not actual_mem:
            self.addUnknown()
            return

        # check for actual mem w.r.t threshold with tolerance of 1GB
        tolerance = 1
        if modules.common.almost_equal(actual_mem, threshold, tolerance):
            self.addHealthy()
        else:
            self.addCheckMessage('Amount of memory is {0:.2f} GB'.format(actual_mem))
            self.addUnHealthy()

check_mlx_fw_version

Brief

Verify Mellanox devices firmware version consistency

Description

None

Module

mlnx

Source Code Listing

    def run(self):

        own_params = self.getParameters()
        params = self.__parameters_task.getResult()

        firmware_versions = 0
        for param, req in own_params.items():
            if param in params.keys():
                firmware_versions = params[param]

        if firmware_versions == 0:
            return

        # If mlxfwmanager run got a failure, skip parsing the output
        mlnx_res = self.__run_task.getOutput()
        if self.__run_task.getReturnCode() or not mlnx_res:
            self.addCheckMessage('No result from mellanox firmware manager')
            self.addUnknown()
            return

        self.addCheckMessage(config.mlx_fw_ver_cmd_str)
        inconsistent_devices = {}

        try:
            res_dict = xmltodict.parse(mlnx_res)
            for res in res_dict['Devices']['Device']:
                pci = res['@pciName']
                fw_ver = res['Versions']['FW']['@current']
                if fw_ver not in inconsistent_devices:
                   inconsistent_devices[fw_ver] = []
                inconsistent_devices[fw_ver].append(pci)

            # Unhealthy check for multiple versions
            if len(inconsistent_devices) > firmware_versions:
                res = ""
                for k, v in inconsistent_devices.items():
                    res += "PCI device: '{}' has firmware version '{}'\n".format(", ".join(v), k)
                self.addCheckMessage(f"Different firmware version found on Mellanox devices\n{res}")
                self.addUnHealthy()
            else:
                self.addHealthy()
        # Unknown check
        except:
            self.addCheckMessage('Error while checking Mellanox devices firmware version consistency')
            self.addUnknown()

check_net_ping

Brief

Verify Network IP Reachability

Description

None

Module

net

Depends On

Source Code Listing

    def run(self):
        from re import findall
        try:
            net_ping = self.__parameters_task.getResult()['net_ping']
            if not net_ping:
                return
        except:
            return
        
        health_list = [True for x in range(len(net_ping))]

        for i, (interface, ip) in enumerate(net_ping.items()):
            run_ping_task = tasks.RunCommand(
                    args=['ping', '-c', str(config.check_net_ping_count), '-W', str(config.check_net_ping_timeout), ip]) \
                    .title('Run ping') \
                    .describe('''Check IP Reachability via Ping''')
            try:
                run_ping_task.run()
                if(run_ping_task.getReturnCode()!=0):
                    raise Exception("Unable to ping {} at {}".format(ip, interface))

                output = run_ping_task.getOutput()
            except:
                self.addCheckMessage("Unable to ping {} at {}".format(ip, interface))
                health_list[i] = False
                continue

            regex = r'[0-9 ]*[a-z].+?(?=,).+?(?=,).+(.+?(?=%))'
            packet_loss = findall(regex, output)
            
            self.addCheckMessage("Checking Packet Loss on {} at {}: {}%".format(interface, ip, packet_loss[0]))

            if(packet_loss[0]!='0'):
                health_list[i] = False
            elif(packet_loss[0]=='0'):
                health_list[i] = True

        if all(health_list):
            self.addHealthy()
        else: 
            self.addUnHealthy()

check_nvidia_grid_license

Brief

Drive Constellation: GRID License Status

Description

None

Module

dcs_modules

Source Code Listing

    def run(self):
        try:
            dcs_grid = self.__parameters_task.getResult()['dcs_grid_license']
            if not dcs_grid:
                return
        except:
            return
        
        nvidia_smi_res = self.__parse_nvidia_smi_task.getResult()
        if nvidia_smi_res == None:
            return

        healthy = False

        try:
            for gpu, info in nvidia_smi_res.items():
                gpu_dict = xmltodict.parse(info)
                # GRID License check on GRID Products only
                # NVBug-2795033: GPU cards which can be licensed will have the `License Status` field and
                # those which cannot will not have this field.
                if gpu_dict['nvidia_smi_log']['gpu'].get('grid_licensed_product', None) == None:
                    continue
                elif gpu_dict['nvidia_smi_log']['gpu']['grid_licensed_product'].get('license_status', None) == None:
                    continue
                # NVBug:3145085 - If the product name is QVDCW and status is licensed then only show healthy
                elif gpu_dict['nvidia_smi_log']['gpu']['grid_licensed_product'].get('licensed_product_name', None) == None:
                    continue

                product_name = gpu_dict['nvidia_smi_log']['gpu']['grid_licensed_product']['licensed_product_name']
                product_name = product_name.strip()
                
                if product_name == 'Quadro Virtual Data Center Workstation':
                    if not gpu_dict['nvidia_smi_log']['gpu']['grid_licensed_product']['license_status'] == 'Licensed':
                        healthy = False
                        break
                    else:
                        healthy = True


            if healthy == True:
                self.addHealthy()
            else:
                self.addCheckMessage("Check GRID License: Contact Nvidia")
                self.addUnHealthy()

        except:
            self.addCheckMessage("Error while performing GRID License Check")
            self.addUnknown()

check_nvidia_smi_gpu_bus_id

Brief

Verify GPU’s identified using nvidia-smi

Description

None

Module

nvidia_smi

Source Code Listing

    def run(self):
        # Get parameters from own task
        own_params = self.getParameters()

        # Get parameters from parameter task
        params = self.__parameters_task.getResult()

        expected_bdfs = None
        for param, req in own_params.items():
            if param in params.keys():
               expected_bdfs = params[param]
        # If parameter is not found in platform parameters - Do Nothing
        if expected_bdfs == None:
            return
        if modules.kvm.kvm_mode_on.getResult() == True:
            self.addCheckMessage('KVM mode is on, skipping check.')
            self.addInformational()
            return
        # Unknown check for no result from nvidia-smi gpu_bus_id parse task
        if not self.__parse_task.getResult():
            self.addCheckMessage(
                'No result for gpu_bus_id information gathered from nvidia-smi tool')
            self.addUnknown()
            return

        # Print/Stream Task info and command messages
        self.addCheckMessage(
            'Checking output of "nvidia-smi --query-gpu=gpu_bus_id --format=csv,noheader" for expected GPUs')

        message = ''
        healthy = True

        try:
            bdfs = self.__parse_task.getResult()
            for gpu in expected_bdfs.keys():
                if gpu not in bdfs:
                    message += '\nGPU not identified at PCI address "{}"'.format(
                        gpu)
                    healthy = False

            if not healthy:
                self.addCheckMessage(message)
                self.addUnHealthy()
            else:
                self.addHealthy()

        except:
            self.addCheckMessage('Error while identifying GPUs bus_id')
            self.addUnknown()

check_nvme_devices

Brief

Verify installed NVMe devices

Description

None

Module

nvme

Source Code Listing

    def run(self):
        import json

        own_params = self.getParameters()
        params = self.__parameters_task.getResult()

        nvme_config = None
        for param, req in own_params.items():
            if param in params.keys():
                nvme_config = params[param]

        if not nvme_config:
            return

        nvsm_config = config.read_nvsm_config_file()

        if nvsm_config != None:
            if not nvsm_config["use_standard_config_storage"]:
                return

        if self.__parse_nvme_devices.getResult():
            self.addCheckMessage(config.nvme_command_str)
            devices = json.loads(self.__parse_nvme_devices.getResult())
            if not [conf for conf in nvme_config if devices.items() == conf.items()]:
                count = 0
                res = 'Supported NVMe device(s) configuration:\n'
                for conf in nvme_config:
                    for size, count in conf.items():
                        res += '"{}" NVMe device(s) with capacity "{}"\n'.format(
                            count, size)
                    if count < len(nvme_config):
                        res += 'or \n'
                        count += 1

                res += 'Found NVMe device(s) configuration:'
                for size, count in devices.items():
                    res += '\n"{}" NVMe device(s) with capacity "{}"'.format(
                        count, size)
                self.addCheckMessage(res)
                self.addUnHealthy()

            else:
                self.addHealthy()
        else:
            self.addCheckMessage("No results from parse nvme devices")
            self.addUnknown()

check_nvme_smart_log

Brief

Check SMART status of NVMe devices

Description

None

Module

nvme

Source Code Listing

    def run(self):
        try:
            check = self.__parameters_task.getResult()['nvme_check_smart_log']
            if not check:
                return
        except:
            return

        # NVBUG2794792: Toshiba/Kioxia CM5: medium error observed in SMART log when I/O sent to the locked drive
        skipDriveModel = None
        try:
            skipDriveModel = self.__parameters_task.getResult()[
                'skip_nvme_drive_model']
        except:
            pass

        import json
        # Return if nvme run throws error or no results
        nvme_drive_map = {}
        if not self.__nvme_list.getReturnCode() and self.__nvme_list.getOutput():
            try:
                nvme_stream = json.loads(nvme_list.getOutput())
                devices = nvme_stream['Devices']
                for device in devices:
                    d = device['DevicePath']
                    deviceName = d.split(os.sep)[-1]
                    nvme_drive_map[deviceName] = device['ModelNumber']
            except Exception as e:
                logging.debug("Error while parsing nvsm devices:{}".format(e))
                pass

        self.addCheckMessage(
            "Checking output of 'nvme smart-log' for each NVMe device")
        nvme_smart_log = self.__nvme_smart_log_task.getResult()
        if not nvme_smart_log:
            self.addUnknown()
            return

        healthy = True
        for name, device in nvme_smart_log.items():
            # Check for critical warnings, which indicate drive is in error state
            critical_warnings = device.get('critical_warning', 0)
            if critical_warnings != '0':
                self.addCheckMessage('Found {0} critical warning(s) on NVMe drive "{1}".'.format(
                    critical_warnings, name))
                healthy = False

            # Check that remaining spare capacity is above the threshold
            available_spare = device.get('available_spare', 1.0)
            available_spare_threshold = device.get(
                'available_spare_threshold', 0.1)
            if available_spare < available_spare_threshold:
                self.addCheckMessage('Remaining spare capacity of {remaining}% on NVMe drive "{drive}" fails to meet threshold of {threshold}%.'.format(
                    drive=name,
                    remaining=int(available_spare * 1e2),
                    threshold=int(available_spare_threshold * 1e2)))
                healthy = False

            # Check that vendor estimate of percentage used is below 90%
            used = device.get('percentage_used', 0.0)
            if used > 0.9:
                self.addCheckMessage('Over {used}% expected life used on NVMe drive "{drive}".'.format(
                    used=int(used * 1e2),
                    drive=name))
                healthy = False

            # Skip media error check for locked devices
            if nvme_drive_map[name] == skipDriveModel:
                continue
            # Check for media errors, which occur when the controller detects
            # unrecovered data integrity errors
            media_errors = device.get('media_errors', 0)
            if media_errors != '0':
                self.addCheckMessage('Found {0} media error(s) on NVMe drive "{1}".'.format(
                    media_errors, name))
                healthy = False

        if healthy:
            self.addHealthy()
        else:
            self.addUnHealthy()

check_psu_bom

Brief

Verify chassis power supply presence

Description

None

Module

ipmitool

Source Code Listing

    def run(self):
        from nvsmhealth.lib import DictionarySuperset
        try:
            psu_bom = self.__parameters_task.getResult()['psu_bom']
        except:
            return

        output = self.__sdr_device_bom_task.getResult()
        dictionary_superset = DictionarySuperset.DictionarySuperset(
                    missing_message="Could not detect presence of chassis power supply {}")
        result = dictionary_superset.compare(output, psu_bom)
        self.addCheckMessage("Checking output of 'ipmitool sdr elist' for expected chassis PSUs")
        if result:
            self.addCheckMessage(result)
            self.addUnHealthy()
        else:
            healthy = True
            # NVBUG-200528273: Check for Power supply lost
            # Even if PSU status is ok, readings might have power supply ac lost message
            psu_res = self.__parse_ipmi_sdr_elist_task.getResult()
            # Filter PSU status keys for readings
            psu_status_keys = [k for k in psu_bom.keys() if 'status' in k.lower()]
            for s in psu_res:
                if s['name'] in psu_status_keys:
                    reading = s['reading']
                    if 'power supply ac lost' in reading.lower():
                        self.addCheckMessage("AC input is lost, {} has reading:\n{}".format(s['name'], s['reading']))
                        healthy = False

            if healthy:
                self.addHealthy()
            else:
                self.addUnHealthy()

check_psu_info

Brief

Check PSU Info (Vendor, Model) for Consistency

Description

None

Module

psu

Depends On

Source Code Listing

    def run(self):
        # print(self.__collect_psu_info_task.getReturnCode(), self.__collect_psu_info_task.getResult())

        if self.__collect_psu_info_task.getReturnCode():
            self.addCheckMessage("Unable to collect PSU (Vendor, Model) Information.")
            self.addUnknown()
            return
    
        psu_info = self.__collect_psu_info_task.getResult()

        for item in psu_info:
            s = set()
            for i in item.keys():
                k = i.split("_")[0]
                s.add(item[i])
            if len(s) > 1:
                self.addCheckMessage("Multiple PSU {}s found. {}".format(k,[i for i in s]))
                self.addUnHealthy()
                return
                
        self.addHealthy()

check_smartctl_disk_count

Brief

Verify installed disks

Description

Verify that all of the expected disks are installed

Module

disk

Source Code Listing

    def run(self):
        try:
            exp_disk_count = self.__parameters_task.getResult()[list(self.getParameters())[0]]
        except:
            return

        self.addCheckMessage("Checking output of 'smartctl' for expected disks")
        if self.__is_raid:
            disk_count = self.__get_disk_count_task.getResult()
        else:
            disk_count = self.__get_disk_count_task.getDisksBySize()
        if not disk_count:
            self.addCheckMessage("No disk(s) found")
            self.addUnknown()
            return

        dictionary_superset = DictionarySuperset.DictionarySuperset(
                missing_message="No disks of capacity '{}' were found",
                changed_message="Disks of capacity '{}' were found '{}' when '{}' disk(s) were expected")
        result = dictionary_superset.compare(disk_count, exp_disk_count)
        if result == None:
            # Healthy check - No diffs found
            self.addHealthy()
        else:
            # UnHealthy check - Print/Stream diffs found
            self.addUnHealthy()
            self.addCheckMessage(result)

check_smartctl_megaraid_disk_count

Brief

Verify installed MegaRAID disks

Description

Count the disks attached to the MegaRAID controller using the smartctl command

Module

disk

Source Code Listing

    def run(self):
        try:
            exp_disk_count = self.__parameters_task.getResult()[list(self.getParameters())[0]]
        except:
            return

        self.addCheckMessage("Checking output of 'smartctl' for expected disks")
        if self.__is_raid:
            disk_count = self.__get_disk_count_task.getResult()
        else:
            disk_count = self.__get_disk_count_task.getDisksBySize()
        if not disk_count:
            self.addCheckMessage("No disk(s) found")
            self.addUnknown()
            return

        dictionary_superset = DictionarySuperset.DictionarySuperset(
                missing_message="No disks of capacity '{}' were found",
                changed_message="Disks of capacity '{}' were found '{}' when '{}' disk(s) were expected")
        result = dictionary_superset.compare(disk_count, exp_disk_count)
        if result == None:
            # Healthy check - No diffs found
            self.addHealthy()
        else:
            # UnHealthy check - Print/Stream diffs found
            self.addUnHealthy()
            self.addCheckMessage(result)

check_smartctl_ssd_brick

Brief

Check for SSD health

Description

None

Module

disk

Source Code Listing

    def run(self):
        try:
            check = self.__parameters_task.getResult()['smartctl_check_ssd_brick']
            if not check:
                return
        except:
            return

        errormod_pattern = re.compile(r'ERRORMOD', flags=re.IGNORECASE)
        # Look for disks with bricked firmware conditions
        brick = False
        try:
            disk_bom = self.__smartctl_info_task.getBomList()

            if disk_bom is None:
                return

            for key, value in disk_bom.items():
                # Check for "ERRORMOD" in firmware version
                if not 'firmware_version' in value:
                    continue
                firmware_version = value['firmware_version']
                m = errormod_pattern.match(firmware_version)
                if not m:
                    continue
                # NOTE: It is likely that the disk capacity is also incorrect, but
                # we do not check for this here. Disk capacity is checked elsewhere
                # as part of the "smartctl-disk-count" check.
                brick = key
                break
        except Exception as e:
            self.addCheckMessage(str(e))
            self.addUnknown()
            return

        # Print details message
        if brick:
            self.addUnHealthy()
            self.addCheckMessage('Possible firmware bug on disk "{0}"'.format(brick))
        else:
            self.addHealthy()
            self.addCheckMessage('No disks with firmware bug found')

check_storcli_disk_state

Brief

None

Description

None

Module

storcli

Source Code Listing

    def run(self):
        if self.__run_command.getReturnCode():
            # command failed to execute
            return
        try:
            devices = self.__parameters_task.getResult()['storcli_disk_stats']
        except:
            return
        output = json.loads(self.__run_command.getOutput().strip())

        for name, key, idx, exp_val in devices:
            check_cls = CheckDisk(output, name, key, idx, exp_val)
            check_cls.setCallback(self.getCallback())
            check_cls.run()
            self.addHealthy(count=check_cls.getResult()['healthy'])
            self.addUnHealthy(count=check_cls.getResult()['unhealthy'])
            self.addUnknown(count=check_cls.getResult()['unknown'])
            check_cls.sendComplete()
            super().addMessages(check_cls.getMessages())

        # clear message as this task doesnt print anything
        self.title('')

check_storcli_sanity_installed

Brief

[sanity] MegaRAID storcli utility installed

Description

None

Module

storcli

Source Code Listing

    def run(self):
        try:
            storcli_platform_string = self.__parameters_task.getResult()['storcli_platform_string']
        except:
            # paramter not found
            return
        if self.__run_command.getReturnCode():
            self.addUnHealthy()
            self.addCheckMessage('The storcli utility does not appear to be installed')
            self.addCheckMessage('Please ensure storcli64 is installed in the /opt/MegaRAID/storcli/ directory')
        else:
            self.addHealthy()

check_storcli_sanity_supported

Brief

[sanity] {} BaseOS support for storcli utility

Description

None

Module

storcli

Source Code Listing

    def run(self):
        try:
            storcli_platform_string = self.__parameters_task.getResult()['storcli_platform_string']
        except:
            # paramter not found
            return

        self.title(self.getTitle().format(storcli_platform_string))

        baseos_version = self.__base_os_version_task.getResult()
        if not baseos_version or 'sw_version' not in baseos_version:
            self.addCheckMessage("Error checking {} BaseOS version".format( storcli_platform_string))
            self.addUnknown()
            return

        # Check DGX BaseOS version for storcli support
        message = 'Installed {} BaseOS version "{}" '.format(storcli_platform_string, baseos_version)
        if Version(baseos_version['sw_version']) >= Version('3.1.6'):
            # DGX BaseOS 3.1.6 introduces support for the storcli64 utility
            message += 'should support storcli'
            self.addHealthy()
        else:
            message += 'does not support storcli'
            self.addUnHealthy()
        self.addCheckMessage(message)

check_superuser_privileges

Brief

Check for superuser privileges

Description

This checks that NVSM Health is running with an effective user ID of 0, which indicates superuser or “root” privileges. Many NVSM Health checks require superuser privileges in order to run certain privileged commands or access privileged log files.

Module

common

Source Code Listing

    def run(self):
        import os
        # TODO: How should classes implementing ICheck communicate
        #       health check results?
        if os.geteuid() != 0:
            pass

check_xenserver_logical_core_count

Brief

Number of logical CPU cores [{0}]

Description

None

Module

xenserver

Source Code Listing

    def run(self):
        try:
            expected_core_count = modules.parameter.parameters.getResult()['xenserver_number_of_cores']
        except:
            return

        xlinfo_result = parse_xl_info.getResult()
        if not xlinfo_result:
            self.addUnknown()
            return

        observed_core_count = xlinfo_result['nr_cpus']
        self.title(self.getTitle().format(observed_core_count))

        if observed_core_count == expected_core_count:
            self.addCheckMessage('Observed {0} logical CPU cores, matching expectations'.format(
                        observed_core_count))
            self.addHealthy()
            return

        if observed_core_count * 2 == expected_core_count:
            # When only half of the expected logical cores are observed, we
            # suspect hyperthreading might be disabled

            # Look for the hyperthreading flag
            hyperthreading_enabled = xlinfo_result['threads_per_core'] == 2

            if not hyperthreading_enabled:
                self.addCheckMessage('It appears that Hyper-Threading is disabled.' \
                      ' Some customers choose to disable Hyper-Threading in' \
                  ' order to improve the performance of certain' \
                  ' workloads. If Hyper-Threading was intentionally' \
                  ' disabled, please ignore this message.')
                self.addUnHealthy()
                return

        self.addCheckMessage('Observed {0} logical CPU cores when {1} cores were expected'.format(
                    observed_core_count, expected_core_count))
        self.addUnHealthy()

dcv_check_fan_bom

Brief

Drive Constellation: Verify chassis fan presence for DCC

Description

None

Module

dcs_modules

Source Code Listing

    def run(self):
        from nvsmhealth.lib import DictionarySuperset
        try:
            dcv_fan_bom = self.__parameters_task.getResult()['dcv_fan_bom']
        except:
            return

        output = self.__dcv_sdr_device_bom_task.getResult()
        dictionary_superset = DictionarySuperset.DictionarySuperset(
                    missing_message="Could not detect presence of DCC chassis fan: {}")
        result = dictionary_superset.compare(output, dcv_fan_bom)
        self.addCheckMessage("Checking output of 'ipmitool sdr elist' for expected chassis fans on DCC")
        if result:
            self.addCheckMessage(result)
            self.addUnHealthy()
        else:
            self.addHealthy()

dcv_check_fru_consistency

Brief

Drive Constellation: Check FRU information for consistency

Description

The FRU (field replaceable unit) information recorded in the BMC (baseboard management controller) includes serial numbers for various FRUs on the system. For any given system, these serial numbers should be consistent among all FRUs. However, it is possible for these serial numbers to become inconsistent as the result of normal maintenance (such as FRU replacement). This check makes sure serial numbers are consistent for all FRUs recorded in the DCC BMC.

Module

dcs_modules

Source Code Listing

    def run(self):
        # TODO:[Kenzen-499] Check if FRU in self.__fru_task is consistent and report health using ICheck interface
        # TODO:Similar approach needs to be used by DCC BMC
        pass

dcv_check_ipmi_sensor_thresholds

Brief

Drive Constellation: Check DCC BMC sensor thresholds

Description

None

Module

dcs_modules

Source Code Listing

    def run(self):

        try:
            dcs_dcv_sensor_threshold = self.__parameters_task.getResult()['dcs_dcv_sensor_threshold']
            if not dcs_dcv_sensor_threshold:
                return
        except:
            return

        threshold_check_dispatch = {
            'lower_non_recoverable': lambda observed, threshold: threshold < observed,
            'lower_critical': lambda observed, threshold: threshold < observed,
            'upper_non_recoverable': lambda observed, threshold: observed < threshold,
            'upper_critical': lambda observed, threshold: observed < threshold
        }

        threshold_display = {
            'lower_non_recoverable': '{name}: Observed value "{observed}" ({units}) below non-recoverable lower threshold "{threshold}"',
            'lower_critical': '{name}: Observed value "{observed}" ({units}) below critical lower threshold "{threshold}"',
            'upper_non_recoverable': '{name}: Observed value "{observed}" ({units}) above non-recoverable upper threshold "{threshold}"',
            'upper_critical': '{name}: Observed value "{observed}" ({units}) above critical upper threshold "{threshold}"'
        }

        # Look for any sensor values that fall outside of critical thresholds
        healthy = True
        try:
            sensors = self.__dcv_parse_ipmi_sensor_task.getResult()
            for sensor in sensors:
                name = sensor['name']
                observed = sensor['current_reading']
                try:
                    observed = float(observed)
                except:
                    continue
                units = sensor['type']
                if units.lower() == 'discrete':
                    continue
                for field in [
                        'lower_non_recoverable',
                        'lower_critical',
                        'upper_non_recoverable',
                        'upper_critical' ]:
                    threshold = sensor.get(field)
                    try:
                        threshold = float(threshold)
                    except:
                        continue
                    check = threshold_check_dispatch[field]
                    if check(observed, threshold):
                        continue  # Observed value is within threshold
                    healthy = False
                    display = threshold_display[field]
                    self.addCheckMessage(display.format(
                            name=name,
                            observed=observed,
                            units=units,
                            threshold=threshold))
            self.addCheckMessage('Checked {count} sensor values against DCC BMC thresholds.'.format(
                 count=len(sensors)))
        except:
            self.addUnknown()

        if healthy:
            self.addHealthy()
        else:
            self.addUnHealthy()

dcv_check_psu_bom

Brief

Drive Constellation: Verify chassis power supply presence on DCC

Description

None

Module

dcs_modules

Source Code Listing

    def run(self):
        from nvsmhealth.lib import DictionarySuperset
        try:
            dcv_psu_bom = self.__parameters_task.getResult()['dcv_psu_bom']
        except:
            return

        output = self.__dcv_sdr_device_bom_task.getResult()
        dictionary_superset = DictionarySuperset.DictionarySuperset(
                    missing_message="Could not detect presence of DCC chassis power supply: {}")
        result = dictionary_superset.compare(output, dcv_psu_bom)
        self.addCheckMessage("Checking output of 'ipmitool sdr elist' for expected chassis PSUs on DCC")
        if result:
            self.addCheckMessage(result)
            self.addUnHealthy()
        else:
            healthy = True
            # NVBUG-200554527: Check for Power supply lost for DCC
            # Even if PSU status is ok, readings might have power supply ac lost message
            psu_res = self.__dcv_parse_ipmi_sdr_elist_task.getResult()
            # Filter PSU status keys for readings
            psu_status_keys = [k for k in dcv_psu_bom.keys() if 'status' in k.lower()]
            for s in psu_res:
                if s['name'] in psu_status_keys:
                    reading = s['reading']
                    if 'power supply ac lost' in reading.lower():
                        self.addCheckMessage("AC input is lost, {} has reading:\n{}".format(s['name'], s['reading']))
                        healthy = False

            if healthy:
                self.addHealthy()
            else:
                self.addUnHealthy()