Commit 72a0730d authored by Ivan Tyagov's avatar Ivan Tyagov

Use proper script to get all Data Streams for a Data Set rather than rely on reference. Test it.

parent eb51cb86
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
Note: This code is quite computationally costly (for Data Streams having thousands of iles) as it needs to: Note: This code is quite computationally costly (for Data Streams having thousands of iles) as it needs to:
1. Query MariaDB to find ingestion lines 1. Query MariaDB to find ingestion lines
2. Read from ZODB both Data Ingestion Lines and Data Streams (whoch itself can be big too) 2. Read from ZODB both Data Ingestion Lines and Data Streams (which itself can be big too)
""" """
data_ingestion_line_list = context.portal_catalog( data_ingestion_line_list = context.portal_catalog(
portal_type = "Data Ingestion Line", portal_type = "Data Ingestion Line",
......
"""
This script is called from ebulk client to get list of Data Streams for a
Data set.
"""
import re import re
import json import json
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
...@@ -20,14 +25,8 @@ data_set = portal.data_set_module.get(data_set_reference) ...@@ -20,14 +25,8 @@ data_set = portal.data_set_module.get(data_set_reference)
if data_set is None: if data_set is None:
return [] return []
# XXX: use DataSet_getDataStreamList instead!
query_dict = {
"portal_type": "Data Stream",
"reference": data_set.getReference() + reference_separator + "%"}
data_stream_list = [] data_stream_list = []
for stream in data_set.DataSet_getDataStreamList():
for stream in portal_catalog(**query_dict):
if stream.getVersion() == "": if stream.getVersion() == "":
return { "status_code": 2, "result": [] } return { "status_code": 2, "result": [] }
data_stream_list.append({ 'id': 'data_stream_module/'+stream.getId(), data_stream_list.append({ 'id': 'data_stream_module/'+stream.getId(),
......
...@@ -84,7 +84,6 @@ class TestDataIngestion(SecurityTestCase): ...@@ -84,7 +84,6 @@ class TestDataIngestion(SecurityTestCase):
def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False): def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False):
ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference) ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference)
self.portal.log(ingestion_reference)
# use default ebulk policy # use default ebulk policy
ingestion_policy = self.portal.portal_ingestion_policies.wendelin_embulk ingestion_policy = self.portal.portal_ingestion_policies.wendelin_embulk
...@@ -197,6 +196,9 @@ class TestDataIngestion(SecurityTestCase): ...@@ -197,6 +196,9 @@ class TestDataIngestion(SecurityTestCase):
""" """
data_set, data_stream_list = self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True) data_set, data_stream_list = self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic() self.tic()
# check data relation between Data Set and Data Streams work
self.assertSameSet(data_stream_list, data_set.DataSet_getDataStreamList())
# publish data set and have all Data Streams publsihed automatically # publish data set and have all Data Streams publsihed automatically
data_set.publish() data_set.publish()
......
  • @Tyagov these changes and the one removing the validation state filter in 98f0da9d, have as result that the getDataStreams script returns invalid data streams, then ebulk finds inconsistencies between user's local dataset and server's dataset.

    I don't have yet the full list of scenarios where this brings problems, but one those is the bug about resuming interrupted ingestions (probably a split file).

    Another scenario is after a file deletion. The server script still returns the data stream that correspond to the removed file. For example, a dataset containing 3 valid files and one deleted returns as result list:

    • {"hash": "b2bc78a5dd45a0692f89d0bc2ad9372e", #invalid data stream, see the reference
      • "id": "data_stream_module/roque_test_dataset_20200526-115155-575_END",
      • "reference": "test_dataset/file_to_be_deleted/none_invalid", "size": 8},
    • {"hash": "b2bc78a5dd45a0692f89d0bc2ad9372e",
      • "id": "data_stream_module/roque_test_dataset_20200520-081423-825_END",
      • "reference": "test_dataset/example1/csv", "size": 8},
    • {"hash": "928497b06986a4e97f85dba74196f3b5",
      • "id": "data_stream_module/roque_test_dataset_20200520-081425-437_END",
      • "reference": "test_dataset/example2/csv", "size": 6},
    • {"hash": "d41d8cd98f00b204e9800998ecf8427e",
      • "id": "data_stream_module/roque_test_dataset_20200520-081426-137_END",
      • "reference": "test_dataset/empty/txt", "size": 0}

    As consequence, ebulk receives more datastreams than the dataset really have, and inform to the user that the local dataset is outdated.

    I'll fix the query so only valid datastreams are listed in the ERP5Site_getDataStreamList script.

  • @rporchetto , I see but this shows a bigger issue:

    A Data Set and related Data Streams can be also in published state. Being just in "validated" is oversimplification. getDataStreams script should return what user may see / access.

    Just imagine that first you update ebulk and server side code to have support for 50Mb+ chunks which are saved as they are server side and reconstructed client(ebulk side). This will make code which "merges" and "invalidates" Data Streams not needed and then the problem of filtering automatically gone.

    I strongly suggest implemnt above and then fix if needed.

  • A Data Set and related Data Streams can be also in published state. Being just in "validated" is oversimplification. getDataStreams script should return what user may see / access.

    Independently of the change in split files, the script as it is right now is not respecting what you said. Currently the script returns things that the user shouldn't see / access.

    The script should never return a data stream of a file that was deleted, no matter the split files topic.

    Unless the idea is that the client side is in charge of filtering and discard those cases, but that doesn't make too much sense from a semantic point of view.

    Edited by Roque
  • @rporchetto , I see your point. What I really meant is that at first places no Data Stream should ever be deleted (I meant case of 50Mbs chunks which after concatenation are deleted) at first place. This is inefficient for any DB approach.

    Still it raises a fundamental question: which sides stores what.

    Imo ebulk (client side) stores content and human there makes sure content is OK (validated). Server side (wendelin) is responsible for storing this efficiently and handling security (i.e. what one user may see) through proper workflow which itself has needed states to indicate status (i.e. Validated, Public, etc)

    This way the spirit of ebulk which more or less mimics git is kept as gitlab stores what git sends to it and handles security.

    But to the topic: I'm OK to filter by state now.

Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment