Broker calls directly Router. Operated machines now signalled by the Router...

Broker calls directly Router. Operated machines now signalled by the Router whenever a resource is made available
parent cd548e43
...@@ -129,9 +129,38 @@ class Broker(ObjectInterruption): ...@@ -129,9 +129,38 @@ class Broker(ObjectInterruption):
yield release,self,self.victim.operatorPool.getResource(self.victim.currentOperator) yield release,self,self.victim.operatorPool.getResource(self.victim.currentOperator)
# signal the other brokers waiting for the same operators that they are now free # signal the other brokers waiting for the same operators that they are now free
# also signal the stations that were not requested to receive because the operator was occupied # also signal the stations that were not requested to receive because the operator was occupied
# self.victim.router.isCalled.signal(now()) #===============================================================
self.signalLoadStations() # TESTING
# print now(), self.victim.id, 'broker signalling ROUTER'
#===============================================================
# TODO: signalling the router must be done more elegantly, router must be set as global variable
# if the router is already invoked then do not signal it again
if not self.victim.router.invoked:
self.victim.router.invoked=True
self.victim.router.isCalled.signal(now())
# TODO: signalling the router will give the chance to it to take the control, but when will it eventually receive it.
# after signalling the broker will signal it's victim that it has finished it's processes
# TODO: this wont work for the moment. The actions that follow must be performed by all operated brokers.
# self.signalLoadStations()
#===============================================================
# # TESTING
# print now(), self.victim.currentOperator.objName, 'released', self.victim.id
#===============================================================
# the victim current operator must be cleared after the operator is released
self.timeLastOperationEnded = now()
self.victim.currentOperator = None
else:
pass
# TODO: the victim must have a new event brokerIsSet
self.victim.brokerIsSet.signal(now())
# #===========================================================================
# # signal stations that wait for load operators
# #===========================================================================
# def signalLoadStations(self):
# # signal the other brokers waiting for the same operators that they are now free
# # also signal the stations that were not requested to receive because the operator was occupied
# # but now must have the option to proceed # # but now must have the option to proceed
# from Globals import G # from Globals import G
# candidateMachines=[] # candidateMachines=[]
...@@ -142,7 +171,6 @@ class Broker(ObjectInterruption): ...@@ -142,7 +171,6 @@ class Broker(ObjectInterruption):
# for operatorpool in G.OperatorPoolsList: # for operatorpool in G.OperatorPoolsList:
# # print operatorpool.id # # print operatorpool.id
# # and find the machines the share the currentOperator with the Broker.victim # # and find the machines the share the currentOperator with the Broker.victim
# # TODO: find the machineManagedJobs.entityToGet.managerers and search there
# if self.victim.currentOperator in operatorpool.operators: # if self.victim.currentOperator in operatorpool.operators:
# # print ' current operator in other operatorPools', operatorpool.id # # print ' current operator in other operatorPools', operatorpool.id
# # print ' ', [str(x.id) for x in operatorpool.coreObjects] # # print ' ', [str(x.id) for x in operatorpool.coreObjects]
...@@ -155,28 +183,29 @@ class Broker(ObjectInterruption): ...@@ -155,28 +183,29 @@ class Broker(ObjectInterruption):
# loadPendingMachines.append(machine) # loadPendingMachines.append(machine)
# #=============================================== # #===============================================
# # # TESTING # # # TESTING
# # print now(), self.victim.id, 'broker signalling', machine.id, 'loadOperatorAvailable' # print now(), self.victim.id, 'broker signalling', machine.id, 'loadOperatorAvailable1'
# #===============================================
# machine.loadOperatorAvailable.signal(now())
#
# # if the machines are MachineManagedJobs their OperatorPool is empty while their canAcceptAndIsRequested has not returned True
# # In order to signal them that the loadOperator is free, find the entities that have that operator, search for the possible receivers that
# # can accept signal them
# for machine in G.MachineManagedJobList:
# if self.victim.currentOperator in machine.operatorPool.operators:
# if machine.broker.waitForOperator:
# candidateMachines.append(machine)
# for entity in G.pendingEntities:
# if machine.canAcceptEntity(entity) and any(type=='Load' for type in machine.multOperationTypeList):
# loadPendingMachines.append(machine)
# #===============================================
# # # TESTING
# print now(), self.victim.id, 'broker signalling ', machine.id, 'loadOperatorAvailable2'
# #=============================================== # #===============================================
# machine.loadOperatorAvailable.signal(now()) # machine.loadOperatorAvailable.signal(now())
#
# # print 'machines waitingForLoadOperator',[str(x.id) for x in loadPendingMachines] # # print 'machines waitingForLoadOperator',[str(x.id) for x in loadPendingMachines]
# # print 'machines waitingForOperator',[str(x.id) for x in candidateMachines] # # print 'machines waitingForOperator',[str(x.id) for x in candidateMachines]
# # # for the candidateMachines #
# # if loadPendingMachines:
# # maxTimeWaiting=0
# # receiver=None
# # # choose the one that waits the most time and give it the chance to grasp the resource
# # # TODO: failures after the end of processing are not considered here
# # for machine in loadPendingMachines:
# # timeWaiting=now()-machine.timeLastEntityEnded
# # if(timeWaiting>maxTimeWaiting or maxTimeWaiting==0):
# # maxTimeWaiting=timeWaiting
# # receiver=machine
# # #===============================================
# # # # TESTING
# # # print now(), self.victim.id, 'broker signalling', machine.id, 'loadOperatorAvailable'
# # #===============================================
# # # finally signal the machine to receive the operator
# # receiver.loadOperatorAvailable.signal(now())
# #
# # for the candidateMachines # # for the candidateMachines
# if candidateMachines: # if candidateMachines:
...@@ -195,85 +224,3 @@ class Broker(ObjectInterruption): ...@@ -195,85 +224,3 @@ class Broker(ObjectInterruption):
# # finally signal the machine to receive the operator # # finally signal the machine to receive the operator
# receiver.broker.resourceAvailable.signal(now()) # receiver.broker.resourceAvailable.signal(now())
\ No newline at end of file
#===============================================================
# # TESTING
# print now(), self.victim.currentOperator.objName, 'released', self.victim.id
#===============================================================
# the victim current operator must be cleared after the operator is released
self.timeLastOperationEnded = now()
self.victim.currentOperator = None
else:
pass
# TODO: exit method can perform the signalling
# TODO: the victim must have a new event brokerIsSet
self.victim.brokerIsSet.signal(now())
#===========================================================================
# signal stations that wait for load operators
#===========================================================================
def signalLoadStations(self):
# signal the other brokers waiting for the same operators that they are now free
# also signal the stations that were not requested to receive because the operator was occupied
# but now must have the option to proceed
from Globals import G
candidateMachines=[]
loadPendingMachines=[]
# print ' have to signal machines that are waiting for operator', self.victim.id, 'broker'
# run through the operatorPools
# TODO: MachineManagedJob operatorPool is not in the global OperatorPoolsList
for operatorpool in G.OperatorPoolsList:
# print operatorpool.id
# and find the machines the share the currentOperator with the Broker.victim
if self.victim.currentOperator in operatorpool.operators:
# print ' current operator in other operatorPools', operatorpool.id
# print ' ', [str(x.id) for x in operatorpool.coreObjects]
for machine in operatorpool.coreObjects:
# if the machine waits to get an operator add it to the candidateMachines local list
if machine.broker.waitForOperator:
candidateMachines.append(machine)
# cause an loadOperatorAvailable event if any of this machines can accept and has Load operation type defined
if machine.canAccept() and any(type=='Load' for type in machine.multOperationTypeList):
loadPendingMachines.append(machine)
#===============================================
# # TESTING
# print now(), self.victim.id, 'broker signalling', machine.id, 'loadOperatorAvailable1'
#===============================================
machine.loadOperatorAvailable.signal(now())
# if the machines are MachineManagedJobs their OperatorPool is empty while their canAcceptAndIsRequested has not returned True
# In order to signal them that the loadOperator is free, find the entities that have that operator, search for the possible receivers that
# can accept signal them
for machine in G.MachineManagedJobList:
if self.victim.currentOperator in machine.operatorPool.operators:
if machine.broker.waitForOperator:
candidateMachines.append(machine)
for entity in G.pendingEntities:
if machine.canAcceptEntity(entity) and any(type=='Load' for type in machine.multOperationTypeList):
loadPendingMachines.append(machine)
#===============================================
# # TESTING
# print now(), self.victim.id, 'broker signalling ', machine.id, 'loadOperatorAvailable2'
#===============================================
machine.loadOperatorAvailable.signal(now())
# print 'machines waitingForLoadOperator',[str(x.id) for x in loadPendingMachines]
# print 'machines waitingForOperator',[str(x.id) for x in candidateMachines]
# for the candidateMachines
if candidateMachines:
maxTimeWaiting=0
receiver=None
# choose the one that waits the most time and give it the chance to grasp the resource
for machine in candidateMachines:
timeWaiting=now()-machine.broker.timeWaitForOperatorStarted
if(timeWaiting>maxTimeWaiting or maxTimeWaiting==0):
maxTimeWaiting=timeWaiting
receiver=machine
#===========================================================
# # TESTING
# print now(), self.victim.id, 'broker signalling', machine.id, 'resourceAvailable'
#===========================================================
# finally signal the machine to receive the operator
receiver.broker.resourceAvailable.signal(now())
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment