| |
| import argparse |
| import json |
| import pickle |
| import random |
| from itertools import product |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torchvision.transforms as transforms |
| from torch.utils.data import DataLoader |
| from torchvision.datasets import ImageFolder |
| from tqdm import tqdm |
| from common.evaluation import Evaluator |
| from model import chmnet |
| from model.base.geometry import Geometry |
|
|
| from Utils import ( |
| CosineCustomDataset, |
| PairedLayer4Extractor, |
| compute_spatial_similarity, |
| generate_mask, |
| normalize_array, |
| get_transforms, |
| arg_topK, |
| ) |
|
|
| |
| random.seed(42) |
|
|
| |
| to_np = lambda x: x.data.to("cpu").numpy() |
|
|
| |
| chm_args = dict( |
| { |
| "alpha": [0.05, 0.1], |
| "img_size": 240, |
| "ktype": "psi", |
| "load": "pas_psi.pt", |
| } |
| ) |
|
|
|
|
| class CHMGridTransfer: |
| def __init__( |
| self, |
| query_image, |
| support_set, |
| support_set_labels, |
| train_folder, |
| top_N, |
| top_K, |
| binarization_threshold, |
| chm_source_transform, |
| chm_target_transform, |
| cosine_source_transform, |
| cosine_target_transform, |
| batch_size=64, |
| ): |
| self.N = top_N |
| self.K = top_K |
| self.BS = batch_size |
|
|
| self.chm_source_transform = chm_source_transform |
| self.chm_target_transform = chm_target_transform |
| self.cosine_source_transform = cosine_source_transform |
| self.cosine_target_transform = cosine_target_transform |
|
|
| self.source_embeddings = None |
| self.target_embeddings = None |
| self.correspondence_map = None |
| self.similarity_maps = None |
| self.reverse_similarity_maps = None |
| self.transferred_points = None |
|
|
| self.binarization_threshold = binarization_threshold |
|
|
| |
| self.q = query_image |
| self.support_set = support_set |
| self.labels_ss = support_set_labels |
|
|
| def build(self): |
| |
| test_ds = CosineCustomDataset( |
| query_image=self.q, |
| supporting_set=self.support_set, |
| source_transform=self.chm_source_transform, |
| target_transform=self.chm_target_transform, |
| ) |
| test_dl = DataLoader(test_ds, batch_size=self.BS, shuffle=False) |
| self.find_correspondences(test_dl) |
|
|
| |
| test_ds = CosineCustomDataset( |
| query_image=self.q, |
| supporting_set=self.support_set, |
| source_transform=self.cosine_source_transform, |
| target_transform=self.cosine_target_transform, |
| ) |
| test_dl = DataLoader(test_ds, batch_size=self.BS, shuffle=False) |
| self.compute_embeddings(test_dl) |
| self.compute_similarity_map() |
|
|
| def find_correspondences(self, test_dl): |
| model = chmnet.CHMNet(chm_args["ktype"]) |
| model.load_state_dict( |
| torch.load(chm_args["load"], map_location=torch.device("cpu")) |
| ) |
| Evaluator.initialize(chm_args["alpha"]) |
| Geometry.initialize(img_size=chm_args["img_size"]) |
|
|
| grid_results = [] |
| transferred_points = [] |
|
|
| |
| fixed_src_grid_points = list( |
| product( |
| np.linspace(1 + 17, 240 - 17 - 1, 7), |
| np.linspace(1 + 17, 240 - 17 - 1, 7), |
| ) |
| ) |
| fixed_src_grid_points = np.asarray(fixed_src_grid_points, dtype=np.float64).T |
|
|
| with torch.no_grad(): |
| model.eval() |
| for idx, batch in enumerate(tqdm(test_dl)): |
|
|
| keypoints = ( |
| torch.tensor(fixed_src_grid_points) |
| .unsqueeze(0) |
| .repeat(batch["src_img"].shape[0], 1, 1) |
| ) |
| n_pts = torch.tensor( |
| np.asarray(batch["src_img"].shape[0] * [49]), dtype=torch.long |
| ) |
|
|
| corr_matrix = model(batch["src_img"], batch["trg_img"]) |
| prd_kps = Geometry.transfer_kps( |
| corr_matrix, keypoints, n_pts, normalized=False |
| ) |
| transferred_points.append(prd_kps.cpu().numpy()) |
| for tgt_points in prd_kps: |
| tgt_grid = [] |
| for x, y in zip(tgt_points[0], tgt_points[1]): |
| tgt_grid.append( |
| [int(((x + 1) / 2.0) * 7), int(((y + 1) / 2.0) * 7)] |
| ) |
| grid_results.append(tgt_grid) |
|
|
| self.correspondence_map = grid_results |
| self.transferred_points = np.vstack(transferred_points) |
|
|
| def compute_embeddings(self, test_dl): |
| paired_extractor = PairedLayer4Extractor() |
|
|
| source_embeddings = [] |
| target_embeddings = [] |
|
|
| with torch.no_grad(): |
| for idx, batch in enumerate(test_dl): |
| s_e, t_e = paired_extractor((batch["src_img"], batch["trg_img"])) |
|
|
| source_embeddings.append(s_e) |
| target_embeddings.append(t_e) |
|
|
| |
| self.source_embeddings = torch.cat(source_embeddings, axis=0) |
| self.target_embeddings = torch.cat(target_embeddings, axis=0) |
|
|
| def compute_similarity_map(self): |
| CosSim = nn.CosineSimilarity(dim=0, eps=1e-6) |
|
|
| similarity_maps = [] |
| rsimilarity_maps = [] |
|
|
| grid = [] |
| for i in range(7): |
| for j in range(7): |
| grid.append([i, j]) |
|
|
| |
| for i in range(len(self.correspondence_map)): |
| cosine_map = np.zeros((7, 7)) |
| reverse_cosine_map = np.zeros((7, 7)) |
|
|
| |
| for S, T in zip(grid, self.correspondence_map[i]): |
| v1 = self.source_embeddings[i][:, S[0], S[1]] |
| v2 = self.target_embeddings[i][:, T[0], T[1]] |
| covalue = CosSim(v1, v2) |
| cosine_map[S[0], S[1]] = covalue |
| reverse_cosine_map[T[0], T[1]] = covalue |
|
|
| similarity_maps.append(cosine_map) |
| rsimilarity_maps.append(reverse_cosine_map) |
|
|
| self.similarity_maps = similarity_maps |
| self.reverse_similarity_maps = rsimilarity_maps |
|
|
| def compute_score_using_cc(self): |
| |
| SIMS_source, SIMS_target = [], [] |
| for i in range(len(self.source_embeddings)): |
| simA, simB = compute_spatial_similarity( |
| to_np(self.source_embeddings[i]), to_np(self.target_embeddings[i]) |
| ) |
|
|
| SIMS_source.append(simA) |
| SIMS_target.append(simB) |
|
|
| SIMS_source = np.stack(SIMS_source, axis=0) |
| |
|
|
| top_cos_values = [] |
|
|
| for i in range(len(self.similarity_maps)): |
| cosine_value = np.multiply( |
| self.similarity_maps[i], |
| generate_mask( |
| normalize_array(SIMS_source[i]), t=self.binarization_threshold |
| ), |
| ) |
| top_5_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1][:5] |
| mean_of_top_5 = np.mean( |
| [cosine_value.T.reshape(-1)[x] for x in top_5_indicies] |
| ) |
| top_cos_values.append(np.mean(mean_of_top_5)) |
|
|
| return top_cos_values |
|
|
| def compute_score_using_custom_points(self, selected_keypoint_masks): |
| top_cos_values = [] |
|
|
| for i in range(len(self.similarity_maps)): |
| cosine_value = np.multiply(self.similarity_maps[i], selected_keypoint_masks) |
| top_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1] |
| mean_of_tops = np.mean( |
| [cosine_value.T.reshape(-1)[x] for x in top_indicies] |
| ) |
| top_cos_values.append(np.mean(mean_of_tops)) |
|
|
| return top_cos_values |
|
|
| def export(self): |
| storage = { |
| "N": self.N, |
| "K": self.K, |
| "source_embeddings": self.source_embeddings, |
| "target_embeddings": self.target_embeddings, |
| "correspondence_map": self.correspondence_map, |
| "similarity_maps": self.similarity_maps, |
| "T": self.binarization_threshold, |
| "query": self.q, |
| "support_set": self.support_set, |
| "labels_for_support_set": self.labels_ss, |
| "rsimilarity_maps": self.reverse_similarity_maps, |
| "transferred_points": self.transferred_points, |
| } |
|
|
| return ModifiableCHMResults(storage) |
|
|
|
|
| class ModifiableCHMResults: |
| def __init__(self, storage): |
| self.N = storage["N"] |
| self.K = storage["K"] |
| self.source_embeddings = storage["source_embeddings"] |
| self.target_embeddings = storage["target_embeddings"] |
| self.correspondence_map = storage["correspondence_map"] |
| self.similarity_maps = storage["similarity_maps"] |
| self.T = storage["T"] |
| self.q = storage["query"] |
| self.support_set = storage["support_set"] |
| self.labels_ss = storage["labels_for_support_set"] |
| self.rsimilarity_maps = storage["rsimilarity_maps"] |
| self.transferred_points = storage["transferred_points"] |
| self.similarity_maps_masked = None |
| self.SIMS_source = None |
| self.SIMS_target = None |
| self.masked_sim_values = [] |
| self.top_cos_values = [] |
|
|
| def compute_score_using_cc(self): |
| |
| SIMS_source, SIMS_target = [], [] |
| for i in range(len(self.source_embeddings)): |
| simA, simB = compute_spatial_similarity( |
| to_np(self.source_embeddings[i]), to_np(self.target_embeddings[i]) |
| ) |
|
|
| SIMS_source.append(simA) |
| SIMS_target.append(simB) |
|
|
| SIMS_source = np.stack(SIMS_source, axis=0) |
| SIMS_target = np.stack(SIMS_target, axis=0) |
|
|
| self.SIMS_source = SIMS_source |
| self.SIMS_target = SIMS_target |
|
|
| top_cos_values = [] |
|
|
| for i in range(len(self.similarity_maps)): |
| masked_sim_values = np.multiply( |
| self.similarity_maps[i], |
| generate_mask(normalize_array(SIMS_source[i]), t=self.T), |
| ) |
| self.masked_sim_values.append(masked_sim_values) |
| top_5_indicies = np.argsort(masked_sim_values.T.reshape(-1))[::-1][:5] |
| mean_of_top_5 = np.mean( |
| [masked_sim_values.T.reshape(-1)[x] for x in top_5_indicies] |
| ) |
| top_cos_values.append(np.mean(mean_of_top_5)) |
|
|
| self.top_cos_values = top_cos_values |
|
|
| return top_cos_values |
|
|
| def compute_score_using_custom_points(self, selected_keypoint_masks): |
| top_cos_values = [] |
| similarity_maps_masked = [] |
|
|
| for i in range(len(self.similarity_maps)): |
| cosine_value = np.multiply(self.similarity_maps[i], selected_keypoint_masks) |
| similarity_maps_masked.append(cosine_value) |
| top_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1] |
| mean_of_tops = np.mean( |
| [cosine_value.T.reshape(-1)[x] for x in top_indicies] |
| ) |
| top_cos_values.append(np.mean(mean_of_tops)) |
|
|
| self.similarity_maps_masked = similarity_maps_masked |
| return top_cos_values |
|
|
| def predict_using_cc(self): |
| top_cos_values = self.compute_score_using_cc() |
| |
| prediction = np.argmax( |
| np.bincount( |
| [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
| ) |
| ) |
| prediction_weight = np.max( |
| np.bincount( |
| [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
| ) |
| ) |
|
|
| reranked_nns_idx = [x for x in np.argsort(top_cos_values)[::-1]] |
| reranked_nns_files = [self.support_set[x] for x in reranked_nns_idx] |
|
|
| topK_idx = [ |
| x |
| for x in np.argsort(top_cos_values)[::-1] |
| if self.labels_ss[x] == prediction |
| ] |
| topK_files = [self.support_set[x] for x in topK_idx] |
| topK_cmaps = [self.correspondence_map[x] for x in topK_idx] |
| topK_similarity_maps = [self.similarity_maps[x] for x in topK_idx] |
| topK_rsimilarity_maps = [self.rsimilarity_maps[x] for x in topK_idx] |
| topK_transfered_points = [self.transferred_points[x] for x in topK_idx] |
| predicted_folder_name = topK_files[0].split("/")[-2] |
|
|
| return ( |
| topK_idx, |
| prediction, |
| predicted_folder_name, |
| prediction_weight, |
| topK_files[: self.K], |
| reranked_nns_files[: self.K], |
| topK_cmaps[: self.K], |
| topK_similarity_maps[: self.K], |
| topK_rsimilarity_maps[: self.K], |
| topK_transfered_points[: self.K], |
| ) |
|
|
| def predict_custom_pairs(self, selected_keypoint_masks): |
| top_cos_values = self.compute_score_using_custom_points(selected_keypoint_masks) |
|
|
| |
| prediction = np.argmax( |
| np.bincount( |
| [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
| ) |
| ) |
| prediction_weight = np.max( |
| np.bincount( |
| [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
| ) |
| ) |
|
|
| reranked_nns_idx = [x for x in np.argsort(top_cos_values)[::-1]] |
| reranked_nns_files = [self.support_set[x] for x in reranked_nns_idx] |
|
|
| topK_idx = [ |
| x |
| for x in np.argsort(top_cos_values)[::-1] |
| if self.labels_ss[x] == prediction |
| ] |
| topK_files = [self.support_set[x] for x in topK_idx] |
| topK_cmaps = [self.correspondence_map[x] for x in topK_idx] |
| topK_similarity_maps = [self.similarity_maps[x] for x in topK_idx] |
| topK_rsimilarity_maps = [self.rsimilarity_maps[x] for x in topK_idx] |
| topK_transferred_points = [self.transferred_points[x] for x in topK_idx] |
| |
| topK_masked_sims = [self.similarity_maps_masked[x] for x in topK_idx] |
| predicted_folder_name = topK_files[0].split("/")[-2] |
|
|
| non_zero_mask = np.count_nonzero(selected_keypoint_masks) |
|
|
| return ( |
| topK_idx, |
| prediction, |
| predicted_folder_name, |
| prediction_weight, |
| topK_files[: self.K], |
| reranked_nns_files[: self.K], |
| topK_cmaps[: self.K], |
| topK_similarity_maps[: self.K], |
| topK_rsimilarity_maps[: self.K], |
| topK_transferred_points[: self.K], |
| topK_masked_sims[: self.K], |
| non_zero_mask, |
| ) |
|
|
|
|
| def export_visualizations_results( |
| reranker_output, |
| knn_predicted_label, |
| knn_confidence, |
| topK_knns, |
| K=20, |
| N=50, |
| T=0.55, |
| ): |
| """ |
| Export all details for visualization and analysis |
| """ |
|
|
| non_zero_mask = 5 |
| ( |
| topK_idx, |
| p, |
| pfn, |
| pr, |
| rfiles, |
| reranked_nns, |
| cmaps, |
| sims, |
| rsims, |
| trns_kpts, |
| ) = reranker_output.predict_using_cc() |
|
|
| MASKED_COSINE_VALUES = [ |
| np.multiply( |
| sims[X], |
| generate_mask( |
| normalize_array(reranker_output.SIMS_source[topK_idx[X]]), t=T |
| ), |
| ) |
| for X in range(len(sims)) |
| ] |
|
|
| list_of_source_points = [] |
| list_of_target_points = [] |
|
|
| for CK in range(len(sims)): |
| target_keypoints = [] |
| topk_index = arg_topK(MASKED_COSINE_VALUES[CK], topK=non_zero_mask) |
|
|
| for i in range(non_zero_mask): |
| |
| x, y = trns_kpts[CK].T[topk_index[i]] |
| Ptarget = int(((x + 1) / 2.0) * 240), int(((y + 1) / 2.0) * 240) |
| target_keypoints.append(Ptarget) |
|
|
| |
| a = np.linspace(1 + 17, 240 - 17 - 1, 7) |
| b = np.linspace(1 + 17, 240 - 17 - 1, 7) |
| point_list = list(product(a, b)) |
|
|
| list_of_source_points.append(np.asarray([point_list[x] for x in topk_index])) |
| list_of_target_points.append(np.asarray(target_keypoints)) |
|
|
| |
| detailed_output = { |
| "q": reranker_output.q, |
| "K": K, |
| "N": N, |
| "knn-prediction": knn_predicted_label, |
| "knn-prediction-confidence": knn_confidence, |
| "knn-nearest-neighbors": topK_knns, |
| "chm-prediction": pfn, |
| "chm-prediction-confidence": pr, |
| "chm-nearest-neighbors": rfiles, |
| "chm-nearest-neighbors-all": reranked_nns, |
| "correspondance_map": cmaps, |
| "masked_cos_values": MASKED_COSINE_VALUES, |
| "src-keypoints": list_of_source_points, |
| "tgt-keypoints": list_of_target_points, |
| "non_zero_mask": non_zero_mask, |
| "transferred_kpoints": trns_kpts, |
| } |
|
|
| return detailed_output |
|
|
|
|
| def chm_classify_and_visualize( |
| query_image, kNN_results, support, TRAIN_SET, N=50, K=20, T=0.55, BS=64 |
| ): |
| global chm_args |
| chm_src_t, chm_tgt_t, cos_src_t, cos_tgt_t = get_transforms("single", chm_args) |
| knn_predicted_label, knn_confidence, topK_knns = kNN_results |
|
|
| reranker = CHMGridTransfer( |
| query_image=query_image, |
| support_set=support[0], |
| support_set_labels=support[1], |
| train_folder=TRAIN_SET, |
| top_N=N, |
| top_K=K, |
| binarization_threshold=T, |
| chm_source_transform=chm_src_t, |
| chm_target_transform=chm_tgt_t, |
| cosine_source_transform=cos_src_t, |
| cosine_target_transform=cos_tgt_t, |
| batch_size=BS, |
| ) |
|
|
| |
| reranker.build() |
| |
| exported_reranker = reranker.export() |
| |
|
|
| output = export_visualizations_results( |
| exported_reranker, |
| knn_predicted_label, |
| knn_confidence, |
| topK_knns, |
| K, |
| N, |
| T, |
| ) |
|
|
| return output |
|
|