root / code / trunk / backend / agent.py

Revision 343, 7.2 kB (checked in by bwilson, 6 months ago)

fix the flooder all run problem, add a skaion agent

Line 
1#!/usr/bin/python
2
3# Copyright (C) 2007 SPARTA, Inc.
4# This software is licensed under the GPLv3 license, included in
5# ./GPLv3-LICENSE.txt in the source distribution
6
7import time
8import os
9import signal
10import re
11import sys
12import logging
13import pprint
14import socket
15import seer
16from distributions import *
17
18class Agent(object):
19       
20        def __init__(self):
21                self.statedump = {}  # For storage of strings used to set variables for dumping state quickly
22                self.storage = {}  # For storage of state variables
23                self.vartypes = {} # For indicating what vars we except or not
24                self.pids = []  # Array for storage of subprocess PID's
25                self.addVarType('NODES', 'array', [])
26
27        def agentInit(self, group, whiteboard, mystate, pidkill):
28                """ Used by main program to initialize common variables.
29                        We do this in a subfunction so that actual implementations
30                        don't have to deal with it in their init function
31                """
32                self.type = self.agenttype # pull from static, maybe just use static?
33                self.group = group
34                self.name = mystate.node
35                self.whiteboard = whiteboard # Shared storage location
36                self.mystate = mystate # link to MyState object
37                self.log = logging.getLogger(self.type+"/"+self.group)
38                self.pidkill = pidkill
39               
40        def setVar(self, key, val):
41                """ Always set local (ME.key) """
42                self.storage[self.name+"."+key] = val
43       
44        def getVar(self, key):
45                """ Attempt ME.key first, then default to key """
46                return self.storage.get(self.name+"."+key, self.storage.get(key))
47       
48        def getGroupVar(self, key):
49                """ Just look up key in storage """
50                return self.storage.get(key)
51
52        def getNodeVar(self, key):
53                """ Just look up ME.key in storage """
54                return self.storage.get(self.name+"."+key)
55               
56        def getOtherNodeVar(self, node, key):
57                """ Look up node.key then default to key """
58                return self.storage.get(node+"."+key, self.storage.get(key))
59               
60
61        def myNodeMemberOf(self, arrayname):
62                """ Check if my node name is in the particular list """
63                array = self.getVar(arrayname)
64                compare = self.name.lower()
65
66                for node in array:
67                        if (node == '*') or (node.lower() == compare):
68                                return True
69                return False
70
71
72        def myIPMemberOf(self, arrayname):
73                """ Check if one of my ip addrs is in the particular list and return them """
74                array = self.getVar(arrayname)
75                iplist = self.mystate.GetIPList()
76                collect = {}
77
78                for aip in array:
79                        for myip in iplist:
80                                if (aip == myip):
81                                        collect[myip] = 1
82                return collect.keys()
83
84
85        def addVarType(self, key, type, default):
86                self.vartypes[key] = type
87                self.storage[key] = default
88       
89        def processArgs(self, args):
90                """ Insert all of the variables, verifying/converting each """
91                for k in args:
92                        singlekey = k[k.rfind('.')+1:]
93                        if (singlekey not in self.vartypes):
94                                self.log.error("Ignoring unknown key (%s)" % (k))
95                                continue
96               
97                        # Find out what type of variable we expect this to be                   
98                        type = self.vartypes.get(singlekey)
99                        val = args[k]
100
101                        try:
102                                if (type == 'array'):
103                                        self.storage[k] = re.split('[\s,]+', val)
104                                elif (type == 'float'):
105                                        self.storage[k] = float(val)
106                                elif (type == 'int'):
107                                        self.storage[k] = int(val)
108                                elif (type == 'bool'):
109                                        self.storage[k] = bool(int(val) == 1)
110                                elif (type == 'proto'):
111                                        if re.match('[a-zA-Z]+', val): 
112                                                self.storage[k] = socket.getprotobyname(val)
113                                        else:
114                                                self.storage[k] = int(val)
115                                elif (type == 'cidr'):
116                                        self.storage[k] = seer.CIDR(inputstr=val)
117                                else:
118                                        self.storage[k] = val
119
120                        except Exception:
121                                self.log.error("Can't convert value (%s) to %s " % (val, type), exc_info=1)
122
123                        else:
124                                # If we accepted the value, put its string in the statedump hash
125                                self.statedump[k] = val
126                               
127                self.configDone()
128
129
130        def configDone(self):
131                """ Called when all variables in an event have been processed """
132                pass
133
134
135        def launchProgram(self):
136                """ Base function that is called with basic agent receives a START """
137                pass
138
139
140        def handleSTART(self):
141                if len(self.pids) > 0:
142                        self.log.info("Already running, not restarting")
143                        return
144
145                if (self.myNodeMemberOf('NODES')):
146                        self.launchProgram()
147
148
149        def handleSTOP(self):
150                for pid in self.pids:
151                        self.pidkill.kill(pid, signal.SIGTERM)
152                self.pids = []
153
154
155        def __repr__(self):
156                return "Agent(%s, %s, %s)\n%s" % (self.type, self.group, self.name, pprint.pformat(self.storage, 6))
157
158
159
160"""
161Trafgen extends the regular agent and provides some callbacks for starting a server and clients
162"""
163class TrafgenAgent(Agent):
164
165        def __init__(self):
166                Agent.__init__(self)
167                self.runningserver = 0
168                self.addVarType('servers', 'array', [])
169                self.addVarType('think', 'string', '1')
170                self.addVarType('sizes', 'string', '1')
171                self.addVarType('autoquit', 'int', None)
172
173
174        def handleSTART(self):
175                if len(self.pids) > 0:
176                        self.log.info("Already running, not restarting")
177                        return
178
179                if (self.myNodeMemberOf('servers')):
180                        self.runningserver = 1
181                        self.serverExec()
182
183                if (self.myNodeMemberOf('NODES')):
184                        self.launchTrafficController()
185
186
187        def handleSTOP(self):
188                if (self.runningserver):
189                        self.serverStop()
190                        self.runningserver = 0
191
192                for pid in self.pids:
193                        self.pidkill.killpg(pid, signal.SIGTERM)
194                self.pids = []
195
196
197        def serverExec(self):
198                """ To be overriden by subclass, called when the local server should be started """
199                pass
200       
201        def serverStop(self):
202                """ To be overriden by subclass, called when the local server should be stopped """
203                pass
204
205        def clientInit(self):
206                """ To be overriden by subclass, called after fork but only once before loop starts """
207                pass
208       
209        def clientExec(self, src, dst, size):
210                """ To be overriden by subclass, this will exec or perform the necessary trafgen process multiple times """
211                pass
212
213
214        def launchTrafficController(self):
215                pid = os.fork()
216                if (pid > 0):
217                        self.pids.append(pid)
218                        return
219
220                os.setsid()
221                signal.signal(signal.SIGCHLD, signal.SIG_IGN)
222
223                spool = seer.AddressPool(self.whiteboard.get('FAKE.'+self.name))
224                dpool = seer.AddressPool()
225                serv = self.getGroupVar("servers")
226                for s in serv:
227                        dpool.Add(s, self.whiteboard.get('FAKE.'+s))
228
229                think = self.getVar("think")
230                sizes = self.getVar("sizes")
231                autoquit = self.getVar("autoquit")
232
233                logfile = self.type+"."+self.group
234                starttime = time.time()
235
236                # Redirect stdout for exec'd applications
237                try:
238                        self.log.info("starting launcher process at %d - logging output to %s\n" % (starttime, logfile))
239                        fp = open("/local/logs/%s" % (logfile), 'a', 1)
240                        sys.stdout = fp   # For python prints from here on (not logging)
241                        sys.stderr = fp
242                        os.dup2(fp.fileno(), 1)  # For anything we exec from here on
243                        os.dup2(fp.fileno(), 2)
244                except Exception:
245                        self.log.error("Failed to redirect output", exc_info=1);
246
247                # Call overridden init method
248                self.clientInit() 
249       
250                # Loop based on wait times
251                try:
252                        while (True):
253                                elapsed = time.time() - starttime;
254                                if ((autoquit > 0) and (elapsed > autoquit)):
255                                        os.killpg(0, signal.SIGTERM) # This should kill me and my children as I forked/setsid()
256                                        return
257       
258                                self.clientOneLoop(spool, dpool, think, sizes)
259
260                except Exception,e:
261                        self.log.error("error in client process", exc_info=1);
262               
263
264        def clientOneLoop(self, spool, dpool, think, sizes):
265                """ This provides an internal point to override just the loop behaviour """
266                src = spool.Random()
267                dst = dpool.Random()
268                size = int(eval(sizes))
269
270                # Check memory here     
271                self.clientExec(src, dst, size)  # Call overridden method
272
273                waitfor = eval(think)
274                time.sleep(waitfor)
Note: See TracBrowser for help on using the browser.