Skip to content
Snippets Groups Projects

Helper for CreateMetaDataBulk

Merged Thibault Lavigne requested to merge helper-for-create-metadatabulk into master
2 files
+ 222
111
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -129,6 +129,8 @@ class MetaDataMixin(object):
metadatas, list
), "type shouldn't be null and should be of type list of Dict"
# Copy for don't erase metadatas
metas = []
for index, metadata in enumerate(metadatas):
assert isinstance(
metadata, dict
@@ -136,36 +138,47 @@ class MetaDataMixin(object):
assert metadata.get("type") and isinstance(
metadata.get("type"), MetaType
), f"Element at index {index} in metadata_list: type shouldn't be null and should be of type MetaType"
), "type shouldn't be null and should be of type MetaType"
assert metadata.get("name") and isinstance(
metadata.get("name"), str
), f"Element at index {index} in metadata_list: name shouldn't be null and should be of type str"
), "name shouldn't be null and should be of type str"
assert metadata.get("value") and isinstance(
metadata.get("value"), (str, float, int)
), f"Element at index {index} in metadata_list: value shouldn't be null and should be of type (str or float or int)"
), "value shouldn't be null and should be of type (str or float or int)"
assert metadata.get("entity_id") is None or isinstance(
metadata.get("entity_id"), str
), f"Element at index {index} in metadata_list: entity_id should be None or a str"
), "entity_id should be None or a str"
metas.append(
{
"type": metadata.get("type").value,
"name": metadata.get("name"),
"value": metadata.get("value"),
"entity_id": metadata.get("entity_id"),
}
)
if self.is_read_only:
logger.warning("Cannot create metadata as this worker is in read-only mode")
return
print(metadatas)
metadata = self.request(
created_metadatas = self.request(
"CreateMetaDataBulk",
id=element.id,
body={
"worker_version": self.worker_version_id,
"worker_run_id": self.worker_run_id,
"metadata_list": metadatas,
"metadata_list": metas,
},
)
for meta in metadata:
)["metadata_list"]
for meta in created_metadatas:
self.report.add_metadata(element.id, meta["id"], meta["type"], meta["name"])
return metadata["metadata_list"]
return created_metadatas
def list_metadata(self, element: Element):
"""
Loading