Simple
An aggregator is a singleton object that runs on a collector. It responds to REPORT messages from the node and data request from a GUI. It is also periodically called upon for any data that is to be pushed to the GUI. To add a new aggregator, place a python file in the modules directory that inherits from the Aggregator class defined in addons.py and implements the empty methods processReport(), processREQUEST(), and getUpdateMessage(). You must also make sure it is in the dependency tree so that it is included at startup.
When the controller process is started, a singleton object will be created. The aggreator method getUpdateMessage() will be called periodically to gather data that is to be pushed to the GUI. getUpdateMessage() should return data that is to be sent to the GUI or None if there is nothing to send. processREQUEST() is called when a GUI specifically requests data from the controller and the method should return the data requested or None if there is nothing to return. processReport() is called when data is received from a node. The method should process and save the data as necessary for resposne to getUpdateMessage() and processREQUEST().
class FileProcessor(Aggregator):
""" Processor that deals with FileList and File messages """
reporttype = 'FileOutput'
updatetype = 'FileList'
requesttypes = 'File', 'FileList',
storagedir = '/tmp/dir'
def __init__(self):
self.newfiles = []
if not os.path.isdir(self.storagedir):
os.makedirs(self.storagedir)
def processReport(self, msg):
""" Handle new file given to us from the experiment """
data = messages.Files()
data.ParseFromString(msg.data)
for f in data.files:
fp = open(os.path.join(self.storagedir, f.name), 'w')
fp.write(f.data)
fp.close()
self.newfiles.append(f.name)
def processREQUEST(self, msg):
""" Get/Gather FileList or File """
if msg.type == 'FileList':
return self.getFileList(False)
elif msg.type == 'File':
return self.getFileData(msg.filename)
def getUpdateMessage(self):
""" Send update FileList if something changed """
if len(self.newfiles) <= 0:
return None
self.newfiles = []
return self.getFileList(True)
def getFileList(self, adddata):
data = messages.Files()
for f in dircache.listdir(self.storagedir):
file = data.files.add()
file.name = f
file.modtime = os.path.getmtime(os.path.join(self.storagedir, file.name))
if adddata:
try:
fp = open(os.path.join(self.storagedir, file.name), 'r')
file.data = fp.read()
fp.close()
except IOError, (errno, strerror):
logging.warning("Failed open/read %s" % name)
return data.SerializeToString()
def getFileData(self, name):
try:
fp = open(os.path.join(self.storagedir, name), 'r')
data = messages.Files()
file = data.files.add()
file.name = name
file.modtime = os.path.getmtime(os.path.join(self.storagedir, name))
file.data = fp.read()
fp.close()
except IOError, (errno, strerror):
logging.warning("Failed open/read %s" % name)
return None
return data.SerializeToString()