123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- import imgviz
- from qtpy import QtCore
- from qtpy import QtGui
- from qtpy import QtWidgets
- import labelme.ai
- import labelme.utils
- from labelme import QT5
- from labelme.logger import logger
- from labelme.shape import Shape
- import collections
- import threading
- import numpy as np
- import openvino as ov
- import os.path as osp
- import cv2
- from labelme.utils import img_qt_to_arr
- from labelme.utils import load_barcode_dict
- class CodeSet:
- NONE = 0
- A = 1
- B = 2
- C = 3
- class Normalize:
- def __init__(self, mean=(0.45, 0.45, 0.45), std=(0.24, 0.24, 0.24)):
- if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
- raise ValueError("mean and std should be of type list or tuple.")
- self.mean = np.array(mean, dtype=np.float32)
- self.std = np.array(std, dtype=np.float32)
- # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image
- self.mean = self.mean.reshape((1, 1, 3))
- self.std = self.std.reshape((1, 1, 3))
- def __call__(self, img):
- img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
- img = (img - self.mean) / self.std # Normalize
- return img
- class BarcodeDecodeModel:
- def __init__(self, decoding_model_path=None):
- self.ie = ov.Core()
- self.pixmap = QtGui.QPixmap()
- #Load Decoding model if provided
- self.decoding_net = None
- self.decoding_sess = None
- self._characters = load_barcode_dict()
- if decoding_model_path:
- self.decoding_net = self.ie.read_model(model=decoding_model_path)
- self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU")
- self.decoding_input_shape = (1, 3, 32, 256)
- self.normalize = Normalize() # Normalization instance
- self._lock = threading.Lock()
- self._image_embedding_cache = collections.OrderedDict()
- self._max_cache_size = 10
- self.pixmap = QtGui.QPixmap()
- # def set_pixmap(self, pixmap: QtGui.QPixmap):
- # """
- # Set the QPixmap object for decoding.
- # Args:
- # pixmap (QtGui.QPixmap): The QPixmap object containing the image.
- # """
- # if pixmap is None or pixmap.isNull():
- # raise ValueError("Invalid QPixmap provided.")
- # self.pixmap = pixmap
- # logger.debug("Pixmap set successfully in BarcodeDecodeModel.")
- def preprocess_image(self, image):
- norm = Normalize(mean=(0.45, 0.45, 0.45), std=(0.24, 0.24, 0.24))
- resized_image = cv2.resize(image, (self.decoding_input_shape[3], self.decoding_input_shape[2]))
- resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
- resized_image = norm(resized_image)
- # Resize image for detection model input size
- logger.debug(f"Preprocessing image for detection: {image.shape}")
- # resized_image = resized_image.astype('float32') / 255.0
- input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW
- input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension
- logger.debug(f"Processed image shape: {input_tensor.shape}")
- return input_tensor
-
- def decode_from_points(self, points, detection_idx, original_image):
- """
- Decodes the cropped image based on points and returns the decoded text.
- Args:
- points (list): List of points defining the bounding box.
- pixmap (QPixmap): Original image pixmap to crop from.
- Returns:
- str: Decoded text from the decoding model.
- """
- try:
- # Convert scaled_points to a numpy array
- polygon = np.array(points, dtype=np.int32)
- # Create a mask of the same size as the original image
- # original_image = labelme.utils.img_qt_to_arr(self.pixmap.toImage())
- # cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image)
- mask = np.zeros(original_image.shape[:2], dtype=np.uint8)
- cv2.fillPoly(mask, [polygon], 255) # Fill the polygon with white
- # Apply the mask to the original image
- masked_image = cv2.bitwise_and(original_image, original_image, mask=mask)
- # Get the bounding rectangle of the polygon to crop the ROI
- x, y, w, h = cv2.boundingRect(polygon)
- cropped_image_dec = masked_image[y:y+h, x:x+w]
- # cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec)
- logger.debug(f"cropped_exact image saved at {detection_idx + 1}.")
-
- src_points = np.float32(points)
- # Calculate the width and height of the barcode based on scaled_points
- width = int(np.linalg.norm(src_points[0] - src_points[1]))
- # print(width)
- height = int(np.linalg.norm(src_points[1] - src_points[2]))
- # print(height)
- # Correct width/height if needed
- if width < height:
- width, height = height, width
- # Reorder src_points to ensure the transformation aligns the longer side to the width
- src_points = np.float32([
- src_points[1], # Top-left becomes top-right
- src_points[2], # Top-right becomes bottom-right
- src_points[3], # Bottom-right becomes bottom-left
- src_points[0] # Bottom-left becomes top-left
- ])
- # Define destination points for the flattened barcode
- dst_points = np.float32([
- [0, 0],
- [width - 1, 0],
- [width - 1, height - 1],
- [0, height - 1]
- ])
- # Calculate the perspective transformation matrix
- M = cv2.getPerspectiveTransform(src_points, dst_points)
- # Apply the perspective transformation
- aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LINEAR)
- # Save the aligned barcode image
- # cv2.imwrite(f"decoding_barcode_{detection_idx + 1}.png", aligned_barcode)
- logger.debug(f"Aligned barcode saved at {detection_idx + 1}.")
- # Normalize the image to scale pixel intensities to the range [0, 255]
- normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype)
- cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX)
- logger.debug("Image normalized.")
- # Save the cropped image
- cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img)
- logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}")
- # Run decoding model
- # confidence = None
- # Run decoding on the original image
- is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(normalized_img, detection_idx)
- # print(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}")
-
- if not is_valid:
- logger.warning(f"Decoding failed for detection {detection_idx + 1}: {msg}. Retrying with 180° rotation")
- # Rotate image 180 degrees and retry
- rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180)
- is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(rotated_image, detection_idx)
-
- if is_valid:
- logger.debug(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}")
- return decoded
- else:
- logger.warning(f"Decoding still failed after rotation for detection {detection_idx + 1}: {msg}")
- return ""
- return decoded
- except Exception as e:
- logger.error(f"Error in decode_from_points: {e}")
- return "Error: Decoding failed"
-
- def run_decoding(self, image_np, detection_idx):
- """Helper to run decoding on the given image."""
- preprocessed_img = self.preprocess_image(
- image_np
- )
- decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img})
- output_tensor = decode_result['save_infer_model/scale_0.tmp_0']
- logger.debug(f"Output tensor shape: {output_tensor.shape}")
- output_indices_batch = np.argmax(output_tensor, axis=2)
- output_probs_batch = np.max(output_tensor, axis=2)
- # print(f"output_indices:{output_indices}")
- # Decode text from indices
- def preprocess_output_indices(output_indices_batch, output_probs_batch):
- # Ensure it's a proper 2D numpy array
- if output_indices_batch is None or len(output_indices_batch) == 0:
- return False, "Empty output indices batch", None
- first_row = output_indices_batch[0]
- first_row_probs = output_probs_batch[0]
- if first_row is None or len(first_row) == 0:
- return False, "Empty output indices", None
- sequence = first_row.tolist()
- probs = first_row_probs.tolist()
- # Step 1: Trim at first 0
- if 0 in sequence:
- zero_index = sequence.index(0)
- cropped_indices = sequence[:zero_index]
- cropped_probs = probs[:zero_index]
- else:
- cropped_indices = sequence
- cropped_probs = probs
-
- index_prob_pairs = list(zip(cropped_indices, cropped_probs))
- cropped = [x - 1 for x in cropped_indices]
- # cropped = cropped_indices
- # print(f"cropped: {cropped}")
- # Step 2: Check for invalid trailing 107/108 after 0
- if not any(val in (106, 107) for val in cropped):
- return False, "Invalid: missin stop code (106 or 107) before first 0"
- # Step 3: Truncate at second 108 if two 108s appear (EAN start/stop)
- if cropped.count(108) >= 2:
- print("got here")
- first_108 = cropped.index(108)
- # print(f"first_108:{first_108}")
- second_108 = cropped.index(108, first_108 + 1)
- # print(f"second_108:{second_108}")
- cropped = cropped[:second_108 + 1]
- # print(f"cropped: {cropped}")
- index_prob_pairs = index_prob_pairs[:second_108 + 1]
- # print(f": {index_prob_pairs}")
- # Step 4: Check start code validity
- start_code = cropped[0] if cropped else None
- if start_code not in [103, 104, 105, 107]:
- return False, f"Invalid start code: {start_code}"
- return True, (cropped, index_prob_pairs)
- decoded_text = ""
- status, result_or_error = preprocess_output_indices(output_indices_batch, output_probs_batch)
- # print(f"Raw barcode: {result_or_error}, status: {status}")
-
- if status == False:
- print(f"msg: {result_or_error}")
- decoded = "Decoding failed"
- decoded_text = ""
- msg = result_or_error
- avg_conf = 0.0
- return status, decoded, decoded_text, msg, avg_conf, detection_idx
- else:
- mapped_indices, conf_pairs = result_or_error
- avg_conf = round(np.mean([conf for (_, conf) in conf_pairs]), 2)
- # print(f"✅ Average confidence: {avg_conf:.3f}")
- is_valid, barcode_value, msg, predicted_ean_digits_print = self.validate_checksum(mapped_indices)
- # print(f"barcode_value: {barcode_value}, msg: {msg}, predicted_ean_digits_print: {predicted_ean_digits_print}, is_valid: {is_valid}")
- if not is_valid:
- decoded = "Decoding failed"
- decoded_text = ""
- msg = msg
- avg_conf = 0.0
- is_valid = is_valid
- # logger.warning(f"{is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}")
- return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx
- else:
- if msg == "code128":
- decoded_text = ''.join([self._characters[idx] for idx in barcode_value])
- decoded = self.decode_code128(barcode_value)
- logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}")
- else:
- # print(predicted_ean_digits_print)
- decoded_text = ''.join([self._characters[idx] for idx in predicted_ean_digits_print])
- decoded = ''.join(str(d) for d in barcode_value)
- logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}, detection_idx: {detection_idx}")
- return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx
-
- def decode_code128(self, values):
- # print(f"values:{values}")
- values = values[:] # avoid modifying original
- start_code = values.pop(0)
- checksum = values.pop(-1)
- stop_code = values.pop(-1)
- if start_code == 103:
- current_set = 'A'
- elif start_code == 104:
- current_set = 'B'
- elif start_code == 105:
- current_set = 'C'
- else:
- return "Invalid start code"
- decoded_chars = []
- i = 0
- while i < len(values):
- val = values[i]
- # Handle switch codes
- if val == 99:
- current_set = 'C'
- i += 1
- continue
- elif val == 100:
- current_set = 'B'
- i += 1
- continue
- elif val == 101:
- current_set = 'A'
- i += 1
- continue
- # Decode values
- if current_set == 'C':
- decoded_chars.append(f"{val:02d}")
- else:
- if 0 <= val < len(self._characters):
- decoded_chars.append(self._characters[val])
- else:
- decoded_chars.append('?')
- i += 1
- return ''.join(decoded_chars)
- def validate_checksum(self, raw_values):
- # raw_values = [characters.index(char, 1) - 1 if char in characters[1:] else -1 for char in decoded_text]
- logger.debug(f"raw_values passed to validate_checksum: {raw_values}")
- if raw_values[0] == 107:
- logger.debug("Ean Barcode detected")
- code_map_predicted = {
- 'C1': '107', 'C2': '108', '0A': '109', '1A': '110', '2A': '111', '3A': '112', '4A': '113',
- '5A': '114', '6A': '115', '7A': '116', '8A': '117', '9A': '118', '0B': '119', '1B': '120',
- '2B': '121', '3B': '122', '4B': '123', '5B': '124', '6B': '125', '7B': '126', '8B': '127',
- '9B': '128', '0C': '129', '1C': '130', '2C': '131', '3C': '132', '4C': '133', '5C': '134',
- '6C': '135', '7C': '136', '8C': '137', '9C': '138'
- }
- LEFT_PATTERN = (
- "AAAAAA", "AABABB", "AABBAB", "AABBBA", "ABAABB",
- "ABBAAB", "ABBBAA", "ABABAB", "ABABBA", "ABBABA"
- )
- reverse_code_map = {v: k for k, v in code_map_predicted.items()}
- predicted_digits = [val for val in raw_values[1:-1] if val != 108]
- # print(f"predicted_digits: {predicted_digits}")
- mapped_keys = [reverse_code_map.get(str(val), f"UNK({val})") for val in predicted_digits]
- # print(f"Mapped keys: {mapped_keys}")
- detected_pattern = ''.join(k[-1] if len(k) == 2 else '?' for k in mapped_keys[:6])
- # print(f"Detected_pattern: {detected_pattern}")
- if detected_pattern in LEFT_PATTERN:
- pattern_index = LEFT_PATTERN.index(detected_pattern)
- # print(f"pattern_index: {pattern_index}")
- else:
-
- return False, None, "wrong pattern", None
- predicted_ean_value = ''.join(k[0] if len(k) == 2 and k[0].isdigit() else '?' for k in mapped_keys)
- # print(f"predicted_ean_value:{predicted_ean_value}")
- ean13 = str(pattern_index) + predicted_ean_value[:12]
- # print(f"ean13 base:{ean13}")
- if len(ean13) != 13 or not ean13.isdigit():
- logger.warning(f"Invalid Ean value needs to be 13 digits")
- return False, None, "Invalid EAN-13 base", None
- else:
- def calculate_ean13_checksum(ean13):
- digits = [int(d) for d in ean13]
- even_sum = sum(digits[i] for i in range(0, 12, 2))
- odd_sum = sum(digits[i] for i in range(1, 12, 2))
- total = even_sum + odd_sum * 3
- # print(f"total:{total}")
- return (10 - (total % 10)) % 10
-
- calculated_ean_checksum = calculate_ean13_checksum(ean13)
- # print(f"calculated_ean_checksum:{calculated_ean_checksum}")
- predicted_ean_checksum = predicted_ean_value[-1]
- # print(f"predicted_ean_checksum:{predicted_ean_checksum}")
- if str(predicted_ean_checksum) != str(calculated_ean_checksum):
- logger.warning(f"Invalid ean 13 checksum value, supposed to be {calculated_ean_checksum}")
- return False, None, f"Invalid ean 13 checksum: expected {calculated_ean_checksum}", None
- else:
- ean_list = [int(char) for char in ean13]
- predicted_ean_digits_print = raw_values
- return True, ean_list, "ean13", predicted_ean_digits_print
- else:
- logger.debug("Code128 Barcode detected")
- # dict_converted_to_code128_dict = [x - 1 for x in raw_values]
- # print("dict_converted_to_code128_dict", dict_converted_to_code128_dict)
- code128 = raw_values
- # print("code128code128", code128)
- start_code = code128[0]
-
- predicted_code128_checksum = code128[-2]
- # print("predicted_code128_checksum", predicted_code128_checksum)
- code128_base = code128[1:-2]
- # print(code128_base)
- weighted_sum = sum(val * (i +1) for i, val in enumerate(code128_base))
- calculated_128_checksum =(start_code + weighted_sum) % 103
- logger.debug(f"predicted_code128_checksum: {predicted_code128_checksum}, calculated_128_checksum: {calculated_128_checksum}")
- if predicted_code128_checksum != calculated_128_checksum:
- logger.warning(f"Invalid checksum value, supposed to be {calculated_128_checksum} but model predicted {predicted_code128_checksum}")
- return False, None, "Invalid checksum value", None
-
- return True, code128, "code128", None
|