123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- import imgviz
- from qtpy import QtCore
- from qtpy import QtGui
- from qtpy import QtWidgets
- import labelme.ai
- import labelme.utils
- from labelme import QT5
- from labelme.logger import logger
- from labelme.shape import Shape
- import collections
- import threading
- import numpy as np
- import openvino as ov
- import os.path as osp
- import cv2
- from labelme.utils import img_qt_to_arr
- from labelme.utils import load_barcode_dict
- class CodeSet:
- NONE = 0
- A = 1
- B = 2
- C = 3
- class Normalize:
- def __init__(self, mean=(0.15525904, 0.15525904, 0.15525904), std=(0.12552188, 0.12552188, 0.12552188)):
- if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
- raise ValueError("mean and std should be of type list or tuple.")
- self.mean = np.array(mean, dtype=np.float32)
- self.std = np.array(std, dtype=np.float32)
- # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image
- self.mean = self.mean.reshape((1, 1, 3))
- self.std = self.std.reshape((1, 1, 3))
- def __call__(self, img):
- img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
- img = (img - self.mean) / self.std # Normalize
- return img
- class BarcodeDecodeModel:
- def __init__(self, decoding_model_path=None):
- self.ie = ov.Core()
- self.pixmap = QtGui.QPixmap()
- #Load Decoding model if provided
- self.decoding_net = None
- self.decoding_sess = None
- self._characters = load_barcode_dict()
- if decoding_model_path:
- self.decoding_net = self.ie.read_model(model=decoding_model_path)
- self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU")
- self.decoding_input_shape = (1, 3, 32, 256)
- self.normalize = Normalize() # Normalization instance
- self._lock = threading.Lock()
- self._image_embedding_cache = collections.OrderedDict()
- self._max_cache_size = 10
- self.pixmap = QtGui.QPixmap()
- # def set_pixmap(self, pixmap: QtGui.QPixmap):
- # """
- # Set the QPixmap object for decoding.
- # Args:
- # pixmap (QtGui.QPixmap): The QPixmap object containing the image.
- # """
- # if pixmap is None or pixmap.isNull():
- # raise ValueError("Invalid QPixmap provided.")
- # self.pixmap = pixmap
- # logger.debug("Pixmap set successfully in BarcodeDecodeModel.")
- def preprocess_image(self, image):
- norm = Normalize(mean=(0.44948044,0.44948044,0.44948044), std=(0.22099442,0.22099442,0.22099442))
- resized_image = cv2.resize(image, (self.decoding_input_shape[3], self.decoding_input_shape[2]))
- resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
- resized_image = norm(resized_image)
- # Resize image for detection model input size
- logger.debug(f"Preprocessing image for detection: {image.shape}")
- # resized_image = resized_image.astype('float32') / 255.0
- input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW
- input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension
- logger.debug(f"Processed image shape: {input_tensor.shape}")
- return input_tensor
-
- def decode_from_points(self, points, detection_idx, original_image):
- """
- Decodes the cropped image based on points and returns the decoded text.
- Args:
- points (list): List of points defining the bounding box.
- pixmap (QPixmap): Original image pixmap to crop from.
- Returns:
- str: Decoded text from the decoding model.
- """
- try:
- # Convert scaled_points to a numpy array
- polygon = np.array(points, dtype=np.int32)
- # Create a mask of the same size as the original image
- # original_image = labelme.utils.img_qt_to_arr(self.pixmap.toImage())
- cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image)
- mask = np.zeros(original_image.shape[:2], dtype=np.uint8)
- cv2.fillPoly(mask, [polygon], 255) # Fill the polygon with white
- # Apply the mask to the original image
- masked_image = cv2.bitwise_and(original_image, original_image, mask=mask)
- # Get the bounding rectangle of the polygon to crop the ROI
- x, y, w, h = cv2.boundingRect(polygon)
- cropped_image_dec = masked_image[y:y+h, x:x+w]
- cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec)
- logger.debug(f"cropped_exact image saved at {detection_idx + 1}.")
-
- src_points = np.float32(points)
- # Calculate the width and height of the barcode based on scaled_points
- width = int(np.linalg.norm(src_points[0] - src_points[1]))
- # print(width)
- height = int(np.linalg.norm(src_points[1] - src_points[2]))
- # print(height)
- # Correct width/height if needed
- if width < height:
- width, height = height, width
- # Reorder src_points to ensure the transformation aligns the longer side to the width
- src_points = np.float32([
- src_points[1], # Top-left becomes top-right
- src_points[2], # Top-right becomes bottom-right
- src_points[3], # Bottom-right becomes bottom-left
- src_points[0] # Bottom-left becomes top-left
- ])
- # Define destination points for the flattened barcode
- dst_points = np.float32([
- [0, 0],
- [width - 1, 0],
- [width - 1, height - 1],
- [0, height - 1]
- ])
- # Calculate the perspective transformation matrix
- M = cv2.getPerspectiveTransform(src_points, dst_points)
- # Apply the perspective transformation
- aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LINEAR)
- # Save the aligned barcode image
- cv2.imwrite(f"decoding_barcode_{detection_idx + 1}.png", aligned_barcode)
- logger.debug(f"Aligned barcode saved at {detection_idx + 1}.")
- # Normalize the image to scale pixel intensities to the range [0, 255]
- normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype)
- cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX)
- logger.debug("Image normalized.")
- # Save the cropped image
- cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img)
- logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}")
- # Run decoding model
- confidence = None
- # Run decoding on the original image
- decoded_text, confidence = self.run_decoding(normalized_img, detection_idx, confidence)
- # Validate checksum
- if decoded_text:
- checksum_valid, validated_result = self.validate_code128_checksum(decoded_text, detection_idx)
- if checksum_valid:
- logger.debug(f"Validated result for detection {detection_idx + 1}: {validated_result}")
- return validated_result # Return validated result
- else:
- logger.error(f"Checksum validation failed for detection {detection_idx + 1}. Retrying with 180° rotation.")
- # Rotate image 180 degrees and retry
- rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180)
- decoded_text, confidence = self.run_decoding(rotated_image, detection_idx, confidence)
- # Validate checksum again
- if decoded_text:
- checksum_valid, validated_result = self.validate_code128_checksum(decoded_text, detection_idx)
- if checksum_valid:
- logger.debug(f"Validated result after rotation for detection {detection_idx + 1}: {validated_result}")
- return validated_result
- else:
- logger.error(f"Checksum validation failed after rotation for detection {detection_idx + 1}. Error: {validated_result}")
- return
- return "Decoding failed"
- except Exception as e:
- logger.error(f"Error in decode_from_points: {e}")
- return "Error: Decoding failed"
-
- def run_decoding(self, image_np, detection_idx, confidence):
- """Helper to run decoding on the given image."""
- preprocessed_img = self.preprocess_image(
- image_np
- )
- decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img})
- output_tensor = decode_result['save_infer_model/scale_0.tmp_0']
- logger.debug(f"Output tensor shape: {output_tensor.shape}")
- output_indices = np.argmax(output_tensor, axis=2)
- output_probs = np.max(output_tensor, axis=2)
-
- # Decode text from indices
- decoded_text, confidence = self.decode_text(output_indices, output_probs, detection_idx)
- logger.debug(f"Raw barcode: {decoded_text}, Confidence: {confidence:.2f}")
- return decoded_text, confidence
-
- def decode_text(self, text_indices, text_probs, detection_idx):
- """
- Converts model output indices into text using the character dictionary.
- Args:
- text_indices (np.ndarray): Output indices from the decoding model.
- text_probs (np.ndarray): Probabilities corresponding to the indices.
- Returns:
- tuple: Decoded text and its confidence score.
- """
- try:
- max_index = len(self._characters) - 1
- logger.debug(f"Loaded barcode dictionary with {len(self._characters)} characters.")
- result_list = []
-
- for batch_idx in range(text_indices.shape[0]): # Loop through batches
- char_list = []
- conf_list = []
- for step_idx in range(text_indices.shape[1]): # Loop through sequence length
- char_idx = int(text_indices[batch_idx, step_idx])
- if char_idx > max_index:
- logger.warning(f"Index {char_idx} is out of bounds for dictionary size {len(self._characters)}")
- continue # Skip invalid indices
- char = self._characters[char_idx]
- # print("char",char)
- if char == "</s>": # End token
- break
- char_list.append(char)
- conf_list.append(text_probs[batch_idx, step_idx])
- text = ''.join(char_list)
- confidence = np.mean(conf_list) if conf_list else 0.0
- result_list.append((text, confidence))
- # Return the first result (assuming batch size of 1 for now)
- return result_list[0] if result_list else ("", 0.0)
- except Exception as e:
- logger.error(f"Error in decode_text: {e}")
- return "Error: Decoding failed", 0.0
- def validate_code128_checksum(self, decoded_text, detection_idx):
- # Convert characters to their corresponding Code 128 values using the index in _characters
- # print(self._characters)
- code128Values = [self._characters.index(char, 1) - 1 if char in self._characters[1:] else -1 for char in decoded_text]
- logger.debug(f"code128Values:{code128Values}")
- result = ""
- err_msg = ""
- currentCodeSet = CodeSet.B # Default to Code Set B, assuming start code is included in decoded_text
- if code128Values[0] in [103, 104, 105]:
- start_codes = {103: CodeSet.A, 104: CodeSet.B, 105: CodeSet.C}
- currentCodeSet = start_codes[code128Values[0]]
- # print("currentCodeSet",currentCodeSet)
- else:
- err_msg = f"No start code detected, first code is {code128Values[0]}"
- return False, err_msg
- checksum_expected = code128Values[-2]
- # print("Expected checksum:", checksum_expected)
- # Calculate the checksum using the formula
- checksum_calculated = code128Values[0] # Start with the start code value
- for i, value in enumerate(code128Values[1:-2], start=1): # Exclude stop code
- weighted_value = value * i
- checksum_calculated += weighted_value
- # logger.debug(f"Position {i}, Value {value}, Weighted Value {weighted_value}, Running Checksum {checksum_calculated}")
- checksum_calculated %= 103
- logger.debug(f"Final Calculated Checksum (mod 103): {checksum_calculated}")
- if checksum_calculated != checksum_expected:
- err_msg = f"Invalid checksum value, supposed to be {checksum_calculated} but got {checksum_expected}"
- return False, err_msg
- # Verify the stop code
- if code128Values[-1] != 106:
- err_msg = "No valid stop code detected at the end of the sequence."
- return False, err_msg
-
- result = ""
- i = 1 # Start after the start code
- while i < len(code128Values) - 2: # Exclude checksum and stop code
- value = code128Values[i]
- # Handle special functions and code set shifts
- if value == 102: # FNC1 for GS1-128
- logger.debug(f"Detected FNC1 at position {i}, treated as AI separator.")
- # result += "|" # Optional: Add a delimiter for AI parsing
- i += 1
- continue
- elif value == 99: # Switch to Code Set C
- currentCodeSet = CodeSet.C
- logger.debug(f"Switched to Code Set C at position {i}")
- i += 1
- continue
- elif value == 100: # Switch to Code Set B
- currentCodeSet = CodeSet.B
- logger.debug(f"Switched to Code Set B at position {i}")
- i += 1
- continue
- # Decode based on the current Code Set
- if currentCodeSet == CodeSet.C:
- result += f"{value:02}"
- # logger.debug(f"Added Code Set C value {value:02} at position {i}")
- i += 1
- elif currentCodeSet == CodeSet.B:
- if 0 <= value <= 95:
- char = self._characters[value + 1] # Map using the single dictionary
- result += char
- # logger.debug(f"Added Code Set B char {char} at position {i}")
- i += 1
- else:
- err_msg = f"Invalid Code Set B value: {value}"
- logger.error(err_msg)
- return False, err_msg
- elif currentCodeSet == CodeSet.A:
- if 0 <= value <= 95:
- char = self._characters[value + 1] # Map using the single dictionary
- result += char
- # logger.debug(f"Added Code Set A char {char} at position {i}")
- i += 1
- else:
- err_msg = f"Invalid Code Set A value: {value}"
- logger.error(err_msg)
- return False, err_msg
- # logger.debug(f"Decoded result after processing: {result}")
- # logger.debug(f"Result indices for {detection_idx + 1}: {result_indices}")
- return True, result
|