barcode_decode.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. import imgviz
  2. from qtpy import QtCore
  3. from qtpy import QtGui
  4. from qtpy import QtWidgets
  5. import labelme.ai
  6. import labelme.utils
  7. from labelme import QT5
  8. from labelme.logger import logger
  9. from labelme.shape import Shape
  10. import collections
  11. import threading
  12. import numpy as np
  13. import openvino as ov
  14. import os.path as osp
  15. import cv2
  16. from labelme.utils import img_qt_to_arr
  17. from labelme.utils import load_barcode_dict
  18. class CodeSet:
  19. NONE = 0
  20. A = 1
  21. B = 2
  22. C = 3
  23. class Normalize:
  24. def __init__(self, mean=(0.45, 0.45, 0.45), std=(0.24, 0.24, 0.24)):
  25. if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
  26. raise ValueError("mean and std should be of type list or tuple.")
  27. self.mean = np.array(mean, dtype=np.float32)
  28. self.std = np.array(std, dtype=np.float32)
  29. # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image
  30. self.mean = self.mean.reshape((1, 1, 3))
  31. self.std = self.std.reshape((1, 1, 3))
  32. def __call__(self, img):
  33. img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
  34. img = (img - self.mean) / self.std # Normalize
  35. return img
  36. class BarcodeDecodeModel:
  37. def __init__(self, decoding_model_path=None):
  38. self.ie = ov.Core()
  39. self.pixmap = QtGui.QPixmap()
  40. #Load Decoding model if provided
  41. self.decoding_net = None
  42. self.decoding_sess = None
  43. self._characters = load_barcode_dict()
  44. if decoding_model_path:
  45. self.decoding_net = self.ie.read_model(model=decoding_model_path)
  46. self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU")
  47. self.decoding_input_shape = (1, 3, 32, 256)
  48. self.normalize = Normalize() # Normalization instance
  49. self._lock = threading.Lock()
  50. self._image_embedding_cache = collections.OrderedDict()
  51. self._max_cache_size = 10
  52. self.pixmap = QtGui.QPixmap()
  53. # def set_pixmap(self, pixmap: QtGui.QPixmap):
  54. # """
  55. # Set the QPixmap object for decoding.
  56. # Args:
  57. # pixmap (QtGui.QPixmap): The QPixmap object containing the image.
  58. # """
  59. # if pixmap is None or pixmap.isNull():
  60. # raise ValueError("Invalid QPixmap provided.")
  61. # self.pixmap = pixmap
  62. # logger.debug("Pixmap set successfully in BarcodeDecodeModel.")
  63. def preprocess_image(self, image):
  64. norm = Normalize(mean=(0.45, 0.45, 0.45), std=(0.24, 0.24, 0.24))
  65. resized_image = cv2.resize(image, (self.decoding_input_shape[3], self.decoding_input_shape[2]))
  66. resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
  67. resized_image = norm(resized_image)
  68. # Resize image for detection model input size
  69. logger.debug(f"Preprocessing image for detection: {image.shape}")
  70. # resized_image = resized_image.astype('float32') / 255.0
  71. input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW
  72. input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension
  73. logger.debug(f"Processed image shape: {input_tensor.shape}")
  74. return input_tensor
  75. def decode_from_points(self, points, detection_idx, original_image):
  76. """
  77. Decodes the cropped image based on points and returns the decoded text.
  78. Args:
  79. points (list): List of points defining the bounding box.
  80. pixmap (QPixmap): Original image pixmap to crop from.
  81. Returns:
  82. str: Decoded text from the decoding model.
  83. """
  84. try:
  85. # Convert scaled_points to a numpy array
  86. polygon = np.array(points, dtype=np.int32)
  87. # Create a mask of the same size as the original image
  88. # original_image = labelme.utils.img_qt_to_arr(self.pixmap.toImage())
  89. # cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image)
  90. mask = np.zeros(original_image.shape[:2], dtype=np.uint8)
  91. cv2.fillPoly(mask, [polygon], 255) # Fill the polygon with white
  92. # Apply the mask to the original image
  93. masked_image = cv2.bitwise_and(original_image, original_image, mask=mask)
  94. # Get the bounding rectangle of the polygon to crop the ROI
  95. x, y, w, h = cv2.boundingRect(polygon)
  96. cropped_image_dec = masked_image[y:y+h, x:x+w]
  97. # cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec)
  98. logger.debug(f"cropped_exact image saved at {detection_idx + 1}.")
  99. src_points = np.float32(points)
  100. # Calculate the width and height of the barcode based on scaled_points
  101. width = int(np.linalg.norm(src_points[0] - src_points[1]))
  102. # print(width)
  103. height = int(np.linalg.norm(src_points[1] - src_points[2]))
  104. # print(height)
  105. # Correct width/height if needed
  106. if width < height:
  107. width, height = height, width
  108. # Reorder src_points to ensure the transformation aligns the longer side to the width
  109. src_points = np.float32([
  110. src_points[1], # Top-left becomes top-right
  111. src_points[2], # Top-right becomes bottom-right
  112. src_points[3], # Bottom-right becomes bottom-left
  113. src_points[0] # Bottom-left becomes top-left
  114. ])
  115. # Define destination points for the flattened barcode
  116. dst_points = np.float32([
  117. [0, 0],
  118. [width - 1, 0],
  119. [width - 1, height - 1],
  120. [0, height - 1]
  121. ])
  122. # Calculate the perspective transformation matrix
  123. M = cv2.getPerspectiveTransform(src_points, dst_points)
  124. # Apply the perspective transformation
  125. aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LINEAR)
  126. # Save the aligned barcode image
  127. # cv2.imwrite(f"decoding_barcode_{detection_idx + 1}.png", aligned_barcode)
  128. logger.debug(f"Aligned barcode saved at {detection_idx + 1}.")
  129. # Normalize the image to scale pixel intensities to the range [0, 255]
  130. normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype)
  131. cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX)
  132. logger.debug("Image normalized.")
  133. # Save the cropped image
  134. cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img)
  135. logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}")
  136. # Run decoding model
  137. # confidence = None
  138. # Run decoding on the original image
  139. is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(normalized_img, detection_idx)
  140. # print(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}")
  141. if not is_valid:
  142. logger.warning(f"Decoding failed for detection {detection_idx + 1}: {msg}. Retrying with 180° rotation")
  143. # Rotate image 180 degrees and retry
  144. rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180)
  145. is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(rotated_image, detection_idx)
  146. if is_valid:
  147. logger.debug(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}")
  148. return decoded
  149. else:
  150. logger.warning(f"Decoding still failed after rotation for detection {detection_idx + 1}: {msg}")
  151. return ""
  152. return decoded
  153. except Exception as e:
  154. logger.error(f"Error in decode_from_points: {e}")
  155. return "Error: Decoding failed"
  156. def run_decoding(self, image_np, detection_idx):
  157. """Helper to run decoding on the given image."""
  158. preprocessed_img = self.preprocess_image(
  159. image_np
  160. )
  161. decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img})
  162. output_tensor = decode_result['save_infer_model/scale_0.tmp_0']
  163. logger.debug(f"Output tensor shape: {output_tensor.shape}")
  164. output_indices_batch = np.argmax(output_tensor, axis=2)
  165. output_probs_batch = np.max(output_tensor, axis=2)
  166. # print(f"output_indices:{output_indices}")
  167. # Decode text from indices
  168. def preprocess_output_indices(output_indices_batch, output_probs_batch):
  169. # Ensure it's a proper 2D numpy array
  170. if output_indices_batch is None or len(output_indices_batch) == 0:
  171. return False, "Empty output indices batch", None
  172. first_row = output_indices_batch[0]
  173. first_row_probs = output_probs_batch[0]
  174. if first_row is None or len(first_row) == 0:
  175. return False, "Empty output indices", None
  176. sequence = first_row.tolist()
  177. probs = first_row_probs.tolist()
  178. # Step 1: Trim at first 0
  179. if 0 in sequence:
  180. zero_index = sequence.index(0)
  181. cropped_indices = sequence[:zero_index]
  182. cropped_probs = probs[:zero_index]
  183. else:
  184. cropped_indices = sequence
  185. cropped_probs = probs
  186. index_prob_pairs = list(zip(cropped_indices, cropped_probs))
  187. cropped = [x - 1 for x in cropped_indices]
  188. # cropped = cropped_indices
  189. # print(f"cropped: {cropped}")
  190. # Step 2: Check for invalid trailing 107/108 after 0
  191. if not any(val in (106, 107) for val in cropped):
  192. return False, "Invalid: missin stop code (106 or 107) before first 0"
  193. # Step 3: Truncate at second 108 if two 108s appear (EAN start/stop)
  194. if cropped.count(108) >= 2:
  195. print("got here")
  196. first_108 = cropped.index(108)
  197. # print(f"first_108:{first_108}")
  198. second_108 = cropped.index(108, first_108 + 1)
  199. # print(f"second_108:{second_108}")
  200. cropped = cropped[:second_108 + 1]
  201. # print(f"cropped: {cropped}")
  202. index_prob_pairs = index_prob_pairs[:second_108 + 1]
  203. # print(f": {index_prob_pairs}")
  204. # Step 4: Check start code validity
  205. start_code = cropped[0] if cropped else None
  206. if start_code not in [103, 104, 105, 107]:
  207. return False, f"Invalid start code: {start_code}"
  208. return True, (cropped, index_prob_pairs)
  209. decoded_text = ""
  210. status, result_or_error = preprocess_output_indices(output_indices_batch, output_probs_batch)
  211. # print(f"Raw barcode: {result_or_error}, status: {status}")
  212. if status == False:
  213. print(f"msg: {result_or_error}")
  214. decoded = "Decoding failed"
  215. decoded_text = ""
  216. msg = result_or_error
  217. avg_conf = 0.0
  218. return status, decoded, decoded_text, msg, avg_conf, detection_idx
  219. else:
  220. mapped_indices, conf_pairs = result_or_error
  221. avg_conf = round(np.mean([conf for (_, conf) in conf_pairs]), 2)
  222. # print(f"✅ Average confidence: {avg_conf:.3f}")
  223. is_valid, barcode_value, msg, predicted_ean_digits_print = self.validate_checksum(mapped_indices)
  224. # print(f"barcode_value: {barcode_value}, msg: {msg}, predicted_ean_digits_print: {predicted_ean_digits_print}, is_valid: {is_valid}")
  225. if not is_valid:
  226. decoded = "Decoding failed"
  227. decoded_text = ""
  228. msg = msg
  229. avg_conf = 0.0
  230. is_valid = is_valid
  231. # logger.warning(f"{is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}")
  232. return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx
  233. else:
  234. if msg == "code128":
  235. decoded_text = ''.join([self._characters[idx] for idx in barcode_value])
  236. decoded = self.decode_code128(barcode_value)
  237. logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}")
  238. else:
  239. # print(predicted_ean_digits_print)
  240. decoded_text = ''.join([self._characters[idx] for idx in predicted_ean_digits_print])
  241. decoded = ''.join(str(d) for d in barcode_value)
  242. logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}, detection_idx: {detection_idx}")
  243. return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx
  244. def decode_code128(self, values):
  245. # print(f"values:{values}")
  246. values = values[:] # avoid modifying original
  247. start_code = values.pop(0)
  248. checksum = values.pop(-1)
  249. stop_code = values.pop(-1)
  250. if start_code == 103:
  251. current_set = 'A'
  252. elif start_code == 104:
  253. current_set = 'B'
  254. elif start_code == 105:
  255. current_set = 'C'
  256. else:
  257. return "Invalid start code"
  258. decoded_chars = []
  259. i = 0
  260. while i < len(values):
  261. val = values[i]
  262. # Handle switch codes
  263. if val == 99:
  264. current_set = 'C'
  265. i += 1
  266. continue
  267. elif val == 100:
  268. current_set = 'B'
  269. i += 1
  270. continue
  271. elif val == 101:
  272. current_set = 'A'
  273. i += 1
  274. continue
  275. # Decode values
  276. if current_set == 'C':
  277. decoded_chars.append(f"{val:02d}")
  278. else:
  279. if 0 <= val < len(self._characters):
  280. decoded_chars.append(self._characters[val])
  281. else:
  282. decoded_chars.append('?')
  283. i += 1
  284. return ''.join(decoded_chars)
  285. def validate_checksum(self, raw_values):
  286. # raw_values = [characters.index(char, 1) - 1 if char in characters[1:] else -1 for char in decoded_text]
  287. logger.debug(f"raw_values passed to validate_checksum: {raw_values}")
  288. if raw_values[0] == 107:
  289. logger.debug("Ean Barcode detected")
  290. code_map_predicted = {
  291. 'C1': '107', 'C2': '108', '0A': '109', '1A': '110', '2A': '111', '3A': '112', '4A': '113',
  292. '5A': '114', '6A': '115', '7A': '116', '8A': '117', '9A': '118', '0B': '119', '1B': '120',
  293. '2B': '121', '3B': '122', '4B': '123', '5B': '124', '6B': '125', '7B': '126', '8B': '127',
  294. '9B': '128', '0C': '129', '1C': '130', '2C': '131', '3C': '132', '4C': '133', '5C': '134',
  295. '6C': '135', '7C': '136', '8C': '137', '9C': '138'
  296. }
  297. LEFT_PATTERN = (
  298. "AAAAAA", "AABABB", "AABBAB", "AABBBA", "ABAABB",
  299. "ABBAAB", "ABBBAA", "ABABAB", "ABABBA", "ABBABA"
  300. )
  301. reverse_code_map = {v: k for k, v in code_map_predicted.items()}
  302. predicted_digits = [val for val in raw_values[1:-1] if val != 108]
  303. # print(f"predicted_digits: {predicted_digits}")
  304. mapped_keys = [reverse_code_map.get(str(val), f"UNK({val})") for val in predicted_digits]
  305. # print(f"Mapped keys: {mapped_keys}")
  306. detected_pattern = ''.join(k[-1] if len(k) == 2 else '?' for k in mapped_keys[:6])
  307. # print(f"Detected_pattern: {detected_pattern}")
  308. if detected_pattern in LEFT_PATTERN:
  309. pattern_index = LEFT_PATTERN.index(detected_pattern)
  310. # print(f"pattern_index: {pattern_index}")
  311. else:
  312. return False, None, "wrong pattern", None
  313. predicted_ean_value = ''.join(k[0] if len(k) == 2 and k[0].isdigit() else '?' for k in mapped_keys)
  314. # print(f"predicted_ean_value:{predicted_ean_value}")
  315. ean13 = str(pattern_index) + predicted_ean_value[:12]
  316. # print(f"ean13 base:{ean13}")
  317. if len(ean13) != 13 or not ean13.isdigit():
  318. logger.warning(f"Invalid Ean value needs to be 13 digits")
  319. return False, None, "Invalid EAN-13 base", None
  320. else:
  321. def calculate_ean13_checksum(ean13):
  322. digits = [int(d) for d in ean13]
  323. even_sum = sum(digits[i] for i in range(0, 12, 2))
  324. odd_sum = sum(digits[i] for i in range(1, 12, 2))
  325. total = even_sum + odd_sum * 3
  326. # print(f"total:{total}")
  327. return (10 - (total % 10)) % 10
  328. calculated_ean_checksum = calculate_ean13_checksum(ean13)
  329. # print(f"calculated_ean_checksum:{calculated_ean_checksum}")
  330. predicted_ean_checksum = predicted_ean_value[-1]
  331. # print(f"predicted_ean_checksum:{predicted_ean_checksum}")
  332. if str(predicted_ean_checksum) != str(calculated_ean_checksum):
  333. logger.warning(f"Invalid ean 13 checksum value, supposed to be {calculated_ean_checksum}")
  334. return False, None, f"Invalid ean 13 checksum: expected {calculated_ean_checksum}", None
  335. else:
  336. ean_list = [int(char) for char in ean13]
  337. predicted_ean_digits_print = raw_values
  338. return True, ean_list, "ean13", predicted_ean_digits_print
  339. else:
  340. logger.debug("Code128 Barcode detected")
  341. # dict_converted_to_code128_dict = [x - 1 for x in raw_values]
  342. # print("dict_converted_to_code128_dict", dict_converted_to_code128_dict)
  343. code128 = raw_values
  344. # print("code128code128", code128)
  345. start_code = code128[0]
  346. predicted_code128_checksum = code128[-2]
  347. # print("predicted_code128_checksum", predicted_code128_checksum)
  348. code128_base = code128[1:-2]
  349. # print(code128_base)
  350. weighted_sum = sum(val * (i +1) for i, val in enumerate(code128_base))
  351. calculated_128_checksum =(start_code + weighted_sum) % 103
  352. logger.debug(f"predicted_code128_checksum: {predicted_code128_checksum}, calculated_128_checksum: {calculated_128_checksum}")
  353. if predicted_code128_checksum != calculated_128_checksum:
  354. logger.warning(f"Invalid checksum value, supposed to be {calculated_128_checksum} but model predicted {predicted_code128_checksum}")
  355. return False, None, "Invalid checksum value", None
  356. return True, code128, "code128", None