Newer
Older
from operator import attrgetter
from arkindex_export import Dataset, DatasetElement, Element, Transcription
from arkindex_export.queries import list_children
from atr_data_generator.extract.arguments import MANUAL
def get_dataset_elements(dataset: Dataset, split: str):
"""
Retrieve dataset elements in a specific split from an SQLite export of an Arkindex corpus
:param dataset: Dataset object from which the elements come.
:param split: Set name of the dataset to use.
:return: The filtered list of dataset elements.
DatasetElement.select(DatasetElement.element)
.join(Element)
.where(
DatasetElement.dataset == dataset,
DatasetElement.set_name == split,
)
)
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def parse_sources(sources: List[str]):
"""List of transcriptions sources. Manual source has a different treatment.
:param sources: List of str or MANUAL.
:return: A peewee filter by Transcription.worker_version
"""
query_filter = None
if MANUAL in sources:
# Manual filtering
query_filter = Transcription.worker_version.is_null()
sources.remove(MANUAL)
# Filter by worker_versions
if sources:
if query_filter:
query_filter |= Transcription.worker_version.in_(sources)
else:
query_filter = Transcription.worker_version.in_(sources)
return query_filter
def get_children_info(
parent_id: str,
type: Optional[str],
sources: Optional[List[str]],
):
"""Get the information about the children elements and their transcriptions. Apply all needed filters.
:param parent_id: ID of the parent element.
:param type: Transcriptions of elements.
"""
elements = list_children(parent_id)
# Insert parent in the query to allow to process it
elements = Element.select().where(
Element.id.in_(list(map(attrgetter("id"), elements)) + [parent_id])
)
# Filter by type
if type:
elements = elements.where(Element.type == type)
# Get transcriptions
transcriptions = Transcription.select().join(
elements, on=(Transcription.element == elements.c.id)
)
# Filter by transcription source
if sources:
transcriptions = transcriptions.where(parse_sources(sources.copy()))
# Additional ordering in case there are identical names
return transcriptions.order_by(Transcription.element.name, Transcription.element_id)