Skip to content

tabular_dataloader

Bases: dataloader

Source code in tinybig/data/tabular_dataloader.py
class tabular_dataloader(dataloader):

    def __init__(self, name='tabular_dataloader', data_contents: list | tuple = None,
                 train_batch_size=64, test_batch_size=64, *args, **kwargs):
        super().__init__(name=name, train_batch_size=train_batch_size, test_batch_size=test_batch_size, *args, **kwargs)
        self.data_contents = data_contents

    def load_file(self, cache_dir: str = None, filename: str = None):
        file_path = cache_dir + filename
        if not os.path.exists(file_path):
            raise ValueError('The provided file path doesn\'t exist...'.format(file_path))
        f = open(file_path, 'r')
        rows = f.readlines()
        f.close()
        return rows

    def load_raw_data(self, data_contents=None, cache_dir: str = None, filename: str = None):
        data_contents = data_contents if data_contents is not None else self.data_contents
        if data_contents is not None:
            return data_contents
        elif cache_dir is not None and filename is not None:
            return self.load_file(cache_dir=cache_dir, filename=filename)

    def process_data(self, data_contents=None, cache_dir: str = None, filename: str = None,
                     str_converted_to_numerical: bool = True, *args, **kwargs):

        rows = data_contents if data_contents is not None else self.load_file(cache_dir=cache_dir, filename=filename)

        data_dict = {
            'profile': {
                'name': self.name,
                'str_converted_to_numerical': str_converted_to_numerical,
                'column_names': [],  #[list of column names],
                'column_value_types': [],  #[list of column value types, e.g., float, int, str, etc.],
                'column_codings': [],  #[list of column index: {raw_values: coding_values}]
            },
            'contents': []  # [[vector of instance 1], [vector of instance 2], ... ]
        }


        headline = rows[0]
        column_names, column_value_types = [], []
        feature_name_value_type_list = headline.strip('\n').split(',')
        for feature_name_value_type in feature_name_value_type_list:
            name, value_type = feature_name_value_type.split('_')
            column_names.append(name)
            column_value_types.append(value_type)

        column_codings = [None]*len(column_names)
        contents = []
        nan_detect = False
        nan_columns = {}
        for row_index in range(1, len(rows)):
            row = rows[row_index]
            x = []
            features = row.strip('\n').split(',')
            for index in range(len(features)):
                feature = features[index]
                feature_value_type = column_value_types[index]
                if feature_value_type in ['int', 'integer', 'Int', 'Integer', 'INT', 'INTEGER']:
                    try:
                        feature = int(feature)
                    except:
                        nan_detect = True
                        nan_columns[index] = 1
                        feature = np.nan
                elif feature_value_type in ['float', 'Float', 'double', 'Double', 'FLOAT', 'DOUBLE']:
                    try:
                        feature = float(feature)
                    except:
                        nan_detect = True
                        nan_columns[index] = 1
                        feature = np.nan
                elif feature_value_type in ['str', 'string', 'Str', 'String', 'strings', 'Strings', 'STR', 'STRING', 'STRINGS']:
                    if str_converted_to_numerical:
                        if column_codings[index] is None:
                            column_codings[index] = {}
                        if feature not in column_codings[index]:
                            column_codings[index][feature] = len(column_codings[index])
                        feature = column_codings[index][feature]
                x.append(feature)
            contents.append(x)
        data_dict['profile']['feature_names'] = column_names
        data_dict['profile']['column_value_types'] = column_value_types
        data_dict['profile']['column_codings'] = column_codings
        data_dict['profile']['row_number'] = len(contents)
        data_dict['profile']['column_number'] = len(column_names)
        data_dict['contents'] = contents

        if nan_detect:
            warnings.warn('The loaded dataset may contain NaN elements, and the columns containing nan are listed as follows', UserWarning)
            print(nan_columns.keys())
        return data_dict

    def load_complete_data(self, data_contents: list | tuple = None, cache_dir: str = None,
                           filename: str = None, *args, **kwargs):
        data_contents = data_contents if data_contents is not None else self.data_contents
        raw_data = self.process_data(data_contents=data_contents, cache_dir=cache_dir, filename=filename,
                                     str_converted_to_numerical=True, *args, **kwargs)

        contents = np.array(raw_data['contents'])
        row_number = raw_data['profile']['row_number']
        column_number = raw_data['profile']['column_number']
        column_names = raw_data['profile']['feature_names']
        column_value_types = raw_data['profile']['column_value_types']
        ids = contents[:, 0]
        X = torch.Tensor(contents[:, 1:-1])
        y = torch.Tensor(contents[:, -1])

        return {
            'X': X,
            'y': y,
            'profile': {
                'name': raw_data['profile']['name'],
                'instance_number': row_number,
                'feature_number': column_number - 2,
                'feature_codings': raw_data['profile']['column_codings'][1:-1],
                'label_codings': raw_data['profile']['column_codings'][-1],
                'feature_names': column_names[1:-1],
                'label_names': column_names[-1],
                'feature_value_types': column_value_types[1:-1],
                'label_value_types': column_value_types[-1]
            }
        }

    @staticmethod
    def normalize(input: torch.Tensor, normalize_type=None, normalize_range: list | tuple=None):
        if normalize_type == 'min_max':
            if normalize_range is None:
                min_value, max_value = (0, 1)
            else:
                min_value, max_value = normalize_range
            X_std = (input - input.numpy().min(axis=0)) / (input.numpy().max(axis=0) - input.numpy().min(axis=0))
            X_scaled = X_std * (max_value - min_value) + min_value
        elif normalize_type == 'mean_std':
            X_mean = torch.mean(input, dim=0, keepdim=True)
            X_std = torch.std(input, dim=0, keepdim=True)
            X_scaled = (input - X_mean) / X_std
        else:
            X_scaled = input
        return X_scaled

    def split(self, X, y, split_type='train_test_split', train_percentage=0.9, fold=10, random_state=1234, shuffle=True):
        train_loader, test_loader = None, None
        if split_type == 'train_test_split':
            X_train, X_test, y_train, y_test = train_test_split(
                X, y,
                train_size=int(train_percentage * len(X)),
                random_state=random_state, shuffle=shuffle
            )
            train_dataset = dataset(X_train, torch.unsqueeze(y_train, 1))
            test_dataset = dataset(X_test, torch.unsqueeze(y_test, 1))
            train_loader = DataLoader(train_dataset, batch_size=self.train_batch_size, shuffle=True)
            test_loader = DataLoader(test_dataset, batch_size=self.test_batch_size, shuffle=False)
        elif split_type in ['KFold', 'cross_validation']:
            kf = KFold(n_splits=fold, random_state=random_state, shuffle=shuffle)
            train_loader, test_loader = {}, {}
            for i, (train_index, test_index) in enumerate(kf.split(X)):
                X_train, y_train = X[train_index], y[train_index]
                X_test, y_test = X[test_index], y[test_index]
                train_dataset = dataset(X_train, torch.unsqueeze(y_train, 1))
                test_dataset = dataset(X_test, torch.unsqueeze(y_test, 1))
                train_loader[i] = DataLoader(train_dataset, batch_size=self.train_batch_size, shuffle=True)
                test_loader[i] = DataLoader(test_dataset, batch_size=self.test_batch_size, shuffle=False)
        return train_loader, test_loader

    def load(self, cache_dir: str = './data/', filename: str = 'classic_dataset_filename',
             split_type: str = 'train_test_split', train_percentage: float = 0.9, fold: int = 10,
             random_state: int = 123, shuffle: bool = True,
             normalize_X: bool = False, normalize_y: bool = False,
             normalize_type: str = 'min_max', normalize_range: list | tuple = (0, 1), *args, **kwargs):

        complete_data = self.load_complete_data(data_contents=self.data_contents, cache_dir=cache_dir, filename=filename)
        X, y = complete_data['X'], complete_data['y']

        if normalize_X:
            X = self.normalize(input=X, normalize_type=normalize_type, normalize_range=normalize_range)
        if normalize_y:
            y = self.normalize(input=y, normalize_type=normalize_type, normalize_range=normalize_range)

        train_loader, test_loader = self.split(X=X, y=y, split_type=split_type, train_percentage=train_percentage,
                                               fold=fold, random_state=random_state, shuffle=shuffle)

        return {
            'train_loader': train_loader,
            'test_loader': test_loader,
            'profile': complete_data['profile']
        }