class time_series_dataloader(dataloader):
def __init__(
self,
data_profile: dict,
x_len: int, y_len: int,
xy_gap: int = 1,
name: str = 'time_series_dataloader',
time_granularity: str = 'daily',
target_attributes: str = 'All',
coverage_year_range: int = 1,
instance_ids: Union[int, str] = None,
train_batch_size: int = 64,
test_batch_size: int = 64,
):
super().__init__(name=name, train_batch_size=train_batch_size, test_batch_size=test_batch_size)
if data_profile is None or data_profile == {}:
raise ValueError('data_profile must be provided')
self.data_profile = data_profile
self.x_len = x_len
self.y_len = y_len
self.xy_gap = xy_gap
self.time_granularity = time_granularity
self.target_attributes = target_attributes
self.coverage_year_range = coverage_year_range
self.instance_ids = instance_ids
def get_data_profile(self):
return self.data_profile
def get_name(self):
return self.name
def get_attribute_list(self):
return self.data_profile['target_attributes']
def get_time_granularity_list(self):
return self.data_profile['time_granularity']
def download_data(self, cache_dir: str, file_name: str, time_granularity: str):
if cache_dir is None or file_name is None or time_granularity is None:
raise ValueError("The cache directory, file name and time_granularity must be specified.")
if 'zipped_files' in self.data_profile and file_name in self.data_profile['zipped_files']:
postfix = '.zip'
else:
postfix = ''
complete_file_path = f'{cache_dir}/{time_granularity}/{file_name}{postfix}'
url = f'{self.data_profile['url']['url_prefix']}/{time_granularity}/{file_name}{postfix}'
create_directory_if_not_exists(complete_file_path=complete_file_path)
download_file_from_github(url_link=url, destination_path=complete_file_path)
if postfix == '.zip':
unzip_file(complete_file_path=complete_file_path)
def load_raw(self, cache_dir: str, file_name: str, time_granularity: str, device: str = 'cpu'):
if cache_dir is None or file_name is None or time_granularity is None:
raise ValueError("The cache directory, file name and time_granularity must be specified.")
if not check_file_existence(f'{cache_dir}/{time_granularity}/{file_name}'):
self.download_data(cache_dir=cache_dir, file_name=file_name, time_granularity=time_granularity)
data = np.loadtxt(f'{cache_dir}/{time_granularity}/{file_name}', delimiter=',', dtype='str')
instance_ids = data[0, 1:]
timestamps = data[1:, 0]
time_series_data = data[1:, 1:].astype(float)
time_series_data = torch.tensor(time_series_data, dtype=torch.float, device=device)
return instance_ids.tolist(), timestamps.tolist(), time_series_data
def partition_data(self, data_batch: torch.Tensor, x_len: int, y_len: int, xy_gap: int):
t, n = data_batch.shape
if t < x_len + y_len + xy_gap:
raise ValueError("The data batch size must be larger than the number of data points.")
X, Y = [], []
for start_idx in range(0, t - x_len - y_len - xy_gap + 1):
x_segment = data_batch[start_idx:start_idx+x_len, :]
y_segment = data_batch[start_idx+x_len+xy_gap:start_idx+x_len+xy_gap+y_len, :]
X.append(x_segment)
Y.append(y_segment)
X = np.array(X)
Y = np.array(Y)
return X, Y
def load(
self,
# directory to load the data
cache_dir: str = None,
# parameters to locate files
time_granularity: str = None,
target_attributes: str = None,
coverage_year_range: int = None,
# data partition parameters
instance_ids: Union[int, str] = None,
train_percentage: float = 0.8,
normalize: bool = True,
normalization_mode: str = 'instance_time',
# other parameters
device: str = 'cpu',
*args, **kwargs
):
cache_dir = f'{cache_dir}/{self.data_profile['name']}' if cache_dir is not None else f'./data/{self.data_profile['name']}'
target_attributes = target_attributes if target_attributes is not None else self.target_attributes
time_granularity = time_granularity if time_granularity is not None else self.time_granularity
target_instance_ids = instance_ids if instance_ids is not None else self.instance_ids
if target_attributes not in self.data_profile['target_attributes']:
raise ValueError(f"The target attribute '{target_attributes}' is not in the data profile attribute list, please choose the target attribute from list {self.data_profile['target_attributes']}...")
if time_granularity not in self.data_profile['time_granularity']:
raise ValueError(f"The time granularity '{time_granularity}' is not in the time granularity list, please choose the time granularity from list {self.data_profile['time_granularity']}...")
if 'coverage_year_range' in self.data_profile:
coverage_year_range = coverage_year_range if coverage_year_range is not None else self.coverage_year_range
if coverage_year_range not in self.data_profile['coverage_year_range']:
raise ValueError(f"coverage_year_range {coverage_year_range} deosn't exist in the dataset... please select from the year range list {self.data_profile['coverage_year_range']}")
file_name = f'{coverage_year_range}_year_{time_granularity}_{target_attributes}.csv'
else:
file_name = f'{time_granularity}_{target_attributes}.csv'
complete_instance_ids, timestamps, time_series_data = self.load_raw(cache_dir=cache_dir, time_granularity=time_granularity, file_name=file_name)
if target_instance_ids is not None:
target_instance_ids = [element for element in target_instance_ids if element in complete_instance_ids]
column_indices = [complete_instance_ids.index(instance_id) for instance_id in target_instance_ids]
if column_indices == []:
raise ValueError(f"none of the instance in the target instance list exists in the dataset... you can leave the instance_ids parameter to be none for loading all instances or select specific instances from the instance ids list {complete_instance_ids}")
time_series_data = time_series_data[:, column_indices]
if normalize:
mode_dict = {
'instance': 'column', 'column': 'column',
'time': 'row', 'row': 'row',
'instance_time': 'row_column', 'time_instance': 'row_column', 'row_column': 'row_column', 'column_row': 'column_row',
'global': 'row_column', 'all': 'row_column', 'both': 'row_column'
}
if normalization_mode not in mode_dict:
raise ValueError(f"normalization_mode {normalization_mode} is not supported, please choose the model from the supported list: {mode_dict.keys()}")
if normalization_mode in ['row', 'column']:
warnings.warn("In the loaded time series data, the row corresponds to the timestamps and the column corresponds to the instances...")
if normalization_mode in ['time', 'row'] and len(target_instance_ids) == 1:
warnings.warn("There exist one single instance loaded, normalization by the time is not supported, normalization is changed to by 'instance' instead")
normalization_mode = 'instance'
time_series_data = mean_std_based_normalize_matrix(mx=time_series_data, mode=mode_dict[normalization_mode])
X, y = self.partition_data(data_batch=time_series_data, x_len=self.x_len, y_len=self.y_len, xy_gap=self.xy_gap)
X_train, X_test, y_train, y_test = train_test_split(
X, y,
train_size=int(train_percentage * len(X)),
shuffle=False
)
X_train = torch.tensor(X_train, device=device).permute(0, 2, 1).reshape(-1, self.x_len)
X_test = torch.tensor(X_test, device=device).permute(0, 2, 1).reshape(-1, self.x_len)
y_train = torch.tensor(y_train, device=device).permute(0, 2, 1).reshape(-1, self.y_len)
y_test = torch.tensor(y_test, device=device).permute(0, 2, 1).reshape(-1, self.y_len)
train_dataset = dataset(X_train, y_train)
test_dataset = dataset(X_test, y_test)
if self.train_batch_size <= 0 or self.train_batch_size == np.infty:
train_loader = DataLoader(train_dataset, batch_size=len(X_train), shuffle=True)
else:
train_loader = DataLoader(train_dataset, batch_size=self.train_batch_size, shuffle=True)
if self.test_batch_size <= 0 or self.test_batch_size == np.infty:
test_loader = DataLoader(test_dataset, batch_size=len(X_test), shuffle=False)
else:
test_loader = DataLoader(test_dataset, batch_size=self.test_batch_size, shuffle=False)
return {'train_loader': train_loader, 'test_loader': test_loader, 'loaded_instance_ids': target_instance_ids}