97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464 | class ExtractorBase(metaclass=ABCMeta):
general_conf = {
"output_dir": None,
"quality": Quality.HIGH,
"tile_selection": TileSelection.NONE,
"tile_size": (1024, 1024), # (x, y) or (width, height)
"tile_overlap": 0, # in pixels
"force_cpu": False,
"do_viz": False,
}
default_conf = {}
required_inputs = []
grayscale = True
as_float = True
interp = "cv2_area" # "cv2_area", "cv2_linear", or "pil_bilinear" (more accurate but slower)
descriptor_size = 128
features_as_half = True
def __init__(self, custom_config: dict):
"""
Initialize the instance with a custom config. This is the method to be called by subclasses
Args:
custom_config: a dictionary of options to
"""
# If a custom config is passed, update the default config
if not isinstance(custom_config, dict):
raise TypeError("opt must be a dictionary")
# self._update_config(custom_config)
# Update default config
self._config = {
"general": {
**self.general_conf,
**custom_config.get("general", {}),
},
"extractor": {
**self.default_conf,
**custom_config.get("extractor", {}),
},
}
# Get main processing parameters and save them as class members
self._quality = self._config["general"]["quality"]
self._tiling = self._config["general"]["tile_selection"]
logger.debug(
f"Matching options: Quality: {self._quality.name} - Tiling: {self._tiling.name}"
)
# Define saving directory
output_dir = self._config["general"]["output_dir"]
if output_dir is not None:
self._output_dir = Path(output_dir)
self._output_dir.mkdir(parents=True, exist_ok=True)
else:
self._output_dir = None
logger.debug(f"Saving directory: {self._output_dir}")
# Get device
self._device = (
"cuda"
if torch.cuda.is_available() and not self._config["general"]["force_cpu"]
else "cpu"
)
logger.debug(f"Running inference on device {self._device}")
def extract(self, img: Union[Image, Path, str]) -> np.ndarray:
"""
Extract features from an image. This is the main method of the feature extractor.
Args:
img: Image to extract features from. It can be either a path to an image or an Image object
Returns:
List of features extracted from the image. Each feature is a 2D NumPy array
"""
if isinstance(img, str):
im_path = Path(img)
elif isinstance(img, Image):
im_path = img.path
elif isinstance(img, Path):
im_path = img
else:
raise TypeError(
"Invalid image path. 'img' must be a string, a Path or an Image object"
)
if not im_path.exists():
raise ValueError(f"Image {im_path} does not exist")
output_dir = Path(self._config["general"]["output_dir"])
feature_path = output_dir / "features.h5"
# Load image
image = cv2.imread(str(im_path))
if self.grayscale:
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
if self.as_float:
image = image.astype(np.float32)
# Resize images if needed
image_ = self._resize_image(self._quality, image, interp=self.interp)
if self._config["general"]["tile_selection"] == TileSelection.NONE:
# Extract features from the whole image
features = self._extract(image_)
# features["feature_path"] = str(feature_path)
# features["im_path"] = str(im_path)
features["tile_idx"] = np.zeros(
features["keypoints"].shape[0], dtype=np.float32
)
else:
# Extract features by tiles
features = self._extract_by_tile(image_, select_unique=True)
# features["feature_path"] = str(feature_path)
# features["im_path"] = str(im_path)
logger.debug(f"Extracted {len(features['keypoints'])} keypoints")
# Retrieve original image coordinates if matching was performed on up/down-sampled images
features = self._resize_features(self._quality, features)
# Add the image_size to the features (if not already present)
features["image_size"] = np.array(image.shape[:2])
# Save features to disk in h5 format
save_features_h5(
feature_path,
features,
im_path.name,
as_half=self.features_as_half,
)
# For debug: visualize keypoints and save to disk
if self._config["general"]["verbose"]:
viz_dir = output_dir / "debug" / "keypoints"
viz_dir.mkdir(parents=True, exist_ok=True)
image = cv2.imread(str(im_path))
self.viz_keypoints(
image,
features["keypoints"],
viz_dir,
im_path.stem,
img_format="jpg",
jpg_quality=70,
)
return feature_path
@abstractmethod
def _extract(self, image: np.ndarray) -> dict:
"""
Extract features from an image. This is called by ` extract ` method to extract features from the image. This method must be implemented by subclasses.
Args:
image: A NumPy array of shape ( height width 3 )
Returns:
A dictionary of extracted features
"""
raise NotImplementedError("Subclasses should implement _extract method!")
@abstractmethod
def _frame2tensor(self, image: np.ndarray, device: str = "cpu"):
"""
Convert a frame to a tensor. This is a low - level method to be used by subclasses that need to convert an image to a tensor with the required format. This method must be implemented by subclasses.
Args:
image: The image to be converted
device: The device to convert to (defaults to 'cpu')
"""
raise NotImplementedError(
"Subclasses should implement _frame2tensor method to adapt the input image to the required format!"
)
def _extract_by_tile(self, image: np.ndarray, select_unique: bool = True):
"""
Extract features from an image by tiles. This is called by :meth:`extract` to extract features from the image.
Args:
image: The image to extract from. Must be a 2D array
select_unique: If True the unique values of keypoints are selected
"""
# Compute tiles limits
tile_size = self._config["general"]["tile_size"]
overlap = self._config["general"]["tile_overlap"]
tiler = Tiler(tiling_mode="size")
tiles, tiles_origins, padding = tiler.compute_tiles_by_size(
input=image, window_size=tile_size, overlap=overlap
)
# Initialize empty arrays
kpts_full = np.array([], dtype=np.float32).reshape(0, 2)
descriptors_full = np.array([], dtype=np.float32).reshape(
self.descriptor_size, 0
)
scores_full = np.array([], dtype=np.float32)
tile_idx_full = np.array([], dtype=np.float32)
# Extract features from each tile
for idx, tile in tiles.items():
logger.debug(f" - Extracting features from tile: {idx}")
# Extract features in tile
feat_tile = self._extract(tile)
kp_tile = feat_tile["keypoints"]
des_tile = feat_tile["descriptors"]
if "scores" in feat_tile:
scor_tile = feat_tile["scores"]
else:
scor_tile = None
# For debug: visualize keypoints and save to disk
if self._config["general"]["verbose"]:
tile = np.uint8(tile)
viz_dir = self._output_dir / "debug" / "tiles"
viz_dir.mkdir(parents=True, exist_ok=True)
self.viz_keypoints(
tile,
kp_tile,
viz_dir,
f"tile_{idx}",
img_format="jpg",
jpg_quality=70,
)
# get keypoints in original image coordinates
kp_tile += np.array(tiles_origins[idx])
# Check if any keypoints are outside the original image (non-padded) or too close to the border
border_thr = 2 # Adjust this threshold as needed
mask = (
(kp_tile[:, 0] >= border_thr)
& (kp_tile[:, 0] < image.shape[1] - border_thr)
& (kp_tile[:, 1] >= border_thr)
& (kp_tile[:, 1] < image.shape[0] - border_thr)
)
kp_tile = kp_tile[mask]
des_tile = des_tile[:, mask]
if scor_tile is not None:
scor_tile = scor_tile[mask]
if len(kp_tile) > 0:
kpts_full = np.vstack((kpts_full, kp_tile))
descriptors_full = np.hstack((descriptors_full, des_tile))
tile_idx = np.full(len(kp_tile), idx, dtype=np.float32)
tile_idx_full = np.concatenate((tile_idx_full, tile_idx))
if scor_tile is not None:
scores_full = np.concatenate((scores_full, scor_tile))
else:
scores_full = None
if scores_full is None:
logger.warning("No scores found in features")
scores_full = np.ones(kpts_full.shape[0], dtype=np.float32)
# Select unique keypoints
if select_unique is True:
kpts_full, unique_idx = np.unique(kpts_full, axis=0, return_index=True)
descriptors_full = descriptors_full[:, unique_idx]
tile_idx_full = tile_idx_full[unique_idx]
scores_full = scores_full[unique_idx]
# Make FeaturesDict object
features = FeaturesDict(
keypoints=kpts_full,
descriptors=descriptors_full,
scores=scores_full,
tile_idx=tile_idx_full,
)
return features
def _resize_image(
self, quality: Quality, image: np.ndarray, interp: str = "cv2_area"
) -> Tuple[np.ndarray]:
"""
Resize images based on the specified quality.
Args:
quality (Quality): The quality level for resizing.
image (np.ndarray): The first image.
Returns:
Tuple[np.ndarray]: Resized images.
"""
# If quality is HIGHEST, force interpolation to cv2_cubic
if quality == Quality.HIGHEST:
interp = "cv2_cubic"
if quality == Quality.HIGH:
return image # No resize
new_size = get_size_by_quality(quality, image.shape[:2])
return resize_image(image, (new_size[1], new_size[0]), interp=interp)
def _resize_features(
self, quality: Quality, features: FeaturesDict
) -> Tuple[FeaturesDict]:
"""
Resize features based on the specified quality.
Args:
quality (Quality): The quality level for resizing.
features (FeaturesDict): The features to be resized.
Returns:
Tuple[FeaturesDict]: Resized features.
"""
if quality == Quality.HIGHEST:
features["keypoints"] /= 2
elif quality == Quality.HIGH:
pass
elif quality == Quality.MEDIUM:
features["keypoints"] *= 2
elif quality == Quality.LOW:
features["keypoints"] *= 4
elif quality == Quality.LOWEST:
features["keypoints"] *= 8
return features
def viz_keypoints(
self,
image: np.ndarray,
keypoints: np.ndarray,
output_dir: Path,
im_name: str = "keypoints",
resize_to: int = 2000,
img_format: str = "jpg",
jpg_quality: int = 90,
):
"""
Visualizes keypoints on an image and saves the result to a file.
Args:
image (np.ndarray): The input image.
keypoints (np.ndarray): The keypoints to visualize.
output_dir (Path): The directory to save the output image.
im_name (str, optional): The name of the output image file. Defaults to "keypoints".
resize_to (int, optional): The maximum size (in pixels) to resize the image. Defaults to 2000.
img_format (str, optional): The format of the output image file. Defaults to "jpg".
jpg_quality (int, optional): The JPEG quality of the output image (only applicable if img_format is "jpg"). Defaults to 90.
"""
if resize_to > 0:
size = image.shape[:2][::-1]
scale = resize_to / max(size)
size_new = tuple(int(round(x * scale)) for x in size)
image = cv2.resize(image, size_new)
keypoints = keypoints * scale
kk = [cv2.KeyPoint(x, y, 1) for x, y in keypoints]
out = cv2.drawKeypoints(
image,
kk,
0,
(0, 255, 0),
flags=cv2.DRAW_MATCHES_FLAGS_DEFAULT,
)
out_path = str(output_dir / f"{im_name}.{img_format}")
if img_format == "jpg":
cv2.imwrite(
out_path,
out,
[int(cv2.IMWRITE_JPEG_QUALITY), jpg_quality],
)
else:
cv2.imwrite(out_path, out)
|