diff --git a/examples/sinter_example.py b/examples/sinter_example.py index e5513c3..b773033 100644 --- a/examples/sinter_example.py +++ b/examples/sinter_example.py @@ -1,8 +1,8 @@ +import numpy as np import sinter -from ldpc.sinter_decoders import SinterBeliefFindDecoder, SinterBpOsdDecoder import stim +from ldpc.sinter_decoders import SinterBeliefFindDecoder, SinterBpOsdDecoder from matplotlib import pyplot as plt -import numpy as np def generate_example_tasks(): @@ -28,22 +28,31 @@ def generate_example_tasks(): def main(): + from src_python.ldpc.sinter_decoders.sinter_lsd_decoder import SinterLsdDecoder samples = sinter.collect( num_workers=10, max_shots=20_000, max_errors=100, tasks=generate_example_tasks(), - decoders=['bposd', 'belief_find'], - custom_decoders={'bposd': SinterBpOsdDecoder( - max_iter=5, - bp_method="ms", - ms_scaling_factor=0.625, - schedule="parallel", - osd_method="osd0"), - "belief_find": SinterBeliefFindDecoder(max_iter=5, + decoders=['bposd', 'belief_find', 'bplsd'], + custom_decoders={ + 'bposd': SinterBpOsdDecoder( + max_iter=10, + bp_method="ms", + ms_scaling_factor=0.625, + schedule="parallel", + osd_method="osd0"), + "belief_find": SinterBeliefFindDecoder(max_iter=10, bp_method="ms", ms_scaling_factor=0.625, - schedule="parallel")}, + schedule="parallel"), + 'bplsd': SinterLsdDecoder( + max_iter=2, + bp_method="ms", + ms_scaling_factor=0.625, + schedule="parallel", + lsd_order=0), + }, print_progress=True, save_resume_filepath=f'bposd_surface_code.csv', @@ -55,7 +64,7 @@ def main(): print(sample.to_csv_line()) # Render a matplotlib plot of the data. - fig, axis = plt.subplots(1, 2, sharey=True, figsize=(10, 5)) + fig, axis = plt.subplots(1, 3, sharey=True, figsize=(10, 5)) sinter.plot_error_rate( ax=axis[0], stats=samples, @@ -71,9 +80,17 @@ def main(): filter_func=lambda stat: stat.decoder == 'belief_find', x_func=lambda stat: stat.json_metadata['p'], ) + sinter.plot_error_rate( + ax=axis[2], + stats=samples, + group_func=lambda stat: f"Rotated Surface Code d={stat.json_metadata['d']}", + filter_func=lambda stat: stat.decoder == 'lsd', + x_func=lambda stat: stat.json_metadata['p'], + ) axis[0].set_ylabel('Logical Error Rate') axis[0].set_title('BPOSD') axis[1].set_title('Belief Find') + axis[2].set_title('LSD') for ax in axis: ax.loglog() ax.grid() diff --git a/src_python/ldpc/bplsd_decoder/_bplsd_decoder.pyx b/src_python/ldpc/bplsd_decoder/_bplsd_decoder.pyx index d2b14ac..d43eddb 100644 --- a/src_python/ldpc/bplsd_decoder/_bplsd_decoder.pyx +++ b/src_python/ldpc/bplsd_decoder/_bplsd_decoder.pyx @@ -47,7 +47,8 @@ cdef class BpLsdDecoder(BpDecoderBase): def __cinit__(self, pcm: Union[np.ndarray, scipy.sparse.spmatrix], error_rate: Optional[float] = None, error_channel: Optional[List[float]] = None, max_iter: Optional[int] = 0, bp_method: Optional[str] = 'minimum_sum', ms_scaling_factor: Optional[float] = 1.0, schedule: Optional[str] = 'parallel', omp_thread_count: Optional[int] = 1, - random_schedule_seed: Optional[int] = 0, serial_schedule_order: Optional[List[int]] = None, bits_per_step:int = 1, input_vector_type: str = "syndrome", lsd_order: int = 0): + random_schedule_seed: Optional[int] = 0, serial_schedule_order: Optional[List[int]] = None, + bits_per_step:int = 1, input_vector_type: str = "syndrome", lsd_order: int = 0): self.MEMORY_ALLOCATED=False self.lsd = new lsd_decoder_cpp(self.pcm[0]) self.bplsd_decoding.resize(self.n) #C vector for the bf decoding @@ -133,6 +134,4 @@ cdef class BpLsdDecoder(BpDecoderBase): out = np.zeros(clss.size(),dtype=int) for i in range(clss.size()): out[i] = clss[i] - return out - - return self.lsd.cluster_size_stats \ No newline at end of file + return out \ No newline at end of file diff --git a/src_python/ldpc/monte_carlo_simulation/data_utils.py b/src_python/ldpc/monte_carlo_simulation/data_utils.py new file mode 100644 index 0000000..99a4af6 --- /dev/null +++ b/src_python/ldpc/monte_carlo_simulation/data_utils.py @@ -0,0 +1,621 @@ +# from Timo and Lucas' paper https://github.com/cda-tum/mqt-qecc/tree/analog-information-decoding/src/mqt/qecc/analog_information_decoding +from __future__ import annotations + +import itertools +import json +import os +from dataclasses import fields, dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +import numpy as np + + +@dataclass +class BpParams: + """ + Class to store parameters for BP decoding. + """ + + bp_method: str = "msl" + max_bp_iter: int = 30 + osd_order: int = 10 + osd_method: str = "osd_cs" + ms_scaling_factor: float = 0.75 + schedule: str = "parallel" + omp_thread_count: int = 1 + random_serial_schedule: int = 0 + serial_schedule_order: Optional[List[int]] = None + cutoff: float = np.inf + + @classmethod + def from_dict(cls, dict_): + class_fields = {f.name for f in fields(cls)} + return BpParams(**{k: v for k, v in dict_.items() if k in class_fields}) + + +def extract_settings(filename): + """ + Extracts all settings from a parameter file and returns them as a dictionary. + """ + + keyword_lists = {} + + with open(filename, 'r') as file: + for line in file: + json_data = json.loads(line.strip()) + for keyword, value in json_data.items(): + if keyword not in keyword_lists: + keyword_lists[keyword] = [] + if value not in keyword_lists[keyword]: + keyword_lists[keyword].append(value) + + return keyword_lists + + +def load_data( + input_filenames: list[str], +) -> list[dict]: + """ + Loads data from a list of JSON files and returns it as a list of dictionaries. + """ + + data = [] + for file in input_filenames: + path = Path(file) + + try: + ldata = json.load(path.open()) + data.append(ldata) + except: + merge_json_files(path.with_suffix("")) + ldata = json.load(path.open()) + data.append(ldata) + return data + + +def calculate_error_rates(success_cnt, runs, code_params): + """ + Calculates logical error rate, logical error rate error bar, word error rate, + and word error rate error bar. + """ + logical_err_rate = 1.0 - (success_cnt / runs) + logical_err_rate_eb = np.sqrt( + (1 - logical_err_rate) * logical_err_rate / runs + ) + word_error_rate = 1.0 - (1 - logical_err_rate) ** (1 / code_params["k"]) + word_error_rate_eb = ( + logical_err_rate_eb + * ((1 - logical_err_rate_eb) ** (1 / code_params["k"] - 1)) + / code_params["k"] + ) + return ( + logical_err_rate, + logical_err_rate_eb, + word_error_rate, + word_error_rate_eb, + ) + + +def is_converged(x_success, z_success, runs, code_params, precission): + x_cond = _check_convergence( + x_success, runs, code_params, precission_cutoff=precission + ) + z_cond = _check_convergence( + z_success, runs, code_params, precission_cutoff=precission + ) + return x_cond == z_cond == True + + +def _check_convergence(success_cnt, runs, code_params, precission_cutoff): + _, _, ler, ler_eb = calculate_error_rates(success_cnt, runs, code_params) + if ler_eb != 0.0: + if ler_eb / ler < precission_cutoff: + return True + else: + return False + + +def create_outpath( + x_meta: bool = False, + z_meta: bool = False, + bias: List[float] = None, + codename: str = None, + single_stage: bool = True, + sus_th_depth: int = None, + rounds: int = None, + id: int = 0, + overwrite: bool = False, + analog_info: bool = False, + analog_tg: bool = False, + repetitions: int = None, + experiment: str = "wer_per_round", + **kwargs, +) -> str: + path = f"results/{experiment:s}/" + if analog_info: + path += "analog_info/" # ranvendraan et al analog info decoder + elif analog_tg: + path += "analog_tg/" # analog tannergraph bp + else: + path += "hard_syndrome/" # standard hard syndrome decoding only + if bias is not None: + path += f"single_stage={single_stage}/bias={bias[0]}_{bias[1]}_{bias[2]}/" + + if sus_th_depth: + path += f"sus_th_depth={sus_th_depth}/" + elif rounds: + path += f"rounds={rounds}/" + # else: + # raise ValueError( + # "Either sus_th_depth or window_count_per_run must be specified." + # ) + + if repetitions: + path += f"repetitions={repetitions}/" + + if x_meta: + path += "x-meta=true/" + else: + path += "x-meta=false/" + + if z_meta: + path += "z-meta=true/" + else: + path += "z-meta=false/" + + path += f"{codename:s}/" + + if "syndr_err_rate" not in kwargs or kwargs['syndr_err_rate'] is None: + if "sigma" in kwargs: + path += ( + f"per_{kwargs['data_err_rate']:.3e}_sigma_{kwargs['sigma']:.3e}/" + ) + if "z_sigma" in kwargs: + path += ( + f"per_{kwargs['data_err_rate']:.3e}_x_sigma_{kwargs['x_sigma']:.3e}_z_sigma_{kwargs['z_sigma']:.3e}" + ) + else: + path += ( + f"per_{kwargs['data_err_rate']:.3e}_ser_{kwargs['syndr_err_rate']:.3e}/" + ) + + if not os.path.exists(path): + os.makedirs(path, exist_ok=True) + + if overwrite == False: + f_loc = path + f"id_{id}.json" + while os.path.exists(f_loc): + id += 1 + f_loc = path + f"id_{id}.json" + + while not os.path.exists(f_loc): + open(f_loc, "w").close() + + return f_loc + + +def replace_inf(lst: list): + new_lst = [] + for item in lst: + if np.isinf(item): + new_lst.append("i") + else: + new_lst.append(item) + return new_lst + + +def product_dict(**kwargs): + """Generate a iterator of dictionaries where each dictionary is a cartesian product + of the values associated with each key in the input dictionary. + """ + keys = kwargs.keys() + vals = kwargs.values() + for instance in itertools.product(*vals): + yield dict(zip(keys, instance)) + + +def zip_dict(**kwargs): + """Create a iterator of dictionaries where each dictionary contains the zip() of the + values associated with each key in the input dictionary. + """ + return (dict(zip(kwargs.keys(), values)) for values in zip(*kwargs.values())) + + +# def merge_json_files(directory_path): +# # Get a list of all files in the directory +# file_list = os.listdir(directory_path) + +# # Filter the list to only include JSON files +# json_files = [f for f in file_list if f.startswith('per')] + +# # Open the output file for writing +# with open(directory_path + 'merged_results.json', 'w') as outfile: +# # Loop through the JSON files and write their contents to the output file +# for file in json_files: +# with open(os.path.join(directory_path, file), 'r') as infile: +# data = json.load(infile) +# json.dump(data, outfile, ensure_ascii=False, indent=4) +# outfile.write(",") + +# print(f"All JSON files in {directory_path} have been merged into merged_file.json") + + +def _update_error_rates(success_cnt, runs, code_K): + """ + Calculates logical error rate, logical error rate error bar, word error rate, + and word error rate error bar. + """ + logical_err_rate = 1.0 - (success_cnt / runs) + logical_err_rate_eb = np.sqrt( + (1 - logical_err_rate) * logical_err_rate / runs + ) + word_error_rate = 1.0 - (1 - logical_err_rate) ** (1 / code_K) + word_error_rate_eb = ( + logical_err_rate_eb + * ((1 - logical_err_rate_eb) ** (1 / code_K - 1)) + / code_K + ) + return ( + logical_err_rate, + logical_err_rate_eb, + word_error_rate, + word_error_rate_eb, + ) + + +def merge_datasets(datasets: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Merges a list of dictionaries into a single dictionary. The values for the fields "nr_runs", + "x_success_cnt" and "z_success_cnt" are extracted from each dictionary and added together. + + Args: + datasets (List[Dict[str, Any]]): A list of dictionaries to be merged. + + Returns: + Dict[str, Any]: A dictionary containing the merged data. + """ + if not datasets: + return {} + + # Start with a copy of the first dictionary in the list + merged_data = dict(datasets[0]) + + # Extract and add up the values for "nr_runs", "x_success_cnt", and "z_success_cnt" + for data in datasets[1:]: + merged_data["nr_runs"] += data.get("nr_runs", 0) + merged_data["x_success_cnt"] += data.get("x_success_cnt", 0) + merged_data["z_success_cnt"] += data.get("z_success_cnt", 0) + + # Update logical and word error rates based on accumulated data. + runs = merged_data["nr_runs"] + x_success_cnt = merged_data["x_success_cnt"] + z_success_cnt = merged_data["z_success_cnt"] + + x_ler, x_ler_eb, x_wer, x_wer_eb = _update_error_rates( + x_success_cnt, runs, merged_data["code_K"] + ) + z_ler, z_ler_eb, z_wer, z_wer_eb = _update_error_rates( + z_success_cnt, runs, merged_data["code_K"] + ) + + update_dict = { + "x_ler": x_ler, + "x_ler_eb": x_ler_eb, + "x_wer": x_wer, + "x_wer_eb": x_wer_eb, + "x_success_cnt": x_success_cnt, + "z_ler": z_ler, + "z_ler_eb": z_ler_eb, + "z_wer": z_wer, + "z_wer_eb": z_wer_eb, + "z_success_cnt": z_success_cnt, + } + merged_data.update(update_dict) + return merged_data + + +def _merge_datasets_x(_datasets: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Merges a list of dictionaries into a single dictionary. The values for the fields "nr_runs", + "x_success_cnt" and "z_success_cnt" are extracted from each dictionary and added together. + + Args: + datasets (List[Dict[str, Any]]): A list of dictionaries to be merged. + + Returns: + Dict[str, Any]: A dictionary containing the merged data. + """ + + datasets = _datasets.copy() + + if not datasets: + return {} + + # remove datasets that do not contain x_success_cnt + for i, data in enumerate(datasets): + if "x_success_cnt" not in data: + datasets.pop(i) + + # Start with a copy of the first dictionary in the list that contains z_success_cnt + # and remove that dict from the list + for i, data in enumerate(datasets): + if "x_success_cnt" in data: + merged_data = dict(datasets.pop(i)) + break + + # Extract and add up the values for "nr_runs", "x_success_cnt", and "z_success_cnt" + for data in datasets: + try: + merged_data["nr_runs"] += data.get("nr_runs", 0) + merged_data["x_success_cnt"] += data.get("x_success_cnt", 0) + # merged_data["z_success_cnt"] += data.get("z_success_cnt", 0) + except: + pass + + # Update logical and word error rates based on accumulated data. + runs = merged_data["nr_runs"] + x_success_cnt = merged_data["x_success_cnt"] + # z_success_cnt = merged_data["z_success_cnt"] + + x_ler, x_ler_eb, x_wer, x_wer_eb = _update_error_rates( + x_success_cnt, runs, merged_data["code_K"] + ) + + update_dict = { + "x_ler": x_ler, + "x_ler_eb": x_ler_eb, + "x_wer": x_wer, + "x_wer_eb": x_wer_eb, + "x_success_cnt": x_success_cnt, + } + merged_data.update(update_dict) + return merged_data + + +def _merge_datasets_z(_datasets: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Merges a list of dictionaries into a single dictionary. The values for the fields "nr_runs", + "x_success_cnt" and "z_success_cnt" are extracted from each dictionary and added together. + + Args: + datasets (List[Dict[str, Any]]): A list of dictionaries to be merged. + + Returns: + Dict[str, Any]: A dictionary containing the merged data. + """ + + datasets = _datasets.copy() + # print("datasets", datasets) + if not datasets: + return {} + + # remove datasets that do not contain z_success_cnt + for i, data in enumerate(datasets): + if "z_success_cnt" not in data: + datasets.pop(i) + + # print(datasets) + + # Start with a copy of the first dictionary in the list that contains z_success_cnt + # and remove that dict from the list + for i, data in enumerate(datasets): + if "z_success_cnt" in data: + merged_data = dict(datasets.pop(i)) + break + + # merged_data = dict(datasets[0]) + + # Extract and add up the values for "nr_runs", "x_success_cnt", and "z_success_cnt" + for data in datasets: + try: + merged_data["nr_runs"] += data.get("nr_runs", 0) + # merged_data["z_success_cnt"] += data.get("z_success_cnt", 0) + merged_data["z_success_cnt"] += data.get("z_success_cnt", 0) + except: + pass + + # Update logical and word error rates based on accumulated data. + runs = merged_data["nr_runs"] + # x_success_cnt = merged_data["x_success_cnt"] + z_success_cnt = merged_data["z_success_cnt"] + + z_ler, z_ler_eb, z_wer, z_wer_eb = _update_error_rates( + z_success_cnt, runs, merged_data["code_K"] + ) + + update_dict = { + "z_ler": z_ler, + "z_ler_eb": z_ler_eb, + "z_wer": z_wer, + "z_wer_eb": z_wer_eb, + "z_success_cnt": z_success_cnt, + } + merged_data.update(update_dict) + return merged_data + + +from json.decoder import JSONDecodeError + + +def merge_json_files(input_path: str) -> None: + """ + Iterates through all subfolders in the input folder, loads all JSON files in each subfolder + and merges them using the `merge_datasets` function. The resulting dictionaries are + stored in a list and then dumped as a JSON file in the input folder. + + Args: + input_path (str): The path to the input folder. + + Returns: + None + """ + output_data: List[Dict[str, Any]] = [] + for folder_name in os.listdir(input_path): + folder_path = os.path.join(input_path, folder_name) + if os.path.isdir(folder_path): + data: List[Dict[str, Any]] = [] + for filename in os.listdir(folder_path): + if filename.endswith(".json"): + file_path = os.path.join(folder_path, filename) + with open(file_path, "r") as file: + try: + json_data = json.load(file) + data.append(json_data) + except JSONDecodeError: + pass + merged_data = merge_datasets(data) + if merged_data: + output_data.append(merged_data) + + # save output to parent directiory + code_name = os.path.basename(os.path.normpath(input_path)) + parent_dir = os.path.abspath(os.path.join(input_path, os.pardir)) + with open( + os.path.join(parent_dir, f"{code_name:s}.json"), "w" + ) as output_file: + json.dump(output_data, output_file, ensure_ascii=False, indent=4) + + +def merge_json_files_x(input_path: str) -> None: + """ + Iterates through all subfolders in the input folder, loads all JSON files in each subfolder + and merges them using the `merge_datasets` function. The resulting dictionaries are + stored in a list and then dumped as a JSON file in the input folder. + + Args: + input_path (str): The path to the input folder. + + Returns: + None + """ + output_data: List[Dict[str, Any]] = [] + for folder_name in os.listdir(input_path): + folder_path = os.path.join(input_path, folder_name) + if os.path.isdir(folder_path): + data: List[Dict[str, Any]] = [] + for filename in os.listdir(folder_path): + if filename.endswith(".json"): + file_path = os.path.join(folder_path, filename) + with open(file_path, "r") as file: + try: + json_data = json.load(file) + data.append(json_data) + except JSONDecodeError: + pass + merged_data = _merge_datasets_x(data) + if merged_data: + output_data.append(merged_data) + + # save output to parent directiory + code_name = os.path.basename(os.path.normpath(input_path)) + parent_dir = os.path.abspath(os.path.join(input_path, os.pardir)) + with open( + os.path.join(parent_dir, f"{code_name:s}.json"), "w" + ) as output_file: + json.dump(output_data, output_file, ensure_ascii=False, indent=4) + + +def merge_json_files_z(input_path: str) -> None: + """ + Iterates through all subfolders in the input folder, loads all JSON files in each subfolder + and merges them using the `merge_datasets` function. The resulting dictionaries are + stored in a list and then dumped as a JSON file in the input folder. + + Args: + input_path (str): The path to the input folder. + + Returns: + None + """ + output_data: List[Dict[str, Any]] = [] + for folder_name in os.listdir(input_path): + folder_path = os.path.join(input_path, folder_name) + if os.path.isdir(folder_path): + data: List[Dict[str, Any]] = [] + for filename in os.listdir(folder_path): + if filename.endswith(".json"): + file_path = os.path.join(folder_path, filename) + with open(file_path, "r") as file: + try: + json_data = json.load(file) + data.append(json_data) + except JSONDecodeError: + pass + merged_data = _merge_datasets_z(data) + if merged_data: + output_data.append(merged_data) + + # save output to parent directiory + code_name = os.path.basename(os.path.normpath(input_path)) + parent_dir = os.path.abspath(os.path.join(input_path, os.pardir)) + with open( + os.path.join(parent_dir, f"{code_name:s}.json"), "w" + ) as output_file: + json.dump(output_data, output_file, ensure_ascii=False, indent=4) + + +def merge_json_files_xz(input_path: str) -> None: + """ + Iterates through all subfolders in the input folder, loads all JSON files in each subfolder + and merges them using the `merge_datasets` function. The resulting dictionaries are + stored in a list and then dumped as a JSON file in the input folder. + + Args: + input_path (str): The path to the input folder. + + Returns: + None + """ + output_data: List[Dict[str, Any]] = [] + for folder_name in os.listdir(input_path): + folder_path = os.path.join(input_path, folder_name) + if os.path.isdir(folder_path): + data: List[Dict[str, Any]] = [] + for filename in os.listdir(folder_path): + if filename.endswith(".json"): + file_path = os.path.join(folder_path, filename) + with open(file_path, "r") as file: + try: + json_data = json.load(file) + data.append(json_data) + except JSONDecodeError: + pass + # print(folder_path, filename) + # print(data) + merged_data_x = _merge_datasets_x(data) + merged_data_z = _merge_datasets_z(data) + merged_data = _combine_xz_data(merged_data_x, merged_data_z) + if merged_data: + output_data.append(merged_data) + + # save output to parent directiory + code_name = os.path.basename(os.path.normpath(input_path)) + parent_dir = os.path.abspath(os.path.join(input_path, os.pardir)) + with open( + os.path.join(parent_dir, f"{code_name:s}.json"), "w" + ) as output_file: + json.dump(output_data, output_file, ensure_ascii=False, indent=4) + + +def _combine_xz_data(xdata: Union[dict, None], zdata: Union[dict, None]) -> dict: + """ + Combine the x and z data into a single dictionary. + Before doing that, rename "runs" in each dictionary to "x_runs" and "z_runs" respectively. + + """ + + if xdata and zdata: + # print(xdata) + xdata["x_runs"] = xdata.pop("nr_runs") + zdata["z_runs"] = zdata.pop("nr_runs") + xdata.update(zdata) + return xdata + elif xdata: + xdata["x_runs"] = xdata.pop("nr_runs") + return xdata + elif zdata: + zdata["z_runs"] = zdata.pop("nr_runs") + return zdata + else: + return {} diff --git a/src_python/ldpc/monte_carlo_simulation/memory_experiment_v2.py b/src_python/ldpc/monte_carlo_simulation/memory_experiment_v2.py new file mode 100644 index 0000000..0e2e30d --- /dev/null +++ b/src_python/ldpc/monte_carlo_simulation/memory_experiment_v2.py @@ -0,0 +1,171 @@ +# from Timo and Lucas' paper https://github.com/cda-tum/mqt-qecc/tree/analog-information-decoding/src/mqt/qecc/analog_information_decoding +import numpy as np +from pymatching import Matching +from scipy.sparse import csr_matrix, hstack, eye, block_diag + +from simulation_utils import ( + get_virtual_check_init_vals, +) + + +def build_multiround_pcm(pcm, repetitions, format="csr"): + """Builds the multiround parity-check matrix as described in the paper. + + Each row corresponds to a round of measurements, the matrix for r repetitions has the form + H3D := (H_diag | id_r), where H_diag is the r-diagonal matrix containing the parity-check matrix H and + id_diag is a staircase block matrix containing two m-identity matrices in each row and column except the first one. + """ + if not isinstance(pcm, csr_matrix): + pcm = csr_matrix(pcm) + + pcm_rows, pcm_cols = pcm.shape + + # Construct the block of PCMs + H_3DPCM = block_diag([pcm] * (repetitions + 1), format=format) + + # Construct the block of identity matrices + H_3DID_diag = block_diag( + [eye(pcm_rows, format=format)] * (repetitions + 1), format=format) + + # Construct the block of identity matrices + H_3DID_offdiag = eye(pcm_rows * (repetitions + 1), + k=-pcm_rows, format=format) + + # Construct the block of identity matrices + H_3DID = H_3DID_diag + H_3DID_offdiag + + # # hstack the two blocks + H_3D = hstack([H_3DPCM, H_3DID], format=format) + + return H_3D + + +def move_syndrome(syndrome, data_type=np.int32): + """Slides the window one region up, i.e., the syndrome of the first half is overwritten by the second half.""" + T = int(syndrome.shape[1] / 2) # number of rounds in each region + + # zero likes syndrome + new_syndrome = np.zeros(syndrome.shape, dtype=data_type) + + # move the syndromes from the Tth round to the 0th round + new_syndrome[:, :T] = syndrome[:, T:] + return new_syndrome + + +def get_updated_decoder(decoding_method: str, + decoder, + new_channel, + H3D=None): + """ Updates the decoder with the new channel information and returns the updated decoder object.""" + if decoding_method == "bposd" or decoding_method == "lsd": + decoder.update_channel_probs(new_channel) + return decoder + elif decoding_method == "matching": + weights = np.clip( + np.log((1 - new_channel) / new_channel), + a_min=-16777215, + a_max=16777215, + ) + return Matching(H3D, weights=weights) + else: + raise ValueError("Unknown decoding method", decoding_method) + + +def decode_multiround( + syndrome: np.ndarray, + H: np.ndarray, + decoder, + channel_probs: np.ndarray, # needed for matching decoder does not have an update weights method + repetitions: int, + last_round=False, + analog_syndr=None, + check_block_size: int = 0, + sigma: float = 0.0, + H3D: np.ndarray = None, # needed for matching decoder + decoding_method: str = "lsd" # bposd or matching +): + """Overlapping window decoding. + First, we compute the difference syndrome from the recorded syndrome of each measurement round for all measurement + rounds of the current window (consisting of two regions with equal size). + Then, we apply the correction returned from the decoder on the first region (commit region). + We then propagate the syndrome through the whole window (i.e., to the end of the second region). + """ + analog_tg = analog_syndr is not None + # convert syndrome to difference syndrome + diff_syndrome = syndrome.copy() + diff_syndrome[:, 1:] = (syndrome[:, 1:] - syndrome[:, :-1]) % 2 + bp_iter = 0 + + region_size = repetitions // 2 # assumes repetitions is even + + if analog_tg: + # If we have analog information, we use it to initialize the time-like syndrome nodes, which are defined + # in the block of the H3D matrix after the diagonal H block. + analog_init_vals = get_virtual_check_init_vals( + analog_syndr.flatten("F"), sigma + ) + + new_channel = np.hstack( + (channel_probs[:check_block_size], analog_init_vals) + ) + + # in the last round, we have a perfect syndrome round to make sure we're in the codespace + if last_round: + new_channel[-H.shape[0]:] = 1e-15 + + decoder = get_updated_decoder(decoding_method, decoder, new_channel, H3D) + + else: + if last_round: + new_channel = np.copy(channel_probs) + new_channel[-H.shape[0]:] = 1e-15 + + decoder = get_updated_decoder(decoding_method, decoder, new_channel, H3D) + + decoded = decoder.decode(diff_syndrome.flatten("F")) + + if decoding_method == "bposd": + bp_iter = decoder.iter + + # extract space correction, first repetitions * n entires + space_correction = ( + decoded[: H.shape[1] * repetitions] + .reshape((repetitions, H.shape[1])) + .T + ) + # extract time correction + + if last_round == False: + # this corresponds to the decoding on the second block of the H3D matrix + time_correction = ( + decoded[H.shape[1] * repetitions:] + .reshape((repetitions, H.shape[0])) + .T + ) + + # append time correction with zeros + time_correction = np.hstack( + (time_correction, np.zeros((H.shape[0], 1), dtype=np.int32)) + ) + + # correct only in the commit region + decoded = (np.cumsum(space_correction, 1) % 2)[:, region_size - 1] + + # get the syndrome according to the correction + corr_syndrome = (H @ decoded) % 2 + + # propagate the syndrome correction through the tentative region + syndrome[:, region_size:] = ( + (syndrome[:, region_size:] + corr_syndrome[:, None]) % 2 + ).astype(np.int32) + + # apply the time correction of round region_size - 1 to the syndrome at the beginning of the tentative region + syndrome[:, region_size] = ( + (syndrome[:, region_size] + time_correction[:, region_size - 1]) % 2 + ).astype(np.int32) + + else: + # correct in the commit and tentative region as the last round stabilizer is perfect + decoded = (np.cumsum(space_correction, 1) % 2)[:, -1] + + return decoded.astype(np.int32), syndrome, analog_syndr, bp_iter diff --git a/src_python/ldpc/monte_carlo_simulation/phenomenological_noise_sim.py b/src_python/ldpc/monte_carlo_simulation/phenomenological_noise_sim.py new file mode 100644 index 0000000..1335438 --- /dev/null +++ b/src_python/ldpc/monte_carlo_simulation/phenomenological_noise_sim.py @@ -0,0 +1,76 @@ +import matplotlib.pyplot as plt +import numpy as np +import panqec.codes + +from ldpc.monte_carlo_simulation.data_utils import BpParams +from ldpc.monte_carlo_simulation.quasi_single_shot_v2 import QSS_SimulatorV2 + +if __name__ == "__main__": + ps = np.linspace(0.015, 0.035, 6) + fig, axis = plt.subplots(1, 2, sharey=True, figsize=(10, 5)) + nr_samples = 1000 + decoding_rds = 1 + for dist in [3, 5, 7]: + print(f"d={dist}") + lsd_lers = [] + osd_lers = [] + codename = "2DTC" + code = panqec.codes.Toric2DCode(dist) + Hz = code.Hz.toarray().astype(np.int32) + Lz = code.logicals_z[:, Hz.shape[1]:] + for p in ps: + print(f"p={p}") + sim1 = QSS_SimulatorV2( + H=Hz, + L=Lz, + per=p, + ser=p, + bias=[1.0, 0.0, 0.0], + codename=codename, + decoding_method="lsd", + check_side="Z", + analog_tg=False, + rounds=(decoding_rds + 1) * dist, # how often to decode, i.e., how often we slide window-1 + repetitions=2 * dist, # how many noisy syndromes, == window size == 2 * region_size + experiment="test", + bp_params=BpParams(max_bp_iter=5) + ) + sim2 = QSS_SimulatorV2( + H=Hz, + L=Lz, + per=p, + ser=p, + bias=[1.0, 0.0, 0.0], + codename=codename, + decoding_method="matching", + check_side="Z", + analog_tg=False, + rounds=(decoding_rds + 1) * dist, # how often to decode, i.e., how often we slide window-1 + repetitions=2 * dist, # how many noisy syndromes, == window size == 2 * region_size + experiment="test", + bp_params=BpParams(max_bp_iter=50) + ) + out1 = sim1.run(samples=nr_samples) + out2 = sim2.run(samples=nr_samples) + lsd_lers.append(out1["x_ler"]) + osd_lers.append(out2["x_ler"]) + axis[0].plot( + ps, + lsd_lers, + label=f"lsd d={dist}", + marker="o", linestyle="dashed", + ) + axis[1].plot( + ps, + osd_lers, + label=f"bposd d={dist}", + marker="x", + linestyle="solid", + ) + axis[0].legend() + axis[1].legend() + axis[0].set_xlabel("p") + axis[0].set_ylabel("LER") + axis[0].set_yscale("log") + fig.savefig(f"code-{codename}") + fig.show() diff --git a/src_python/ldpc/monte_carlo_simulation/quasi_single_shot_v2.py b/src_python/ldpc/monte_carlo_simulation/quasi_single_shot_v2.py new file mode 100644 index 0000000..3bbe793 --- /dev/null +++ b/src_python/ldpc/monte_carlo_simulation/quasi_single_shot_v2.py @@ -0,0 +1,318 @@ +# from Timo and Lucas' paper https://github.com/cda-tum/mqt-qecc/tree/analog-information-decoding/src/mqt/qecc/analog_information_decoding +from typing import List, Optional + +import numpy as np +from pymatching import Matching + +from data_utils import _check_convergence, create_outpath, BpParams +from ldpc.bplsd_decoder import BpLsdDecoder +from ldpc.bposd_decoder import BpOsdDecoder +from memory_experiment_v2 import ( + build_multiround_pcm, + move_syndrome, decode_multiround, +) +from simulation_utils import ( + is_logical_err, + save_results, + error_channel_setup, + set_seed, + generate_err, + generate_syndr_err, + get_sigma_from_syndr_er, + get_noisy_analog_syndrome, + get_binary_from_analog, +) + + +class QSS_SimulatorV2: + def __init__( + self, + H: np.ndarray, + per: float, + ser: float, + L: np.ndarray, + bias: List[float], + codename: str, + bp_params: Optional[BpParams], + decoding_method: str = "bposd", # bposd or matching + check_side: str = "X", + seed: int = 666, + analog_tg: bool = False, + repetitions: int = 0, + rounds: int = 0, + experiment: str = "qss", + **kwargs, + ) -> None: + """ + + :param H: parity-check matrix of code + :param per: physical data error rate + :param ser: syndrome error rate + :param L: logical matrix + :param bias: bias array + :param codename: name of the code + :param bp_params: BP decoder parameters + :param check_side: side of the check (X or Z) + :param seed: random seed + :param analog_tg: switch analog decoding on/off + :param repetitions: number of total syndrome measurements, i.e., total time steps. Must be even. + :param rounds: number of decoding runs, i.e., number of times we slide the window - 1 + :param experiment: name of experiment, for outpath creation + :param kwargs: + """ + self.H = H + self.data_err_rate = per + self.syndr_err_rate = ser + self.check_side = check_side + self.L = L + self.bias = bias + self.codename = codename + self.bp_params = bp_params + self.decoding_method = decoding_method + self.save_interval = kwargs.get("save_interval", 50) + self.eb_precission = kwargs.get("eb_precission", 1e-2) + self.analog_tg = analog_tg + self.repetitions = repetitions + if repetitions % 2 != 0: + raise ValueError("repetitions must be even") + + if self.decoding_method not in ["bposd", "matching", "lsd"]: + raise ValueError("Decoding method must be either bposd or matching") + + if self.repetitions % 2 != 0: + raise ValueError("Repetitions must be even!") + + self.rounds = rounds + self.experiment = experiment + set_seed(seed) + # load code parameters + self.code_params = eval( + open( + f"/home/luca/Documents/codeRepos/ss-cats/single-shot-cats/codes/lifted-product/3d_ldpc/code_params.txt").read() + ) + self.input_values = self.__dict__.copy() + + self.outfile = create_outpath(**self.input_values) + + # Remove Arrays + del self.input_values["H"] + del self.input_values["L"] + + self.num_checks, self.num_qubits = self.H.shape + + self.x_bit_chnl, self.y_bit_chnl, self.z_bit_chnl = error_channel_setup( + error_rate=self.data_err_rate, + xyz_error_bias=bias, + N=self.num_qubits, + ) + self.x_syndr_err_chnl, self.y_syndr_err_chnl, self.z_syndr_err_chnl = error_channel_setup( + error_rate=self.syndr_err_rate, + xyz_error_bias=bias, + N=self.num_checks, + ) + if self.check_side == "X": + self.err_idx = 1 + # Z bit/syndrome errors + self.data_err_channel = self.y_bit_chnl + self.z_bit_chnl + self.syndr_err_channel = 1.0 * (self.z_syndr_err_chnl + self.y_syndr_err_chnl) + else: + # we have X errors on qubits + self.err_idx = 0 + # X bit/syndrome errors + self.data_err_channel = self.x_bit_chnl + self.y_bit_chnl + self.syndr_err_channel = 1.0 * (self.x_syndr_err_chnl + self.y_syndr_err_chnl) + + # initialize the multiround parity-check matrix as described in the paper + self.H3D = build_multiround_pcm( + self.H, self.repetitions - 1, + ) + + # the number of columns of the diagonal check matrix of the H3D matrix + self.check_block_size = self.num_qubits * (self.repetitions) + + channel_probs = np.zeros(self.H3D.shape[1]) + # The bits corresponding to the columns of the diagonal H-bock of H3D are initialized with the bit channel + channel_probs[: self.check_block_size] = np.array( + self.data_err_channel.tolist() * (self.repetitions) + ) + + # The remaining bits (corresponding to the identity block of H3D) are initialized with the syndrome error channel + channel_probs[self.check_block_size:] = np.array( + self.syndr_err_channel.tolist() * (self.repetitions) + ) + + # If we do ATG decoding, initialize sigma (syndrome noise strength) + if self.analog_tg: + self.sigma = get_sigma_from_syndr_er( + self.syndr_err_channel[0] # x/z + y + ) # assumes all sigmas are the same + else: + self.sigma = None + self.bp_iterations = 0 + if self.decoding_method == "bposd": + self.decoder = BpOsdDecoder( + self.H3D, + error_channel=channel_probs.tolist(), + max_iter=self.bp_params.max_bp_iter, + bp_method="minimum_sum", + osd_order=self.bp_params.osd_order, + osd_method=self.bp_params.osd_method, + ms_scaling_factor=self.bp_params.ms_scaling_factor + ) + elif self.decoding_method == "matching": + weights = np.log((1 - channel_probs) / channel_probs) + self.decoder = Matching(self.H3D, weights=weights) + elif self.decoding_method == 'lsd': + self.decoder = BpLsdDecoder( + self.H3D, + error_channel=channel_probs, + max_iter=5, + bp_method='ms', + ms_scaling_factor=0.6, + schedule=self.bp_params.schedule, + omp_thread_count=self.bp_params.omp_thread_count, + serial_schedule_order=self.bp_params.serial_schedule_order, + lsd_order=0 + ) + self.channel_probs = channel_probs + + def _decode_multiround( + self, + syndrome_mat: np.ndarray, + analog_syndr_mat: np.ndarray, + last_round: bool = False, + ) -> np.ndarray: + return decode_multiround( + syndrome=syndrome_mat, + H=self.H, + decoder=self.decoder, + repetitions=self.repetitions, + last_round=last_round, + analog_syndr=analog_syndr_mat, + check_block_size=self.check_block_size, + sigma=self.sigma, + H3D=self.H3D if self.decoding_method == "matching" else None, # avoid passing matrix in case not needed + channel_probs=self.channel_probs, + decoding_method=self.decoding_method, + ) + + def _single_sample(self) -> int: + # prepare fresh syndrome matrix and error vector + # each column == measurement result of a single timestep + syndrome_mat = np.zeros( + (self.num_checks, self.repetitions), dtype=np.int32 + ) + analog_syndr_mat = None + + if self.analog_tg: + analog_syndr_mat = np.zeros( + (self.num_checks, self.repetitions), dtype=np.float64 + ) + + err = np.zeros(self.num_qubits, dtype=np.int32) + cnt = 0 # counter for syndrome_mat + + for round in range(self.rounds): + residual_err = [np.copy(err), np.copy(err)] + err = generate_err( + N=self.num_qubits, + channel_probs=[ + self.x_bit_chnl, + self.y_bit_chnl, + self.z_bit_chnl, + ], + residual_err=residual_err, + )[self.err_idx] # only first or last vector needed, depending on side (X or Z) + noiseless_syndrome = (self.H @ err) % 2 + + # add syndrome error + if round != (self.rounds - 1): + if self.analog_tg: + analog_syndrome = get_noisy_analog_syndrome( + noiseless_syndrome, self.sigma + ) + syndrome = get_binary_from_analog( + analog_syndrome + ) + else: + syndrome_error = generate_syndr_err(self.syndr_err_channel) + syndrome = (noiseless_syndrome + syndrome_error) % 2 + else: # last round is perfect + syndrome = np.copy(noiseless_syndrome) + analog_syndrome = get_noisy_analog_syndrome( + noiseless_syndrome, 0.0 + ) # no noise + + # fill the corresponding column of the syndrome/analog syndrome matrix + syndrome_mat[:, cnt] += syndrome + if self.analog_tg: + analog_syndr_mat[:, cnt] += analog_syndrome + + cnt += 1 # move to next column of syndrome matrix + + if (cnt == self.repetitions): # if we have filled the syndrome matrix, decode + if round != (self.rounds - 1): # if not last round, decode and move syndrome + cnt = (self.repetitions // 2) # reset counter to start of tentative region + + # the correction is only the correction of the commit region + ( + corr, + syndrome_mat, + analog_syndr_mat, + bp_iters + ) = self._decode_multiround( + syndrome_mat, + analog_syndr_mat, + last_round=False, + ) + # we compute the average for all rounds since this equals a single sample + self.bp_iterations += bp_iters / self.rounds + err = (err + corr) % 2 + syndrome_mat = move_syndrome(syndrome_mat) + if self.analog_tg: + analog_syndr_mat = move_syndrome( + analog_syndr_mat, data_type=np.float64 + ) + + else: # if we are in the last round, decode and stop + # the correction is the correction of the commit and tentative region + ( + corr, + syndrome_mat, + analog_syndr_mat, + bp_iters + ) = self._decode_multiround( + syndrome_mat, + analog_syndr_mat, + last_round=True, + ) + self.bp_iterations += bp_iters / self.rounds + err = (err + corr) % 2 + return int(not is_logical_err(self.L, err)) + + def _save_results(self, success_cnt: int, samples: int) -> dict: + return save_results( + success_cnt=success_cnt, + nr_runs=samples, + p=self.data_err_rate, + s=self.syndr_err_rate, + input_vals=self.input_values, + outfile=self.outfile, + code_params=self.code_params, + err_side="z" if self.check_side == "X" else "x", + bp_iterations=self.bp_iterations, + bp_params=self.bp_params + ) + + def run(self, samples: int = 1): + """Returns single data point""" + success_cnt = 0 + for run in range(1, samples + 1): + success_cnt += self._single_sample() + if run % self.save_interval == 1: + self._save_results(success_cnt, run) + if _check_convergence( + success_cnt, run, self.code_params, self.eb_precission): + print("Converged") + break + return self._save_results(success_cnt, run) diff --git a/src_python/ldpc/monte_carlo_simulation/simulation_utils.py b/src_python/ldpc/monte_carlo_simulation/simulation_utils.py new file mode 100644 index 0000000..cc3462a --- /dev/null +++ b/src_python/ldpc/monte_carlo_simulation/simulation_utils.py @@ -0,0 +1,297 @@ +# from Timo and Lucas' paper https://github.com/cda-tum/mqt-qecc/tree/analog-information-decoding/src/mqt/qecc/analog_information_decoding + +import json +import warnings + +import numpy as np +from numba import njit +from numba.core.errors import ( + NumbaDeprecationWarning, + NumbaPendingDeprecationWarning, +) +from scipy.special import erfcinv, erfc + +from data_utils import replace_inf, calculate_error_rates, BpParams + +warnings.simplefilter("ignore", category=NumbaDeprecationWarning) +warnings.simplefilter("ignore", category=NumbaPendingDeprecationWarning) + + +@njit +def set_seed(value): + """ + The approriate way to set seeds when numba is used. + """ + np.random.seed(value) + + +# @njit +def alist2numpy(fname): # current original implementation is buggy + alist_file = np.loadtxt(fname, delimiter=",", dtype=str) + matrix_dimensions = alist_file[0].split() + m = int(matrix_dimensions[0]) + n = int(matrix_dimensions[1]) + + mat = np.zeros((m, n), dtype=np.int32) + + for i in range(m): + columns = [] + for item in alist_file[i + 4].split(): + if item.isdigit(): + columns.append(item) + columns = np.array(columns, dtype=np.int32) + columns = columns - 1 # convert to zero indexing + mat[i, columns] = 1 + + return mat + + +# Rewrite such that call signatures of check_logical_err_h +# and check_logical_err_l are identical +@njit +def check_logical_err_h(check_matrix, original_err, decoded_estimate): + r, n = check_matrix.shape + + # compute residual err given original err + residual_err = np.zeros((n, 1), dtype=np.int32) + for i in range(n): + residual_err[i][0] = original_err[i] ^ decoded_estimate[i] + + ht = np.transpose(check_matrix) + + htr = np.append(ht, residual_err, axis=1) + + rank_ht = rank(check_matrix) # rank A = rank A.T + + rank_htr = rank(htr) + + if rank_ht < rank_htr: + return True + else: + return False + + +# L is a numpy array, residual_err is vector s.t. dimensions match +# residual_err is a logical iff it commutes with logicals of other side +# i.e., an X residal is a logical iff it commutes with at least one Z logical and +# an Z residual is a logical iff it commutes with at least one Z logical +# Hence, L must be of same type as H and of different type than residual_err +def is_logical_err(L, residual_err): + """checks if the residual error is a logical error + :returns: True if its logical error, False otherwise (is a stabilizer)""" + l_check = (L @ residual_err) % 2 + return l_check.any() # check all zeros + + +# adapted from https://github.com/quantumgizmos/bp_osd/blob/a179e6e86237f4b9cc2c952103fce919da2777c8/src/bposd/css_decode_sim.py#L430 +# and https://github.com/MikeVasmer/single_shot_3D_HGP/blob/master/sim_scripts/single_shot_hgp3d.cpp#L207 +# channel_probs = [x,y,z], residual_err = [x,z] +@njit +def generate_err(N, channel_probs, residual_err): + """ + Computes error vector with X and Z part given channel probabilities and residual error. + Assumes that residual error has two equally sized parts. + """ + error_x = residual_err[0] + error_z = residual_err[1] + channel_probs_x = channel_probs[0] + channel_probs_y = channel_probs[1] + channel_probs_z = channel_probs[2] + residual_err_x = residual_err[0] + residual_err_z = residual_err[1] + + for i in range(N): + rand = np.random.random() # this returns a random float in [0,1) + # e.g. if err channel is p = 0.3, then an error will be applied if rand < p + if ( + rand < channel_probs_z[i] + ): # if probability for z error high enough, rand < p, apply + # if there is a z error on the i-th bit, flip the bit but take residual error into account + # nothing on x part - probably redundant anyways + error_z[i] = (residual_err_z[i] + 1) % 2 + elif ( # if p = 0.3 then 0.3 <= rand < 0.6 is the same sized interval as rand < 0.3 + channel_probs_z[i] + <= rand + < (channel_probs_z[i] + channel_probs_x[i]) + ): + # X error + error_x[i] = (residual_err_x[i] + 1) % 2 + elif ( # 0.6 <= rand < 0.9 + (channel_probs_z[i] + channel_probs_x[i]) + <= rand + < (channel_probs_x[i] + channel_probs_y[i] + channel_probs_z[i]) + ): + # y error == both x and z error + error_z[i] = (residual_err_z[i] + 1) % 2 + error_x[i] = (residual_err_x[i] + 1) % 2 + + return error_x, error_z + + +@njit +def get_analog_llr(analog_syndrome: np.ndarray, sigma: float) -> np.ndarray: + """Computes analog LLRs given analog syndrome and sigma""" + return (2 * analog_syndrome) / (sigma ** 2) + + +def get_sigma_from_syndr_er(ser: float) -> float: + """ + For analog Cat syndrome noise we need to convert the syndrome error model as described in the paper. + :return: sigma + """ + return ( + 1 / np.sqrt(2) / (erfcinv(2 * ser)) + ) # see Eq. cref{eq:perr-to-sigma} in our paper + + +def get_error_rate_from_sigma(sigma: float) -> float: + """ + For analog Cat syndrome noise we need to convert the syndrome error model as described in the paper. + :return: sigma + """ + return 0.5 * erfc( + 1 / np.sqrt(2 * sigma ** 2) + ) # see Eq. cref{eq:perr-to-sigma} in our paper + + +# @njit +def get_virtual_check_init_vals(noisy_syndr, sigma: float): + """ + Computes a vector of values v_i from the noisy syndrome bits y_i s.t. BP initializes the LLRs l_i of the analog nodes with the + analog info values (see paper section). v_i := 1/(e^{y_i}+1) + """ + llrs = get_analog_llr(noisy_syndr, sigma) + return 1 / (np.exp(np.abs(llrs)) + 1) + + +@njit +def generate_syndr_err(channel_probs): + error = np.zeros_like(channel_probs, dtype=np.int32) + + for i, prob in np.ndenumerate(channel_probs): + rand = np.random.random() + + if rand < channel_probs[i]: + error[i] = 1 + + return error + + +# @njit +def get_noisy_analog_syndrome( + perfect_syndr: np.ndarray, sigma: float +) -> np.array: + """Generate noisy analog syndrome vector given the perfect syndrome and standard deviation sigma (~ noise strength) + Assumes perfect_syndr has entries in {0,1}""" + + # compute signed syndrome: 1 = check satisfied, -1 = check violated + sgns = np.where( + perfect_syndr == 0, + np.ones_like(perfect_syndr), + np.full_like(perfect_syndr, -1), + ) + + # sample from Gaussian with zero mean and sigma std. dev: ~N(0, sigma_sq) + res = np.random.normal(sgns, sigma, len(sgns)) + return res + + +@njit +def error_channel_setup(error_rate, xyz_error_bias, N): + """Set up an error_channel given the physical error rate, bias, and number of bits.""" + xyz_error_bias = np.array(xyz_error_bias) + if xyz_error_bias[0] == np.inf: + px = error_rate + py = 0 + pz = 0 + elif xyz_error_bias[1] == np.inf: + px = 0 + py = error_rate + pz = 0 + elif xyz_error_bias[2] == np.inf: + px = 0 + py = 0 + pz = error_rate + else: + px, py, pz = ( + error_rate * xyz_error_bias / np.sum(xyz_error_bias) + ) # Oscar only considers X or Z errors. For reproducability remove normalization + + channel_probs_x = np.ones(N) * px + channel_probs_z = np.ones(N) * pz + channel_probs_y = np.ones(N) * py + + return channel_probs_x, channel_probs_y, channel_probs_z + + +# @njit +def build_single_stage_pcm(H, M) -> np.ndarray: + id_r = np.identity(M.shape[1]) + zeros = np.zeros((M.shape[0], H.shape[1])) + return np.block([[H, id_r], [zeros, M]]).astype(np.int32) + + +@njit +def get_signed_from_binary(binary_syndrome: np.ndarray) -> np.ndarray: + """Maps the binary vector with {0,1} entries to a vector with {-1,1} entries""" + signed_syndr = [] + for j in range(len(binary_syndrome)): + signed_syndr.append(1 if binary_syndrome[j] == 0 else -1) + return np.array(signed_syndr) + + +def get_binary_from_analog(analog_syndrome: np.ndarray) -> np.ndarray: + """Returns the thresholded binary vector. + Since in {-1,+1} notation -1 indicates a check violation, we map values <= 0 to 1 and values > 0 to 0. + """ + return np.where(analog_syndrome <= 0.0, 1, 0).astype(np.int32) + + +def save_results( + success_cnt: int, + nr_runs: int, + p: float, + s: float, + input_vals: dict, + outfile: str, + code_params, + err_side: str = "X", + bp_iterations: int = None, + bp_params: BpParams = None, +) -> dict: + ler, ler_eb, wer, wer_eb = calculate_error_rates( + success_cnt, nr_runs, code_params + ) + + output = { + "code_K": code_params["k"], + "code_N": code_params["n"], + "nr_runs": nr_runs, + "pers": p, + "sers": s, + f"{err_side}_ler": ler, + f"{err_side}_ler_eb": ler_eb, + f"{err_side}_wer": wer, + f"{err_side}_wer_eb": wer_eb, + f"{err_side}_success_cnt": success_cnt, + "avg_bp_iterations": bp_iterations / nr_runs, + "bp_params": bp_params, + } + + output.update(input_vals) + output["bias"] = replace_inf(output["bias"]) + with open(outfile, "w") as f: + json.dump( + output, + f, + ensure_ascii=False, + indent=4, + default=lambda o: o.__dict__, + ) + return output + + +if __name__ == "__main__": + q = 0.1 + sigma = get_sigma_from_syndr_er(q) + assert np.isclose(q, get_error_rate_from_sigma(sigma)) diff --git a/src_python/ldpc/sinter_decoders/__init__.py b/src_python/ldpc/sinter_decoders/__init__.py index 0ffaa92..6b96e5d 100644 --- a/src_python/ldpc/sinter_decoders/__init__.py +++ b/src_python/ldpc/sinter_decoders/__init__.py @@ -1,2 +1,3 @@ from ldpc.sinter_decoders.sinter_bposd_decoder import SinterBpOsdDecoder from ldpc.sinter_decoders.sinter_belief_find_decoder import SinterBeliefFindDecoder +from ldpc.sinter_decoders.sinter_lsd_decoder import SinterLsdDecoder diff --git a/src_python/ldpc/sinter_decoders/sinter_lsd_decoder.py b/src_python/ldpc/sinter_decoders/sinter_lsd_decoder.py new file mode 100644 index 0000000..000c4b8 --- /dev/null +++ b/src_python/ldpc/sinter_decoders/sinter_lsd_decoder.py @@ -0,0 +1,121 @@ +import pathlib + +import numpy as np +import sinter +import stim +from beliefmatching import detector_error_model_to_check_matrices +from ldpc.bplsd_decoder._bplsd_decoder import BpLsdDecoder + + +class SinterLsdDecoder(sinter.Decoder): + """ + Initialize the SinterBPLSDDecoder object. + + Parameters + ---------- + max_iter : Optional[int], optional + The maximum number of iterations for the decoding algorithm, by default 0. + bp_method : Optional[str], optional + The belief propagation method used. Must be one of {'product_sum', 'minimum_sum'}, by default 'minimum_sum'. + ms_scaling_factor : Optional[float], optional + The scaling factor used in the minimum sum method, by default 1.0. + schedule : Optional[str], optional + The scheduling method used. Must be one of {'parallel', 'serial'}, by default 'parallel'. + omp_thread_count : Optional[int], optional + The number of OpenMP threads used for parallel decoding, by default 1. + serial_schedule_order : Optional[List[int]], optional + A list of integers that specify the serial schedule order. Must be of length equal to the block length of the code, by default None. + lsd_order : int, optional + The LSD order, by default 0. + + Notes + ----- + This class provides an interface for configuring and using the SinterBPLSDDecoder for quantum error correction. + """ + + def __init__(self, + max_iter=0, + bp_method="ms", + ms_scaling_factor=0.625, + schedule="parallel", + omp_thread_count=1, + serial_schedule_order=None, + lsd_order=0): + self.max_iter = max_iter + self.bp_method = bp_method + self.ms_scaling_factor = ms_scaling_factor + self.schedule = schedule + self.omp_thread_count = omp_thread_count + self.serial_schedule_order = serial_schedule_order + self.lsd_order = lsd_order + + def decode_via_files( + self, + *, + num_shots: int, + num_dets: int, + num_obs: int, + dem_path: pathlib.Path, + dets_b8_in_path: pathlib.Path, + obs_predictions_b8_out_path: pathlib.Path, + tmp_dir: pathlib.Path, + ) -> None: + """Performs decoding by reading problems from, and writing solutions to, file paths. + Args: + num_shots: The number of times the circuit was sampled. The number of problems + to be solved. + num_dets: The number of detectors in the circuit. The number of detection event + bits in each shot. + num_obs: The number of observables in the circuit. The number of predicted bits + in each shot. + dem_path: The file path where the detector error model should be read from, + e.g. using `stim.DetectorErrorModel.from_file`. The error mechanisms + specified by the detector error model should be used to configure the + decoder. + dets_b8_in_path: The file path that detection event data should be read from. + Note that the file may be a named pipe instead of a fixed size object. + The detection events will be in b8 format (see + https://github.com/quantumlib/Stim/blob/main/doc/result_formats.md ). The + number of detection events per shot is available via the `num_dets` + argument or via the detector error model at `dem_path`. + obs_predictions_b8_out_path: The file path that decoder predictions must be + written to. The predictions must be written in b8 format (see + https://github.com/quantumlib/Stim/blob/main/doc/result_formats.md ). The + number of observables per shot is available via the `num_obs` argument or + via the detector error model at `dem_path`. + tmp_dir: Any temporary files generated by the decoder during its operation MUST + be put into this directory. The reason for this requirement is because + sinter is allowed to kill the decoding process without warning, without + giving it time to clean up any temporary objects. All cleanup should be done + via sinter deleting this directory after killing the decoder. + """ + self.dem = stim.DetectorErrorModel.from_file(dem_path) + self.matrices = detector_error_model_to_check_matrices(self.dem) + self.lsd = BpLsdDecoder(self.matrices.check_matrix, + error_channel=list(self.matrices.priors), + max_iter=self.max_iter, + bp_method=self.bp_method, + ms_scaling_factor=self.ms_scaling_factor, + schedule=self.schedule, + omp_thread_count=self.omp_thread_count, + serial_schedule_order=self.serial_schedule_order, + lsd_order=self.lsd_order) + + shots = stim.read_shot_data_file( + path=dets_b8_in_path, format="b8", num_detectors=num_dets + ) + predictions = np.zeros( + (num_shots, num_obs), dtype=bool) + for i in range(num_shots): + predictions[i, :] = self.decode(shots[i, :]) + + stim.write_shot_data_file( + data=predictions, + path=obs_predictions_b8_out_path, + format="b8", + num_observables=num_obs, + ) + + def decode(self, syndrome: np.ndarray) -> np.ndarray: + corr = self.lsd.decode(syndrome) + return (self.matrices.observables_matrix @ corr) % 2