Commit 30e62cde authored by Georgios Dagkakis's avatar Georgios Dagkakis

if it is a full batch buffered before the decomposition create it this way

parent 27e90ab1
......@@ -80,8 +80,21 @@ class BatchesWIPShort(plugin.InputPreparationPlugin):
proceeded=complete - (complete % workingBatchSize)
currentCompleted=awaiting % workingBatchSize
print buffered,proceeded,currentCompleted
bufferedSubBatches=int(buffered/workingBatchSize)
if self.checkIfStationIsAfterDecomposition(data, stationId):
if buffered>=standardBatchUnits:
bufferedBatches=int(buffered/standardBatchUnits)
bufferedSubBatches=int((buffered-bufferedBatches*standardBatchUnits)/workingBatchSize)
print bufferedBatches,bufferedSubBatches
for i in range(bufferedBatches):
bufferId=self.getBuffer(data, stationId)
self.createBatch(data, bufferId, currentBatchId, currentBatchId,standardBatchUnits)
batchCounter+=1
currentBatchId='Batch_'+str(batchCounter)+'_WIP'
# set the buffered sub-batches to the previous station
for i in range(int(buffered/workingBatchSize)):
for i in range(bufferedSubBatches):
bufferId=self.getPredecessors(data, stationId)[0]
self.createSubBatch(data, bufferId, currentBatchId, currentBatchId, subBatchCounter,
workingBatchSize,receiver=stationId)
......@@ -114,8 +127,7 @@ class BatchesWIPShort(plugin.InputPreparationPlugin):
batchCounter+=1
currentBatchId='Batch_'+str(batchCounter)+'_WIP'
unitsToCompleteBatch=standardBatchUnits
# for stations that do not share sub-batches with others
# for stations that operate on full batches
else:
pass
......@@ -124,7 +136,7 @@ class BatchesWIPShort(plugin.InputPreparationPlugin):
# creates a sub-batch in a station
def createSubBatch(self,data,stationId,parentBatchId,parentBatchName,subBatchId,numberOfUnits,
unitsToProcess=0,receiver=None):
print 'creating',stationId,parentBatchId
print 'creating sub-batch',stationId,parentBatchId
data['graph']['node'][stationId]['wip'].insert(0,{
"_class": 'Dream.SubBatch',
"id": parentBatchId+'_SB_'+str(subBatchId)+'_wip',
......@@ -136,6 +148,18 @@ class BatchesWIPShort(plugin.InputPreparationPlugin):
"receiver":receiver
}
)
# creates a batch in a station
def createBatch(self,data,stationId,batchId,batchName,numberOfUnits,unitsToProcess=0):
print 'creating batch',stationId,batchId,numberOfUnits
data['graph']['node'][stationId]['wip'].insert(0,{
"_class": 'Dream.Batch',
"id": batchId+'_wip',
"name":batchName+'_wip',
"numberOfUnits":numberOfUnits,
"unitsToProcess": unitsToProcess,
}
)
# gets the data and a station id and returns a list with all the stations that the station may share batches
def findSharingStations(self,data,stationId):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment