barcode_decode.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. import imgviz
  2. from qtpy import QtCore
  3. from qtpy import QtGui
  4. from qtpy import QtWidgets
  5. import labelme.ai
  6. import labelme.utils
  7. from labelme import QT5
  8. from labelme.logger import logger
  9. from labelme.shape import Shape
  10. import collections
  11. import threading
  12. import numpy as np
  13. import openvino as ov
  14. import os.path as osp
  15. import cv2
  16. import os
  17. import time
  18. from labelme.utils import img_qt_to_arr
  19. from labelme.utils import load_barcode_dict
  20. class CodeSet:
  21. NONE = 0
  22. A = 1
  23. B = 2
  24. C = 3
  25. # class Normalize:
  26. # def __init__(self, mean=(0.45, 0.45, 0.45), std=(0.25, 0.25, 0.25)):
  27. # if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
  28. # raise ValueError("mean and std should be of type list or tuple.")
  29. # self.mean = np.array(mean, dtype=np.float32)
  30. # self.std = np.array(std, dtype=np.float32)
  31. # # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image
  32. # self.mean = self.mean.reshape((1, 1, 3))
  33. # self.std = self.std.reshape((1, 1, 3))
  34. # def __call__(self, img):
  35. # img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
  36. # img = (img - self.mean) / self.std # Normalize
  37. # return img
  38. class BarcodeDecodeModel:
  39. def __init__(self, decoding_model_path=None):
  40. self.ie = ov.Core()
  41. self.pixmap = QtGui.QPixmap()
  42. #Load Decoding model if provided
  43. self.decoding_net = None
  44. self.decoding_sess = None
  45. self._characters = load_barcode_dict()
  46. if decoding_model_path:
  47. self.decoding_net = self.ie.read_model(model=decoding_model_path)
  48. self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU")
  49. self.decoding_input_shape = (1, 3, 32, 512)
  50. # self.normalize = Normalize() # Normalization instance
  51. self._lock = threading.Lock()
  52. self._image_embedding_cache = collections.OrderedDict()
  53. self._max_cache_size = 10
  54. self.pixmap = QtGui.QPixmap()
  55. # def set_pixmap(self, pixmap: QtGui.QPixmap):
  56. # """
  57. # Set the QPixmap object for decoding.
  58. # Args:
  59. # pixmap (QtGui.QPixmap): The QPixmap object containing the image.
  60. # """
  61. # if pixmap is None or pixmap.isNull():
  62. # raise ValueError("Invalid QPixmap provided.")
  63. # self.pixmap = pixmap
  64. # logger.debug("Pixmap set successfully in BarcodeDecodeModel.")
  65. def preprocess_image(self, img):
  66. logger.debug(f"Preprocessing image for decoding: {img.shape}")
  67. img = cv2.resize(
  68. img,
  69. (self.decoding_input_shape[3], self.decoding_input_shape[2]),
  70. interpolation=cv2.INTER_NEAREST
  71. )
  72. img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
  73. img = img.astype("float32") / 255.0
  74. mean = np.array([0.45, 0.45, 0.45], dtype=np.float32)
  75. std = np.array([0.25, 0.25, 0.25], dtype=np.float32)
  76. img = (img - mean) / std
  77. img = img.transpose(2, 0, 1)
  78. img = img.astype("float32")
  79. input_tensor = np.expand_dims(img, 0)
  80. logger.debug(f"Processed image shape: {input_tensor.shape}")
  81. return input_tensor
  82. def decode_from_points(self, points, detection_idx, original_image):
  83. """
  84. Decodes the cropped image based on points and returns the decoded text.
  85. Args:
  86. points (list): List of points defining the bounding box.
  87. pixmap (QPixmap): Original image pixmap to crop from.
  88. Returns:
  89. str: Decoded text from the decoding model.
  90. """
  91. try:
  92. # Convert scaled_points to a numpy array
  93. polygon = np.array(points, dtype=np.int32)
  94. # Create a mask of the same size as the original image
  95. # original_image = labelme.utils.img_qt_to_arr(self.pixmap.toImage())
  96. # cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image)
  97. # mask = np.zeros(original_image.shape[:2], dtype=np.uint8)
  98. # cv2.fillPoly(mask, [polygon], 255) # Fill the polygon with white
  99. # Apply the mask to the original image
  100. # masked_image = cv2.bitwise_and(original_image, original_image, mask=mask)
  101. # cv2.imwrite(f"masked_image{detection_idx + 1}.png", masked_image)
  102. # Get the bounding rectangle of the polygon to crop the ROI
  103. x, y, w, h = cv2.boundingRect(polygon)
  104. # cropped_image_dec = masked_image[y:y+h, x:x+w]
  105. # cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec)
  106. # logger.debug(f"cropped_exact image saved at {detection_idx + 1}.")
  107. src_points = np.float32(points)
  108. # Calculate the width and height of the barcode based on scaled_points
  109. width = int(np.ceil(np.linalg.norm(src_points[0] - src_points[1])))
  110. print(f" width:{ width}")
  111. height = int(np.ceil(np.linalg.norm(src_points[1] - src_points[2])))
  112. print(f" height:{ height}")
  113. # Correct width/height if needed
  114. if width < height:
  115. width, height = height, width
  116. # Reorder src_points to ensure the transformation aligns the longer side to the width
  117. src_points = np.float32([
  118. src_points[1], # Top-left becomes top-right
  119. src_points[2], # Top-right becomes bottom-right
  120. src_points[3], # Bottom-right becomes bottom-left
  121. src_points[0] # Bottom-left becomes top-left
  122. ])
  123. print(f"src_points:{src_points}")
  124. # Define destination points for the flattened barcode
  125. dst_points = np.float32([
  126. [0, 0],
  127. [width, 0],
  128. [width, height],
  129. [0, height]
  130. ])
  131. # Calculate the perspective transformation matrix
  132. M = cv2.getPerspectiveTransform(src_points, dst_points)
  133. print(f" M:{ M}")
  134. # t_start = time.perf_counter()
  135. # Apply the perspective transformation
  136. aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LANCZOS4)
  137. # t_end = time.perf_counter()
  138. # print(
  139. # f"[TIMING] Total crop+align time: {(t_end - t_start) * 1000:.3f} ms | "
  140. # f"aligned_size=({height},{width})"
  141. # )
  142. # Save the aligned barcode image
  143. # cv2.imwrite(f"old_aligned_barcode_decoding_barcode_{detection_idx + 1}.png", aligned_barcode)
  144. # logger.debug(f"Aligned barcode saved at {detection_idx + 1}.")
  145. # Normalize the image to scale pixel intensities to the range [0, 255]
  146. normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype)
  147. cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX)
  148. logger.debug("Image normalized.")
  149. # # Save the cropped image
  150. cv2.imwrite(f"cropped_image__normalized{detection_idx + 1}.png",normalized_img)
  151. # logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}")
  152. # Run decoding model
  153. # confidence = None
  154. # Run decoding on the original image
  155. is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(normalized_img, detection_idx)
  156. print(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}")
  157. if not is_valid:
  158. logger.warning(f"Decoding failed for detection {detection_idx + 1}: {msg}. Retrying with 180° rotation")
  159. # Rotate image 180 degrees and retry
  160. rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180)
  161. is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(rotated_image, detection_idx)
  162. if is_valid:
  163. logger.debug(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}")
  164. return decoded
  165. else:
  166. logger.warning(f"Decoding still failed after rotation for detection {detection_idx + 1}: {msg}")
  167. return ""
  168. return decoded
  169. except Exception as e:
  170. logger.error(f"Error in decode_from_points: {e}")
  171. return "Error: Decoding failed"
  172. def run_decoding(self, normalized_img, detection_idx):
  173. """Helper to run decoding on the given image."""
  174. preprocessed_img = self.preprocess_image(
  175. normalized_img
  176. )
  177. # cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img)
  178. decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img})
  179. output_tensor = decode_result['save_infer_model/scale_0.tmp_0']
  180. logger.debug(f"Output tensor shape: {output_tensor.shape}")
  181. # print(f"decode_result: {decode_result}")
  182. output_indices_batch = np.argmax(output_tensor, axis=2)
  183. output_probs_batch = np.max(output_tensor, axis=2)
  184. # print(f"output_probs_batch:{output_probs_batch}")
  185. # Decode text from indices
  186. def preprocess_output_indices(output_indices_batch, output_probs_batch):
  187. # Ensure it's a proper 2D numpy array
  188. if output_indices_batch is None or len(output_indices_batch) == 0:
  189. return False, "Empty output indices batch", None
  190. # print(f"output_indices_batch: {output_indices_batch}")
  191. first_row = output_indices_batch[0]
  192. first_row_probs = output_probs_batch[0]
  193. if first_row is None or len(first_row) == 0:
  194. return False, "Empty output indices", None
  195. sequence = first_row.tolist()
  196. probs = first_row_probs.tolist()
  197. # Step 1: Trim at first 0
  198. if 0 in sequence:
  199. zero_index = sequence.index(0)
  200. cropped_indices = sequence[:zero_index]
  201. cropped_probs = probs[:zero_index]
  202. else:
  203. cropped_indices = sequence
  204. cropped_probs = probs
  205. index_prob_pairs = list(zip(cropped_indices, cropped_probs))
  206. cropped = [x - 1 for x in cropped_indices]
  207. # cropped = cropped_indices
  208. # print(f"cropped: {cropped}")
  209. # Step 2: Check for invalid trailing 107/108 after 0
  210. if not any(val in (106, 107) for val in cropped):
  211. return False, "Invalid: missin stop code (106 or 107) before first 0"
  212. # Step 3: Truncate at second 108 if two 108s appear (EAN start/stop)
  213. if cropped.count(108) >= 2:
  214. print("got here")
  215. first_108 = cropped.index(108)
  216. # print(f"first_108:{first_108}")
  217. second_108 = cropped.index(108, first_108 + 1)
  218. # print(f"second_108:{second_108}")
  219. cropped = cropped[:second_108 + 1]
  220. # print(f"cropped: {cropped}")
  221. index_prob_pairs = index_prob_pairs[:second_108 + 1]
  222. # print(f": {index_prob_pairs}")
  223. # Step 4: Check start code validity
  224. start_code = cropped[0] if cropped else None
  225. if start_code not in [103, 104, 105, 107]:
  226. return False, f"Invalid start code: {start_code}"
  227. return True, (cropped, index_prob_pairs)
  228. decoded_text = ""
  229. status, result_or_error = preprocess_output_indices(output_indices_batch, output_probs_batch)
  230. # print(f"Raw barcode: {result_or_error}, status: {status}")
  231. if status == False:
  232. print(f"msg: {result_or_error}")
  233. decoded = "Decoding failed"
  234. decoded_text = ""
  235. msg = result_or_error
  236. avg_conf = 0.0
  237. return status, decoded, decoded_text, msg, avg_conf, detection_idx
  238. else:
  239. mapped_indices, conf_pairs = result_or_error
  240. avg_conf = round(np.mean([conf for (_, conf) in conf_pairs]), 2)
  241. # print(f"✅ Average confidence: {avg_conf:.3f}")
  242. is_valid, barcode_value, msg, predicted_ean_digits_print = self.validate_checksum(mapped_indices)
  243. # print(f"barcode_value: {barcode_value}, msg: {msg}, predicted_ean_digits_print: {predicted_ean_digits_print}, is_valid: {is_valid}")
  244. if not is_valid:
  245. decoded = "Decoding failed"
  246. decoded_text = ""
  247. msg = msg
  248. avg_conf = 0.0
  249. is_valid = is_valid
  250. # logger.warning(f"{is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}")
  251. return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx
  252. else:
  253. if msg == "code128":
  254. decoded_text = ''.join([self._characters[idx] for idx in barcode_value])
  255. decoded = self.decode_code128(barcode_value)
  256. logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}")
  257. else:
  258. # print(predicted_ean_digits_print)
  259. decoded_text = ''.join([self._characters[idx] for idx in predicted_ean_digits_print])
  260. decoded = ''.join(str(d) for d in barcode_value)
  261. logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}, detection_idx: {detection_idx}")
  262. return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx
  263. def decode_code128(self, values):
  264. # print(f"values:{values}")
  265. values = values[:] # avoid modifying original
  266. start_code = values.pop(0)
  267. checksum = values.pop(-1)
  268. stop_code = values.pop(-1)
  269. if start_code == 103:
  270. current_set = 'A'
  271. elif start_code == 104:
  272. current_set = 'B'
  273. elif start_code == 105:
  274. current_set = 'C'
  275. else:
  276. return "Invalid start code"
  277. decoded_chars = []
  278. i = 0
  279. while i < len(values):
  280. val = values[i]
  281. # Handle switch codes
  282. if val == 99:
  283. current_set = 'C'
  284. i += 1
  285. continue
  286. elif val == 100:
  287. current_set = 'B'
  288. i += 1
  289. continue
  290. elif val == 101:
  291. current_set = 'A'
  292. i += 1
  293. continue
  294. # Decode values
  295. if current_set == 'C':
  296. decoded_chars.append(f"{val:02d}")
  297. else:
  298. if 0 <= val < len(self._characters):
  299. decoded_chars.append(self._characters[val])
  300. else:
  301. decoded_chars.append('?')
  302. i += 1
  303. return ''.join(decoded_chars)
  304. def validate_checksum(self, raw_values):
  305. # raw_values = [characters.index(char, 1) - 1 if char in characters[1:] else -1 for char in decoded_text]
  306. logger.debug(f"raw_values passed to validate_checksum: {raw_values}")
  307. if raw_values[0] == 107:
  308. logger.debug("Ean Barcode detected")
  309. code_map_predicted = {
  310. 'C1': '107', 'C2': '108', '0A': '109', '1A': '110', '2A': '111', '3A': '112', '4A': '113',
  311. '5A': '114', '6A': '115', '7A': '116', '8A': '117', '9A': '118', '0B': '119', '1B': '120',
  312. '2B': '121', '3B': '122', '4B': '123', '5B': '124', '6B': '125', '7B': '126', '8B': '127',
  313. '9B': '128', '0C': '129', '1C': '130', '2C': '131', '3C': '132', '4C': '133', '5C': '134',
  314. '6C': '135', '7C': '136', '8C': '137', '9C': '138'
  315. }
  316. LEFT_PATTERN = (
  317. "AAAAAA", "AABABB", "AABBAB", "AABBBA", "ABAABB",
  318. "ABBAAB", "ABBBAA", "ABABAB", "ABABBA", "ABBABA"
  319. )
  320. reverse_code_map = {v: k for k, v in code_map_predicted.items()}
  321. predicted_digits = [val for val in raw_values[1:-1] if val != 108]
  322. # print(f"predicted_digits: {predicted_digits}")
  323. mapped_keys = [reverse_code_map.get(str(val), f"UNK({val})") for val in predicted_digits]
  324. logger.debug(f"Mapped keys: {mapped_keys}")
  325. detected_pattern = ''.join(k[-1] if len(k) == 2 else '?' for k in mapped_keys[:6])
  326. # print(f"Detected_pattern: {detected_pattern}")
  327. if detected_pattern in LEFT_PATTERN:
  328. pattern_index = LEFT_PATTERN.index(detected_pattern)
  329. # print(f"pattern_index: {pattern_index}")
  330. else:
  331. return False, None, "wrong pattern", None
  332. predicted_ean_value = ''.join(k[0] if len(k) == 2 and k[0].isdigit() else '?' for k in mapped_keys)
  333. # print(f"predicted_ean_value:{predicted_ean_value}")
  334. ean13 = str(pattern_index) + predicted_ean_value[:12]
  335. logger.debug(f"ean13 base:{ean13}")
  336. if len(ean13) != 13 or not ean13.isdigit():
  337. logger.warning(f"Invalid Ean value needs to be 13 digits")
  338. return False, None, "Invalid EAN-13 base", None
  339. else:
  340. def calculate_ean13_checksum(ean13):
  341. digits = [int(d) for d in ean13]
  342. even_sum = sum(digits[i] for i in range(0, 12, 2))
  343. odd_sum = sum(digits[i] for i in range(1, 12, 2))
  344. total = even_sum + odd_sum * 3
  345. # print(f"total:{total}")
  346. return (10 - (total % 10)) % 10
  347. calculated_ean_checksum = calculate_ean13_checksum(ean13)
  348. # print(f"calculated_ean_checksum:{calculated_ean_checksum}")
  349. predicted_ean_checksum = predicted_ean_value[-1]
  350. # print(f"predicted_ean_checksum:{predicted_ean_checksum}")
  351. if str(predicted_ean_checksum) != str(calculated_ean_checksum):
  352. logger.warning(f"Invalid ean 13 checksum value, supposed to be {predicted_ean_checksum} but got {calculated_ean_checksum}")
  353. return False, None, f"Invalid ean 13 checksum: expected {predicted_ean_checksum}", None
  354. else:
  355. ean_list = [int(char) for char in ean13]
  356. predicted_ean_digits_print = raw_values
  357. return True, ean_list, "ean13", predicted_ean_digits_print
  358. else:
  359. logger.debug("Code128 Barcode detected")
  360. # dict_converted_to_code128_dict = [x - 1 for x in raw_values]
  361. # print("dict_converted_to_code128_dict", dict_converted_to_code128_dict)
  362. code128 = raw_values
  363. # print("code128code128", code128)
  364. start_code = code128[0]
  365. predicted_code128_checksum = code128[-2]
  366. # print("predicted_code128_checksum", predicted_code128_checksum)
  367. code128_base = code128[1:-2]
  368. # print(code128_base)
  369. weighted_sum = sum(val * (i +1) for i, val in enumerate(code128_base))
  370. calculated_128_checksum =(start_code + weighted_sum) % 103
  371. logger.debug(f"predicted_code128_checksum: {predicted_code128_checksum}, calculated_128_checksum: {calculated_128_checksum}")
  372. if predicted_code128_checksum != calculated_128_checksum:
  373. logger.warning(f"Invalid checksum value, supposed to be {calculated_128_checksum} but model predicted {predicted_code128_checksum}")
  374. return False, None, "Invalid checksum value", None
  375. return True, code128, "code128", None