barcode_decode.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. import imgviz
  2. from qtpy import QtCore
  3. from qtpy import QtGui
  4. from qtpy import QtWidgets
  5. import labelme.ai
  6. import labelme.utils
  7. from labelme import QT5
  8. from labelme.logger import logger
  9. from labelme.shape import Shape
  10. import collections
  11. import threading
  12. import numpy as np
  13. import openvino as ov
  14. import os.path as osp
  15. import cv2
  16. from labelme.utils import img_qt_to_arr
  17. from labelme.utils import load_barcode_dict
  18. class CodeSet:
  19. NONE = 0
  20. A = 1
  21. B = 2
  22. C = 3
  23. class Normalize:
  24. def __init__(self, mean=(0.15525904, 0.15525904, 0.15525904), std=(0.12552188, 0.12552188, 0.12552188)):
  25. if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
  26. raise ValueError("mean and std should be of type list or tuple.")
  27. self.mean = np.array(mean, dtype=np.float32)
  28. self.std = np.array(std, dtype=np.float32)
  29. # Reshape for broadcasting to apply mean and std across the spatial dimensions of an image
  30. self.mean = self.mean.reshape((1, 1, 3))
  31. self.std = self.std.reshape((1, 1, 3))
  32. def __call__(self, img):
  33. img = img.astype(np.float32) / 255.0 # Scale pixel values to [0, 1]
  34. img = (img - self.mean) / self.std # Normalize
  35. return img
  36. class BarcodeDecodeModel:
  37. def __init__(self, decoding_model_path=None):
  38. self.ie = ov.Core()
  39. self.pixmap = QtGui.QPixmap()
  40. #Load Decoding model if provided
  41. self.decoding_net = None
  42. self.decoding_sess = None
  43. self._characters = load_barcode_dict()
  44. if decoding_model_path:
  45. self.decoding_net = self.ie.read_model(model=decoding_model_path)
  46. self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU")
  47. self.decoding_input_shape = (1, 3, 32, 256)
  48. self.normalize = Normalize() # Normalization instance
  49. self._lock = threading.Lock()
  50. self._image_embedding_cache = collections.OrderedDict()
  51. self._max_cache_size = 10
  52. self.pixmap = QtGui.QPixmap()
  53. # def set_pixmap(self, pixmap: QtGui.QPixmap):
  54. # """
  55. # Set the QPixmap object for decoding.
  56. # Args:
  57. # pixmap (QtGui.QPixmap): The QPixmap object containing the image.
  58. # """
  59. # if pixmap is None or pixmap.isNull():
  60. # raise ValueError("Invalid QPixmap provided.")
  61. # self.pixmap = pixmap
  62. # logger.debug("Pixmap set successfully in BarcodeDecodeModel.")
  63. def preprocess_image(self, image):
  64. norm = Normalize(mean=(0.44948044,0.44948044,0.44948044), std=(0.22099442,0.22099442,0.22099442))
  65. resized_image = cv2.resize(image, (self.decoding_input_shape[3], self.decoding_input_shape[2]))
  66. resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
  67. resized_image = norm(resized_image)
  68. # Resize image for detection model input size
  69. logger.debug(f"Preprocessing image for detection: {image.shape}")
  70. # resized_image = resized_image.astype('float32') / 255.0
  71. input_tensor = resized_image.transpose(2, 0, 1) # Convert HWC to CHW
  72. input_tensor = np.expand_dims(input_tensor, 0) # Add batch dimension
  73. logger.debug(f"Processed image shape: {input_tensor.shape}")
  74. return input_tensor
  75. def decode_from_points(self, points, detection_idx, original_image):
  76. """
  77. Decodes the cropped image based on points and returns the decoded text.
  78. Args:
  79. points (list): List of points defining the bounding box.
  80. pixmap (QPixmap): Original image pixmap to crop from.
  81. Returns:
  82. str: Decoded text from the decoding model.
  83. """
  84. try:
  85. # Convert scaled_points to a numpy array
  86. polygon = np.array(points, dtype=np.int32)
  87. # Create a mask of the same size as the original image
  88. # original_image = labelme.utils.img_qt_to_arr(self.pixmap.toImage())
  89. cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image)
  90. mask = np.zeros(original_image.shape[:2], dtype=np.uint8)
  91. cv2.fillPoly(mask, [polygon], 255) # Fill the polygon with white
  92. # Apply the mask to the original image
  93. masked_image = cv2.bitwise_and(original_image, original_image, mask=mask)
  94. # Get the bounding rectangle of the polygon to crop the ROI
  95. x, y, w, h = cv2.boundingRect(polygon)
  96. cropped_image_dec = masked_image[y:y+h, x:x+w]
  97. cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec)
  98. logger.debug(f"cropped_exact image saved at {detection_idx + 1}.")
  99. src_points = np.float32(points)
  100. # Calculate the width and height of the barcode based on scaled_points
  101. width = int(np.linalg.norm(src_points[0] - src_points[1]))
  102. # print(width)
  103. height = int(np.linalg.norm(src_points[1] - src_points[2]))
  104. # print(height)
  105. # Correct width/height if needed
  106. if width < height:
  107. width, height = height, width
  108. # Reorder src_points to ensure the transformation aligns the longer side to the width
  109. src_points = np.float32([
  110. src_points[1], # Top-left becomes top-right
  111. src_points[2], # Top-right becomes bottom-right
  112. src_points[3], # Bottom-right becomes bottom-left
  113. src_points[0] # Bottom-left becomes top-left
  114. ])
  115. # Define destination points for the flattened barcode
  116. dst_points = np.float32([
  117. [0, 0],
  118. [width - 1, 0],
  119. [width - 1, height - 1],
  120. [0, height - 1]
  121. ])
  122. # Calculate the perspective transformation matrix
  123. M = cv2.getPerspectiveTransform(src_points, dst_points)
  124. # Apply the perspective transformation
  125. aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LINEAR)
  126. # Save the aligned barcode image
  127. cv2.imwrite(f"decoding_barcode_{detection_idx + 1}.png", aligned_barcode)
  128. logger.debug(f"Aligned barcode saved at {detection_idx + 1}.")
  129. # Normalize the image to scale pixel intensities to the range [0, 255]
  130. normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype)
  131. cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX)
  132. logger.debug("Image normalized.")
  133. # Save the cropped image
  134. cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img)
  135. logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}")
  136. # Run decoding model
  137. confidence = None
  138. # Run decoding on the original image
  139. decoded_text, confidence = self.run_decoding(normalized_img, detection_idx, confidence)
  140. # Validate checksum
  141. if decoded_text:
  142. checksum_valid, validated_result = self.validate_code128_checksum(decoded_text, detection_idx)
  143. if checksum_valid:
  144. logger.debug(f"Validated result for detection {detection_idx + 1}: {validated_result}")
  145. return validated_result # Return validated result
  146. else:
  147. logger.error(f"Checksum validation failed for detection {detection_idx + 1}. Retrying with 180° rotation.")
  148. # Rotate image 180 degrees and retry
  149. rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180)
  150. decoded_text, confidence = self.run_decoding(rotated_image, detection_idx, confidence)
  151. # Validate checksum again
  152. if decoded_text:
  153. checksum_valid, validated_result = self.validate_code128_checksum(decoded_text, detection_idx)
  154. if checksum_valid:
  155. logger.debug(f"Validated result after rotation for detection {detection_idx + 1}: {validated_result}")
  156. return validated_result
  157. else:
  158. logger.error(f"Checksum validation failed after rotation for detection {detection_idx + 1}. Error: {validated_result}")
  159. return
  160. return "Decoding failed"
  161. except Exception as e:
  162. logger.error(f"Error in decode_from_points: {e}")
  163. return "Error: Decoding failed"
  164. def run_decoding(self, image_np, detection_idx, confidence):
  165. """Helper to run decoding on the given image."""
  166. preprocessed_img = self.preprocess_image(
  167. image_np
  168. )
  169. decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img})
  170. output_tensor = decode_result['save_infer_model/scale_0.tmp_0']
  171. logger.debug(f"Output tensor shape: {output_tensor.shape}")
  172. output_indices = np.argmax(output_tensor, axis=2)
  173. output_probs = np.max(output_tensor, axis=2)
  174. # Decode text from indices
  175. decoded_text, confidence = self.decode_text(output_indices, output_probs, detection_idx)
  176. logger.debug(f"Raw barcode: {decoded_text}, Confidence: {confidence:.2f}")
  177. return decoded_text, confidence
  178. def decode_text(self, text_indices, text_probs, detection_idx):
  179. """
  180. Converts model output indices into text using the character dictionary.
  181. Args:
  182. text_indices (np.ndarray): Output indices from the decoding model.
  183. text_probs (np.ndarray): Probabilities corresponding to the indices.
  184. Returns:
  185. tuple: Decoded text and its confidence score.
  186. """
  187. try:
  188. max_index = len(self._characters) - 1
  189. logger.debug(f"Loaded barcode dictionary with {len(self._characters)} characters.")
  190. result_list = []
  191. for batch_idx in range(text_indices.shape[0]): # Loop through batches
  192. char_list = []
  193. conf_list = []
  194. for step_idx in range(text_indices.shape[1]): # Loop through sequence length
  195. char_idx = int(text_indices[batch_idx, step_idx])
  196. if char_idx > max_index:
  197. logger.warning(f"Index {char_idx} is out of bounds for dictionary size {len(self._characters)}")
  198. continue # Skip invalid indices
  199. char = self._characters[char_idx]
  200. # print("char",char)
  201. if char == "</s>": # End token
  202. break
  203. char_list.append(char)
  204. conf_list.append(text_probs[batch_idx, step_idx])
  205. text = ''.join(char_list)
  206. confidence = np.mean(conf_list) if conf_list else 0.0
  207. result_list.append((text, confidence))
  208. # Return the first result (assuming batch size of 1 for now)
  209. return result_list[0] if result_list else ("", 0.0)
  210. except Exception as e:
  211. logger.error(f"Error in decode_text: {e}")
  212. return "Error: Decoding failed", 0.0
  213. def validate_code128_checksum(self, decoded_text, detection_idx):
  214. # Convert characters to their corresponding Code 128 values using the index in _characters
  215. # print(self._characters)
  216. code128Values = [self._characters.index(char, 1) - 1 if char in self._characters[1:] else -1 for char in decoded_text]
  217. logger.debug(f"code128Values:{code128Values}")
  218. result = ""
  219. err_msg = ""
  220. currentCodeSet = CodeSet.B # Default to Code Set B, assuming start code is included in decoded_text
  221. if code128Values[0] in [103, 104, 105]:
  222. start_codes = {103: CodeSet.A, 104: CodeSet.B, 105: CodeSet.C}
  223. currentCodeSet = start_codes[code128Values[0]]
  224. # print("currentCodeSet",currentCodeSet)
  225. else:
  226. err_msg = f"No start code detected, first code is {code128Values[0]}"
  227. return False, err_msg
  228. checksum_expected = code128Values[-2]
  229. # print("Expected checksum:", checksum_expected)
  230. # Calculate the checksum using the formula
  231. checksum_calculated = code128Values[0] # Start with the start code value
  232. for i, value in enumerate(code128Values[1:-2], start=1): # Exclude stop code
  233. weighted_value = value * i
  234. checksum_calculated += weighted_value
  235. # logger.debug(f"Position {i}, Value {value}, Weighted Value {weighted_value}, Running Checksum {checksum_calculated}")
  236. checksum_calculated %= 103
  237. logger.debug(f"Final Calculated Checksum (mod 103): {checksum_calculated}")
  238. if checksum_calculated != checksum_expected:
  239. err_msg = f"Invalid checksum value, supposed to be {checksum_calculated} but got {checksum_expected}"
  240. return False, err_msg
  241. # Verify the stop code
  242. if code128Values[-1] != 106:
  243. err_msg = "No valid stop code detected at the end of the sequence."
  244. return False, err_msg
  245. result = ""
  246. i = 1 # Start after the start code
  247. while i < len(code128Values) - 2: # Exclude checksum and stop code
  248. value = code128Values[i]
  249. # Handle special functions and code set shifts
  250. if value == 102: # FNC1 for GS1-128
  251. logger.debug(f"Detected FNC1 at position {i}, treated as AI separator.")
  252. # result += "|" # Optional: Add a delimiter for AI parsing
  253. i += 1
  254. continue
  255. elif value == 99: # Switch to Code Set C
  256. currentCodeSet = CodeSet.C
  257. logger.debug(f"Switched to Code Set C at position {i}")
  258. i += 1
  259. continue
  260. elif value == 100: # Switch to Code Set B
  261. currentCodeSet = CodeSet.B
  262. logger.debug(f"Switched to Code Set B at position {i}")
  263. i += 1
  264. continue
  265. # Decode based on the current Code Set
  266. if currentCodeSet == CodeSet.C:
  267. result += f"{value:02}"
  268. # logger.debug(f"Added Code Set C value {value:02} at position {i}")
  269. i += 1
  270. elif currentCodeSet == CodeSet.B:
  271. if 0 <= value <= 95:
  272. char = self._characters[value + 1] # Map using the single dictionary
  273. result += char
  274. # logger.debug(f"Added Code Set B char {char} at position {i}")
  275. i += 1
  276. else:
  277. err_msg = f"Invalid Code Set B value: {value}"
  278. logger.error(err_msg)
  279. return False, err_msg
  280. elif currentCodeSet == CodeSet.A:
  281. if 0 <= value <= 95:
  282. char = self._characters[value + 1] # Map using the single dictionary
  283. result += char
  284. # logger.debug(f"Added Code Set A char {char} at position {i}")
  285. i += 1
  286. else:
  287. err_msg = f"Invalid Code Set A value: {value}"
  288. logger.error(err_msg)
  289. return False, err_msg
  290. # logger.debug(f"Decoded result after processing: {result}")
  291. # logger.debug(f"Result indices for {detection_idx + 1}: {result_indices}")
  292. return True, result