123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- import imgviz
- from qtpy import QtCore
- from qtpy import QtGui
- from qtpy import QtWidgets
- import labelme.ai
- import labelme.utils
- from labelme import QT5
- from labelme.logger import logger
- from labelme.shape import Shape
- import collections
- import threading
- import numpy as np
- import openvino as ov
- import os.path as osp
- import cv2
- from labelme.utils import img_qt_to_arr
- from labelme.utils import load_barcode_dict
- class CodeSet:
- NONE = 0
- A = 1
- B = 2
- C = 3
- class Normalize:
- def __init__(self, mean=(0.15525904, 0.15525904, 0.15525904), std=(0.12552188, 0.12552188, 0.12552188)):
- if not (isinstance(mean, (list, tuple)) and isinstance(std, (list, tuple))):
- raise ValueError("mean and std should be of type list or tuple.")
- self.mean = np.array(mean, dtype=np.float32)
- self.std = np.array(std, dtype=np.float32)
-
- self.mean = self.mean.reshape((1, 1, 3))
- self.std = self.std.reshape((1, 1, 3))
- def __call__(self, img):
- img = img.astype(np.float32) / 255.0
- img = (img - self.mean) / self.std
- return img
- class BarcodeDecodeModel:
- def __init__(self, decoding_model_path=None):
- self.ie = ov.Core()
- self.pixmap = QtGui.QPixmap()
-
- self.decoding_net = None
- self.decoding_sess = None
- self._characters = load_barcode_dict()
- if decoding_model_path:
- self.decoding_net = self.ie.read_model(model=decoding_model_path)
- self.decoding_sess = self.ie.compile_model(model=self.decoding_net, device_name="CPU")
- self.decoding_input_shape = (1, 3, 32, 256)
- self.normalize = Normalize()
- self._lock = threading.Lock()
- self._image_embedding_cache = collections.OrderedDict()
- self._max_cache_size = 10
- self.pixmap = QtGui.QPixmap()
-
-
-
-
-
-
-
-
-
-
- def preprocess_image(self, image):
- norm = Normalize(mean=(0.44948044,0.44948044,0.44948044), std=(0.22099442,0.22099442,0.22099442))
- resized_image = cv2.resize(image, (self.decoding_input_shape[3], self.decoding_input_shape[2]))
- resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)
- resized_image = norm(resized_image)
-
- logger.debug(f"Preprocessing image for detection: {image.shape}")
-
- input_tensor = resized_image.transpose(2, 0, 1)
- input_tensor = np.expand_dims(input_tensor, 0)
- logger.debug(f"Processed image shape: {input_tensor.shape}")
- return input_tensor
-
- def decode_from_points(self, points, detection_idx, original_image):
- """
- Decodes the cropped image based on points and returns the decoded text.
- Args:
- points (list): List of points defining the bounding box.
- pixmap (QPixmap): Original image pixmap to crop from.
- Returns:
- str: Decoded text from the decoding model.
- """
- try:
-
- polygon = np.array(points, dtype=np.int32)
-
-
- cv2.imwrite(f"original_image{detection_idx + 1}.png", original_image)
- mask = np.zeros(original_image.shape[:2], dtype=np.uint8)
- cv2.fillPoly(mask, [polygon], 255)
-
- masked_image = cv2.bitwise_and(original_image, original_image, mask=mask)
-
- x, y, w, h = cv2.boundingRect(polygon)
- cropped_image_dec = masked_image[y:y+h, x:x+w]
- cv2.imwrite(f"cropped_exact_{detection_idx + 1}.png", cropped_image_dec)
- logger.debug(f"cropped_exact image saved at {detection_idx + 1}.")
-
- src_points = np.float32(points)
-
- width = int(np.linalg.norm(src_points[0] - src_points[1]))
-
- height = int(np.linalg.norm(src_points[1] - src_points[2]))
-
-
- if width < height:
- width, height = height, width
-
- src_points = np.float32([
- src_points[1],
- src_points[2],
- src_points[3],
- src_points[0]
- ])
-
- dst_points = np.float32([
- [0, 0],
- [width - 1, 0],
- [width - 1, height - 1],
- [0, height - 1]
- ])
-
- M = cv2.getPerspectiveTransform(src_points, dst_points)
-
- aligned_barcode = cv2.warpPerspective(original_image, M, (width, height), flags=cv2.INTER_LINEAR)
-
- cv2.imwrite(f"decoding_barcode_{detection_idx + 1}.png", aligned_barcode)
- logger.debug(f"Aligned barcode saved at {detection_idx + 1}.")
-
- normalized_img = np.zeros(aligned_barcode.shape, aligned_barcode.dtype)
- cv2.normalize(aligned_barcode, normalized_img, 0, 255, cv2.NORM_MINMAX)
- logger.debug("Image normalized.")
-
- cv2.imwrite(f"cropped_image_decoding_normalized{detection_idx + 1}.png",normalized_img)
- logger.debug(f"Saved normalized image for decoding : {detection_idx + 1}")
-
- confidence = None
-
- decoded_text, confidence = self.run_decoding(normalized_img, detection_idx, confidence)
-
- if decoded_text:
- checksum_valid, validated_result = self.validate_code128_checksum(decoded_text, detection_idx)
- if checksum_valid:
- logger.debug(f"Validated result for detection {detection_idx + 1}: {validated_result}")
- return validated_result
- else:
- logger.error(f"Checksum validation failed for detection {detection_idx + 1}. Retrying with 180° rotation.")
-
- rotated_image = cv2.rotate(normalized_img, cv2.ROTATE_180)
- decoded_text, confidence = self.run_decoding(rotated_image, detection_idx, confidence)
-
- if decoded_text:
- checksum_valid, validated_result = self.validate_code128_checksum(decoded_text, detection_idx)
- if checksum_valid:
- logger.debug(f"Validated result after rotation for detection {detection_idx + 1}: {validated_result}")
- return validated_result
- else:
- logger.error(f"Checksum validation failed after rotation for detection {detection_idx + 1}. Error: {validated_result}")
- return
- return "Decoding failed"
- except Exception as e:
- logger.error(f"Error in decode_from_points: {e}")
- return "Error: Decoding failed"
-
- def run_decoding(self, image_np, detection_idx, confidence):
- """Helper to run decoding on the given image."""
- preprocessed_img = self.preprocess_image(
- image_np
- )
- decode_result = self.decoding_sess.infer_new_request({'x': preprocessed_img})
- output_tensor = decode_result['save_infer_model/scale_0.tmp_0']
- logger.debug(f"Output tensor shape: {output_tensor.shape}")
- output_indices = np.argmax(output_tensor, axis=2)
- output_probs = np.max(output_tensor, axis=2)
-
-
- decoded_text, confidence = self.decode_text(output_indices, output_probs, detection_idx)
- logger.debug(f"Raw barcode: {decoded_text}, Confidence: {confidence:.2f}")
- return decoded_text, confidence
-
- def decode_text(self, text_indices, text_probs, detection_idx):
- """
- Converts model output indices into text using the character dictionary.
- Args:
- text_indices (np.ndarray): Output indices from the decoding model.
- text_probs (np.ndarray): Probabilities corresponding to the indices.
- Returns:
- tuple: Decoded text and its confidence score.
- """
- try:
- max_index = len(self._characters) - 1
- logger.debug(f"Loaded barcode dictionary with {len(self._characters)} characters.")
- result_list = []
-
- for batch_idx in range(text_indices.shape[0]):
- char_list = []
- conf_list = []
- for step_idx in range(text_indices.shape[1]):
- char_idx = int(text_indices[batch_idx, step_idx])
- if char_idx > max_index:
- logger.warning(f"Index {char_idx} is out of bounds for dictionary size {len(self._characters)}")
- continue
- char = self._characters[char_idx]
-
- if char == "</s>":
- break
- char_list.append(char)
- conf_list.append(text_probs[batch_idx, step_idx])
- text = ''.join(char_list)
- confidence = np.mean(conf_list) if conf_list else 0.0
- result_list.append((text, confidence))
-
- return result_list[0] if result_list else ("", 0.0)
- except Exception as e:
- logger.error(f"Error in decode_text: {e}")
- return "Error: Decoding failed", 0.0
- def validate_code128_checksum(self, decoded_text, detection_idx):
-
-
- code128Values = [self._characters.index(char, 1) - 1 if char in self._characters[1:] else -1 for char in decoded_text]
- logger.debug(f"code128Values:{code128Values}")
- result = ""
- err_msg = ""
- currentCodeSet = CodeSet.B
- if code128Values[0] in [103, 104, 105]:
- start_codes = {103: CodeSet.A, 104: CodeSet.B, 105: CodeSet.C}
- currentCodeSet = start_codes[code128Values[0]]
-
- else:
- err_msg = f"No start code detected, first code is {code128Values[0]}"
- return False, err_msg
- checksum_expected = code128Values[-2]
-
-
- checksum_calculated = code128Values[0]
- for i, value in enumerate(code128Values[1:-2], start=1):
- weighted_value = value * i
- checksum_calculated += weighted_value
-
- checksum_calculated %= 103
- logger.debug(f"Final Calculated Checksum (mod 103): {checksum_calculated}")
- if checksum_calculated != checksum_expected:
- err_msg = f"Invalid checksum value, supposed to be {checksum_calculated} but got {checksum_expected}"
- return False, err_msg
-
- if code128Values[-1] != 106:
- err_msg = "No valid stop code detected at the end of the sequence."
- return False, err_msg
-
- result = ""
- i = 1
- while i < len(code128Values) - 2:
- value = code128Values[i]
-
- if value == 102:
- logger.debug(f"Detected FNC1 at position {i}, treated as AI separator.")
-
- i += 1
- continue
- elif value == 99:
- currentCodeSet = CodeSet.C
- logger.debug(f"Switched to Code Set C at position {i}")
- i += 1
- continue
- elif value == 100:
- currentCodeSet = CodeSet.B
- logger.debug(f"Switched to Code Set B at position {i}")
- i += 1
- continue
-
- if currentCodeSet == CodeSet.C:
- result += f"{value:02}"
-
- i += 1
- elif currentCodeSet == CodeSet.B:
- if 0 <= value <= 95:
- char = self._characters[value + 1]
- result += char
-
- i += 1
- else:
- err_msg = f"Invalid Code Set B value: {value}"
- logger.error(err_msg)
- return False, err_msg
- elif currentCodeSet == CodeSet.A:
- if 0 <= value <= 95:
- char = self._characters[value + 1]
- result += char
-
- i += 1
- else:
- err_msg = f"Invalid Code Set A value: {value}"
- logger.error(err_msg)
- return False, err_msg
-
-
- return True, result
|