Skip to content
Snippets Groups Projects
Commit 983a628c authored by Mélodie Boillet's avatar Mélodie Boillet Committed by Bastien Abadie
Browse files

Add decoder code

parent 78616260
No related branches found
No related tags found
1 merge request!1Add decoder code
[flake8]
max-line-length = 150
exclude = .git,__pycache__
ignore = E203,E501,W503
*.pth filter=lfs diff=lfs merge=lfs -text
*.pyc
*.egg-info/
stages:
- lint
- test
test:
image: python:3.8
stage: test
cache:
paths:
- .cache/pip
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
before_script:
- pip install tox
except:
- schedules
script:
- tox
lint:
image: python:3.8
cache:
paths:
- .cache/pip
- .cache/pre-commit
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
PRE_COMMIT_HOME: "$CI_PROJECT_DIR/.cache/pre-commit"
before_script:
- pip install pre-commit
except:
- schedules
script:
- pre-commit run -a
[settings]
known_third_party = cv2,numpy,pytest,setuptools,torch
repos:
- repo: https://github.com/asottile/seed-isort-config
rev: v2.2.0
hooks:
- id: seed-isort-config
- repo: https://github.com/pre-commit/mirrors-isort
rev: v4.3.21
hooks:
- id: isort
- repo: https://github.com/ambv/black
rev: 20.8b1
hooks:
- id: black
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
hooks:
- id: flake8
additional_dependencies:
- 'flake8-coding==1.3.1'
- 'flake8-copyright==0.2.2'
- 'flake8-debugger==3.1.0'
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.1.0
hooks:
- id: check-ast
- id: check-docstring-first
- id: check-executables-have-shebangs
- id: check-merge-conflict
- id: check-symlinks
- id: debug-statements
- id: trailing-whitespace
- id: check-yaml
args: [--allow-multiple-documents]
- id: mixed-line-ending
- id: name-tests-test
args: ['--django']
- id: check-json
- id: requirements-txt-fixer
- repo: https://github.com/codespell-project/codespell
rev: v1.17.1
hooks:
- id: codespell
args: ['--write-changes']
- repo: meta
hooks:
- id: check-useless-excludes
default_language_version:
python: python3.8
include requirements.txt
include VERSION
0.1.0
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cv2
import numpy as np
from torch import from_numpy
def resize(input_image, network_size, padding):
"""
Resize the input image into the network input size.
Resize the image such that the longest side is equal to the network
input size. Pad the image such that it is divisible by 8.
:param input_image: The input image to resize.
:param network_size: The input size of the model.
:param padding: The value to use as padding.
:return: The resized input image and the padding sizes.
"""
old_size = input_image.shape[:2]
if max(old_size) != network_size:
# Compute the new sizes.
ratio = float(network_size) / max(old_size)
new_size = tuple([int(x * ratio) for x in old_size])
# Resize the image.
resized_image = cv2.resize(input_image, (new_size[1], new_size[0]))
else:
new_size = old_size
resized_image = input_image
delta_w = 0
delta_h = 0
if resized_image.shape[0] % 8 != 0:
delta_h = int(8 * np.ceil(resized_image.shape[0] / 8)) - resized_image.shape[0]
if resized_image.shape[1] % 8 != 0:
delta_w = int(8 * np.ceil(resized_image.shape[1] / 8)) - resized_image.shape[1]
top, bottom = delta_h // 2, delta_h - (delta_h // 2)
left, right = delta_w // 2, delta_w - (delta_w // 2)
resized_image = cv2.copyMakeBorder(
resized_image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=padding
)
return resized_image, [top, left]
def preprocess_image(input_image, model_input_size, mean, std):
"""
Preprocess the input image before feeding it to the network.
The image is first resized, normalized and converted to a tensor.
:param input_image: The input image to preprocess.
:param model_input_size: The size of the model input.
:param mean: The mean value used to normalize the image.
:param std: The standard deviation used to normalize the image.
:return: The resized, normalized and padded input tensor.
"""
# Resize the image
resized_image, padding = resize(input_image, model_input_size, padding=mean)
# Normalize the image
normalized_image = np.zeros(resized_image.shape)
for channel in range(resized_image.shape[2]):
normalized_image[:, :, channel] = (
np.float32(resized_image[:, :, channel]) - mean[channel]
) / std[channel]
# To tensor
normalized_image = normalized_image.transpose((2, 0, 1))
normalized_image = np.expand_dims(normalized_image, axis=0)
return from_numpy(normalized_image), padding
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import os
import cv2
import numpy as np
import torch
from doc_ufcn import image, model, prediction
logging.basicConfig(
format="[%(levelname)s] %(message)s",
level=logging.DEBUG,
)
class DocUFCN:
"""
The DocUFCN class is used to apply the Doc-UFCN model.
The class initializes useful parameters: number of classes,
model input size and the device.
"""
def __init__(self, no_of_classes, model_input_size, device):
"""
Constructor of the DocUFCN class.
:param no_of_classes: The number of classes wanted at the
output of the network.
:param model_input_size: The size of the model input.
:param device: The device to use.
"""
super(DocUFCN, self).__init__()
self.no_of_classes = no_of_classes
assert isinstance(
self.no_of_classes, int
), "Number of classes must be an integer"
assert self.no_of_classes > 0, "Number of classes must be positive"
self.model_input_size = model_input_size
assert isinstance(
self.model_input_size, int
), "Model input size must be an integer"
assert self.model_input_size > 0, "Model input size must be positive"
self.device = device
def load(self, model_path, mean, std):
"""
Load a trained model.
:param model_path: Path to the model.
:param mean: The mean value to use to normalize the input image.
:param std: The std value to use to normalize the input image.
"""
net = model.DocUFCNModel(self.no_of_classes)
net.to(self.device)
# Restore the model weights.
assert os.path.isfile(model_path)
checkpoint = torch.load(model_path, map_location=self.device)
loaded_checkpoint = {}
for key in checkpoint["state_dict"].keys():
loaded_checkpoint[key.replace("module.", "")] = checkpoint["state_dict"][
key
]
net.load_state_dict(loaded_checkpoint, strict=False)
logging.debug(f"Loaded model {model_path}")
self.net = net
self.mean, self.std = mean, std
assert isinstance(
mean, list
), "mean must be a list of 3 integers (RGB) between 0 and 255"
assert (
len(mean) == 3
), "mean must be a list of 3 integers (RGB) between 0 and 255"
assert all(
isinstance(element, int) and element >= 0 and element <= 255
for element in mean
), "mean must be a list of 3 integers (RGB) between 0 and 255"
assert isinstance(
std, list
), "std must be a list of 3 integers (RGB) between 0 and 255"
assert len(std) == 3, "std must be a list of 3 integers (RGB) between 0 and 255"
assert all(
isinstance(element, int) and element >= 0 and element <= 255
for element in std
), "std must be a list of 3 integers (RGB) between 0 and 255"
def predict(
self,
input_image,
min_cc=50,
raw_output=False,
mask_output=False,
overlap_output=False,
):
"""
Run prediction on an input image.
:param input_image: The image to predict.
:param min_cc: The threshold to remove small connected components.
:param raw_output: Return the raw probabilities.
:param mask_output: Return a mask with the detected objects.
:param overlap_output: Return the detected objects drawn over the input image.
"""
self.net.eval()
assert isinstance(
input_image, np.ndarray
), "Input image must be an np.array in RGB"
input_size = (input_image.shape[0], input_image.shape[1])
input_image = np.asarray(input_image)
if len(input_image.shape) < 3:
input_image = cv2.cvtColor(input_image, cv2.COLOR_GRAY2RGB)
# Preprocess the input image.
input_tensor, padding = image.preprocess_image(
input_image, self.model_input_size, self.mean, self.std
)
logging.debug("Image pre-processed")
# Run the prediction.
with torch.no_grad():
pred = self.net(input_tensor.float().to(self.device))
pred = pred[0].cpu().detach().numpy()
# Get contours of the predicted objects.
predicted_polygons = prediction.get_predicted_polygons(
pred, self.no_of_classes
)
# Remove the small connected components.
assert isinstance(min_cc, int), "min_cc must be a positive integer"
assert min_cc > 0, "min_cc must be a positive integer"
if min_cc > 0:
for channel in range(1, self.no_of_classes):
predicted_polygons[channel] = [
contour
for contour in predicted_polygons[channel]
if cv2.contourArea(contour["polygon"]) > min_cc
]
# Resize the polygons.
resized_predicted_polygons = prediction.resize_predicted_polygons(
predicted_polygons, input_size, self.model_input_size, padding
)
# Generate the mask images if requested.
mask = (
prediction.get_prediction_image(resized_predicted_polygons, input_size)
if mask_output
else None
)
overlap = (
prediction.get_prediction_image(
resized_predicted_polygons, input_size, input_image
)
if overlap_output
else None
)
if not raw_output:
pred = None
if mask is not None:
return predicted_polygons, pred, mask * 255 / np.max(mask), overlap
return predicted_polygons, pred, mask, overlap
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import torch
from torch.nn import Module as NNModule
class DocUFCNModel(NNModule):
"""
The DocUFCNModel class is used to generate the Doc-UFCN network.
The class initializes different useful layers and defines
the sequencing of the defined layers/blocks.
"""
def __init__(self, no_of_classes):
"""
Constructor of the DocUFCNModel class.
:param no_of_classes: The number of classes wanted at the
output of the network.
"""
super(DocUFCNModel, self).__init__()
self.dilated_block1 = self.dilated_block(3, 32)
self.dilated_block2 = self.dilated_block(32, 64)
self.dilated_block3 = self.dilated_block(64, 128)
self.dilated_block4 = self.dilated_block(128, 256)
self.pool = torch.nn.MaxPool2d(2, 2)
self.conv_block1 = self.conv_block(256, 128)
self.conv_block2 = self.conv_block(256, 64)
self.conv_block3 = self.conv_block(128, 32)
self.last_conv = torch.nn.Conv2d(64, no_of_classes, 3, stride=1, padding=1)
self.softmax = torch.nn.Softmax(dim=1)
@staticmethod
def dilated_block(input_size, output_size):
"""
Define a dilated block.
It consists in 6 successive convolutions with the dilations
rates [1, 2, 4, 8, 16].
:param input_size: The size of the input tensor.
:param output_size: The size of the output tensor.
:return: The sequence of the convolutions.
"""
modules = []
modules.append(
torch.nn.Conv2d(input_size, output_size, 3, stride=1, dilation=1, padding=1)
)
modules.append(torch.nn.BatchNorm2d(output_size, track_running_stats=False))
modules.append(torch.nn.ReLU(inplace=True))
modules.append(torch.nn.Dropout(p=0.4))
for i in [2, 4, 8, 16]:
modules.append(
torch.nn.Conv2d(
output_size, output_size, 3, stride=1, dilation=i, padding=i
)
)
modules.append(torch.nn.BatchNorm2d(output_size, track_running_stats=False))
modules.append(torch.nn.ReLU(inplace=True))
modules.append(torch.nn.Dropout(p=0.4))
return torch.nn.Sequential(*modules)
@staticmethod
def conv_block(input_size, output_size):
"""
Define a convolutional block.
It consists in a convolution followed by an upsampling layer.
:param input_size: The size of the input tensor.
:param output_size: The size of the output tensor.
:return: The sequence of the convolutions.
"""
return torch.nn.Sequential(
torch.nn.Conv2d(input_size, output_size, 3, stride=1, padding=1),
torch.nn.BatchNorm2d(output_size, track_running_stats=False),
torch.nn.ReLU(inplace=True),
torch.nn.Dropout(p=0.4),
# Does the upsampling.
torch.nn.ConvTranspose2d(output_size, output_size, 2, stride=2),
torch.nn.BatchNorm2d(output_size, track_running_stats=False),
torch.nn.ReLU(inplace=True),
torch.nn.Dropout(p=0.4),
)
def forward(self, input_tensor):
"""
Define the forward step of the network.
It consists in 4 successive dilated blocks followed by 3
convolutional blocks, a final convolution and a softmax layer.
:param input_tensor: The input tensor.
:return: The output tensor.
"""
tensor = self.dilated_block1(input_tensor)
out_block1 = tensor
tensor = self.dilated_block2(self.pool(tensor))
out_block2 = tensor
tensor = self.dilated_block3(self.pool(tensor))
out_block3 = tensor
tensor = self.dilated_block4(self.pool(tensor))
tensor = self.conv_block1(tensor)
tensor = torch.cat([tensor, out_block3], dim=1)
tensor = self.conv_block2(tensor)
tensor = torch.cat([tensor, out_block2], dim=1)
tensor = self.conv_block3(tensor)
tensor = torch.cat([tensor, out_block1], dim=1)
output_tensor = self.last_conv(tensor)
return self.softmax(output_tensor)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cv2
import numpy as np
def get_predicted_polygons(prediction, no_of_classes):
"""
Keep the pixels with the highest probability across the channels
and extract the contours of the connected components.
Return a list of contours with their corresponding confidence scores.
:param prediction: The probability maps.
:param no_of_classes: The number of classes used to train the model.
:return: The predicted polygons.
"""
max_prediction = np.argmax(prediction, axis=0)
# Get the contours of the objects.
predicted_polygons = {}
for channel in range(1, no_of_classes):
probas_channel = np.uint8(max_prediction == channel) * prediction[channel, :, :]
# Generate a binary image for the current channel.
bin_img = probas_channel.copy()
bin_img[bin_img > 0] = 1
# Detect the objects contours.
contours, _ = cv2.findContours(
np.uint8(bin_img), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
predicted_polygons[channel] = [
{
"confidence": compute_confidence(contour, probas_channel),
"polygon": contour,
}
for contour in contours
]
return predicted_polygons
def compute_confidence(region, probs):
"""
Compute the confidence of a given region from the probability map.
Generates a mask of the size of the probability map to only keep the
regions pixels. Get the sum of the probabilities within the region by
multiplying the mask and the probability map. Return this sum divided
by the number of pixels of the region.
:param region: The region to compute the confidence.
:param probs: The probability map used to compute the confidence score.
:return: The mean of the region probabilities.
"""
mask = np.zeros(probs.shape)
cv2.drawContours(mask, [region], 0, 1, -1)
confidence = np.sum(mask * probs) / np.sum(mask)
return round(confidence, 2)
def resize_predicted_polygons(polygons, original_image_size, model_input_size, padding):
"""
Resize the detected polygons to the original input image size.
:param polygons: The polygons to resize.
:param original_image_size: The original input size.
:param model_input_size: The network input size.
:param padding: The padding applied to the input image.
:return polygons: The resized detected polygons.
"""
# Compute the small size image.
ratio = float(model_input_size) / max(original_image_size)
new_size = tuple([int(x * ratio) for x in original_image_size])
# Compute resizing ratio.
ratio = [
element / float(new) for element, new in zip(original_image_size, new_size)
]
for channel in polygons.keys():
for index, polygon in enumerate(polygons[channel]):
x_points = [
int((element[0][1] - padding[0]) * ratio[0])
for element in polygon["polygon"]
]
y_points = [
int((element[0][0] - padding[1]) * ratio[1])
for element in polygon["polygon"]
]
x_points = np.clip(np.array(x_points), 0, original_image_size[0])
y_points = np.clip(np.array(y_points), 0, original_image_size[1])
polygons[channel][index]["polygon"] = list(zip(y_points, x_points))
# Sort the polygons.
polygons[channel] = sorted(
polygons[channel],
key=lambda item: (item["polygon"][0][1], item["polygon"][0][0]),
)
return polygons
def get_prediction_image(polygons, image_size, image=None):
"""
Generate a mask with the detected polygons.
:param polygons: The detected polygons coordinates.
:param image_size: The original input image size.
:param image: The input image.
"""
if image is None:
mask = np.zeros((image_size[0], image_size[1]))
thickness = -1
else:
mask = image
thickness = 2
for channel in polygons.keys():
color = int(channel * 255 / len(polygons.keys()))
if image is not None:
color = [0, color, 0]
# Draw polygons.
for polygon in polygons[channel]:
cv2.drawContours(mask, [np.array(polygon["polygon"])], 0, color, thickness)
return mask
numpy==1.21.2
opencv-python-headless==4.5.3.56
torch==1.9.0
setup.py 0 → 100755
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pathlib import Path
from setuptools import find_packages, setup
MODULE = "doc_ufcn"
def parse_requirements():
path = Path(__file__).parent.resolve() / "requirements.txt"
assert path.exists(), f"Missing requirements: {path}"
return list(map(str.strip, path.read_text().splitlines()))
setup(
name=MODULE,
version=open("VERSION").read(),
description="Doc-UFCN",
author="Mélodie Boillet",
author_email="boillet@teklia.com",
install_requires=parse_requirements(),
packages=find_packages(),
)
tests/data/mask_image.png

4.45 KiB

tests/data/masked_image.png

1.11 MiB

tests/data/overlap_image.png

1.12 MiB

File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment