Skip to content
Snippets Groups Projects
Verified Commit 3efef58f authored by Yoann Schneider's avatar Yoann Schneider :tennis:
Browse files

remove obsolete code

parent bc01528c
No related branches found
No related tags found
No related merge requests found
Showing
with 10 additions and 4482 deletions
# Only run on our the DAN python module
files: '^dan'
repos: repos:
- repo: https://github.com/PyCQA/isort - repo: https://github.com/PyCQA/isort
rev: 5.10.1 rev: 5.10.1
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
The arkindex_utils module
======================
"""
import errno
import logging
import sys
from apistar.exceptions import ErrorResponse
def retrieve_corpus(client, corpus_name: str) -> str:
"""
Retrieve the corpus id from the corpus name.
:param client: The arkindex client.
:param corpus_name: The name of the corpus to retrieve.
:return target_corpus: The id of the retrieved corpus.
"""
for corpus in client.request("ListCorpus"):
if corpus["name"] == corpus_name:
target_corpus = corpus["id"]
try:
logging.info(f"Corpus id retrieved: {target_corpus}")
except NameError:
logging.error(f"Corpus {corpus_name} not found")
sys.exit(errno.EINVAL)
return target_corpus
def retrieve_subsets(
client, corpus: str, parents_types: list, parents_names: list
) -> list:
"""
Retrieve the requested subsets.
:param client: The arkindex client.
:param corpus: The id of the retrieved corpus.
:param parents_types: The types of parents of the elements to retrieve.
:param parents_names: The names of parents of the elements to retrieve.
:return subsets: The retrieved subsets.
"""
subsets = []
for parent_type in parents_types:
try:
subsets.extend(
client.request("ListElements", corpus=corpus, type=parent_type)[
"results"
]
)
except ErrorResponse as e:
logging.error(f"{e.content}: {parent_type}")
sys.exit(errno.EINVAL)
# Retrieve subsets with name in parents-names. If no parents-names given, keep all subsets.
if parents_names is not None:
logging.info(f"Retrieving {parents_names} subset(s)")
subsets = [subset for subset in subsets if subset["name"] in parents_names]
else:
logging.info("Retrieving all subsets")
if len(subsets) == 0:
logging.info("No subset found")
return subsets
import os
import shutil
import tarfile
import pickle
import re
from PIL import Image
import numpy as np
class DatasetFormatter:
"""
Global pipeline/functions for dataset formatting
"""
def __init__(self, dataset_name, level, extra_name="", set_names=["train", "valid", "test"]):
self.dataset_name = dataset_name
self.level = level
self.set_names = set_names
self.target_fold_path = os.path.join(
"Datasets", "formatted", "{}_{}{}".format(dataset_name, level, extra_name))
self.map_datasets_files = dict()
self.extract_with_dirname = False
def format(self):
self.init_format()
self.map_datasets_files[self.dataset_name][self.level]["format_function"]()
self.end_format()
def init_format(self):
"""
Load and extracts needed files
"""
os.makedirs(self.target_fold_path, exist_ok=True)
for set_name in self.set_names:
os.makedirs(os.path.join(self.target_fold_path, set_name), exist_ok=True)
class OCRDatasetFormatter(DatasetFormatter):
"""
Specific pipeline/functions for OCR/HTR dataset formatting
"""
def __init__(self, source_dataset, level, extra_name="", set_names=["train", "valid", "test"]):
super(OCRDatasetFormatter, self).__init__(source_dataset, level, extra_name, set_names)
self.charset = set()
self.gt = dict()
for set_name in set_names:
self.gt[set_name] = dict()
def format_text_label(self, label):
"""
Remove extra space or line break characters
"""
temp = re.sub("(\n)+", '\n', label)
return re.sub("( )+", ' ', temp).strip(" \n")
def load_resize_save(self, source_path, target_path):
"""
Load image, apply resolution modification and save it
"""
shutil.copyfile(source_path, target_path)
def resize(self, img, source_dpi, target_dpi):
"""
Apply resolution modification to image
"""
if source_dpi == target_dpi:
return img
if isinstance(img, np.ndarray):
h, w = img.shape[:2]
img = Image.fromarray(img)
else:
w, h = img.size
ratio = target_dpi / source_dpi
img = img.resize((int(w*ratio), int(h*ratio)), Image.BILINEAR)
return np.array(img)
def end_format(self):
"""
Save label and charset files
"""
with open(os.path.join(self.target_fold_path, "labels.pkl"), "wb") as f:
pickle.dump({
"ground_truth": self.gt,
"charset": sorted(list(self.charset)),
}, f)
with open(os.path.join(self.target_fold_path, "charset.pkl"), "wb") as f:
pickle.dump(sorted(list(self.charset)), f)
from Datasets.dataset_formatters.generic_dataset_formatter import OCRDatasetFormatter
import os
import numpy as np
from Datasets.dataset_formatters.utils_dataset import natural_sort
from PIL import Image
import xml.etree.ElementTree as ET
import re
from tqdm import tqdm
# Layout string to token
SEM_MATCHING_TOKENS_STR = {
"INTITULE": "",
"DATE": "",
"COTE_SERIE": "",
"ANALYSE_COMPL": "",
"PRECISIONS_SUR_COTE": "",
"COTE_ARTICLE": ""
}
# Layout begin-token to end-token
SEM_MATCHING_TOKENS = {
"": "",
"": "",
"": "",
"": "",
"": "",
"": ""
}
class SimaraDatasetFormatter(OCRDatasetFormatter):
def __init__(self, level, set_names=["train", "valid", "test"], dpi=150, sem_token=True):
super(SimaraDatasetFormatter, self).__init__("simara", level, "_sem" if sem_token else "", set_names)
self.dpi = dpi
self.sem_token = sem_token
self.map_datasets_files.update({
"simara": {
# (1,050 for train, 100 for validation and 100 for test)
"page": {
"format_function": self.format_simara_page,
},
}
})
self.matching_tokens_str = SEM_MATCHING_TOKENS_STR
self.matching_tokens = SEM_MATCHING_TOKENS
def preformat_simara_page(self):
"""
Extract all information from dataset and correct some annotations
"""
dataset = {
"train": list(),
"valid": list(),
"test": list()
}
img_folder_path = os.path.join("Datasets", "raw", "simara", "images")
labels_folder_path = os.path.join("Datasets", "raw", "simara", "labels")
sem_labels_folder_path = os.path.join("Datasets", "raw", "simara", "labels_sem")
train_files = [
os.path.join(labels_folder_path, 'train', name)
for name in os.listdir(os.path.join(sem_labels_folder_path, 'train'))]
valid_files = [
os.path.join(labels_folder_path, 'valid', name)
for name in os.listdir(os.path.join(sem_labels_folder_path, 'valid'))]
test_files = [
os.path.join(labels_folder_path, 'test', name)
for name in os.listdir(os.path.join(sem_labels_folder_path, 'test'))]
for set_name, files in zip(self.set_names, [train_files, valid_files, test_files]):
for i, label_file in enumerate(tqdm(files, desc='Pre-formatting '+set_name)):
with open(label_file, 'r') as f:
text = f.read()
with open(label_file.replace('labels', 'labels_sem'), 'r') as f:
sem_text = f.read()
dataset[set_name].append({
"img_path": os.path.join(
img_folder_path, set_name, label_file.split('/')[-1].replace('txt', 'jpg')),
"label": text,
"sem_label": sem_text,
})
print(dataset['test'], len(dataset['test']))
return dataset
def format_simara_page(self):
"""
Format simara page dataset
"""
dataset = self.preformat_simara_page()
for set_name in self.set_names:
fold = os.path.join(self.target_fold_path, set_name)
for sample in tqdm(dataset[set_name], desc='Formatting '+set_name):
new_name = sample['img_path'].split('/')[-1]
new_img_path = os.path.join(fold, new_name)
self.load_resize_save(sample["img_path"], new_img_path)#, 300, self.dpi)
page = {
"text": sample["label"] if not self.sem_token else sample["sem_label"],
}
self.charset = self.charset.union(set(page["text"]))
self.gt[set_name][new_name] = page
if __name__ == "__main__":
SimaraDatasetFormatter("page", sem_token=True).format()
#SimaraDatasetFormatter("page", sem_token=False).format()
import os
import sys
DOSSIER_COURRANT = os.path.dirname(os.path.abspath(__file__))
DOSSIER_PARENT = os.path.dirname(DOSSIER_COURRANT)
sys.path.append(os.path.dirname(DOSSIER_PARENT))
sys.path.append(os.path.dirname(os.path.dirname(DOSSIER_PARENT)))
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(DOSSIER_PARENT))))
from torch.optim import Adam
from basic.transforms import aug_config
from OCR.ocr_dataset_manager import OCRDataset, OCRDatasetManager
from OCR.document_OCR.dan.trainer_dan import Manager
from dan.decoder import GlobalHTADecoder
from dan.models import FCN_Encoder
from basic.scheduler import exponential_dropout_scheduler, linear_scheduler
import torch
import numpy as np
import random
import torch.multiprocessing as mp
def train_and_test(rank, params):
torch.manual_seed(0)
torch.cuda.manual_seed(0)
np.random.seed(0)
random.seed(0)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
params["training_params"]["ddp_rank"] = rank
model = Manager(params)
model.load_model()
model.train()
# load weights giving best CER on valid set
model.params["training_params"]["load_epoch"] = "best"
model.load_model()
metrics = ["cer", "wer", "time", "map_cer", "loer"]
for dataset_name in params["dataset_params"]["datasets"].keys():
for set_name in ["test", "valid", "train"]:
model.predict("{}-{}".format(dataset_name, set_name), [(dataset_name, set_name), ], metrics, output=True)
if __name__ == "__main__":
dataset_name = "simara" # ["RIMES", "READ_2016"]
dataset_level = "page" # ["page", "double_page"]
dataset_variant = "_sem"
# max number of lines for synthetic documents
max_nb_lines = {
"RIMES": 40,
"READ_2016": 30,
}
params = {
"dataset_params": {
"dataset_manager": OCRDatasetManager,
"dataset_class": OCRDataset,
"datasets": {
dataset_name: "../../../Datasets/formatted/{}_{}{}".format(dataset_name, dataset_level, dataset_variant),
},
"train": {
"name": "{}-train".format(dataset_name),
"datasets": [(dataset_name, "train"), ],
},
"valid": {
"{}-valid".format(dataset_name): [(dataset_name, "valid"), ],
},
"config": {
"load_in_memory": True, # Load all images in CPU memory
"worker_per_gpu": 4, # Num of parallel processes per gpu for data loading
"width_divisor": 8, # Image width will be divided by 8
"height_divisor": 32, # Image height will be divided by 32
"padding_value": 0, # Image padding value
"padding_token": None, # Label padding value
"charset_mode": "seq2seq", # add end-of-transcription ans start-of-transcription tokens to charset
"constraints": ["add_eot", "add_sot"], # add end-of-transcription ans start-of-transcription tokens in labels
"normalize": True, # Normalize with mean and variance of training dataset
"preprocessings": [
{
"type": "to_RGB",
# if grayscaled image, produce RGB one (3 channels with same value) otherwise do nothing
},
],
"augmentation": aug_config(0.9, 0.1),
"synthetic_data": None,
#"synthetic_data": {
# "init_proba": 0.9, # begin proba to generate synthetic document
# "end_proba": 0.2, # end proba to generate synthetic document
# "num_steps_proba": 200000, # linearly decrease the percent of synthetic document from 90% to 20% through 200000 samples
# "proba_scheduler_function": linear_scheduler, # decrease proba rate linearly
# "start_scheduler_at_max_line": True, # start decreasing proba only after curriculum reach max number of lines
# "dataset_level": dataset_level,
# "curriculum": True, # use curriculum learning (slowly increase number of lines per synthetic samples)
# "crop_curriculum": True, # during curriculum learning, crop images under the last text line
# "curr_start": 0, # start curriculum at iteration
# "curr_step": 10000, # interval to increase the number of lines for curriculum learning
# "min_nb_lines": 1, # initial number of lines for curriculum learning
# "max_nb_lines": max_nb_lines[dataset_name], # maximum number of lines for curriculum learning
# "padding_value": 255,
# # config for synthetic line generation
# "config": {
# "background_color_default": (255, 255, 255),
# "background_color_eps": 15,
# "text_color_default": (0, 0, 0),
# "text_color_eps": 15,
# "font_size_min": 35,
# "font_size_max": 45,
# "color_mode": "RGB",
# "padding_left_ratio_min": 0.00,
# "padding_left_ratio_max": 0.05,
# "padding_right_ratio_min": 0.02,
# "padding_right_ratio_max": 0.2,
# "padding_top_ratio_min": 0.02,
# "padding_top_ratio_max": 0.1,
# "padding_bottom_ratio_min": 0.02,
# "padding_bottom_ratio_max": 0.1,
# },
#}
}
},
"model_params": {
"models": {
"encoder": FCN_Encoder,
"decoder": GlobalHTADecoder,
},
#"transfer_learning": None,
"transfer_learning": {
# model_name: [state_dict_name, checkpoint_path, learnable, strict]
"encoder": ["encoder", "dan_rimes_page.pt", True, True],
"decoder": ["decoder", "dan_rimes_page.pt", True, False],
},
"transfered_charset": True, # Transfer learning of the decision layer based on charset of the line HTR model
"additional_tokens": 1, # for decision layer = [<eot>, ], only for transfered charset
"input_channels": 3, # number of channels of input image
"dropout": 0.5, # dropout rate for encoder
"enc_dim": 256, # dimension of extracted features
"nb_layers": 5, # encoder
"h_max": 500, # maximum height for encoder output (for 2D positional embedding)
"w_max": 1000, # maximum width for encoder output (for 2D positional embedding)
"l_max": 15000, # max predicted sequence (for 1D positional embedding)
"dec_num_layers": 8, # number of transformer decoder layers
"dec_num_heads": 4, # number of heads in transformer decoder layers
"dec_res_dropout": 0.1, # dropout in transformer decoder layers
"dec_pred_dropout": 0.1, # dropout rate before decision layer
"dec_att_dropout": 0.1, # dropout rate in multi head attention
"dec_dim_feedforward": 256, # number of dimension for feedforward layer in transformer decoder layers
"use_2d_pe": True, # use 2D positional embedding
"use_1d_pe": True, # use 1D positional embedding
"use_lstm": False,
"attention_win": 100, # length of attention window
# Curriculum dropout
"dropout_scheduler": {
"function": exponential_dropout_scheduler,
"T": 5e4,
}
},
"training_params": {
"output_folder": "dan_simara_page", # folder name for checkpoint and results
"max_nb_epochs": 50000, # maximum number of epochs before to stop
"max_training_time": 3600 * 24 * 1.9, # maximum time before to stop (in seconds)
"load_epoch": "last", # ["best", "last"]: last to continue training, best to evaluate
"interval_save_weights": None, # None: keep best and last only
"batch_size": 2, # mini-batch size for training
"valid_batch_size": 4, # mini-batch size for valdiation
"use_ddp": False, # Use DistributedDataParallel
"ddp_port": "20027",
"use_amp": True, # Enable automatic mix-precision
"nb_gpu": torch.cuda.device_count(),
"optimizers": {
"all": {
"class": Adam,
"args": {
"lr": 0.0001,
"amsgrad": False,
}
},
},
"lr_schedulers": None, # Learning rate schedulers
"eval_on_valid": True, # Whether to eval and logs metrics on validation set during training or not
"eval_on_valid_interval": 5, # Interval (in epochs) to evaluate during training
"focus_metric": "cer", # Metrics to focus on to determine best epoch
"expected_metric_value": "low", # ["high", "low"] What is best for the focus metric value
"set_name_focus_metric": "{}-valid".format(dataset_name), # Which dataset to focus on to select best weights
"train_metrics": ["loss_ce", "cer", "wer", "syn_max_lines"], # Metrics name for training
"eval_metrics": ["cer", "wer", "map_cer"], # Metrics name for evaluation on validation set during training
"force_cpu": False, # True for debug purposes
"max_char_prediction": 1000, # max number of token prediction
# Keep teacher forcing rate to 20% during whole training
"teacher_forcing_scheduler": {
"min_error_rate": 0.2,
"max_error_rate": 0.2,
"total_num_steps": 5e4
},
},
}
if params["training_params"]["use_ddp"] and not params["training_params"]["force_cpu"]:
mp.spawn(train_and_test, args=(params,), nprocs=params["training_params"]["nb_gpu"])
else:
train_and_test(0, params)
from OCR.ocr_manager import OCRManager
from torch.nn import CrossEntropyLoss
import torch
from dan.ocr_utils import LM_ind_to_str
import numpy as np
from torch.cuda.amp import autocast
import time
class Manager(OCRManager):
def __init__(self, params):
super(Manager, self).__init__(params)
def load_save_info(self, info_dict):
if "curriculum_config" in info_dict.keys():
if self.dataset.train_dataset is not None:
self.dataset.train_dataset.curriculum_config = info_dict["curriculum_config"]
def add_save_info(self, info_dict):
info_dict["curriculum_config"] = self.dataset.train_dataset.curriculum_config
return info_dict
def get_init_hidden(self, batch_size):
num_layers = 1
hidden_size = self.params["model_params"]["enc_dim"]
return torch.zeros(num_layers, batch_size, hidden_size), torch.zeros(num_layers, batch_size, hidden_size)
def apply_teacher_forcing(self, y, y_len, error_rate):
y_error = y.clone()
for b in range(len(y_len)):
for i in range(1, y_len[b]):
if np.random.rand() < error_rate and y[b][i] != self.dataset.tokens["pad"]:
y_error[b][i] = np.random.randint(0, len(self.dataset.charset)+2)
return y_error, y_len
def train_batch(self, batch_data, metric_names):
loss_func = CrossEntropyLoss(ignore_index=self.dataset.tokens["pad"])
sum_loss = 0
x = batch_data["imgs"].to(self.device)
y = batch_data["labels"].to(self.device)
reduced_size = [s[:2] for s in batch_data["imgs_reduced_shape"]]
y_len = batch_data["labels_len"]
# add errors in teacher forcing
if "teacher_forcing_error_rate" in self.params["training_params"] and self.params["training_params"]["teacher_forcing_error_rate"] is not None:
error_rate = self.params["training_params"]["teacher_forcing_error_rate"]
simulated_y_pred, y_len = self.apply_teacher_forcing(y, y_len, error_rate)
elif "teacher_forcing_scheduler" in self.params["training_params"]:
error_rate = self.params["training_params"]["teacher_forcing_scheduler"]["min_error_rate"] + min(self.latest_step, self.params["training_params"]["teacher_forcing_scheduler"]["total_num_steps"]) * (self.params["training_params"]["teacher_forcing_scheduler"]["max_error_rate"]-self.params["training_params"]["teacher_forcing_scheduler"]["min_error_rate"]) / self.params["training_params"]["teacher_forcing_scheduler"]["total_num_steps"]
simulated_y_pred, y_len = self.apply_teacher_forcing(y, y_len, error_rate)
else:
simulated_y_pred = y
with autocast(enabled=self.params["training_params"]["use_amp"]):
hidden_predict = None
cache = None
raw_features = self.models["encoder"](x)
features_size = raw_features.size()
b, c, h, w = features_size
pos_features = self.models["decoder"].features_updater.get_pos_features(raw_features)
features = torch.flatten(pos_features, start_dim=2, end_dim=3).permute(2, 0, 1)
enhanced_features = pos_features
enhanced_features = torch.flatten(enhanced_features, start_dim=2, end_dim=3).permute(2, 0, 1)
output, pred, hidden_predict, cache, weights = self.models["decoder"](features, enhanced_features,
simulated_y_pred[:, :-1],
reduced_size,
[max(y_len) for _ in range(b)],
features_size,
start=0,
hidden_predict=hidden_predict,
cache=cache,
keep_all_weights=True)
loss_ce = loss_func(pred, y[:, 1:])
sum_loss += loss_ce
with autocast(enabled=False):
self.backward_loss(sum_loss)
self.step_optimizers()
self.zero_optimizers()
predicted_tokens = torch.argmax(pred, dim=1).detach().cpu().numpy()
predicted_tokens = [predicted_tokens[i, :y_len[i]] for i in range(b)]
str_x = [LM_ind_to_str(self.dataset.charset, t, oov_symbol="") for t in predicted_tokens]
values = {
"nb_samples": b,
"str_y": batch_data["raw_labels"],
"str_x": str_x,
"loss": sum_loss.item(),
"loss_ce": loss_ce.item(),
"syn_max_lines": self.dataset.train_dataset.get_syn_max_lines() if self.params["dataset_params"]["config"]["synthetic_data"] else 0,
}
return values
def evaluate_batch(self, batch_data, metric_names):
x = batch_data["imgs"].to(self.device)
reduced_size = [s[:2] for s in batch_data["imgs_reduced_shape"]]
max_chars = self.params["training_params"]["max_char_prediction"]
start_time = time.time()
with autocast(enabled=self.params["training_params"]["use_amp"]):
b = x.size(0)
reached_end = torch.zeros((b, ), dtype=torch.bool, device=self.device)
prediction_len = torch.zeros((b, ), dtype=torch.int, device=self.device)
predicted_tokens = torch.ones((b, 1), dtype=torch.long, device=self.device) * self.dataset.tokens["start"]
predicted_tokens_len = torch.ones((b, ), dtype=torch.int, device=self.device)
whole_output = list()
confidence_scores = list()
cache = None
hidden_predict = None
if b > 1:
features_list = list()
for i in range(b):
pos = batch_data["imgs_position"]
features_list.append(self.models["encoder"](x[i:i+1, :, pos[i][0][0]:pos[i][0][1], pos[i][1][0]:pos[i][1][1]]))
max_height = max([f.size(2) for f in features_list])
max_width = max([f.size(3) for f in features_list])
features = torch.zeros((b, features_list[0].size(1), max_height, max_width), device=self.device, dtype=features_list[0].dtype)
for i in range(b):
features[i, :, :features_list[i].size(2), :features_list[i].size(3)] = features_list[i]
else:
features = self.models["encoder"](x)
features_size = features.size()
coverage_vector = torch.zeros((features.size(0), 1, features.size(2), features.size(3)), device=self.device)
pos_features = self.models["decoder"].features_updater.get_pos_features(features)
features = torch.flatten(pos_features, start_dim=2, end_dim=3).permute(2, 0, 1)
enhanced_features = pos_features
enhanced_features = torch.flatten(enhanced_features, start_dim=2, end_dim=3).permute(2, 0, 1)
for i in range(0, max_chars):
output, pred, hidden_predict, cache, weights = self.models["decoder"](features, enhanced_features, predicted_tokens, reduced_size, predicted_tokens_len, features_size, start=0, hidden_predict=hidden_predict, cache=cache, num_pred=1)
whole_output.append(output)
confidence_scores.append(torch.max(torch.softmax(pred[:, :], dim=1), dim=1).values)
coverage_vector = torch.clamp(coverage_vector + weights, 0, 1)
predicted_tokens = torch.cat([predicted_tokens, torch.argmax(pred[:, :, -1], dim=1, keepdim=True)], dim=1)
reached_end = torch.logical_or(reached_end, torch.eq(predicted_tokens[:, -1], self.dataset.tokens["end"]))
predicted_tokens_len += 1
prediction_len[reached_end == False] = i + 1
if torch.all(reached_end):
break
confidence_scores = torch.cat(confidence_scores, dim=1).cpu().detach().numpy()
predicted_tokens = predicted_tokens[:, 1:]
prediction_len[torch.eq(reached_end, False)] = max_chars - 1
predicted_tokens = [predicted_tokens[i, :prediction_len[i]] for i in range(b)]
confidence_scores = [confidence_scores[i, :prediction_len[i]].tolist() for i in range(b)]
str_x = [LM_ind_to_str(self.dataset.charset, t, oov_symbol="") for t in predicted_tokens]
process_time = time.time() - start_time
values = {
"nb_samples": b,
"str_y": batch_data["raw_labels"],
"str_x": str_x,
"confidence_score": confidence_scores,
"time": process_time,
}
return values
import os
import sys
from os.path import dirname
DOSSIER_COURRANT = dirname(os.path.abspath(__file__))
ROOT_FOLDER = dirname(dirname(dirname(DOSSIER_COURRANT)))
sys.path.append(ROOT_FOLDER)
from OCR.line_OCR.ctc.trainer_line_ctc import TrainerLineCTC
from OCR.line_OCR.ctc.models_line_ctc import Decoder
from dan.models import FCN_Encoder
from torch.optim import Adam
from basic.transforms import line_aug_config
from basic.scheduler import exponential_dropout_scheduler, exponential_scheduler
from OCR.ocr_dataset_manager import OCRDataset, OCRDatasetManager
import torch.multiprocessing as mp
import torch
import numpy as np
import random
def train_and_test(rank, params):
torch.manual_seed(0)
torch.cuda.manual_seed(0)
np.random.seed(0)
random.seed(0)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
params["training_params"]["ddp_rank"] = rank
model = TrainerLineCTC(params)
model.load_model()
# Model trains until max_time_training or max_nb_epochs is reached
model.train()
# load weights giving best CER on valid set
model.params["training_params"]["load_epoch"] = "best"
model.load_model()
# compute metrics on train, valid and test sets (in eval conditions)
metrics = ["cer", "wer", "time", ]
for dataset_name in params["dataset_params"]["datasets"].keys():
for set_name in ["test", "valid", "train", ]:
model.predict("{}-{}".format(dataset_name, set_name), [(dataset_name, set_name), ], metrics, output=True)
def main():
dataset_name = "READ_2016" # ["RIMES", "READ_2016"]
dataset_level = "syn_line"
params = {
"dataset_params": {
"dataset_manager": OCRDatasetManager,
"dataset_class": OCRDataset,
"datasets": {
dataset_name: "../../../Datasets/formatted/{}_{}".format(dataset_name, dataset_level),
},
"train": {
"name": "{}-train".format(dataset_name),
"datasets": [(dataset_name, "train"), ],
},
"valid": {
"{}-valid".format(dataset_name): [(dataset_name, "valid"), ],
},
"config": {
"load_in_memory": True, # Load all images in CPU memory
"worker_per_gpu": 8, # Num of parallel processes per gpu for data loading
"width_divisor": 8, # Image width will be divided by 8
"height_divisor": 32, # Image height will be divided by 32
"padding_value": 0, # Image padding value
"padding_token": 1000, # Label padding value (None: default value is chosen)
"padding_mode": "br", # Padding at bottom and right
"charset_mode": "CTC", # add blank token
"constraints": ["CTC_line", ], # Padding for CTC requirements if necessary
"normalize": True, # Normalize with mean and variance of training dataset
"padding": {
"min_height": "max", # Pad to reach max height of training samples
"min_width": "max", # Pad to reach max width of training samples
"min_pad": None,
"max_pad": None,
"mode": "br", # Padding at bottom and right
"train_only": False, # Add padding at training time and evaluation time
},
"preprocessings": [
{
"type": "to_RGB",
# if grayscale image, produce RGB one (3 channels with same value) otherwise do nothing
},
],
# Augmentation techniques to use at training time
"augmentation": line_aug_config(0.9, 0.1),
#
"synthetic_data": {
"mode": "line_hw_to_printed",
"init_proba": 1,
"end_proba": 1,
"num_steps_proba": 1e5,
"proba_scheduler_function": exponential_scheduler,
"config": {
"background_color_default": (255, 255, 255),
"background_color_eps": 15,
"text_color_default": (0, 0, 0),
"text_color_eps": 15,
"font_size_min": 30,
"font_size_max": 50,
"color_mode": "RGB",
"padding_left_ratio_min": 0.02,
"padding_left_ratio_max": 0.1,
"padding_right_ratio_min": 0.02,
"padding_right_ratio_max": 0.1,
"padding_top_ratio_min": 0.02,
"padding_top_ratio_max": 0.2,
"padding_bottom_ratio_min": 0.02,
"padding_bottom_ratio_max": 0.2,
},
},
}
},
"model_params": {
# Model classes to use for each module
"models": {
"encoder": FCN_Encoder,
"decoder": Decoder,
},
"transfer_learning": None,
"input_channels": 3, # 1 for grayscale images, 3 for RGB ones (or grayscale as RGB)
"enc_size": 256,
"dropout_scheduler": {
"function": exponential_dropout_scheduler,
"T": 5e4,
},
"dropout": 0.5,
},
"training_params": {
"output_folder": "FCN_read_2016_line_syn", # folder names for logs and weigths
"max_nb_epochs": 10000, # max number of epochs for the training
"max_training_time": 3600 * 24 * 1.9, # max training time limit (in seconds)
"load_epoch": "last", # ["best", "last"], to load weights from best epoch or last trained epoch
"interval_save_weights": None, # None: keep best and last only
"use_ddp": False, # Use DistributedDataParallel
"use_amp": True, # Enable automatic mix-precision
"nb_gpu": torch.cuda.device_count(),
"batch_size": 16, # mini-batch size per GPU
"optimizers": {
"all": {
"class": Adam,
"args": {
"lr": 0.0001,
"amsgrad": False,
}
}
},
"lr_schedulers": None, # Learning rate schedulers
"eval_on_valid": True, # Whether to eval and logs metrics on validation set during training or not
"eval_on_valid_interval": 2, # Interval (in epochs) to evaluate during training
"focus_metric": "cer", # Metrics to focus on to determine best epoch
"expected_metric_value": "low", # ["high", "low"] What is best for the focus metric value
"set_name_focus_metric": "{}-valid".format(dataset_name), # Which dataset to focus on to select best weights
"train_metrics": ["loss_ctc", "cer", "wer"], # Metrics name for training
"eval_metrics": ["loss_ctc", "cer", "wer"], # Metrics name for evaluation on validation set during training
"force_cpu": False, # True for debug purposes to run on cpu only
},
}
if params["training_params"]["use_ddp"] and not params["training_params"]["force_cpu"]:
mp.spawn(train_and_test, args=(params,), nprocs=params["training_params"]["nb_gpu"])
else:
train_and_test(0, params)
if __name__ == "__main__":
main()
import os
import sys
from os.path import dirname
DOSSIER_COURRANT = dirname(os.path.abspath(__file__))
ROOT_FOLDER = dirname(dirname(dirname(DOSSIER_COURRANT)))
sys.path.append(ROOT_FOLDER)
from OCR.line_OCR.ctc.trainer_line_ctc import TrainerLineCTC
from OCR.line_OCR.ctc.models_line_ctc import Decoder
from dan.models import FCN_Encoder
from torch.optim import Adam
from basic.transforms import line_aug_config
from basic.scheduler import exponential_dropout_scheduler, exponential_scheduler
from OCR.ocr_dataset_manager import OCRDataset, OCRDatasetManager
import torch.multiprocessing as mp
import torch
import numpy as np
import random
def train_and_test(rank, params):
torch.manual_seed(0)
torch.cuda.manual_seed(0)
np.random.seed(0)
random.seed(0)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
params["training_params"]["ddp_rank"] = rank
model = TrainerLineCTC(params)
model.generate_syn_line_dataset("READ_2016_syn_line") # ["RIMES_syn_line", "READ_2016_syn_line"]
def main():
dataset_name = "READ_2016" # ["RIMES", "READ_2016"]
dataset_level = "page"
params = {
"dataset_params": {
"dataset_manager": OCRDatasetManager,
"dataset_class": OCRDataset,
"datasets": {
dataset_name: "../../../Datasets/formatted/{}_{}".format(dataset_name, dataset_level),
},
"train": {
"name": "{}-train".format(dataset_name),
"datasets": [(dataset_name, "train"), ],
},
"valid": {
"{}-valid".format(dataset_name): [(dataset_name, "valid"), ],
},
"config": {
"load_in_memory": False, # Load all images in CPU memory
"worker_per_gpu": 4,
"width_divisor": 8, # Image width will be divided by 8
"height_divisor": 32, # Image height will be divided by 32
"padding_value": 0, # Image padding value
"padding_token": 1000, # Label padding value (None: default value is chosen)
"padding_mode": "br", # Padding at bottom and right
"charset_mode": "CTC", # add blank token
"constraints": [], # Padding for CTC requirements if necessary
"normalize": True, # Normalize with mean and variance of training dataset
"preprocessings": [],
# Augmentation techniques to use at training time
"augmentation": line_aug_config(0.9, 0.1),
#
"synthetic_data": {
"mode": "line_hw_to_printed",
"init_proba": 1,
"end_proba": 1,
"num_steps_proba": 1e5,
"proba_scheduler_function": exponential_scheduler,
"config": {
"background_color_default": (255, 255, 255),
"background_color_eps": 15,
"text_color_default": (0, 0, 0),
"text_color_eps": 15,
"font_size_min": 30,
"font_size_max": 50,
"color_mode": "RGB",
"padding_left_ratio_min": 0.02,
"padding_left_ratio_max": 0.1,
"padding_right_ratio_min": 0.02,
"padding_right_ratio_max": 0.1,
"padding_top_ratio_min": 0.02,
"padding_top_ratio_max": 0.2,
"padding_bottom_ratio_min": 0.02,
"padding_bottom_ratio_max": 0.2,
},
},
}
},
"model_params": {
# Model classes to use for each module
"models": {
"encoder": FCN_Encoder,
"decoder": Decoder,
},
"transfer_learning": None,
"input_channels": 3, # 1 for grayscale images, 3 for RGB ones (or grayscale as RGB)
"enc_size": 256,
"dropout_scheduler": {
"function": exponential_dropout_scheduler,
"T": 5e4,
},
"dropout": 0.5,
},
"training_params": {
"output_folder": "FCN_Encoder_read_syn_line_all_pad_max_cursive", # folder names for logs and weigths
"max_nb_epochs": 10000, # max number of epochs for the training
"max_training_time": 3600 * 24 * 1.9, # max training time limit (in seconds)
"load_epoch": "last", # ["best", "last"], to load weights from best epoch or last trained epoch
"interval_save_weights": None, # None: keep best and last only
"use_ddp": False, # Use DistributedDataParallel
"use_amp": True, # Enable automatic mix-precision
"nb_gpu": torch.cuda.device_count(),
"batch_size": 1, # mini-batch size per GPU
"optimizers": {
"all": {
"class": Adam,
"args": {
"lr": 0.0001,
"amsgrad": False,
}
}
},
"lr_schedulers": None,
"eval_on_valid": True, # Whether to eval and logs metrics on validation set during training or not
"eval_on_valid_interval": 2, # Interval (in epochs) to evaluate during training
"focus_metric": "cer", # Metrics to focus on to determine best epoch
"expected_metric_value": "low", # ["high", "low"] What is best for the focus metric value
"set_name_focus_metric": "{}-valid".format(dataset_name),
"train_metrics": ["loss_ctc", "cer", "wer"], # Metrics name for training
"eval_metrics": ["loss_ctc", "cer", "wer"], # Metrics name for evaluation on validation set during training
"force_cpu": False, # True for debug purposes to run on cpu only
},
}
if params["training_params"]["use_ddp"] and not params["training_params"]["force_cpu"]:
mp.spawn(train_and_test, args=(params,), nprocs=params["training_params"]["nb_gpu"])
else:
train_and_test(0, params)
if __name__ == "__main__":
main()
\ No newline at end of file
from torch.nn.functional import log_softmax
from torch.nn import AdaptiveMaxPool2d, Conv1d
from torch.nn import Module
class Decoder(Module):
def __init__(self, params):
super(Decoder, self).__init__()
self.vocab_size = params["vocab_size"]
self.ada_pool = AdaptiveMaxPool2d((1, None))
self.end_conv = Conv1d(in_channels=params["enc_size"], out_channels=self.vocab_size+1, kernel_size=1)
def forward(self, x):
x = self.ada_pool(x).squeeze(2)
x = self.end_conv(x)
return log_softmax(x, dim=1)
from basic.metric_manager import MetricManager
from OCR.ocr_manager import OCRManager
from dan.ocr_utils import LM_ind_to_str
import torch
from torch.cuda.amp import autocast
from torch.nn import CTCLoss
import re
import time
class TrainerLineCTC(OCRManager):
def __init__(self, params):
super(TrainerLineCTC, self).__init__(params)
def train_batch(self, batch_data, metric_names):
"""
Forward and backward pass for training
"""
x = batch_data["imgs"].to(self.device)
y = batch_data["labels"].to(self.device)
x_reduced_len = [s[1] for s in batch_data["imgs_reduced_shape"]]
y_len = batch_data["labels_len"]
loss_ctc = CTCLoss(blank=self.dataset.tokens["blank"])
self.zero_optimizers()
with autocast(enabled=self.params["training_params"]["use_amp"]):
x = self.models["encoder"](x)
global_pred = self.models["decoder"](x)
loss = loss_ctc(global_pred.permute(2, 0, 1), y, x_reduced_len, y_len)
self.backward_loss(loss)
self.step_optimizers()
pred = torch.argmax(global_pred, dim=1).cpu().numpy()
values = {
"nb_samples": len(batch_data["raw_labels"]),
"loss_ctc": loss.item(),
"str_x": self.pred_to_str(pred, x_reduced_len),
"str_y": batch_data["raw_labels"]
}
return values
def evaluate_batch(self, batch_data, metric_names):
"""
Forward pass only for validation and test
"""
x = batch_data["imgs"].to(self.device)
y = batch_data["labels"].to(self.device)
x_reduced_len = [s[1] for s in batch_data["imgs_reduced_shape"]]
y_len = batch_data["labels_len"]
loss_ctc = CTCLoss(blank=self.dataset.tokens["blank"])
start_time = time.time()
with autocast(enabled=self.params["training_params"]["use_amp"]):
x = self.models["encoder"](x)
global_pred = self.models["decoder"](x)
loss = loss_ctc(global_pred.permute(2, 0, 1), y, x_reduced_len, y_len)
pred = torch.argmax(global_pred, dim=1).cpu().numpy()
str_x = self.pred_to_str(pred, x_reduced_len)
process_time =time.time() - start_time
values = {
"nb_samples": len(batch_data["raw_labels"]),
"loss_ctc": loss.item(),
"str_x": str_x,
"str_y": batch_data["raw_labels"],
"time": process_time
}
return values
def ctc_remove_successives_identical_ind(self, ind):
res = []
for i in ind:
if res and res[-1] == i:
continue
res.append(i)
return res
def pred_to_str(self, pred, pred_len):
"""
convert prediction tokens to string
"""
ind_x = [pred[i][:pred_len[i]] for i in range(pred.shape[0])]
ind_x = [self.ctc_remove_successives_identical_ind(t) for t in ind_x]
str_x = [LM_ind_to_str(self.dataset.charset, t, oov_symbol="") for t in ind_x]
str_x = [re.sub("( )+", ' ', t).strip(" ") for t in str_x]
return str_x
This diff is collapsed.
from basic.generic_training_manager import GenericTrainingManager
import os
from PIL import Image
import pickle
class OCRManager(GenericTrainingManager):
def __init__(self, params):
super(OCRManager, self).__init__(params)
self.params["model_params"]["vocab_size"] = len(self.dataset.charset)
def generate_syn_line_dataset(self, name):
"""
Generate synthetic line dataset from currently loaded dataset
"""
dataset_name = list(self.params['dataset_params']["datasets"].keys())[0]
path = os.path.join(os.path.dirname(self.params['dataset_params']["datasets"][dataset_name]), name)
os.makedirs(path, exist_ok=True)
charset = set()
dataset = None
gt = {
"train": dict(),
"valid": dict(),
"test": dict()
}
for set_name in ["train", "valid", "test"]:
set_path = os.path.join(path, set_name)
os.makedirs(set_path, exist_ok=True)
if set_name == "train":
dataset = self.dataset.train_dataset
elif set_name == "valid":
dataset = self.dataset.valid_datasets["{}-valid".format(dataset_name)]
elif set_name == "test":
self.dataset.generate_test_loader("{}-test".format(dataset_name), [(dataset_name, "test"), ])
dataset = self.dataset.test_datasets["{}-test".format(dataset_name)]
samples = list()
for sample in dataset.samples:
for line_label in sample["label"].split("\n"):
for chunk in [line_label[i:i+100] for i in range(0, len(line_label), 100)]:
charset = charset.union(set(chunk))
if len(chunk) > 0:
samples.append({
"path": sample["path"],
"label": chunk,
"nb_cols": 1,
})
for i, sample in enumerate(samples):
ext = sample['path'].split(".")[-1]
img_name = "{}_{}.{}".format(set_name, i, ext)
img_path = os.path.join(set_path, img_name)
img = dataset.generate_typed_text_line_image(sample["label"])
Image.fromarray(img).save(img_path)
gt[set_name][img_name] = {
"text": sample["label"],
"nb_cols": sample["nb_cols"] if "nb_cols" in sample else 1
}
if "line_label" in sample:
gt[set_name][img_name]["lines"] = sample["line_label"]
with open(os.path.join(path, "labels.pkl"), "wb") as f:
pickle.dump({
"ground_truth": gt,
"charset": sorted(list(charset)),
}, f)
\ No newline at end of file
...@@ -2,10 +2,10 @@ ...@@ -2,10 +2,10 @@
This repository is a public implementation of the paper: "DAN: a Segmentation-free Document Attention Network for Handwritten Document Recognition". This repository is a public implementation of the paper: "DAN: a Segmentation-free Document Attention Network for Handwritten Document Recognition".
![Prediction visualization](visual.png) ![Prediction visualization](images/visual.png)
The model uses a character-level attention to handle slanted lines: The model uses a character-level attention to handle slanted lines:
![Prediction visualization on slanted lines](visual_slanted_lines.png) ![Prediction visualization on slanted lines](images/visual_slanted_lines.png)
The paper is available at https://arxiv.org/abs/2203.12273. The paper is available at https://arxiv.org/abs/2203.12273.
......
import torch
import random
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from basic.transforms import apply_data_augmentation
from Datasets.dataset_formatters.utils_dataset import natural_sort
import os
import numpy as np
import pickle
from PIL import Image
import cv2
class DatasetManager:
def __init__(self, params):
self.params = params
self.dataset_class = params["dataset_class"]
self.img_padding_value = params["config"]["padding_value"]
self.my_collate_function = None
self.train_dataset = None
self.valid_datasets = dict()
self.test_datasets = dict()
self.train_loader = None
self.valid_loaders = dict()
self.test_loaders = dict()
self.train_sampler = None
self.valid_samplers = dict()
self.test_samplers = dict()
self.generator = torch.Generator()
self.generator.manual_seed(0)
self.batch_size = {
"train": self.params["batch_size"],
"valid": self.params["valid_batch_size"] if "valid_batch_size" in self.params else self.params["batch_size"],
"test": self.params["test_batch_size"] if "test_batch_size" in self.params else 1,
}
def apply_specific_treatment_after_dataset_loading(self, dataset):
raise NotImplementedError
def load_datasets(self):
"""
Load training and validation datasets
"""
self.train_dataset = self.dataset_class(self.params, "train", self.params["train"]["name"], self.get_paths_and_sets(self.params["train"]["datasets"]))
self.params["config"]["mean"], self.params["config"]["std"] = self.train_dataset.compute_std_mean()
self.my_collate_function = self.train_dataset.collate_function(self.params["config"])
self.apply_specific_treatment_after_dataset_loading(self.train_dataset)
for custom_name in self.params["valid"].keys():
self.valid_datasets[custom_name] = self.dataset_class(self.params, "valid", custom_name, self.get_paths_and_sets(self.params["valid"][custom_name]))
self.apply_specific_treatment_after_dataset_loading(self.valid_datasets[custom_name])
def load_ddp_samplers(self):
"""
Load training and validation data samplers
"""
if self.params["use_ddp"]:
self.train_sampler = DistributedSampler(self.train_dataset, num_replicas=self.params["num_gpu"], rank=self.params["ddp_rank"], shuffle=True)
for custom_name in self.valid_datasets.keys():
self.valid_samplers[custom_name] = DistributedSampler(self.valid_datasets[custom_name], num_replicas=self.params["num_gpu"], rank=self.params["ddp_rank"], shuffle=False)
else:
for custom_name in self.valid_datasets.keys():
self.valid_samplers[custom_name] = None
def load_dataloaders(self):
"""
Load training and validation data loaders
"""
self.train_loader = DataLoader(self.train_dataset,
batch_size=self.batch_size["train"],
shuffle=True if self.train_sampler is None else False,
drop_last=False,
batch_sampler=self.train_sampler,
sampler=self.train_sampler,
num_workers=self.params["num_gpu"]*self.params["worker_per_gpu"],
pin_memory=True,
collate_fn=self.my_collate_function,
worker_init_fn=self.seed_worker,
generator=self.generator)
for key in self.valid_datasets.keys():
self.valid_loaders[key] = DataLoader(self.valid_datasets[key],
batch_size=self.batch_size["valid"],
sampler=self.valid_samplers[key],
batch_sampler=self.valid_samplers[key],
shuffle=False,
num_workers=self.params["num_gpu"]*self.params["worker_per_gpu"],
pin_memory=True,
drop_last=False,
collate_fn=self.my_collate_function,
worker_init_fn=self.seed_worker,
generator=self.generator)
@staticmethod
def seed_worker(worker_id):
worker_seed = torch.initial_seed() % 2 ** 32
np.random.seed(worker_seed)
random.seed(worker_seed)
def generate_test_loader(self, custom_name, sets_list):
"""
Load test dataset, data sampler and data loader
"""
if custom_name in self.test_loaders.keys():
return
paths_and_sets = list()
for set_info in sets_list:
paths_and_sets.append({
"path": self.params["datasets"][set_info[0]],
"set_name": set_info[1]
})
self.test_datasets[custom_name] = self.dataset_class(self.params, "test", custom_name, paths_and_sets)
self.apply_specific_treatment_after_dataset_loading(self.test_datasets[custom_name])
if self.params["use_ddp"]:
self.test_samplers[custom_name] = DistributedSampler(self.test_datasets[custom_name], num_replicas=self.params["num_gpu"], rank=self.params["ddp_rank"], shuffle=False)
else:
self.test_samplers[custom_name] = None
self.test_loaders[custom_name] = DataLoader(self.test_datasets[custom_name],
batch_size=self.batch_size["test"],
sampler=self.test_samplers[custom_name],
shuffle=False,
num_workers=self.params["num_gpu"]*self.params["worker_per_gpu"],
pin_memory=True,
drop_last=False,
collate_fn=self.my_collate_function,
worker_init_fn=self.seed_worker,
generator=self.generator)
def remove_test_dataset(self, custom_name):
del self.test_datasets[custom_name]
del self.test_samplers[custom_name]
del self.test_loaders[custom_name]
def remove_valid_dataset(self, custom_name):
del self.valid_datasets[custom_name]
del self.valid_samplers[custom_name]
del self.valid_loaders[custom_name]
def remove_train_dataset(self):
self.train_dataset = None
self.train_sampler = None
self.train_loader = None
def remove_all_datasets(self):
self.remove_train_dataset()
for name in list(self.valid_datasets.keys()):
self.remove_valid_dataset(name)
for name in list(self.test_datasets.keys()):
self.remove_test_dataset(name)
def get_paths_and_sets(self, dataset_names_folds):
paths_and_sets = list()
for dataset_name, fold in dataset_names_folds:
path = self.params["datasets"][dataset_name]
paths_and_sets.append({
"path": path,
"set_name": fold
})
return paths_and_sets
class GenericDataset(Dataset):
"""
Main class to handle dataset loading
"""
def __init__(self, params, set_name, custom_name, paths_and_sets):
self.params = params
self.name = custom_name
self.set_name = set_name
self.mean = np.array(params["config"]["mean"]) if "mean" in params["config"].keys() else None
self.std = np.array(params["config"]["std"]) if "std" in params["config"].keys() else None
self.load_in_memory = self.params["config"]["load_in_memory"] if "load_in_memory" in self.params["config"] else True
self.samples = self.load_samples(paths_and_sets, load_in_memory=self.load_in_memory)
if self.load_in_memory:
self.apply_preprocessing(params["config"]["preprocessings"])
self.padding_value = params["config"]["padding_value"]
if self.padding_value == "mean":
if self.mean is None:
_, _ = self.compute_std_mean()
self.padding_value = self.mean
self.params["config"]["padding_value"] = self.padding_value
self.curriculum_config = None
self.training_info = None
def __len__(self):
return len(self.samples)
@staticmethod
def load_image(path):
with Image.open(path) as pil_img:
img = np.array(pil_img)
## grayscale images
if len(img.shape) == 2:
img = np.expand_dims(img, axis=2)
return img
@staticmethod
def load_samples(paths_and_sets, load_in_memory=True):
"""
Load images and labels
"""
samples = list()
for path_and_set in paths_and_sets:
path = path_and_set["path"]
set_name = path_and_set["set_name"]
with open(os.path.join(path, "labels.pkl"), "rb") as f:
info = pickle.load(f)
gt = info["ground_truth"][set_name]
for filename in natural_sort(gt.keys()):
name = os.path.join(os.path.basename(path), set_name, filename)
full_path = os.path.join(path, set_name, filename)
if isinstance(gt[filename], dict) and "text" in gt[filename]:
label = gt[filename]["text"]
else:
label = gt[filename]
samples.append({
"name": name,
"label": label,
"unchanged_label": label,
"path": full_path,
"nb_cols": 1 if "nb_cols" not in gt[filename] else gt[filename]["nb_cols"]
})
if load_in_memory:
samples[-1]["img"] = GenericDataset.load_image(full_path)
if type(gt[filename]) is dict:
if "lines" in gt[filename].keys():
samples[-1]["raw_line_seg_label"] = gt[filename]["lines"]
if "paragraphs" in gt[filename].keys():
samples[-1]["paragraphs_label"] = gt[filename]["paragraphs"]
if "pages" in gt[filename].keys():
samples[-1]["pages_label"] = gt[filename]["pages"]
return samples
def apply_preprocessing(self, preprocessings):
for i in range(len(self.samples)):
self.samples[i] = apply_preprocessing(self.samples[i], preprocessings)
def compute_std_mean(self):
"""
Compute cumulated variance and mean of whole dataset
"""
if self.mean is not None and self.std is not None:
return self.mean, self.std
if not self.load_in_memory:
sample = self.samples[0].copy()
sample["img"] = self.get_sample_img(0)
img = apply_preprocessing(sample, self.params["config"]["preprocessings"])["img"]
else:
img = self.get_sample_img(0)
_, _, c = img.shape
sum = np.zeros((c,))
nb_pixels = 0
for i in range(len(self.samples)):
if not self.load_in_memory:
sample = self.samples[i].copy()
sample["img"] = self.get_sample_img(i)
img = apply_preprocessing(sample, self.params["config"]["preprocessings"])["img"]
else:
img = self.get_sample_img(i)
sum += np.sum(img, axis=(0, 1))
nb_pixels += np.prod(img.shape[:2])
mean = sum / nb_pixels
diff = np.zeros((c,))
for i in range(len(self.samples)):
if not self.load_in_memory:
sample = self.samples[i].copy()
sample["img"] = self.get_sample_img(i)
img = apply_preprocessing(sample, self.params["config"]["preprocessings"])["img"]
else:
img = self.get_sample_img(i)
diff += [np.sum((img[:, :, k] - mean[k]) ** 2) for k in range(c)]
std = np.sqrt(diff / nb_pixels)
self.mean = mean
self.std = std
return mean, std
def apply_data_augmentation(self, img):
"""
Apply data augmentation strategy on the input image
"""
augs = [self.params["config"][key] if key in self.params["config"].keys() else None for key in ["augmentation", "valid_augmentation", "test_augmentation"]]
for aug, set_name in zip(augs, ["train", "valid", "test"]):
if aug and self.set_name == set_name:
return apply_data_augmentation(img, aug)
return img, list()
def get_sample_img(self, i):
"""
Get image by index
"""
if self.load_in_memory:
return self.samples[i]["img"]
else:
return GenericDataset.load_image(self.samples[i]["path"])
def denormalize(self, img):
"""
Get original image, before normalization
"""
return img * self.std + self.mean
def apply_preprocessing(sample, preprocessings):
"""
Apply preprocessings on each sample
"""
resize_ratio = [1, 1]
img = sample["img"]
for preprocessing in preprocessings:
if preprocessing["type"] == "dpi":
ratio = preprocessing["target"] / preprocessing["source"]
temp_img = img
h, w, c = temp_img.shape
temp_img = cv2.resize(temp_img, (int(np.ceil(w * ratio)), int(np.ceil(h * ratio))))
if len(temp_img.shape) == 2:
temp_img = np.expand_dims(temp_img, axis=2)
img = temp_img
resize_ratio = [ratio, ratio]
if preprocessing["type"] == "to_grayscaled":
temp_img = img
h, w, c = temp_img.shape
if c == 3:
img = np.expand_dims(
0.2125 * temp_img[:, :, 0] + 0.7154 * temp_img[:, :, 1] + 0.0721 * temp_img[:, :, 2],
axis=2).astype(np.uint8)
if preprocessing["type"] == "to_RGB":
temp_img = img
h, w, c = temp_img.shape
if c == 1:
img = np.concatenate([temp_img, temp_img, temp_img], axis=2)
if preprocessing["type"] == "resize":
keep_ratio = preprocessing["keep_ratio"]
max_h, max_w = preprocessing["max_height"], preprocessing["max_width"]
temp_img = img
h, w, c = temp_img.shape
ratio_h = max_h / h if max_h else 1
ratio_w = max_w / w if max_w else 1
if keep_ratio:
ratio_h = ratio_w = min(ratio_w, ratio_h)
new_h = min(max_h, int(h * ratio_h))
new_w = min(max_w, int(w * ratio_w))
temp_img = cv2.resize(temp_img, (new_w, new_h))
if len(temp_img.shape) == 2:
temp_img = np.expand_dims(temp_img, axis=2)
img = temp_img
resize_ratio = [ratio_h, ratio_w]
if preprocessing["type"] == "fixed_height":
new_h = preprocessing["height"]
temp_img = img
h, w, c = temp_img.shape
ratio = new_h / h
temp_img = cv2.resize(temp_img, (int(w*ratio), new_h))
if len(temp_img.shape) == 2:
temp_img = np.expand_dims(temp_img, axis=2)
img = temp_img
resize_ratio = [ratio, ratio]
if resize_ratio != [1, 1] and "raw_line_seg_label" in sample:
for li in range(len(sample["raw_line_seg_label"])):
for side, ratio in zip((["bottom", "top"], ["right", "left"]), resize_ratio):
for s in side:
sample["raw_line_seg_label"][li][s] = sample["raw_line_seg_label"][li][s] * ratio
sample["img"] = img
sample["resize_ratio"] = resize_ratio
return sample
This diff is collapsed.
This diff is collapsed.
from torch.nn import Dropout, Dropout2d
import numpy as np
class DropoutScheduler:
def __init__(self, models, function, T=1e5):
"""
T: number of gradient updates to converge
"""
self.teta_list = list()
self.init_teta_list(models)
self.function = function
self.T = T
self.step_num = 0
def step(self):
self.step(1)
def step(self, num):
self.step_num += num
def init_teta_list(self, models):
for model_name in models.keys():
self.init_teta_list_module(models[model_name])
def init_teta_list_module(self, module):
for child in module.children():
if isinstance(child, Dropout) or isinstance(child, Dropout2d):
self.teta_list.append([child, child.p])
else:
self.init_teta_list_module(child)
def update_dropout_rate(self):
for (module, p) in self.teta_list:
module.p = self.function(p, self.step_num, self.T)
def exponential_dropout_scheduler(dropout_rate, step, max_step):
return dropout_rate * (1 - np.exp(-10 * step / max_step))
def exponential_scheduler(init_value, end_value, step, max_step):
step = min(step, max_step-1)
return init_value - (init_value - end_value) * (1 - np.exp(-10*step/max_step))
def linear_scheduler(init_value, end_value, step, max_step):
return init_value + step * (end_value - init_value) / max_step
\ No newline at end of file
import numpy as np
from numpy import random
from PIL import Image, ImageOps
from cv2 import erode, dilate, normalize
import cv2
import math
from basic.utils import randint, rand_uniform, rand
from torchvision.transforms import RandomPerspective, RandomCrop, ColorJitter, GaussianBlur, RandomRotation
from torchvision.transforms.functional import InterpolationMode
"""
Each transform class defined here takes as input a PIL Image and returns the modified PIL Image
"""
class SignFlipping:
"""
Color inversion
"""
def __init__(self):
pass
def __call__(self, x):
return ImageOps.invert(x)
class DPIAdjusting:
"""
Resolution modification
"""
def __init__(self, factor, preserve_ratio):
self.factor = factor
def __call__(self, x):
w, h = x.size
return x.resize((int(np.ceil(w * self.factor)), int(np.ceil(h * self.factor))), Image.BILINEAR)
class Dilation:
"""
OCR: stroke width increasing
"""
def __init__(self, kernel, iterations):
self.kernel = np.ones(kernel, np.uint8)
self.iterations = iterations
def __call__(self, x):
return Image.fromarray(dilate(np.array(x), self.kernel, iterations=self.iterations))
class Erosion:
"""
OCR: stroke width decreasing
"""
def __init__(self, kernel, iterations):
self.kernel = np.ones(kernel, np.uint8)
self.iterations = iterations
def __call__(self, x):
return Image.fromarray(erode(np.array(x), self.kernel, iterations=self.iterations))
class GaussianNoise:
"""
Add Gaussian Noise
"""
def __init__(self, std):
self.std = std
def __call__(self, x):
x_np = np.array(x)
mean, std = np.mean(x_np), np.std(x_np)
std = math.copysign(max(abs(std), 0.000001), std)
min_, max_ = np.min(x_np,), np.max(x_np)
normal_noise = np.random.randn(*x_np.shape)
if len(x_np.shape) == 3 and x_np.shape[2] == 3 and np.all(x_np[:, :, 0] == x_np[:, :, 1]) and np.all(x_np[:, :, 0] == x_np[:, :, 2]):
normal_noise[:, :, 1] = normal_noise[:, :, 2] = normal_noise[:, :, 0]
x_np = ((x_np-mean)/std + normal_noise*self.std) * std + mean
x_np = normalize(x_np, x_np, max_, min_, cv2.NORM_MINMAX)
return Image.fromarray(x_np.astype(np.uint8))
class Sharpen:
"""
Add Gaussian Noise
"""
def __init__(self, alpha, strength):
self.alpha = alpha
self.strength = strength
def __call__(self, x):
x_np = np.array(x)
id_matrix = np.array([[0, 0, 0],
[0, 1, 0],
[0, 0, 0]]
)
effect_matrix = np.array([[1, 1, 1],
[1, -(8+self.strength), 1],
[1, 1, 1]]
)
kernel = (1 - self.alpha) * id_matrix - self.alpha * effect_matrix
kernel = np.expand_dims(kernel, axis=2)
kernel = np.concatenate([kernel, kernel, kernel], axis=2)
sharpened = cv2.filter2D(x_np, -1, kernel=kernel[:, :, 0])
return Image.fromarray(sharpened.astype(np.uint8))
class ZoomRatio:
"""
Crop by ratio
Preserve dimensions if keep_dim = True (= zoom)
"""
def __init__(self, ratio_h, ratio_w, keep_dim=True):
self.ratio_w = ratio_w
self.ratio_h = ratio_h
self.keep_dim = keep_dim
def __call__(self, x):
w, h = x.size
x = RandomCrop((int(h * self.ratio_h), int(w * self.ratio_w)))(x)
if self.keep_dim:
x = x.resize((w, h), Image.BILINEAR)
return x
class ElasticDistortion:
def __init__(self, kernel_size=(7, 7), sigma=5, alpha=1):
self.kernel_size = kernel_size
self.sigma = sigma
self.alpha = alpha
def __call__(self, x):
x_np = np.array(x)
h, w = x_np.shape[:2]
dx = np.random.uniform(-1, 1, (h, w))
dy = np.random.uniform(-1, 1, (h, w))
x_gauss = cv2.GaussianBlur(dx, self.kernel_size, self.sigma)
y_gauss = cv2.GaussianBlur(dy, self.kernel_size, self.sigma)
n = np.sqrt(x_gauss**2 + y_gauss**2)
nd_x = self.alpha * x_gauss / n
nd_y = self.alpha * y_gauss / n
ind_y, ind_x = np.indices((h, w), dtype=np.float32)
map_x = nd_x + ind_x
map_x = map_x.reshape(h, w).astype(np.float32)
map_y = nd_y + ind_y
map_y = map_y.reshape(h, w).astype(np.float32)
dst = cv2.remap(x_np, map_x, map_y, cv2.INTER_LINEAR)
return Image.fromarray(dst.astype(np.uint8))
class Tightening:
"""
Reduce interline spacing
"""
def __init__(self, color=255, remove_proba=0.75):
self.color = color
self.remove_proba = remove_proba
def __call__(self, x):
x_np = np.array(x)
interline_indices = [np.all(line == 255) for line in x_np]
indices_to_removed = np.logical_and(np.random.choice([True, False], size=len(x_np), replace=True, p=[self.remove_proba, 1-self.remove_proba]), interline_indices)
new_x = x_np[np.logical_not(indices_to_removed)]
return Image.fromarray(new_x.astype(np.uint8))
def get_list_augmenters(img, aug_configs, fill_value):
"""
Randomly select a list of data augmentation techniques to used based on aug_configs
"""
augmenters = list()
for aug_config in aug_configs:
if rand() > aug_config["proba"]:
continue
if aug_config["type"] == "dpi":
valid_factor = False
while not valid_factor:
factor = rand_uniform(aug_config["min_factor"], aug_config["max_factor"])
valid_factor = not (("max_width" in aug_config and factor*img.size[0] > aug_config["max_width"]) or \
("max_height" in aug_config and factor * img.size[1] > aug_config["max_height"]) or \
("min_width" in aug_config and factor*img.size[0] < aug_config["min_width"]) or \
("min_height" in aug_config and factor * img.size[1] < aug_config["min_height"]))
augmenters.append(DPIAdjusting(factor, preserve_ratio=aug_config["preserve_ratio"]))
elif aug_config["type"] == "zoom_ratio":
ratio_h = rand_uniform(aug_config["min_ratio_h"], aug_config["max_ratio_h"])
ratio_w = rand_uniform(aug_config["min_ratio_w"], aug_config["max_ratio_w"])
augmenters.append(ZoomRatio(ratio_h=ratio_h, ratio_w=ratio_w, keep_dim=aug_config["keep_dim"]))
elif aug_config["type"] == "perspective":
scale = rand_uniform(aug_config["min_factor"], aug_config["max_factor"])
augmenters.append(RandomPerspective(distortion_scale=scale, p=1, interpolation=InterpolationMode.BILINEAR, fill=fill_value))
elif aug_config["type"] == "elastic_distortion":
kernel_size = randint(aug_config["min_kernel_size"], aug_config["max_kernel_size"]) // 2 * 2 + 1
sigma = rand_uniform(aug_config["min_sigma"], aug_config["max_sigma"])
alpha= rand_uniform(aug_config["min_alpha"], aug_config["max_alpha"])
augmenters.append(ElasticDistortion(kernel_size=(kernel_size, kernel_size), sigma=sigma, alpha=alpha))
elif aug_config["type"] == "dilation_erosion":
kernel_h = randint(aug_config["min_kernel"], aug_config["max_kernel"] + 1)
kernel_w = randint(aug_config["min_kernel"], aug_config["max_kernel"] + 1)
if randint(0, 2) == 0:
augmenters.append(Erosion((kernel_w, kernel_h), aug_config["iterations"]))
else:
augmenters.append(Dilation((kernel_w, kernel_h), aug_config["iterations"]))
elif aug_config["type"] == "color_jittering":
augmenters.append(ColorJitter(contrast=aug_config["factor_contrast"],
brightness=aug_config["factor_brightness"],
saturation=aug_config["factor_saturation"],
hue=aug_config["factor_hue"],
))
elif aug_config["type"] == "gaussian_blur":
max_kernel_h = min(aug_config["max_kernel"], img.size[1])
max_kernel_w = min(aug_config["max_kernel"], img.size[0])
kernel_h = randint(aug_config["min_kernel"], max_kernel_h + 1) // 2 * 2 + 1
kernel_w = randint(aug_config["min_kernel"], max_kernel_w + 1) // 2 * 2 + 1
sigma = rand_uniform(aug_config["min_sigma"], aug_config["max_sigma"])
augmenters.append(GaussianBlur(kernel_size=(kernel_w, kernel_h), sigma=sigma))
elif aug_config["type"] == "gaussian_noise":
augmenters.append(GaussianNoise(std=aug_config["std"]))
elif aug_config["type"] == "sharpen":
alpha = rand_uniform(aug_config["min_alpha"], aug_config["max_alpha"])
strength = rand_uniform(aug_config["min_strength"], aug_config["max_strength"])
augmenters.append(Sharpen(alpha=alpha, strength=strength))
else:
print("Error - unknown augmentor: {}".format(aug_config["type"]))
exit(-1)
return augmenters
def apply_data_augmentation(img, da_config):
"""
Apply data augmentation strategy on input image
"""
applied_da = list()
if da_config["proba"] != 1 and rand() > da_config["proba"]:
return img, applied_da
# Convert to PIL Image
img = img[:, :, 0] if img.shape[2] == 1 else img
img = Image.fromarray(img)
fill_value = da_config["fill_value"] if "fill_value" in da_config else 255
augmenters = get_list_augmenters(img, da_config["augmentations"], fill_value=fill_value)
if da_config["order"] == "random":
random.shuffle(augmenters)
for augmenter in augmenters:
img = augmenter(img)
applied_da.append(type(augmenter).__name__)
# convert to numpy array
img = np.array(img)
img = np.expand_dims(img, axis=2) if len(img.shape) == 2 else img
return img, applied_da
def apply_transform(img, transform):
"""
Apply data augmentation technique on input image
"""
img = img[:, :, 0] if img.shape[2] == 1 else img
img = Image.fromarray(img)
img = transform(img)
img = np.array(img)
return np.expand_dims(img, axis=2) if len(img.shape) == 2 else img
def line_aug_config(proba_use_da, p):
return {
"order": "random",
"proba": proba_use_da,
"augmentations": [
{
"type": "dpi",
"proba": p,
"min_factor": 0.5,
"max_factor": 1.5,
"preserve_ratio": True,
},
{
"type": "perspective",
"proba": p,
"min_factor": 0,
"max_factor": 0.4,
},
{
"type": "elastic_distortion",
"proba": p,
"min_alpha": 0.5,
"max_alpha": 1,
"min_sigma": 1,
"max_sigma": 10,
"min_kernel_size": 3,
"max_kernel_size": 9,
},
{
"type": "dilation_erosion",
"proba": p,
"min_kernel": 1,
"max_kernel": 3,
"iterations": 1,
},
{
"type": "color_jittering",
"proba": p,
"factor_hue": 0.2,
"factor_brightness": 0.4,
"factor_contrast": 0.4,
"factor_saturation": 0.4,
},
{
"type": "gaussian_blur",
"proba": p,
"min_kernel": 3,
"max_kernel": 5,
"min_sigma": 3,
"max_sigma": 5,
},
{
"type": "gaussian_noise",
"proba": p,
"std": 0.5,
},
{
"type": "sharpen",
"proba": p,
"min_alpha": 0,
"max_alpha": 1,
"min_strength": 0,
"max_strength": 1,
},
{
"type": "zoom_ratio",
"proba": p,
"min_ratio_h": 0.8,
"max_ratio_h": 1,
"min_ratio_w": 0.99,
"max_ratio_w": 1,
"keep_dim": True
},
]
}
def aug_config(proba_use_da, p):
return {
"order": "random",
"proba": proba_use_da,
"augmentations": [
{
"type": "dpi",
"proba": p,
"min_factor": 0.75,
"max_factor": 1,
"preserve_ratio": True,
},
{
"type": "perspective",
"proba": p,
"min_factor": 0,
"max_factor": 0.4,
},
{
"type": "elastic_distortion",
"proba": p,
"min_alpha": 0.5,
"max_alpha": 1,
"min_sigma": 1,
"max_sigma": 10,
"min_kernel_size": 3,
"max_kernel_size": 9,
},
{
"type": "dilation_erosion",
"proba": p,
"min_kernel": 1,
"max_kernel": 3,
"iterations": 1,
},
{
"type": "color_jittering",
"proba": p,
"factor_hue": 0.2,
"factor_brightness": 0.4,
"factor_contrast": 0.4,
"factor_saturation": 0.4,
},
{
"type": "gaussian_blur",
"proba": p,
"min_kernel": 3,
"max_kernel": 5,
"min_sigma": 3,
"max_sigma": 5,
},
{
"type": "gaussian_noise",
"proba": p,
"std": 0.5,
},
{
"type": "sharpen",
"proba": p,
"min_alpha": 0,
"max_alpha": 1,
"min_strength": 0,
"max_strength": 1,
},
]
}
import numpy as np
import torch
from torch.distributions.uniform import Uniform
import cv2
def randint(low, high):
"""
call torch.randint to preserve random among dataloader workers
"""
return int(torch.randint(low, high, (1, )))
def rand():
"""
call torch.rand to preserve random among dataloader workers
"""
return float(torch.rand((1, )))
def rand_uniform(low, high):
"""
call torch uniform to preserve random among dataloader workers
"""
return float(Uniform(low, high).sample())
def pad_sequences_1D(data, padding_value):
"""
Pad data with padding_value to get same length
"""
x_lengths = [len(x) for x in data]
longest_x = max(x_lengths)
padded_data = np.ones((len(data), longest_x)).astype(np.int32) * padding_value
for i, x_len in enumerate(x_lengths):
padded_data[i, :x_len] = data[i][:x_len]
return padded_data
def resize_max(img, max_width=None, max_height=None):
if max_width is not None and img.shape[1] > max_width:
ratio = max_width / img.shape[1]
new_h = int(np.floor(ratio * img.shape[0]))
new_w = int(np.floor(ratio * img.shape[1]))
img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
if max_height is not None and img.shape[0] > max_height:
ratio = max_height / img.shape[0]
new_h = int(np.floor(ratio * img.shape[0]))
new_w = int(np.floor(ratio * img.shape[1]))
img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
return img
def pad_images(data, padding_value, padding_mode="br"):
"""
data: list of numpy array
mode: "br"/"tl"/"random" (bottom-right, top-left, random)
"""
x_lengths = [x.shape[0] for x in data]
y_lengths = [x.shape[1] for x in data]
longest_x = max(x_lengths)
longest_y = max(y_lengths)
padded_data = np.ones((len(data), longest_x, longest_y, data[0].shape[2])) * padding_value
for i, xy_len in enumerate(zip(x_lengths, y_lengths)):
x_len, y_len = xy_len
if padding_mode == "br":
padded_data[i, :x_len, :y_len, ...] = data[i]
elif padding_mode == "tl":
padded_data[i, -x_len:, -y_len:, ...] = data[i]
elif padding_mode == "random":
xmax = longest_x - x_len
ymax = longest_y - y_len
xi = randint(0, xmax) if xmax >= 1 else 0
yi = randint(0, ymax) if ymax >= 1 else 0
padded_data[i, xi:xi+x_len, yi:yi+y_len, ...] = data[i]
else:
raise NotImplementedError("Undefined padding mode: {}".format(padding_mode))
return padded_data
def pad_image(image, padding_value, new_height=None, new_width=None, pad_width=None, pad_height=None, padding_mode="br", return_position=False):
"""
data: list of numpy array
mode: "br"/"tl"/"random" (bottom-right, top-left, random)
"""
if pad_width is not None and new_width is not None:
raise NotImplementedError("pad_with and new_width are not compatible")
if pad_height is not None and new_height is not None:
raise NotImplementedError("pad_height and new_height are not compatible")
h, w, c = image.shape
pad_width = pad_width if pad_width is not None else max(0, new_width - w) if new_width is not None else 0
pad_height = pad_height if pad_height is not None else max(0, new_height - h) if new_height is not None else 0
if not (pad_width == 0 and pad_height == 0):
padded_image = np.ones((h+pad_height, w+pad_width, c)) * padding_value
if padding_mode == "br":
hi, wi = 0, 0
elif padding_mode == "tl":
hi, wi = pad_height, pad_width
elif padding_mode == "random":
hi = randint(0, pad_height) if pad_height >= 1 else 0
wi = randint(0, pad_width) if pad_width >= 1 else 0
else:
raise NotImplementedError("Undefined padding mode: {}".format(padding_mode))
padded_image[hi:hi + h, wi:wi + w, ...] = image
output = padded_image
else:
hi, wi = 0, 0
output = image
if return_position:
return output, [[hi, hi+h], [wi, wi+w]]
return output
def pad_image_width_right(img, new_width, padding_value):
"""
Pad img to right side with padding value to reach new_width as width
"""
h, w, c = img.shape
pad_width = max((new_width - w), 0)
pad_right = np.ones((h, pad_width, c), dtype=img.dtype) * padding_value
img = np.concatenate([img, pad_right], axis=1)
return img
def pad_image_width_left(img, new_width, padding_value):
"""
Pad img to left side with padding value to reach new_width as width
"""
h, w, c = img.shape
pad_width = max((new_width - w), 0)
pad_left = np.ones((h, pad_width, c), dtype=img.dtype) * padding_value
img = np.concatenate([pad_left, img], axis=1)
return img
def pad_image_width_random(img, new_width, padding_value, max_pad_left_ratio=1):
"""
Randomly pad img to left and right sides with padding value to reach new_width as width
"""
h, w, c = img.shape
pad_width = max((new_width - w), 0)
max_pad_left = int(max_pad_left_ratio*pad_width)
pad_left = randint(0, min(pad_width, max_pad_left)) if pad_width != 0 and max_pad_left > 0 else 0
pad_right = pad_width - pad_left
pad_left = np.ones((h, pad_left, c), dtype=img.dtype) * padding_value
pad_right = np.ones((h, pad_right, c), dtype=img.dtype) * padding_value
img = np.concatenate([pad_left, img, pad_right], axis=1)
return img
def pad_image_height_random(img, new_height, padding_value, max_pad_top_ratio=1):
"""
Randomly pad img top and bottom sides with padding value to reach new_width as width
"""
h, w, c = img.shape
pad_height = max((new_height - h), 0)
max_pad_top = int(max_pad_top_ratio*pad_height)
pad_top = randint(0, min(pad_height, max_pad_top)) if pad_height != 0 and max_pad_top > 0 else 0
pad_bottom = pad_height - pad_top
pad_top = np.ones((pad_top, w, c), dtype=img.dtype) * padding_value
pad_bottom = np.ones((pad_bottom, w, c), dtype=img.dtype) * padding_value
img = np.concatenate([pad_top, img, pad_bottom], axis=0)
return img
def pad_image_height_bottom(img, new_height, padding_value):
"""
Pad img to bottom side with padding value to reach new_height as height
"""
h, w, c = img.shape
pad_height = max((new_height - h), 0)
pad_bottom = np.ones((pad_height, w, c)) * padding_value
img = np.concatenate([img, pad_bottom], axis=0)
return img
numpy==1.22.3 arkindex-client==1.0.11
opencv-python==4.5.5.64 editdistance==0.6.0
PyYAML==6.0 fontTools==4.29.1
torch==1.11.0 imageio==2.16.0
networkx==2.6.3
tensorboard==0.2.1
torchvision==0.12.0
tqdm==4.62.3
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment