Utils

Page under construction...

geometric_verification

geometric_verification(kpts0=None, kpts1=None, method=GeometricVerification.PYDEGENSAC, threshold=1, confidence=0.9999, max_iters=10000, quiet=False, **kwargs)

Computes the fundamental matrix and inliers between the two images using geometric verification.

Parameters:
  • method (str, default: PYDEGENSAC ) –

    The method used for geometric verification. Can be one of ['pydegensac', 'opencv'].

  • threshold (float, default: 1 ) –

    Pixel error threshold for considering a correspondence an inlier.

  • confidence (float, default: 0.9999 ) –

    The required confidence level in the results.

  • max_iters (int, default: 10000 ) –

    The maximum number of iterations for estimating the fundamental matrix.

  • quiet (bool, default: False ) –

    If True, disables logging.

  • **kwargs

    Additional parameters for the selected method. Check the documentation of the selected method for more information. For pydegensac: https://github.com/ducha-aiki/pydegensac, for all the other OPENCV methods: https://docs.opencv.org/4.5.2/d9/d0c/group__calib3d.html#ga13f7e34de8fa516a686a56af1196247f

Returns:
  • Tuple[ndarray, ndarray]

    [np.ndarray, np.ndarray]: A tuple containing: - F: The estimated fundamental matrix. - inlMask: a Boolean array that masks the correspondences that were identified as inliers.

Source code in src/deep_image_matching/utils/geometric_verification.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def geometric_verification(
    kpts0: np.ndarray = None,
    kpts1: np.ndarray = None,
    method: GeometricVerification = GeometricVerification.PYDEGENSAC,
    threshold: float = 1,
    confidence: float = 0.9999,
    max_iters: int = 10000,
    quiet: bool = False,
    **kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Computes the fundamental matrix and inliers between the two images using geometric verification.

    Args:
        method (str): The method used for geometric verification. Can be one of ['pydegensac', 'opencv'].
        threshold (float): Pixel error threshold for considering a correspondence an inlier.
        confidence (float): The required confidence level in the results.
        max_iters (int): The maximum number of iterations for estimating the fundamental matrix.
        quiet (bool): If True, disables logging.
        **kwargs: Additional parameters for the selected method. Check the documentation of the selected method for more information. For pydegensac: https://github.com/ducha-aiki/pydegensac, for all the other OPENCV methods: https://docs.opencv.org/4.5.2/d9/d0c/group__calib3d.html#ga13f7e34de8fa516a686a56af1196247f

    Returns:
        [np.ndarray, np.ndarray]: A tuple containing:
            - F: The estimated fundamental matrix.
            - inlMask: a Boolean array that masks the correspondences that were identified as inliers.

    """

    assert isinstance(
        method, GeometricVerification
    ), "Invalid method. It must be a GeometricVerification enum in GeometricVerification.PYDEGENSAC or GeometricVerification.MAGSAC."

    fallback = False
    F = None
    inlMask = np.ones(len(kpts0), dtype=bool)

    if len(kpts0) < 8:
        if not quiet:
            logger.warning("Not enough matches to perform geometric verification.")
        return F, inlMask

    if method == GeometricVerification.PYDEGENSAC:
        try:
            pydegensac = importlib.import_module("pydegensac")
        except ImportError:
            logger.warning(
                "Pydegensac not available. Using RANSAC (OpenCV) for geometric verification."
            )
            fallback = True

    if method == GeometricVerification.PYDEGENSAC and not fallback:
        try:
            params = {**pydegesac_default_params, **kwargs}
            F, inlMask = pydegensac.findFundamentalMatrix(
                kpts0,
                kpts1,
                px_th=threshold,
                conf=confidence,
                max_iters=max_iters,
                laf_consistensy_coef=params["laf_consistensy_coef"],
                error_type=params["error_type"],
                symmetric_error_check=params["symmetric_error_check"],
                enable_degeneracy_check=params["enable_degeneracy_check"],
            )
            if not quiet:
                log_result(inlMask, method.name)
        except Exception as err:
            # Fall back to RANSAC if pydegensac fails
            fallback = True
            log_error(err, method.name, fallback)

    if method == GeometricVerification.MAGSAC:
        try:
            F, inliers = cv2.findFundamentalMat(
                kpts0, kpts1, cv2.USAC_MAGSAC, threshold, confidence, max_iters
            )
            inlMask = (inliers > 0).squeeze()
            if not quiet:
                log_result(inlMask, method.name)
        except Exception as err:
            # Fall back to RANSAC if MAGSAC fails
            fallback = True
            log_error(err, method.name, fallback)

    # Use a generic OPENCV methods
    if method.name not in ["PYDEGENSAC", "MAGSAC", "RANSAC"]:
        logger.debug(f"Method was set to {method}, trying to use it from OPENCV...")
        met = opencv_methods_mapping[method.name]
        try:
            F, inliers = cv2.findFundamentalMat(
                kpts0, kpts1, met, threshold, confidence, max_iters
            )
            inlMask = (inliers > 0).squeeze()
            if not quiet:
                log_result(inlMask, method.name)
        except Exception as err:
            fallback = True
            log_error(err, method.name, fallback)
            inlMask = np.ones(len(kpts0), dtype=bool)

    # Use RANSAC as fallback
    if method == GeometricVerification.RANSAC or fallback:
        try:
            F, inliers = cv2.findFundamentalMat(
                kpts0, kpts1, cv2.RANSAC, threshold, confidence, max_iters
            )
            inlMask = (inliers > 0).squeeze()
            if not quiet:
                log_result(inlMask, method.name)
        except Exception as err:
            log_error(err, method.name)
            inlMask = np.ones(len(kpts0), dtype=bool)

    if not quiet:
        logger.debug(f"Estiamted Fundamental matrix: \n{F}")

    return F, inlMask



tiling

Tiler

Class for dividing an image into tiles.

Source code in src/deep_image_matching/utils/tiling.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class Tiler:
    """
    Class for dividing an image into tiles.
    """

    def __init__(
        self,
        tiling_mode=TilingMode.SIZE,
    ) -> None:
        """
        Initialize class.

        Parameters:
        - tiling_mode (TilingMode or str, default=TilingMode.SIZE): The tiling mode to use. Can be a TilingMode enum or a string with the name of the enum.

        Returns:
        None
        """
        if isinstance(tiling_mode, str):
            tiling_mode = TilingMode[tiling_mode.upper()]
        elif not isinstance(tiling_mode, TilingMode):
            raise TypeError(
                "tiling_mode must be a TilingMode enum or a string with the name of the enum"
            )
        self._tiling_mode = tiling_mode

    def compute_tiles(self, input: Union[np.ndarray, torch.Tensor], **kwargs):
        if self._tiling_mode == TilingMode.SIZE:
            return self.compute_tiles_by_size(input=input, **kwargs)
        elif self._tiling_mode == TilingMode.GRID:
            return self.compute_tiles_by_grid(input=input, **kwargs)
        else:
            return self.compute_tiles_auto(input=input, **kwargs)

    def compute_tiles_by_size(
        self,
        input: Union[np.ndarray, torch.Tensor],
        window_size: Union[int, Tuple[int, int]],
        overlap: Union[int, Tuple[int, int]] = 0,
    ) -> Tuple[
        Dict[int, np.ndarray], Dict[int, Tuple[int, int]], Tuple[int, int, int, int]
    ]:
        """
        Compute tiles by specifying the window size and overlap.

        Parameters:
            input (np.ndarray or torch.Tensor): The input image.
            window_size (int or Tuple[int, int]): The size of each tile. If int, the same size is used for both height and width. If Tuple[int, int], the first element represents the x coordinate (horizontal) and the second element represents the y coordinate (vertical).
            overlap (int or Tuple[int, int], default=0): The overlap between adjacent tiles. If int, the same overlap is used for both height and width. If Tuple[int, int], the first element represents the overlap in the horizontal direction and the second element represents the overlap in the vertical direction.

        Returns:
            Tuple[Dict[int, np.ndarray], Dict[int, Tuple[int, int]]]: A tuple containing two dictionaries. The first dictionary contains the extracted tiles, where the key is the index of the tile and the value is the tile itself. The second dictionary contains the x, y coordinates of the top-left corner of each tile in the original image (before padding), where the key is the index of the tile and the value is a tuple of two integers representing the x and y coordinates.

        Raises:
            TypeError: If the input is not a numpy array or a torch tensor.
            TypeError: If the window_size is not an integer or a tuple of integers.
            TypeError: If the overlap is not an integer or a tuple of integers.

        Note:
            - If the input is a numpy array, it is assumed to be in the format (H, W, C). If C > 1, it is converted to (C, H, W).
            - The output tiles are in the format (H, W, C).
            - The output origins are expressed in x, y coordinates, where x is the horizontal axis and y is the vertical axis (pointing down, as in OpenCV).
        """
        if isinstance(window_size, int):
            window_size = (window_size, window_size)
        elif isinstance(window_size, tuple) or isinstance(window_size, List):
            # transpose to be (H, W)
            window_size = (window_size[1], window_size[0])
        else:
            raise TypeError("window_size must be an integer or a tuple of integers")

        if isinstance(overlap, int):
            overlap = (overlap, overlap)
        elif isinstance(overlap, tuple) or isinstance(window_size, List):
            # transpose to be (H, W)
            overlap = (overlap[1], overlap[0])
        elif not isinstance(overlap, tuple) or isinstance(window_size, List):
            raise TypeError("overlap must be an integer or a tuple of integers")

        if isinstance(input, np.ndarray):
            input = torch.from_numpy(input)
            # If input is a numpy array, it is assumed to be in the format (H, W, C). If C>1, it is converted to (C, H, W)
            if input.dim() > 2:
                input = input.permute(2, 0, 1)

        # Add dimensions to the tensor to be (B, C, H, W)
        if input.dim() == 2:
            input = input.unsqueeze(0).unsqueeze(0)
        if input.dim() == 3:
            input = input.unsqueeze(0)

        H, W = input.shape[2:]

        # Compute padding to make the image divisible by the window size.
        # This returns a tuple of 2 int (vertical, horizontal)
        # NOTE: from version 0.7.1 compute_padding() returns a tuple of 2 int and not 4 ints (top, bottom, left, right) anymore.
        padding = K.contrib.compute_padding((H, W), window_size)
        stride = [w - o for w, o in zip(window_size, overlap)]
        patches = K.contrib.extract_tensor_patches(
            input, window_size, stride=stride, padding=padding
        )

        # Remove batch dimension
        patches = patches.squeeze(0)

        # Compute number of rows and columns
        if konria_071():
            n_rows = (H + 2 * padding[0] - window_size[0]) // stride[0] + 1
            n_cols = (W + 2 * padding[1] - window_size[1]) // stride[1] + 1
        else:
            n_rows = (H + padding[0] + padding[1] - window_size[0]) // stride[0] + 1
            n_cols = (W + padding[2] + padding[3] - window_size[1]) // stride[1] + 1

        # compute x,y coordinates of the top-left corner of each tile in the original image (before padding)
        origins = {}
        for row in range(n_rows):
            for col in range(n_cols):
                tile_idx = np.ravel_multi_index((row, col), (n_rows, n_cols), order="C")
                if konria_071():
                    x = -padding[1] + col * stride[1]
                    y = -padding[0] + row * stride[0]
                else:
                    x = -padding[2] + col * stride[1]
                    y = -padding[0] + row * stride[0]
                origins[tile_idx] = (x, y)

        # Convert patches to numpy array (H, W, C)
        patches = patches.permute(0, 2, 3, 1).numpy()

        # arrange patches in a dictionary with the index of the patch as key
        patches = {i: patches[i] for i in range(patches.shape[0])}

        return patches, origins, padding

    def compute_tiles_by_grid(
        self,
        input: Union[np.ndarray, torch.Tensor],
        grid: List[int] = [1, 1],
        overlap: int = 0,
        origin: List[int] = [0, 0],
    ) -> Tuple[Dict[int, np.ndarray], Dict[int, Tuple[int, int]]]:
        raise NotImplementedError(
            "compute_tiles_by_grid is not fully implemented yet (need to add padding and testing.)"
        )

        if not isinstance(grid, list) or len(grid) != 2:
            raise TypeError("grid must be a list of two integers")

        if not isinstance(input, np.ndarray):
            raise TypeError(
                "input must be a numpy array. Tile selection by grid is not implemented for torch tensors yet."
            )

        H, W = input.shape[:2]
        n_rows = grid[0]
        n_cols = grid[1]

        DX = round(W / n_cols / 10) * 10
        DY = round(H / n_rows / 10) * 10

        origins = {}
        for col in range(n_cols):
            for row in range(n_rows):
                tile_idx = np.ravel_multi_index((row, col), (n_rows, n_cols), order="C")
                xmin = col * DX - overlap
                ymin = row * DY - overlap
                origins[tile_idx] = (xmin, ymin)

        patches = {}
        for idx, origin in origins.items():
            xmin, ymin = origin
            xmax = xmin + DX + overlap - 1
            ymax = ymin + DY + overlap - 1
            patches[idx] = input[ymin:ymax, xmin:xmax]

        return patches, origins

    def compute_tiles_auto(self, input: Union[np.ndarray, torch.Tensor]):
        raise NotImplementedError("compute_tiles_auto is not implemented yet")

__init__(tiling_mode=TilingMode.SIZE)

Initialize class.

Parameters: - tiling_mode (TilingMode or str, default=TilingMode.SIZE): The tiling mode to use. Can be a TilingMode enum or a string with the name of the enum.

Returns: None

Source code in src/deep_image_matching/utils/tiling.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    tiling_mode=TilingMode.SIZE,
) -> None:
    """
    Initialize class.

    Parameters:
    - tiling_mode (TilingMode or str, default=TilingMode.SIZE): The tiling mode to use. Can be a TilingMode enum or a string with the name of the enum.

    Returns:
    None
    """
    if isinstance(tiling_mode, str):
        tiling_mode = TilingMode[tiling_mode.upper()]
    elif not isinstance(tiling_mode, TilingMode):
        raise TypeError(
            "tiling_mode must be a TilingMode enum or a string with the name of the enum"
        )
    self._tiling_mode = tiling_mode

compute_tiles_by_size(input, window_size, overlap=0)

Compute tiles by specifying the window size and overlap.

Parameters:
  • input (ndarray or Tensor) –

    The input image.

  • window_size (int or Tuple[int, int]) –

    The size of each tile. If int, the same size is used for both height and width. If Tuple[int, int], the first element represents the x coordinate (horizontal) and the second element represents the y coordinate (vertical).

  • overlap (int or Tuple[int, int], default=0, default: 0 ) –

    The overlap between adjacent tiles. If int, the same overlap is used for both height and width. If Tuple[int, int], the first element represents the overlap in the horizontal direction and the second element represents the overlap in the vertical direction.

Returns:
  • Tuple[Dict[int, ndarray], Dict[int, Tuple[int, int]], Tuple[int, int, int, int]]

    Tuple[Dict[int, np.ndarray], Dict[int, Tuple[int, int]]]: A tuple containing two dictionaries. The first dictionary contains the extracted tiles, where the key is the index of the tile and the value is the tile itself. The second dictionary contains the x, y coordinates of the top-left corner of each tile in the original image (before padding), where the key is the index of the tile and the value is a tuple of two integers representing the x and y coordinates.

Raises:
  • TypeError

    If the input is not a numpy array or a torch tensor.

  • TypeError

    If the window_size is not an integer or a tuple of integers.

  • TypeError

    If the overlap is not an integer or a tuple of integers.

Note
  • If the input is a numpy array, it is assumed to be in the format (H, W, C). If C > 1, it is converted to (C, H, W).
  • The output tiles are in the format (H, W, C).
  • The output origins are expressed in x, y coordinates, where x is the horizontal axis and y is the vertical axis (pointing down, as in OpenCV).
Source code in src/deep_image_matching/utils/tiling.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def compute_tiles_by_size(
    self,
    input: Union[np.ndarray, torch.Tensor],
    window_size: Union[int, Tuple[int, int]],
    overlap: Union[int, Tuple[int, int]] = 0,
) -> Tuple[
    Dict[int, np.ndarray], Dict[int, Tuple[int, int]], Tuple[int, int, int, int]
]:
    """
    Compute tiles by specifying the window size and overlap.

    Parameters:
        input (np.ndarray or torch.Tensor): The input image.
        window_size (int or Tuple[int, int]): The size of each tile. If int, the same size is used for both height and width. If Tuple[int, int], the first element represents the x coordinate (horizontal) and the second element represents the y coordinate (vertical).
        overlap (int or Tuple[int, int], default=0): The overlap between adjacent tiles. If int, the same overlap is used for both height and width. If Tuple[int, int], the first element represents the overlap in the horizontal direction and the second element represents the overlap in the vertical direction.

    Returns:
        Tuple[Dict[int, np.ndarray], Dict[int, Tuple[int, int]]]: A tuple containing two dictionaries. The first dictionary contains the extracted tiles, where the key is the index of the tile and the value is the tile itself. The second dictionary contains the x, y coordinates of the top-left corner of each tile in the original image (before padding), where the key is the index of the tile and the value is a tuple of two integers representing the x and y coordinates.

    Raises:
        TypeError: If the input is not a numpy array or a torch tensor.
        TypeError: If the window_size is not an integer or a tuple of integers.
        TypeError: If the overlap is not an integer or a tuple of integers.

    Note:
        - If the input is a numpy array, it is assumed to be in the format (H, W, C). If C > 1, it is converted to (C, H, W).
        - The output tiles are in the format (H, W, C).
        - The output origins are expressed in x, y coordinates, where x is the horizontal axis and y is the vertical axis (pointing down, as in OpenCV).
    """
    if isinstance(window_size, int):
        window_size = (window_size, window_size)
    elif isinstance(window_size, tuple) or isinstance(window_size, List):
        # transpose to be (H, W)
        window_size = (window_size[1], window_size[0])
    else:
        raise TypeError("window_size must be an integer or a tuple of integers")

    if isinstance(overlap, int):
        overlap = (overlap, overlap)
    elif isinstance(overlap, tuple) or isinstance(window_size, List):
        # transpose to be (H, W)
        overlap = (overlap[1], overlap[0])
    elif not isinstance(overlap, tuple) or isinstance(window_size, List):
        raise TypeError("overlap must be an integer or a tuple of integers")

    if isinstance(input, np.ndarray):
        input = torch.from_numpy(input)
        # If input is a numpy array, it is assumed to be in the format (H, W, C). If C>1, it is converted to (C, H, W)
        if input.dim() > 2:
            input = input.permute(2, 0, 1)

    # Add dimensions to the tensor to be (B, C, H, W)
    if input.dim() == 2:
        input = input.unsqueeze(0).unsqueeze(0)
    if input.dim() == 3:
        input = input.unsqueeze(0)

    H, W = input.shape[2:]

    # Compute padding to make the image divisible by the window size.
    # This returns a tuple of 2 int (vertical, horizontal)
    # NOTE: from version 0.7.1 compute_padding() returns a tuple of 2 int and not 4 ints (top, bottom, left, right) anymore.
    padding = K.contrib.compute_padding((H, W), window_size)
    stride = [w - o for w, o in zip(window_size, overlap)]
    patches = K.contrib.extract_tensor_patches(
        input, window_size, stride=stride, padding=padding
    )

    # Remove batch dimension
    patches = patches.squeeze(0)

    # Compute number of rows and columns
    if konria_071():
        n_rows = (H + 2 * padding[0] - window_size[0]) // stride[0] + 1
        n_cols = (W + 2 * padding[1] - window_size[1]) // stride[1] + 1
    else:
        n_rows = (H + padding[0] + padding[1] - window_size[0]) // stride[0] + 1
        n_cols = (W + padding[2] + padding[3] - window_size[1]) // stride[1] + 1

    # compute x,y coordinates of the top-left corner of each tile in the original image (before padding)
    origins = {}
    for row in range(n_rows):
        for col in range(n_cols):
            tile_idx = np.ravel_multi_index((row, col), (n_rows, n_cols), order="C")
            if konria_071():
                x = -padding[1] + col * stride[1]
                y = -padding[0] + row * stride[0]
            else:
                x = -padding[2] + col * stride[1]
                y = -padding[0] + row * stride[0]
            origins[tile_idx] = (x, y)

    # Convert patches to numpy array (H, W, C)
    patches = patches.permute(0, 2, 3, 1).numpy()

    # arrange patches in a dictionary with the index of the patch as key
    patches = {i: patches[i] for i in range(patches.shape[0])}

    return patches, origins, padding



image

ImageList

Represents a collection of Image objects

Attributes:
  • IMAGE_EXT (tuple) –

    Supported image file extensions.

Source code in src/deep_image_matching/utils/image.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class ImageList:
    """
    Represents a collection of Image objects

    Attributes:
        IMAGE_EXT (tuple): Supported image file extensions.
    """

    IMAGE_EXT = IMAGE_EXT

    def __init__(self, img_dir: Path):
        """
        Initializes an ImageList object

        Args:
            img_dir (Path): The path to the directory containing the images.

        Raises:
            ValueError: If the directory does not exist, is not a directory, or
                does not contain any valid images.
        """
        if not img_dir.exists():
            raise ValueError(f"Directory {img_dir} does not exist")

        if not img_dir.is_dir():
            raise ValueError(f"{img_dir} is not a directory")

        self.images = []
        self.current_idx = 0
        i = 0
        all_imgs = [
            image for image in img_dir.glob("*") if image.suffix in self.IMAGE_EXT
        ]
        all_imgs.sort()

        if len(all_imgs) == 0:
            raise ValueError(f"{img_dir} does not contain any image")

        for image in all_imgs:
            self.add_image(image, i)
            i += 1

    def __len__(self):
        return len(self.images)

    def __repr__(self) -> str:
        return f"ImageList with {len(self.images)} images"

    def __getitem__(self, img_id):
        return self.images[img_id]

    def __iter__(self):
        return self

    def __next__(self):
        if self.current_idx >= len(self.images):
            raise StopIteration
        cur = self.current_idx
        self.current_idx += 1
        return self.images[cur]

    def add_image(self, path: Path, img_id: int):
        """
        Adds a new Image object to the ImageList.

        Args:
            path (Path): The path to the image file.
            img_id (int): The ID to assign to the image.
        """
        new_image = Image(path, img_id)
        self.images.append(new_image)

    @property
    def img_names(self):
        """
        Returns a list of image names in the ImageList.

        Returns:
            list: A list of image names (strings).
        """
        return [im.name for im in self.images]

    @property
    def img_paths(self):
        """
        Returns a list of image paths in the ImageList

        Returns:
            list: A list of image paths (Path objects).
        """
        return [im.path for im in self.images]

img_names property

Returns a list of image names in the ImageList.

Returns:
  • list

    A list of image names (strings).

img_paths property

Returns a list of image paths in the ImageList

Returns:
  • list

    A list of image paths (Path objects).

__init__(img_dir)

Initializes an ImageList object

Parameters:
  • img_dir (Path) –

    The path to the directory containing the images.

Raises:
  • ValueError

    If the directory does not exist, is not a directory, or does not contain any valid images.

Source code in src/deep_image_matching/utils/image.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def __init__(self, img_dir: Path):
    """
    Initializes an ImageList object

    Args:
        img_dir (Path): The path to the directory containing the images.

    Raises:
        ValueError: If the directory does not exist, is not a directory, or
            does not contain any valid images.
    """
    if not img_dir.exists():
        raise ValueError(f"Directory {img_dir} does not exist")

    if not img_dir.is_dir():
        raise ValueError(f"{img_dir} is not a directory")

    self.images = []
    self.current_idx = 0
    i = 0
    all_imgs = [
        image for image in img_dir.glob("*") if image.suffix in self.IMAGE_EXT
    ]
    all_imgs.sort()

    if len(all_imgs) == 0:
        raise ValueError(f"{img_dir} does not contain any image")

    for image in all_imgs:
        self.add_image(image, i)
        i += 1

add_image(path, img_id)

Adds a new Image object to the ImageList.

Parameters:
  • path (Path) –

    The path to the image file.

  • img_id (int) –

    The ID to assign to the image.

Source code in src/deep_image_matching/utils/image.py
450
451
452
453
454
455
456
457
458
459
def add_image(self, path: Path, img_id: int):
    """
    Adds a new Image object to the ImageList.

    Args:
        path (Path): The path to the image file.
        img_id (int): The ID to assign to the image.
    """
    new_image = Image(path, img_id)
    self.images.append(new_image)

read_image(path, color=True)

Reads image with OpenCV and returns it as a NumPy array.

Parameters:
  • path (Union[str, Path]) –

    The path of the image.

  • color (bool, default: True ) –

    Whether to read the image as color (RGB) or grayscale. Defaults to True.

Returns:
  • ndarray

    np.ndarray: The image as a NumPy array.

Source code in src/deep_image_matching/utils/image.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def read_image(
    path: Union[str, Path],
    color: bool = True,
) -> np.ndarray:
    """
    Reads image with OpenCV and returns it as a NumPy array.

    Args:
        path (Union[str, Path]): The path of the image.
        color (bool, optional): Whether to read the image as color (RGB) or grayscale. Defaults to True.

    Returns:
        np.ndarray: The image as a NumPy array.
    """

    if not Path(path).exists():
        raise ValueError(f"File {path} does not exist")

    if color:
        flag = cv2.IMREAD_COLOR
    else:
        flag = cv2.IMREAD_GRAYSCALE

    image = cv2.imread(str(path), flag)

    if color:
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    return image



logger

deprecated(func)

This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emitted and a logging warning when the function is used.

Source code in src/deep_image_matching/utils/logger.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def deprecated(func):
    """This is a decorator which can be used to mark functions
    as deprecated. It will result in a warning being emitted
    and a logging warning when the function is used."""

    @functools.wraps(func)
    def new_func(*args, **kwargs):
        message = kwargs.get("message", None)
        if message is None:
            message = f"Depracated {func.__name__}."
        warnings.simplefilter("always", DeprecationWarning)  # turn off filter
        msg = f"Call to deprecated function {func.__name__}."
        warnings.warn(
            msg,
            category=DeprecationWarning,
            stacklevel=2,
        )
        logging.warning(msg)
        warnings.simplefilter("default", DeprecationWarning)  # reset filter
        return func(*args, **kwargs)

    return new_func

setup_logger(name=None, log_level='info', log_folder=None, logfile_basename='log')

Configures and returns a logging.Logger instance.

This function checks for existing loggers with the same name. It provides flexible configuration for both console and file-based logging with customizable log levels, formats, and an optional log file.

Parameters:
  • name (str, default: None ) –

    The name of the logger. If None, the root logger will be used. Defaults to None.

  • log_level (str, default: 'info' ) –

    The logging level for both console and file outputs. Valid options are 'debug', 'info', 'warning', 'error', 'critical'. Defaults to 'info'.

  • log_folder (str, default: None ) –

    The path to the directory for storing log files. If None, no file output will be generated. Defaults to None.

  • logfile_basename (str, default: 'log' ) –

    The base name for the log file. A timestamp will be appended. Defaults to "log".

Returns:
  • Logger

    logging.Logger: A configured logger instance.

Source code in src/deep_image_matching/utils/logger.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def setup_logger(
    name: str = None,
    log_level: str = "info",
    log_folder: str = None,
    logfile_basename: str = "log",
) -> logging.Logger:
    """
    Configures and returns a logging.Logger instance.

    This function checks for existing loggers with the same name. It provides
    flexible configuration for both console and file-based logging with customizable
    log levels, formats, and an optional log file.

    Args:
        name (str, optional): The name of the logger. If None, the root logger
            will be used. Defaults to None.
        log_level (str, optional): The logging level for both console and file
            outputs. Valid options are 'debug', 'info', 'warning', 'error',
            'critical'. Defaults to 'info'.
        log_folder (str, optional): The path to the directory for storing log files.
            If None, no file output will be generated. Defaults to None.
        logfile_basename (str, optional): The base name for the log file. A timestamp
            will be appended. Defaults to "log".

    Returns:
        logging.Logger: A configured logger instance.
    """
    # Check if logger already exists
    if logging.getLogger(name).hasHandlers():
        logger = logging.getLogger(name)
        logger.debug(f"Logger {logger.name} already exists")
        return logger

    # Set log line template
    if log_level == "debug":
        log_line_template = "%(color_on)s%(asctime)s | | [%(filename)s -> %(funcName)s], line %(lineno)d - [%(levelname)-8s] %(message)s%(color_off)s"
    else:
        log_line_template = (
            "%(color_on)s%(asctime)s | [%(levelname)-8s] %(message)s%(color_off)s"
        )

    # Set log file
    if log_folder is not None:
        log_folder = Path(log_folder)
        log_folder.mkdir(exist_ok=True, parents=True)
        today = date.today()
        now = datetime.now()
        current_date = f"{today.strftime('%Y_%m_%d')}_{now.strftime('%H:%M')}"
        log_file = log_folder / f"{logfile_basename}_{current_date}.log"
    else:
        log_file = None

    # Setup logging
    logger = configure_logging(
        name=name,
        console_log_output="stdout",
        console_log_level=log_level,
        console_log_color=True,
        logfile_file=log_file,
        logfile_log_level=log_level,
        logfile_log_color=False,
        log_line_template=log_line_template,
    )
    return logger



timer

Timer

Class to help manage printing simple timing of code execution.

Attributes:
  • smoothing (float) –

    A smoothing factor for the time measurements.

  • times (OrderedDict) –

    A dictionary to store timing information for different named sections.

  • will_print (OrderedDict) –

    A dictionary to track whether to print the timing information for each section.

  • logger (Logger) –

    The logger object to use for printing timing information.

Source code in src/deep_image_matching/utils/timer.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class Timer:
    """
    Class to help manage printing simple timing of code execution.

    Attributes:
        smoothing (float): A smoothing factor for the time measurements.
        times (OrderedDict): A dictionary to store timing information for different named sections.
        will_print (OrderedDict): A dictionary to track whether to print the timing information for each section.
        logger (logging.Logger): The logger object to use for printing timing information.
    """

    def __init__(
        self,
        smoothing: float = 0.3,
        logger: logging.Logger = logger,
        log_level: str = "info",
        cumulate_by_key: bool = False,
    ):
        """
        Initializes the Timer object.

        Args:
            smoothing (float, optional): A smoothing factor for the time measurements. Defaults to 0.3.
            logger (logging.Logger, optional): The logger object to use for printing timing information. Defaults to logger.
        """
        self.smoothing = smoothing
        self.times = OrderedDict()
        self.will_print = OrderedDict()
        self.logger = logger
        self.log_level = logging.getLevelName(log_level.upper())
        self.cumulate = cumulate_by_key

        self.reset()

    def reset(self):
        """
        Resets the Timer object, setting initial time values.
        """
        now = time.time()
        self.start = now
        self.last_time = now
        self.times.clear()
        self.will_print.clear()

    def update(self, name: str):
        """
        Updates the timing information for a specific named section. If the section does not exist, it is created, otherwise the timing information is updated. If cumulate_by_key was set to True, the timing information is accumulated for each key, otherwise the timing information is smoothed.

        Args:
            name (str): The name of the section.
        """
        now = time.time()
        dt = now - self.last_time
        self.last_time = now

        if name in self.times:
            if self.cumulate:
                self.times[name] += dt
            else:
                self.times[name] = (
                    self.smoothing * dt + (1 - self.smoothing) * self.times[name]
                )
        else:
            self.times[name] = dt

        self.will_print[name] = True

    def print(self, text: str = "Timer", sep: str = ", "):
        """
        Prints the accumulated timing information.

        Args:
            text (str, optional): Additional text to include in the printed message. Defaults to "Timer".
        """
        total = 0.0
        msg = f"[Timer] | [{text}] "
        for key in self.times:
            val = self.times[key]
            if self.will_print[key]:
                msg = msg + f"{key}={val:.3f}{sep}"
                total += val
        exec_time = time.time() - self.start
        msg = msg + f"Total execution={exec_time:.3f}"
        self.logger.log(self.log_level, msg)

        self.reset()

__init__(smoothing=0.3, logger=logger, log_level='info', cumulate_by_key=False)

Initializes the Timer object.

Parameters:
  • smoothing (float, default: 0.3 ) –

    A smoothing factor for the time measurements. Defaults to 0.3.

  • logger (Logger, default: logger ) –

    The logger object to use for printing timing information. Defaults to logger.

Source code in src/deep_image_matching/utils/timer.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    smoothing: float = 0.3,
    logger: logging.Logger = logger,
    log_level: str = "info",
    cumulate_by_key: bool = False,
):
    """
    Initializes the Timer object.

    Args:
        smoothing (float, optional): A smoothing factor for the time measurements. Defaults to 0.3.
        logger (logging.Logger, optional): The logger object to use for printing timing information. Defaults to logger.
    """
    self.smoothing = smoothing
    self.times = OrderedDict()
    self.will_print = OrderedDict()
    self.logger = logger
    self.log_level = logging.getLevelName(log_level.upper())
    self.cumulate = cumulate_by_key

    self.reset()

print(text='Timer', sep=', ')

Prints the accumulated timing information.

Parameters:
  • text (str, default: 'Timer' ) –

    Additional text to include in the printed message. Defaults to "Timer".

Source code in src/deep_image_matching/utils/timer.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def print(self, text: str = "Timer", sep: str = ", "):
    """
    Prints the accumulated timing information.

    Args:
        text (str, optional): Additional text to include in the printed message. Defaults to "Timer".
    """
    total = 0.0
    msg = f"[Timer] | [{text}] "
    for key in self.times:
        val = self.times[key]
        if self.will_print[key]:
            msg = msg + f"{key}={val:.3f}{sep}"
            total += val
    exec_time = time.time() - self.start
    msg = msg + f"Total execution={exec_time:.3f}"
    self.logger.log(self.log_level, msg)

    self.reset()

reset()

Resets the Timer object, setting initial time values.

Source code in src/deep_image_matching/utils/timer.py
66
67
68
69
70
71
72
73
74
def reset(self):
    """
    Resets the Timer object, setting initial time values.
    """
    now = time.time()
    self.start = now
    self.last_time = now
    self.times.clear()
    self.will_print.clear()

update(name)

Updates the timing information for a specific named section. If the section does not exist, it is created, otherwise the timing information is updated. If cumulate_by_key was set to True, the timing information is accumulated for each key, otherwise the timing information is smoothed.

Parameters:
  • name (str) –

    The name of the section.

Source code in src/deep_image_matching/utils/timer.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def update(self, name: str):
    """
    Updates the timing information for a specific named section. If the section does not exist, it is created, otherwise the timing information is updated. If cumulate_by_key was set to True, the timing information is accumulated for each key, otherwise the timing information is smoothed.

    Args:
        name (str): The name of the section.
    """
    now = time.time()
    dt = now - self.last_time
    self.last_time = now

    if name in self.times:
        if self.cumulate:
            self.times[name] += dt
        else:
            self.times[name] = (
                self.smoothing * dt + (1 - self.smoothing) * self.times[name]
            )
    else:
        self.times[name] = dt

    self.will_print[name] = True

timeit(func)

Decorator that measures the execution time of a function and prints the duration.

Parameters:
  • func (callable) –

    The function to be decorated.

Returns:
  • callable

    The decorated function.

Source code in src/deep_image_matching/utils/timer.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def timeit(func):
    """
    Decorator that measures the execution time of a function and prints the duration.

    Args:
        func (callable): The function to be decorated.

    Returns:
        callable: The decorated function.
    """

    @wraps(func)
    def timeit_wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        total_time = end_time - start_time
        print(f"Function {func.__name__} took {total_time:.4f} s.")
        return result

    return timeit_wrapper