import imgviz from qtpy import QtCore from qtpy import QtGui from qtpy import QtWidgets import labelme.ai import labelme.utils from labelme import QT5 from labelme.logger import logger from labelme.shape import Shape import collections import threading import numpy as np import openvino as ov import os.path as osp import cv2 from labelme.utils import img_qt_to_arr from labelme.utils import load_barcode_dict class CodeSet: NONE = 0 A = 1 B = 2 C = 3 class Normalize: def __init__(self, mean=(0.15525904, 0.15525904, 0.15525904), std=(0.12552188, 0.12552188, 0.12552188)): if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))): raise ValueError("mean and std should be of type list or tuple.") self.mean = np.array(mean, dtype=np.float32) self.std = np.array(std, dtype=np.float32) # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image self.mean = self.mean.reshape((1, 1, 3)) self.std = self.std.reshape((1, 1, 3)) def __call__(self, img): img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1] img = (img - self.mean) / self.std # Normalize return img class BarcodeDecodeModel: def __init__(self, decoding_model_path=None): self.ie = ov.Core() self.pixmap = QtGui.QPixmap() #Load Decoding model if provided self.decoding_net = None self.decoding_sess = None self._characters = load_barcode_dict() if decoding_model_path: self.decoding_net = self.ie.read_model(model=decoding_model_path) self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU") self.decoding_input_shape = (1, 3, 32, 256) self.normalize = Normalize() # Normalization instance self._lock = threading.Lock() self._image_embedding_cache = collections.OrderedDict() self._max_cache_size = 10 self.pixmap = QtGui.QPixmap() # def set_pixmap(self, pixmap: QtGui.QPixmap): # """ # Set the QPixmap object for decoding. # Args: # pixmap (QtGui.QPixmap): The QPixmap object containing the image. # """ # if pixmap is None or pixmap.isNull(): # raise ValueError("Invalid QPixmap provided.") # self.pixmap = pixmap # logger.debug("Pixmap set successfully in BarcodeDecodeModel.") def preprocess_image(self, image): norm = Normalize(mean=(0.44948044,0.44948044,0.44948044), std=(0.22099442,0.22099442,0.22099442)) resized_image = cv2.resize(image, (self.decoding_input_shape[3], self.decoding_input_shape[2])) resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB) resized_image = norm(resized_image) # Resize image for detection model input size logger.debug(f"Preprocessing image for detection: {image.shape}") # resized_image = resized_image.astype('float32') / 255.0 input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension logger.debug(f"Processed image shape: {input_tensor.shape}") return input_tensor def decode_from_points(self, points, detection_idx, original_image): """ Decodes the cropped image based on points and returns the decoded text. Args: points (list): List of points defining the bounding box. pixmap (QPixmap): Original image pixmap to crop from. Returns: str: Decoded text from the decoding model. """ try: # Convert scaled_points to a numpy array polygon = np.array(points, dtype=np.int32) # Create a mask of the same size as the original image # original_image = labelme.utils.img_qt_to_arr(self.pixmap.toImage()) cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image) mask = np.zeros(original_image.shape[:2], dtype=np.uint8) cv2.fillPoly(mask, [polygon], 255) # Fill the polygon with white # Apply the mask to the original image masked_image = cv2.bitwise_and(original_image, original_image, mask=mask) # Get the bounding rectangle of the polygon to crop the ROI x, y, w, h = cv2.boundingRect(polygon) cropped_image_dec = masked_image[y:y+h, x:x+w] cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec) logger.debug(f"cropped_exact image saved at {detection_idx + 1}.") src_points = np.float32(points) # Calculate the width and height of the barcode based on scaled_points width = int(np.linalg.norm(src_points[0] - src_points[1])) # print(width) height = int(np.linalg.norm(src_points[1] - src_points[2])) # print(height) # Correct width/height if needed if width < height: width, height = height, width # Reorder src_points to ensure the transformation aligns the longer side to the width src_points = np.float32([ src_points[1], # Top-left becomes top-right src_points[2], # Top-right becomes bottom-right src_points[3], # Bottom-right becomes bottom-left src_points[0] # Bottom-left becomes top-left ]) # Define destination points for the flattened barcode dst_points = np.float32([ [0, 0], [width - 1, 0], [width - 1, height - 1], [0, height - 1] ]) # Calculate the perspective transformation matrix M = cv2.getPerspectiveTransform(src_points, dst_points) # Apply the perspective transformation aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LINEAR) # Save the aligned barcode image cv2.imwrite(f"decoding_barcode_{detection_idx + 1}.png", aligned_barcode) logger.debug(f"Aligned barcode saved at {detection_idx + 1}.") # Normalize the image to scale pixel intensities to the range [0, 255] normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype) cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX) logger.debug("Image normalized.") # Save the cropped image cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img) logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}") # Run decoding model confidence = None # Run decoding on the original image decoded_text, confidence = self.run_decoding(normalized_img, detection_idx, confidence) # Validate checksum if decoded_text: checksum_valid, validated_result = self.validate_code128_checksum(decoded_text, detection_idx) if checksum_valid: logger.debug(f"Validated result for detection {detection_idx + 1}: {validated_result}") return validated_result # Return validated result else: logger.error(f"Checksum validation failed for detection {detection_idx + 1}. Retrying with 180° rotation.") # Rotate image 180 degrees and retry rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180) decoded_text, confidence = self.run_decoding(rotated_image, detection_idx, confidence) # Validate checksum again if decoded_text: checksum_valid, validated_result = self.validate_code128_checksum(decoded_text, detection_idx) if checksum_valid: logger.debug(f"Validated result after rotation for detection {detection_idx + 1}: {validated_result}") return validated_result else: logger.error(f"Checksum validation failed after rotation for detection {detection_idx + 1}. Error: {validated_result}") return return "Decoding failed" except Exception as e: logger.error(f"Error in decode_from_points: {e}") return "Error: Decoding failed" def run_decoding(self, image_np, detection_idx, confidence): """Helper to run decoding on the given image.""" preprocessed_img = self.preprocess_image( image_np ) decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img}) output_tensor = decode_result['save_infer_model/scale_0.tmp_0'] logger.debug(f"Output tensor shape: {output_tensor.shape}") output_indices = np.argmax(output_tensor, axis=2) output_probs = np.max(output_tensor, axis=2) # Decode text from indices decoded_text, confidence = self.decode_text(output_indices, output_probs, detection_idx) logger.debug(f"Raw barcode: {decoded_text}, Confidence: {confidence:.2f}") return decoded_text, confidence def decode_text(self, text_indices, text_probs, detection_idx): """ Converts model output indices into text using the character dictionary. Args: text_indices (np.ndarray): Output indices from the decoding model. text_probs (np.ndarray): Probabilities corresponding to the indices. Returns: tuple: Decoded text and its confidence score. """ try: max_index = len(self._characters) - 1 logger.debug(f"Loaded barcode dictionary with {len(self._characters)} characters.") result_list = [] for batch_idx in range(text_indices.shape[0]): # Loop through batches char_list = [] conf_list = [] for step_idx in range(text_indices.shape[1]): # Loop through sequence length char_idx = int(text_indices[batch_idx, step_idx]) if char_idx > max_index: logger.warning(f"Index {char_idx} is out of bounds for dictionary size {len(self._characters)}") continue # Skip invalid indices char = self._characters[char_idx] # print("char",char) if char == "": # End token break char_list.append(char) conf_list.append(text_probs[batch_idx, step_idx]) text = ''.join(char_list) confidence = np.mean(conf_list) if conf_list else 0.0 result_list.append((text, confidence)) # Return the first result (assuming batch size of 1 for now) return result_list[0] if result_list else ("", 0.0) except Exception as e: logger.error(f"Error in decode_text: {e}") return "Error: Decoding failed", 0.0 def validate_code128_checksum(self, decoded_text, detection_idx): # Convert characters to their corresponding Code 128 values using the index in _characters # print(self._characters) code128Values = [self._characters.index(char, 1) - 1 if char in self._characters[1:] else -1 for char in decoded_text] logger.debug(f"code128Values:{code128Values}") result = "" err_msg = "" currentCodeSet = CodeSet.B # Default to Code Set B, assuming start code is included in decoded_text if code128Values[0] in [103, 104, 105]: start_codes = {103: CodeSet.A, 104: CodeSet.B, 105: CodeSet.C} currentCodeSet = start_codes[code128Values[0]] # print("currentCodeSet",currentCodeSet) else: err_msg = f"No start code detected, first code is {code128Values[0]}" return False, err_msg checksum_expected = code128Values[-2] # print("Expected checksum:", checksum_expected) # Calculate the checksum using the formula checksum_calculated = code128Values[0] # Start with the start code value for i, value in enumerate(code128Values[1:-2], start=1): # Exclude stop code weighted_value = value * i checksum_calculated += weighted_value # logger.debug(f"Position {i}, Value {value}, Weighted Value {weighted_value}, Running Checksum {checksum_calculated}") checksum_calculated %= 103 logger.debug(f"Final Calculated Checksum (mod 103): {checksum_calculated}") if checksum_calculated != checksum_expected: err_msg = f"Invalid checksum value, supposed to be {checksum_calculated} but got {checksum_expected}" return False, err_msg # Verify the stop code if code128Values[-1] != 106: err_msg = "No valid stop code detected at the end of the sequence." return False, err_msg result = "" i = 1 # Start after the start code while i < len(code128Values) - 2: # Exclude checksum and stop code value = code128Values[i] # Handle special functions and code set shifts if value == 102: # FNC1 for GS1-128 logger.debug(f"Detected FNC1 at position {i}, treated as AI separator.") # result += "|" # Optional: Add a delimiter for AI parsing i += 1 continue elif value == 99: # Switch to Code Set C currentCodeSet = CodeSet.C logger.debug(f"Switched to Code Set C at position {i}") i += 1 continue elif value == 100: # Switch to Code Set B currentCodeSet = CodeSet.B logger.debug(f"Switched to Code Set B at position {i}") i += 1 continue # Decode based on the current Code Set if currentCodeSet == CodeSet.C: result += f"{value:02}" # logger.debug(f"Added Code Set C value {value:02} at position {i}") i += 1 elif currentCodeSet == CodeSet.B: if 0 <= value <= 95: char = self._characters[value + 1] # Map using the single dictionary result += char # logger.debug(f"Added Code Set B char {char} at position {i}") i += 1 else: err_msg = f"Invalid Code Set B value: {value}" logger.error(err_msg) return False, err_msg elif currentCodeSet == CodeSet.A: if 0 <= value <= 95: char = self._characters[value + 1] # Map using the single dictionary result += char # logger.debug(f"Added Code Set A char {char} at position {i}") i += 1 else: err_msg = f"Invalid Code Set A value: {value}" logger.error(err_msg) return False, err_msg # logger.debug(f"Decoded result after processing: {result}") # logger.debug(f"Result indices for {detection_idx + 1}: {result_indices}") return True, result