Source code for mbtrack2.tracking.monitors.tools

# -*- coding: utf-8 -*-
"""
This module defines utilities functions, helping to deals with tracking output 
and hdf5 files.
"""

import os

import h5py as hp
import numpy as np


[docs] def merge_files(files_prefix: str, files_number: int, start_idx: int = 0, file_name: str | None = None): """ Merge several hdf5 files into one. The function assumes that the files to merge have names in the follwing format: - "files_prefix_0.hdf5" - "files_prefix_1.hdf5" ... - "files_prefix_files_number.hdf5" Parameters ---------- files_prefix : str Name of the files to merge. files_number : int Number of files to merge. start_idx : int, optional Start index of the hdf5 files. file_name : str, optional Name of the file with the merged data. If None, files_prefix without number is used. """ if file_name == None: file_name = files_prefix if os.path.exists(file_name): return None with hp.File(file_name + ".hdf5", "a") as f: ## Create file architecture with hp.File(files_prefix + "_" + str(start_idx) + ".hdf5", "r") as f0: for group in list(f0): f.require_group(group) for dataset_name in list(f0[group]): if dataset_name == "freq": f0[group].copy(dataset_name, f[group]) continue shape = f0[group][dataset_name].shape dtype = f0[group][dataset_name].dtype shape_needed = list(shape) shape_needed[-1] = shape_needed[-1] * files_number shape_needed = tuple(shape_needed) f[group].create_dataset(dataset_name, shape_needed, dtype) ## Copy data for i, file_num in enumerate(range(start_idx, start_idx + files_number)): with hp.File(files_prefix + "_" + str(file_num) + ".hdf5", "r") as fi: for group in list(fi): for dataset_name in list(fi[group]): shape = fi[group][dataset_name].shape n_slice = int(len(shape) - 1) length = shape[-1] slice_list = [] for n in range(n_slice): slice_list.append(slice(None)) slice_list.append(slice(length * i, length * (i+1))) if (dataset_name == "freq"): continue if (dataset_name == "time") and (file_num != start_idx): f[group][dataset_name][tuple(slice_list)] = np.max( f[group][dataset_name] [:]) + fi[group][dataset_name] else: f[group][dataset_name][tuple( slice_list)] = fi[group][dataset_name]
[docs] def copy_files(source: str, copy: str, version: str = None): """ Copy a source hdf5 file into another hdf5 file using a different HDF5 version. The function assumes that the source file has only a single group layer. Parameters ---------- source : str Name of the source file. copy : str Name of the copy file. version : str, optional Version number of the copy file. """ if version == None: version = 'v108' with hp.File(source + ".hdf5", "r") as f: with hp.File(copy + ".hdf5", "a", libver=('earliest', version)) as h: ## Copy file for group in list(f): h.require_group(group) for dataset_name in list(f[group]): h[group][dataset_name] = f[group][dataset_name][()]