import imgviz from qtpy import QtCore from qtpy import QtGui from qtpy import QtWidgets import labelme.ai import labelme.utils from labelme import QT5 from labelme.logger import logger from labelme.shape import Shape import collections import threading import numpy as np import openvino as ov import os.path as osp import cv2 from labelme.utils import img_qt_to_arr from labelme.utils import load_barcode_dict class CodeSet: NONE = 0 A = 1 B = 2 C = 3 class Normalize: def __init__(self, mean=(0.45, 0.45, 0.45), std=(0.24, 0.24, 0.24)): if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))): raise ValueError("mean and std should be of type list or tuple.") self.mean = np.array(mean, dtype=np.float32) self.std = np.array(std, dtype=np.float32) # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image self.mean = self.mean.reshape((1, 1, 3)) self.std = self.std.reshape((1, 1, 3)) def __call__(self, img): img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1] img = (img - self.mean) / self.std # Normalize return img class BarcodeDecodeModel: def __init__(self, decoding_model_path=None): self.ie = ov.Core() self.pixmap = QtGui.QPixmap() #Load Decoding model if provided self.decoding_net = None self.decoding_sess = None self._characters = load_barcode_dict() if decoding_model_path: self.decoding_net = self.ie.read_model(model=decoding_model_path) self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU") self.decoding_input_shape = (1, 3, 32, 256) self.normalize = Normalize() # Normalization instance self._lock = threading.Lock() self._image_embedding_cache = collections.OrderedDict() self._max_cache_size = 10 self.pixmap = QtGui.QPixmap() # def set_pixmap(self, pixmap: QtGui.QPixmap): # """ # Set the QPixmap object for decoding. # Args: # pixmap (QtGui.QPixmap): The QPixmap object containing the image. # """ # if pixmap is None or pixmap.isNull(): # raise ValueError("Invalid QPixmap provided.") # self.pixmap = pixmap # logger.debug("Pixmap set successfully in BarcodeDecodeModel.") def preprocess_image(self, image): norm = Normalize(mean=(0.45, 0.45, 0.45), std=(0.24, 0.24, 0.24)) resized_image = cv2.resize(image, (self.decoding_input_shape[3], self.decoding_input_shape[2])) resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB) resized_image = norm(resized_image) # Resize image for detection model input size logger.debug(f"Preprocessing image for detection: {image.shape}") # resized_image = resized_image.astype('float32') / 255.0 input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension logger.debug(f"Processed image shape: {input_tensor.shape}") return input_tensor def decode_from_points(self, points, detection_idx, original_image): """ Decodes the cropped image based on points and returns the decoded text. Args: points (list): List of points defining the bounding box. pixmap (QPixmap): Original image pixmap to crop from. Returns: str: Decoded text from the decoding model. """ try: # Convert scaled_points to a numpy array polygon = np.array(points, dtype=np.int32) # Create a mask of the same size as the original image # original_image = labelme.utils.img_qt_to_arr(self.pixmap.toImage()) # cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image) mask = np.zeros(original_image.shape[:2], dtype=np.uint8) cv2.fillPoly(mask, [polygon], 255) # Fill the polygon with white # Apply the mask to the original image masked_image = cv2.bitwise_and(original_image, original_image, mask=mask) # Get the bounding rectangle of the polygon to crop the ROI x, y, w, h = cv2.boundingRect(polygon) cropped_image_dec = masked_image[y:y+h, x:x+w] # cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec) logger.debug(f"cropped_exact image saved at {detection_idx + 1}.") src_points = np.float32(points) # Calculate the width and height of the barcode based on scaled_points width = int(np.linalg.norm(src_points[0] - src_points[1])) # print(width) height = int(np.linalg.norm(src_points[1] - src_points[2])) # print(height) # Correct width/height if needed if width < height: width, height = height, width # Reorder src_points to ensure the transformation aligns the longer side to the width src_points = np.float32([ src_points[1], # Top-left becomes top-right src_points[2], # Top-right becomes bottom-right src_points[3], # Bottom-right becomes bottom-left src_points[0] # Bottom-left becomes top-left ]) # Define destination points for the flattened barcode dst_points = np.float32([ [0, 0], [width - 1, 0], [width - 1, height - 1], [0, height - 1] ]) # Calculate the perspective transformation matrix M = cv2.getPerspectiveTransform(src_points, dst_points) # Apply the perspective transformation aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LINEAR) # Save the aligned barcode image # cv2.imwrite(f"decoding_barcode_{detection_idx + 1}.png", aligned_barcode) logger.debug(f"Aligned barcode saved at {detection_idx + 1}.") # Normalize the image to scale pixel intensities to the range [0, 255] normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype) cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX) logger.debug("Image normalized.") # Save the cropped image cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img) logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}") # Run decoding model # confidence = None # Run decoding on the original image is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(normalized_img, detection_idx) # print(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}") if not is_valid: logger.warning(f"Decoding failed for detection {detection_idx + 1}: {msg}. Retrying with 180° rotation") # Rotate image 180 degrees and retry rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180) is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(rotated_image, detection_idx) if is_valid: logger.debug(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}") return decoded else: logger.warning(f"Decoding still failed after rotation for detection {detection_idx + 1}: {msg}") return "" return decoded except Exception as e: logger.error(f"Error in decode_from_points: {e}") return "Error: Decoding failed" def run_decoding(self, image_np, detection_idx): """Helper to run decoding on the given image.""" preprocessed_img = self.preprocess_image( image_np ) decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img}) output_tensor = decode_result['save_infer_model/scale_0.tmp_0'] logger.debug(f"Output tensor shape: {output_tensor.shape}") output_indices_batch = np.argmax(output_tensor, axis=2) output_probs_batch = np.max(output_tensor, axis=2) # print(f"output_indices:{output_indices}") # Decode text from indices def preprocess_output_indices(output_indices_batch, output_probs_batch): # Ensure it's a proper 2D numpy array if output_indices_batch is None or len(output_indices_batch) == 0: return False, "Empty output indices batch", None first_row = output_indices_batch[0] first_row_probs = output_probs_batch[0] if first_row is None or len(first_row) == 0: return False, "Empty output indices", None sequence = first_row.tolist() probs = first_row_probs.tolist() # Step 1: Trim at first 0 if 0 in sequence: zero_index = sequence.index(0) cropped_indices = sequence[:zero_index] cropped_probs = probs[:zero_index] else: cropped_indices = sequence cropped_probs = probs index_prob_pairs = list(zip(cropped_indices, cropped_probs)) cropped = [x - 1 for x in cropped_indices] # cropped = cropped_indices # print(f"cropped: {cropped}") # Step 2: Check for invalid trailing 107/108 after 0 if not any(val in (106, 107) for val in cropped): return False, "Invalid: missin stop code (106 or 107) before first 0" # Step 3: Truncate at second 108 if two 108s appear (EAN start/stop) if cropped.count(108) >= 2: print("got here") first_108 = cropped.index(108) # print(f"first_108:{first_108}") second_108 = cropped.index(108, first_108 + 1) # print(f"second_108:{second_108}") cropped = cropped[:second_108 + 1] # print(f"cropped: {cropped}") index_prob_pairs = index_prob_pairs[:second_108 + 1] # print(f": {index_prob_pairs}") # Step 4: Check start code validity start_code = cropped[0] if cropped else None if start_code not in [103, 104, 105, 107]: return False, f"Invalid start code: {start_code}" return True, (cropped, index_prob_pairs) decoded_text = "" status, result_or_error = preprocess_output_indices(output_indices_batch, output_probs_batch) # print(f"Raw barcode: {result_or_error}, status: {status}") if status == False: print(f"msg: {result_or_error}") decoded = "Decoding failed" decoded_text = "" msg = result_or_error avg_conf = 0.0 return status, decoded, decoded_text, msg, avg_conf, detection_idx else: mapped_indices, conf_pairs = result_or_error avg_conf = round(np.mean([conf for (_, conf) in conf_pairs]), 2) # print(f"✅ Average confidence: {avg_conf:.3f}") is_valid, barcode_value, msg, predicted_ean_digits_print = self.validate_checksum(mapped_indices) # print(f"barcode_value: {barcode_value}, msg: {msg}, predicted_ean_digits_print: {predicted_ean_digits_print}, is_valid: {is_valid}") if not is_valid: decoded = "Decoding failed" decoded_text = "" msg = msg avg_conf = 0.0 is_valid = is_valid # logger.warning(f"{is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}") return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx else: if msg == "code128": decoded_text = ''.join([self._characters[idx] for idx in barcode_value]) decoded = self.decode_code128(barcode_value) logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}") else: # print(predicted_ean_digits_print) decoded_text = ''.join([self._characters[idx] for idx in predicted_ean_digits_print]) decoded = ''.join(str(d) for d in barcode_value) logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}, detection_idx: {detection_idx}") return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx def decode_code128(self, values): # print(f"values:{values}") values = values[:] # avoid modifying original start_code = values.pop(0) checksum = values.pop(-1) stop_code = values.pop(-1) if start_code == 103: current_set = 'A' elif start_code == 104: current_set = 'B' elif start_code == 105: current_set = 'C' else: return "Invalid start code" decoded_chars = [] i = 0 while i < len(values): val = values[i] # Handle switch codes if val == 99: current_set = 'C' i += 1 continue elif val == 100: current_set = 'B' i += 1 continue elif val == 101: current_set = 'A' i += 1 continue # Decode values if current_set == 'C': decoded_chars.append(f"{val:02d}") else: if 0 <= val < len(self._characters): decoded_chars.append(self._characters[val]) else: decoded_chars.append('?') i += 1 return ''.join(decoded_chars) def validate_checksum(self, raw_values): # raw_values = [characters.index(char, 1) - 1 if char in characters[1:] else -1 for char in decoded_text] logger.debug(f"raw_values passed to validate_checksum: {raw_values}") if raw_values[0] == 107: logger.debug("Ean Barcode detected") code_map_predicted = { 'C1': '107', 'C2': '108', '0A': '109', '1A': '110', '2A': '111', '3A': '112', '4A': '113', '5A': '114', '6A': '115', '7A': '116', '8A': '117', '9A': '118', '0B': '119', '1B': '120', '2B': '121', '3B': '122', '4B': '123', '5B': '124', '6B': '125', '7B': '126', '8B': '127', '9B': '128', '0C': '129', '1C': '130', '2C': '131', '3C': '132', '4C': '133', '5C': '134', '6C': '135', '7C': '136', '8C': '137', '9C': '138' } LEFT_PATTERN = ( "AAAAAA", "AABABB", "AABBAB", "AABBBA", "ABAABB", "ABBAAB", "ABBBAA", "ABABAB", "ABABBA", "ABBABA" ) reverse_code_map = {v: k for k, v in code_map_predicted.items()} predicted_digits = [val for val in raw_values[1:-1] if val != 108] # print(f"predicted_digits: {predicted_digits}") mapped_keys = [reverse_code_map.get(str(val), f"UNK({val})") for val in predicted_digits] # print(f"Mapped keys: {mapped_keys}") detected_pattern = ''.join(k[-1] if len(k) == 2 else '?' for k in mapped_keys[:6]) # print(f"Detected_pattern: {detected_pattern}") if detected_pattern in LEFT_PATTERN: pattern_index = LEFT_PATTERN.index(detected_pattern) # print(f"pattern_index: {pattern_index}") else: return False, None, "wrong pattern", None predicted_ean_value = ''.join(k[0] if len(k) == 2 and k[0].isdigit() else '?' for k in mapped_keys) # print(f"predicted_ean_value:{predicted_ean_value}") ean13 = str(pattern_index) + predicted_ean_value[:12] # print(f"ean13 base:{ean13}") if len(ean13) != 13 or not ean13.isdigit(): logger.warning(f"Invalid Ean value needs to be 13 digits") return False, None, "Invalid EAN-13 base", None else: def calculate_ean13_checksum(ean13): digits = [int(d) for d in ean13] even_sum = sum(digits[i] for i in range(0, 12, 2)) odd_sum = sum(digits[i] for i in range(1, 12, 2)) total = even_sum + odd_sum * 3 # print(f"total:{total}") return (10 - (total % 10)) % 10 calculated_ean_checksum = calculate_ean13_checksum(ean13) # print(f"calculated_ean_checksum:{calculated_ean_checksum}") predicted_ean_checksum = predicted_ean_value[-1] # print(f"predicted_ean_checksum:{predicted_ean_checksum}") if str(predicted_ean_checksum) != str(calculated_ean_checksum): logger.warning(f"Invalid ean 13 checksum value, supposed to be {calculated_ean_checksum}") return False, None, f"Invalid ean 13 checksum: expected {calculated_ean_checksum}", None else: ean_list = [int(char) for char in ean13] predicted_ean_digits_print = raw_values return True, ean_list, "ean13", predicted_ean_digits_print else: logger.debug("Code128 Barcode detected") # dict_converted_to_code128_dict = [x - 1 for x in raw_values] # print("dict_converted_to_code128_dict", dict_converted_to_code128_dict) code128 = raw_values # print("code128code128", code128) start_code = code128[0] predicted_code128_checksum = code128[-2] # print("predicted_code128_checksum", predicted_code128_checksum) code128_base = code128[1:-2] # print(code128_base) weighted_sum = sum(val * (i +1) for i, val in enumerate(code128_base)) calculated_128_checksum =(start_code + weighted_sum) % 103 logger.debug(f"predicted_code128_checksum: {predicted_code128_checksum}, calculated_128_checksum: {calculated_128_checksum}") if predicted_code128_checksum != calculated_128_checksum: logger.warning(f"Invalid checksum value, supposed to be {calculated_128_checksum} but model predicted {predicted_code128_checksum}") return False, None, "Invalid checksum value", None return True, code128, "code128", None