barcode_decode.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. import imgviz
  2. from qtpy import QtCore
  3. from qtpy import QtGui
  4. from qtpy import QtWidgets
  5. import labelme.ai
  6. import labelme.utils
  7. from labelme import QT5
  8. from labelme.logger import logger
  9. from labelme.shape import Shape
  10. import collections
  11. import threading
  12. import numpy as np
  13. import openvino as ov
  14. import os.path as osp
  15. import cv2
  16. import os
  17. import time
  18. from labelme.utils import img_qt_to_arr
  19. from labelme.utils import load_barcode_dict
  20. class CodeSet:
  21. NONE = 0
  22. A = 1
  23. B = 2
  24. C = 3
  25. class Normalize:
  26. def __init__(self, mean=(0.45, 0.45, 0.45), std=(0.24, 0.24, 0.24)):
  27. if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
  28. raise ValueError("mean and std should be of type list or tuple.")
  29. self.mean = np.array(mean, dtype=np.float32)
  30. self.std = np.array(std, dtype=np.float32)
  31. # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image
  32. self.mean = self.mean.reshape((1, 1, 3))
  33. self.std = self.std.reshape((1, 1, 3))
  34. def __call__(self, img):
  35. img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
  36. img = (img - self.mean) / self.std # Normalize
  37. return img
  38. class BarcodeDecodeModel:
  39. def __init__(self, decoding_model_path=None):
  40. self.ie = ov.Core()
  41. self.pixmap = QtGui.QPixmap()
  42. #Load Decoding model if provided
  43. self.decoding_net = None
  44. self.decoding_sess = None
  45. self._characters = load_barcode_dict()
  46. if decoding_model_path:
  47. self.decoding_net = self.ie.read_model(model=decoding_model_path)
  48. self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU")
  49. self.decoding_input_shape = (1, 3, 32, 256)
  50. self.normalize = Normalize() # Normalization instance
  51. self._lock = threading.Lock()
  52. self._image_embedding_cache = collections.OrderedDict()
  53. self._max_cache_size = 10
  54. self.pixmap = QtGui.QPixmap()
  55. # def set_pixmap(self, pixmap: QtGui.QPixmap):
  56. # """
  57. # Set the QPixmap object for decoding.
  58. # Args:
  59. # pixmap (QtGui.QPixmap): The QPixmap object containing the image.
  60. # """
  61. # if pixmap is None or pixmap.isNull():
  62. # raise ValueError("Invalid QPixmap provided.")
  63. # self.pixmap = pixmap
  64. # logger.debug("Pixmap set successfully in BarcodeDecodeModel.")
  65. def preprocess_image(self, normalized_img):
  66. norm = Normalize(mean=(0.45, 0.45, 0.45), std=(0.25, 0.25, 0.25))
  67. resized_image = cv2.resize(normalized_img, (self.decoding_input_shape[3], self.decoding_input_shape[2]))
  68. resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
  69. resized_image = norm(resized_image)
  70. # Resize image for detection model input size
  71. logger.debug(f"Preprocessing image for detection: {normalized_img.shape}")
  72. # resized_image = resized_image.astype('float32') / 255.0
  73. input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW
  74. input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension
  75. logger.debug(f"Processed image shape: {input_tensor.shape}")
  76. return input_tensor
  77. def decode_from_points(self, points, detection_idx, original_image):
  78. """
  79. Decodes the cropped image based on points and returns the decoded text.
  80. Args:
  81. points (list): List of points defining the bounding box.
  82. pixmap (QPixmap): Original image pixmap to crop from.
  83. Returns:
  84. str: Decoded text from the decoding model.
  85. """
  86. try:
  87. # Convert scaled_points to a numpy array
  88. polygon = np.array(points, dtype=np.int32)
  89. # Create a mask of the same size as the original image
  90. # original_image = labelme.utils.img_qt_to_arr(self.pixmap.toImage())
  91. # cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image)
  92. mask = np.zeros(original_image.shape[:2], dtype=np.uint8)
  93. cv2.fillPoly(mask, [polygon], 255) # Fill the polygon with white
  94. # Apply the mask to the original image
  95. masked_image = cv2.bitwise_and(original_image, original_image, mask=mask)
  96. # Get the bounding rectangle of the polygon to crop the ROI
  97. x, y, w, h = cv2.boundingRect(polygon)
  98. cropped_image_dec = masked_image[y:y+h, x:x+w]
  99. # cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec)
  100. logger.debug(f"cropped_exact image saved at {detection_idx + 1}.")
  101. src_points = np.float32(points)
  102. # Calculate the width and height of the barcode based on scaled_points
  103. width = int(np.linalg.norm(src_points[0] - src_points[1]))
  104. # print(width)
  105. height = int(np.linalg.norm(src_points[1] - src_points[2]))
  106. # print(height)
  107. # Correct width/height if needed
  108. if width < height:
  109. width, height = height, width
  110. # Reorder src_points to ensure the transformation aligns the longer side to the width
  111. src_points = np.float32([
  112. src_points[1], # Top-left becomes top-right
  113. src_points[2], # Top-right becomes bottom-right
  114. src_points[3], # Bottom-right becomes bottom-left
  115. src_points[0] # Bottom-left becomes top-left
  116. ])
  117. # Define destination points for the flattened barcode
  118. dst_points = np.float32([
  119. [0, 0],
  120. [width - 1, 0],
  121. [width - 1, height - 1],
  122. [0, height - 1]
  123. ])
  124. # Calculate the perspective transformation matrix
  125. M = cv2.getPerspectiveTransform(src_points, dst_points)
  126. # Apply the perspective transformation
  127. aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LINEAR)
  128. # Save the aligned barcode image
  129. # cv2.imwrite(f"decoding_barcode_{detection_idx + 1}.png", aligned_barcode)
  130. # logger.debug(f"Aligned barcode saved at {detection_idx + 1}.")
  131. # Normalize the image to scale pixel intensities to the range [0, 255]
  132. normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype)
  133. cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX)
  134. logger.debug("Image normalized.")
  135. # Save the cropped image
  136. cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img)
  137. logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}")
  138. # Run decoding model
  139. # confidence = None
  140. # Run decoding on the original image
  141. is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(normalized_img, detection_idx)
  142. print(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}")
  143. if not is_valid:
  144. logger.warning(f"Decoding failed for detection {detection_idx + 1}: {msg}. Retrying with 180° rotation")
  145. # Rotate image 180 degrees and retry
  146. rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180)
  147. is_valid, decoded, decoded_text, msg, avg_conf, detection_idx = self.run_decoding(rotated_image, detection_idx)
  148. if is_valid:
  149. logger.debug(f"Valid: {is_valid}, detection_idx:{detection_idx + 1}, decoded:{decoded}, decoded_text:{decoded_text}, msg:{msg}, avg_conf:{avg_conf}")
  150. return decoded
  151. else:
  152. logger.warning(f"Decoding still failed after rotation for detection {detection_idx + 1}: {msg}")
  153. return ""
  154. return decoded
  155. except Exception as e:
  156. logger.error(f"Error in decode_from_points: {e}")
  157. return "Error: Decoding failed"
  158. def run_decoding(self, normalized_img, detection_idx):
  159. """Helper to run decoding on the given image."""
  160. preprocessed_img = self.preprocess_image(
  161. normalized_img
  162. )
  163. # cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img)
  164. decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img})
  165. output_tensor = decode_result['save_infer_model/scale_0.tmp_0']
  166. logger.debug(f"Output tensor shape: {output_tensor.shape}")
  167. print(f"decode_result: {decode_result}")
  168. output_indices_batch = np.argmax(output_tensor, axis=2)
  169. output_probs_batch = np.max(output_tensor, axis=2)
  170. print(f"output_probs_batch:{output_probs_batch}")
  171. # Decode text from indices
  172. def preprocess_output_indices(output_indices_batch, output_probs_batch):
  173. # Ensure it's a proper 2D numpy array
  174. if output_indices_batch is None or len(output_indices_batch) == 0:
  175. return False, "Empty output indices batch", None
  176. print(f"output_indices_batch: {output_indices_batch}")
  177. first_row = output_indices_batch[0]
  178. first_row_probs = output_probs_batch[0]
  179. if first_row is None or len(first_row) == 0:
  180. return False, "Empty output indices", None
  181. sequence = first_row.tolist()
  182. probs = first_row_probs.tolist()
  183. # Step 1: Trim at first 0
  184. if 0 in sequence:
  185. zero_index = sequence.index(0)
  186. cropped_indices = sequence[:zero_index]
  187. cropped_probs = probs[:zero_index]
  188. else:
  189. cropped_indices = sequence
  190. cropped_probs = probs
  191. index_prob_pairs = list(zip(cropped_indices, cropped_probs))
  192. cropped = [x - 1 for x in cropped_indices]
  193. # cropped = cropped_indices
  194. # print(f"cropped: {cropped}")
  195. # Step 2: Check for invalid trailing 107/108 after 0
  196. if not any(val in (106, 107) for val in cropped):
  197. return False, "Invalid: missin stop code (106 or 107) before first 0"
  198. # Step 3: Truncate at second 108 if two 108s appear (EAN start/stop)
  199. if cropped.count(108) >= 2:
  200. print("got here")
  201. first_108 = cropped.index(108)
  202. # print(f"first_108:{first_108}")
  203. second_108 = cropped.index(108, first_108 + 1)
  204. # print(f"second_108:{second_108}")
  205. cropped = cropped[:second_108 + 1]
  206. # print(f"cropped: {cropped}")
  207. index_prob_pairs = index_prob_pairs[:second_108 + 1]
  208. # print(f": {index_prob_pairs}")
  209. # Step 4: Check start code validity
  210. start_code = cropped[0] if cropped else None
  211. if start_code not in [103, 104, 105, 107]:
  212. return False, f"Invalid start code: {start_code}"
  213. return True, (cropped, index_prob_pairs)
  214. decoded_text = ""
  215. status, result_or_error = preprocess_output_indices(output_indices_batch, output_probs_batch)
  216. # print(f"Raw barcode: {result_or_error}, status: {status}")
  217. if status == False:
  218. print(f"msg: {result_or_error}")
  219. decoded = "Decoding failed"
  220. decoded_text = ""
  221. msg = result_or_error
  222. avg_conf = 0.0
  223. return status, decoded, decoded_text, msg, avg_conf, detection_idx
  224. else:
  225. mapped_indices, conf_pairs = result_or_error
  226. avg_conf = round(np.mean([conf for (_, conf) in conf_pairs]), 2)
  227. # print(f"✅ Average confidence: {avg_conf:.3f}")
  228. is_valid, barcode_value, msg, predicted_ean_digits_print = self.validate_checksum(mapped_indices)
  229. # print(f"barcode_value: {barcode_value}, msg: {msg}, predicted_ean_digits_print: {predicted_ean_digits_print}, is_valid: {is_valid}")
  230. if not is_valid:
  231. decoded = "Decoding failed"
  232. decoded_text = ""
  233. msg = msg
  234. avg_conf = 0.0
  235. is_valid = is_valid
  236. # logger.warning(f"{is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}")
  237. return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx
  238. else:
  239. if msg == "code128":
  240. decoded_text = ''.join([self._characters[idx] for idx in barcode_value])
  241. decoded = self.decode_code128(barcode_value)
  242. logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}")
  243. else:
  244. # print(predicted_ean_digits_print)
  245. decoded_text = ''.join([self._characters[idx] for idx in predicted_ean_digits_print])
  246. decoded = ''.join(str(d) for d in barcode_value)
  247. logger.debug(f"✅ {is_valid}, readable: {decoded}, raw: {decoded_text}, conf: {avg_conf}, format: {msg}, detection_idx: {detection_idx}")
  248. return is_valid, decoded, decoded_text, msg, avg_conf, detection_idx
  249. def decode_code128(self, values):
  250. # print(f"values:{values}")
  251. values = values[:] # avoid modifying original
  252. start_code = values.pop(0)
  253. checksum = values.pop(-1)
  254. stop_code = values.pop(-1)
  255. if start_code == 103:
  256. current_set = 'A'
  257. elif start_code == 104:
  258. current_set = 'B'
  259. elif start_code == 105:
  260. current_set = 'C'
  261. else:
  262. return "Invalid start code"
  263. decoded_chars = []
  264. i = 0
  265. while i < len(values):
  266. val = values[i]
  267. # Handle switch codes
  268. if val == 99:
  269. current_set = 'C'
  270. i += 1
  271. continue
  272. elif val == 100:
  273. current_set = 'B'
  274. i += 1
  275. continue
  276. elif val == 101:
  277. current_set = 'A'
  278. i += 1
  279. continue
  280. # Decode values
  281. if current_set == 'C':
  282. decoded_chars.append(f"{val:02d}")
  283. else:
  284. if 0 <= val < len(self._characters):
  285. decoded_chars.append(self._characters[val])
  286. else:
  287. decoded_chars.append('?')
  288. i += 1
  289. return ''.join(decoded_chars)
  290. def validate_checksum(self, raw_values):
  291. # raw_values = [characters.index(char, 1) - 1 if char in characters[1:] else -1 for char in decoded_text]
  292. logger.debug(f"raw_values passed to validate_checksum: {raw_values}")
  293. if raw_values[0] == 107:
  294. logger.debug("Ean Barcode detected")
  295. code_map_predicted = {
  296. 'C1': '107', 'C2': '108', '0A': '109', '1A': '110', '2A': '111', '3A': '112', '4A': '113',
  297. '5A': '114', '6A': '115', '7A': '116', '8A': '117', '9A': '118', '0B': '119', '1B': '120',
  298. '2B': '121', '3B': '122', '4B': '123', '5B': '124', '6B': '125', '7B': '126', '8B': '127',
  299. '9B': '128', '0C': '129', '1C': '130', '2C': '131', '3C': '132', '4C': '133', '5C': '134',
  300. '6C': '135', '7C': '136', '8C': '137', '9C': '138'
  301. }
  302. LEFT_PATTERN = (
  303. "AAAAAA", "AABABB", "AABBAB", "AABBBA", "ABAABB",
  304. "ABBAAB", "ABBBAA", "ABABAB", "ABABBA", "ABBABA"
  305. )
  306. reverse_code_map = {v: k for k, v in code_map_predicted.items()}
  307. predicted_digits = [val for val in raw_values[1:-1] if val != 108]
  308. # print(f"predicted_digits: {predicted_digits}")
  309. mapped_keys = [reverse_code_map.get(str(val), f"UNK({val})") for val in predicted_digits]
  310. logger.debug(f"Mapped keys: {mapped_keys}")
  311. detected_pattern = ''.join(k[-1] if len(k) == 2 else '?' for k in mapped_keys[:6])
  312. # print(f"Detected_pattern: {detected_pattern}")
  313. if detected_pattern in LEFT_PATTERN:
  314. pattern_index = LEFT_PATTERN.index(detected_pattern)
  315. # print(f"pattern_index: {pattern_index}")
  316. else:
  317. return False, None, "wrong pattern", None
  318. predicted_ean_value = ''.join(k[0] if len(k) == 2 and k[0].isdigit() else '?' for k in mapped_keys)
  319. # print(f"predicted_ean_value:{predicted_ean_value}")
  320. ean13 = str(pattern_index) + predicted_ean_value[:12]
  321. logger.debug(f"ean13 base:{ean13}")
  322. if len(ean13) != 13 or not ean13.isdigit():
  323. logger.warning(f"Invalid Ean value needs to be 13 digits")
  324. return False, None, "Invalid EAN-13 base", None
  325. else:
  326. def calculate_ean13_checksum(ean13):
  327. digits = [int(d) for d in ean13]
  328. even_sum = sum(digits[i] for i in range(0, 12, 2))
  329. odd_sum = sum(digits[i] for i in range(1, 12, 2))
  330. total = even_sum + odd_sum * 3
  331. # print(f"total:{total}")
  332. return (10 - (total % 10)) % 10
  333. calculated_ean_checksum = calculate_ean13_checksum(ean13)
  334. # print(f"calculated_ean_checksum:{calculated_ean_checksum}")
  335. predicted_ean_checksum = predicted_ean_value[-1]
  336. # print(f"predicted_ean_checksum:{predicted_ean_checksum}")
  337. if str(predicted_ean_checksum) != str(calculated_ean_checksum):
  338. logger.warning(f"Invalid ean 13 checksum value, supposed to be {predicted_ean_checksum} but got {calculated_ean_checksum}")
  339. return False, None, f"Invalid ean 13 checksum: expected {predicted_ean_checksum}", None
  340. else:
  341. ean_list = [int(char) for char in ean13]
  342. predicted_ean_digits_print = raw_values
  343. return True, ean_list, "ean13", predicted_ean_digits_print
  344. else:
  345. logger.debug("Code128 Barcode detected")
  346. # dict_converted_to_code128_dict = [x - 1 for x in raw_values]
  347. # print("dict_converted_to_code128_dict", dict_converted_to_code128_dict)
  348. code128 = raw_values
  349. # print("code128code128", code128)
  350. start_code = code128[0]
  351. predicted_code128_checksum = code128[-2]
  352. # print("predicted_code128_checksum", predicted_code128_checksum)
  353. code128_base = code128[1:-2]
  354. # print(code128_base)
  355. weighted_sum = sum(val * (i +1) for i, val in enumerate(code128_base))
  356. calculated_128_checksum =(start_code + weighted_sum) % 103
  357. logger.debug(f"predicted_code128_checksum: {predicted_code128_checksum}, calculated_128_checksum: {calculated_128_checksum}")
  358. if predicted_code128_checksum != calculated_128_checksum:
  359. logger.warning(f"Invalid checksum value, supposed to be {calculated_128_checksum} but model predicted {predicted_code128_checksum}")
  360. return False, None, "Invalid checksum value", None
  361. return True, code128, "code128", None