Newer
Older
# -*- coding: utf-8 -*-
from collections import namedtuple
from io import BytesIO
from math import ceil
import requests
from PIL import Image
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from arkindex_worker import logger
# See http://docs.python-requests.org/en/master/user/advanced/#timeouts
DOWNLOAD_TIMEOUT = (30, 60)
BoundingBox = namedtuple("BoundingBox", ["x", "y", "width", "height"])
def open_image(path, mode="RGB", rotation_angle=0, mirrored=False):
"""
Open an image from a path or a URL
"""
if (
path.startswith("http://")
or path.startswith("https://")
or not os.path.exists(path)
):
image = download_image(path)
else:
try:
image = Image.open(path)
except (IOError, ValueError):
image = download_image(path)
if image.mode != mode:
image = image.convert(mode)
if mirrored:
image = image.transpose(Image.FLIP_LEFT_RIGHT)
if rotation_angle:
image = image.rotate(-rotation_angle, expand=True)
return image
def download_image(url):
"""
Download an image and open it with Pillow
"""
assert url.startswith("http"), "Image URL must be HTTP(S)"
# Download the image
# Cannot use stream=True as urllib's responses do not support the seek(int) method,
# which is explicitly required by Image.open on file-like objects
try:
resp = requests.get(url, timeout=DOWNLOAD_TIMEOUT)
except requests.exceptions.SSLError:
logger.warning(
"An SSLError occurred during image download, retrying with a weaker and unsafe SSL configuration"
)
# Saving current ciphers
previous_ciphers = requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS
# Downgrading ciphers to download the image
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = "ALL:@SECLEVEL=1"
resp = requests.get(url, timeout=DOWNLOAD_TIMEOUT)
# Restoring previous ciphers
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = previous_ciphers
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
resp.raise_for_status()
# Preprocess the image and prepare it for classification
image = Image.open(BytesIO(resp.content))
logger.info(
"Downloaded image {} - size={}x{}".format(url, image.size[0], image.size[1])
)
return image
def polygon_bounding_box(polygon):
x_coords, y_coords = zip(*polygon)
x, y = min(x_coords), min(y_coords)
width, height = max(x_coords) - x, max(y_coords) - y
return BoundingBox(x, y, width, height)
def _retry_log(retry_state, *args, **kwargs):
logger.warning(
f"Request to {retry_state.args[0]} failed ({repr(retry_state.outcome.exception())}), "
f"retrying in {retry_state.idle_for} seconds"
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=2),
retry=retry_if_exception_type(requests.RequestException),
before_sleep=_retry_log,
reraise=True,
)
def _retried_request(url):
resp = requests.get(url, timeout=DOWNLOAD_TIMEOUT)
resp.raise_for_status()
return resp
def download_tiles(url):
"""
Reconstruct a full IIIF image on servers that cannot serve the full-sized image using tiles.
"""
if not url.endswith("/"):
url += "/"
logger.debug("Downloading image information")
info = _retried_request(url + "info.json").json()
image_width, image_height = info.get("width"), info.get("height")
assert image_width and image_height, "Missing image dimensions in info.json"
assert info.get(
"tiles"
), "Image cannot be retrieved at full size and tiles are not supported"
# Take the biggest available tile size
tile = sorted(info["tiles"], key=lambda tile: tile.get("width", 0), reverse=True)[0]
tile_width = tile["width"]
# Tile height is optional and defaults to the width
tile_height = tile.get("height", tile_width)
full_image = Image.new("RGB", (image_width, image_height))
for tile_x in range(ceil(image_width / tile_width)):
for tile_y in range(ceil(image_height / tile_height)):
region_x = tile_x * tile_width
region_y = tile_y * tile_height
# Prevent trying to crop outside the bounds of an image
region_width = min(tile_width, image_width - region_x)
region_height = min(tile_height, image_height - region_y)
logger.debug(f"Downloading tile {tile_x},{tile_y}")
resp = _retried_request(
f"{url}{region_x},{region_y},{region_width},{region_height}/full/0/default.jpg"
)
tile_img = Image.open(BytesIO(resp.content))
# Some bad IIIF image server implementations may sometimes return tiles with a few pixels of difference
# with the expected sizes, causing Pillow to raise ValueError('images do not match').
actual_width, actual_height = tile_img.size
if actual_width < region_width or actual_height < region_height:
# Fail when tiles are too small
raise ValueError(
f"Expected size {region_width}×{region_height} for tile {tile_x},{tile_y}, "
f"but got {actual_width}×{actual_height}"
)
if actual_width > region_width or actual_height > region_height:
# Warn and crop when tiles are too large
logger.warning(
f"Cropping tile {tile_x},{tile_y} from {actual_width}×{actual_height} "
f"to {region_width}×{region_height}"
)
tile_img = tile_img.crop((0, 0, region_width, region_height))
full_image.paste(
tile_img,
box=(
region_x,
region_y,
region_x + region_width,
region_y + region_height,
),
)
return full_image
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def trim_polygon(polygon, image_width: int, image_height: int):
"""
This method takes as input:
- a polygon: a list or tuple of points
- image_width, image_height: an image's dimensions
and outputs a new polygon, whose points are all located within the image.
If some of the polygon's points are not inside the image, the polygon gets trimmed,
which means that some points can disappear or their coordinates be modified.
"""
assert isinstance(
polygon, (list, tuple)
), "Input polygon must be a valid list or tuple of points."
assert all(
isinstance(point, (list, tuple)) for point in polygon
), "Polygon points must be tuples or lists."
assert all(
len(point) == 2 for point in polygon
), "Polygon points must be tuples or lists of 2 elements."
assert all(
isinstance(point[0], int) and isinstance(point[1], int) for point in polygon
), "Polygon point coordinates must be integers."
assert any(
point[0] <= image_width and point[1] <= image_height for point in polygon
), "This polygon is entirely outside the image's bounds."
trimmed_polygon = [
[
min(image_width, max(0, x)),
min(image_height, max(0, y)),
]
for x, y in polygon
]
updated_polygon = []
for point in trimmed_polygon:
if point not in updated_polygon:
updated_polygon.append(point)
# Add back the matching last point, if it was present in the original polygon
if polygon[-1] == polygon[0]:
updated_polygon.append(updated_polygon[0])
return updated_polygon