class tabular_dataloader(dataloader):
def __init__(self, name='tabular_dataloader', data_contents: list | tuple = None,
train_batch_size=64, test_batch_size=64, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.train_batch_size = train_batch_size
self.test_batch_size = test_batch_size
self.data_contents = data_contents
def load_file(self, cache_dir: str = None, filename: str = None):
file_path = cache_dir + filename
if not os.path.exists(file_path):
raise ValueError('The provided file path doesn\'t exist...'.format(file_path))
f = open(file_path, 'r')
rows = f.readlines()
f.close()
return rows
def load_raw_data(self, data_contents=None, cache_dir: str = None, filename: str = None):
data_contents = data_contents if data_contents is not None else self.data_contents
if data_contents is not None:
return data_contents
elif cache_dir is not None and filename is not None:
return self.load_file(cache_dir=cache_dir, filename=filename)
def process_data(self, data_contents=None, cache_dir: str = None, filename: str = None,
str_converted_to_numerical: bool = True, *args, **kwargs):
rows = data_contents if data_contents is not None else self.load_file(cache_dir=cache_dir, filename=filename)
data_dict = {
'profile': {
'name': self.name,
'str_converted_to_numerical': str_converted_to_numerical,
'column_names': [], #[list of column names],
'column_value_types': [], #[list of column value types, e.g., float, int, str, etc.],
'column_codings': [], #[list of column index: {raw_values: coding_values}]
},
'contents': [] # [[vector of instance 1], [vector of instance 2], ... ]
}
headline = rows[0]
column_names, column_value_types = [], []
feature_name_value_type_list = headline.strip('\n').split(',')
for feature_name_value_type in feature_name_value_type_list:
name, value_type = feature_name_value_type.split('_')
column_names.append(name)
column_value_types.append(value_type)
column_codings = [None]*len(column_names)
contents = []
nan_detect = False
nan_columns = {}
for row_index in range(1, len(rows)):
row = rows[row_index]
x = []
features = row.strip('\n').split(',')
for index in range(len(features)):
feature = features[index]
feature_value_type = column_value_types[index]
if feature_value_type in ['int', 'integer', 'Int', 'Integer', 'INT', 'INTEGER']:
try:
feature = int(feature)
except:
nan_detect = True
nan_columns[index] = 1
feature = np.nan
elif feature_value_type in ['float', 'Float', 'double', 'Double', 'FLOAT', 'DOUBLE']:
try:
feature = float(feature)
except:
nan_detect = True
nan_columns[index] = 1
feature = np.nan
elif feature_value_type in ['str', 'string', 'Str', 'String', 'strings', 'Strings', 'STR', 'STRING', 'STRINGS']:
if str_converted_to_numerical:
if column_codings[index] is None:
column_codings[index] = {}
if feature not in column_codings[index]:
column_codings[index][feature] = len(column_codings[index])
feature = column_codings[index][feature]
x.append(feature)
contents.append(x)
data_dict['profile']['feature_names'] = column_names
data_dict['profile']['column_value_types'] = column_value_types
data_dict['profile']['column_codings'] = column_codings
data_dict['profile']['row_number'] = len(contents)
data_dict['profile']['column_number'] = len(column_names)
data_dict['contents'] = contents
if nan_detect:
warnings.warn('The loaded dataset may contain NaN elements, and the columns containing nan are listed as follows', UserWarning)
print(nan_columns.keys())
return data_dict
def load_complete_data(self, data_contents: list | tuple = None, cache_dir: str = None,
filename: str = None, *args, **kwargs):
data_contents = data_contents if data_contents is not None else self.data_contents
raw_data = self.process_data(data_contents=data_contents, cache_dir=cache_dir, filename=filename,
str_converted_to_numerical=True, *args, **kwargs)
contents = np.array(raw_data['contents'])
row_number = raw_data['profile']['row_number']
column_number = raw_data['profile']['column_number']
column_names = raw_data['profile']['feature_names']
column_value_types = raw_data['profile']['column_value_types']
ids = contents[:, 0]
X = torch.Tensor(contents[:, 1:-1])
y = torch.Tensor(contents[:, -1])
return {
'X': X,
'y': y,
'profile': {
'name': raw_data['profile']['name'],
'instance_number': row_number,
'feature_number': column_number - 2,
'feature_codings': raw_data['profile']['column_codings'][1:-1],
'label_codings': raw_data['profile']['column_codings'][-1],
'feature_names': column_names[1:-1],
'label_names': column_names[-1],
'feature_value_types': column_value_types[1:-1],
'label_value_types': column_value_types[-1]
}
}
@staticmethod
def normalize(input: torch.Tensor, normalize_type=None, normalize_range: list | tuple=None):
if normalize_type == 'min_max':
if normalize_range is None:
min_value, max_value = (0, 1)
else:
min_value, max_value = normalize_range
X_std = (input - input.numpy().min(axis=0)) / (input.numpy().max(axis=0) - input.numpy().min(axis=0))
X_scaled = X_std * (max_value - min_value) + min_value
elif normalize_type == 'mean_std':
X_mean = torch.mean(input, dim=0, keepdim=True)
X_std = torch.std(input, dim=0, keepdim=True)
X_scaled = (input - X_mean) / X_std
else:
X_scaled = input
return X_scaled
def split(self, X, y, split_type='train_test_split', train_percentage=0.9, fold=10, random_state=1234, shuffle=True):
train_loader, test_loader = None, None
if split_type == 'train_test_split':
X_train, X_test, y_train, y_test = train_test_split(
X, y,
train_size=int(train_percentage * len(X)),
random_state=random_state, shuffle=shuffle
)
train_dataset = dataset(X_train, torch.unsqueeze(y_train, 1))
test_dataset = dataset(X_test, torch.unsqueeze(y_test, 1))
train_loader = DataLoader(train_dataset, batch_size=self.train_batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=self.test_batch_size, shuffle=False)
elif split_type in ['KFold', 'cross_validation']:
kf = KFold(n_splits=fold, random_state=random_state, shuffle=shuffle)
train_loader, test_loader = {}, {}
for i, (train_index, test_index) in enumerate(kf.split(X)):
X_train, y_train = X[train_index], y[train_index]
X_test, y_test = X[test_index], y[test_index]
train_dataset = dataset(X_train, torch.unsqueeze(y_train, 1))
test_dataset = dataset(X_test, torch.unsqueeze(y_test, 1))
train_loader[i] = DataLoader(train_dataset, batch_size=self.train_batch_size, shuffle=True)
test_loader[i] = DataLoader(test_dataset, batch_size=self.test_batch_size, shuffle=False)
return train_loader, test_loader
def load(self, cache_dir: str = './data/', filename: str = 'classic_dataset_filename',
split_type: str = 'train_test_split', train_percentage: float = 0.9, fold: int = 10,
random_state: int = 123, shuffle: bool = True,
normalize_X: bool = False, normalize_y: bool = False,
normalize_type: str = 'min_max', normalize_range: list | tuple = (0, 1), *args, **kwargs):
complete_data = self.load_complete_data(data_contents=self.data_contents, cache_dir=cache_dir, filename=filename)
X, y = complete_data['X'], complete_data['y']
if normalize_X:
X = self.normalize(input=X, normalize_type=normalize_type, normalize_range=normalize_range)
if normalize_y:
y = self.normalize(input=y, normalize_type=normalize_type, normalize_range=normalize_range)
train_loader, test_loader = self.split(X=X, y=y, split_type=split_type, train_percentage=train_percentage,
fold=fold, random_state=random_state, shuffle=shuffle)
return {
'train_loader': train_loader,
'test_loader': test_loader,
'profile': complete_data['profile']
}