| 1 | #!/usr/bin/python |
|---|
| 2 | |
|---|
| 3 | # Copyright (C) 2007 SPARTA, Inc. |
|---|
| 4 | # This software is licensed under the GPLv3 license, included in |
|---|
| 5 | # ./GPLv3-LICENSE.txt in the source distribution |
|---|
| 6 | |
|---|
| 7 | import time |
|---|
| 8 | import os |
|---|
| 9 | import signal |
|---|
| 10 | import re |
|---|
| 11 | import sys |
|---|
| 12 | import logging |
|---|
| 13 | import pprint |
|---|
| 14 | import socket |
|---|
| 15 | import seer |
|---|
| 16 | from distributions import * |
|---|
| 17 | |
|---|
| 18 | class Agent(object): |
|---|
| 19 | |
|---|
| 20 | def __init__(self): |
|---|
| 21 | self.statedump = {} # For storage of strings used to set variables for dumping state quickly |
|---|
| 22 | self.storage = {} # For storage of state variables |
|---|
| 23 | self.vartypes = {} # For indicating what vars we except or not |
|---|
| 24 | self.pids = [] # Array for storage of subprocess PID's |
|---|
| 25 | self.addVarType('NODES', 'array', []) |
|---|
| 26 | |
|---|
| 27 | def agentInit(self, group, whiteboard, mystate, pidkill): |
|---|
| 28 | """ Used by main program to initialize common variables. |
|---|
| 29 | We do this in a subfunction so that actual implementations |
|---|
| 30 | don't have to deal with it in their init function |
|---|
| 31 | """ |
|---|
| 32 | self.type = self.agenttype # pull from static, maybe just use static? |
|---|
| 33 | self.group = group |
|---|
| 34 | self.name = mystate.node |
|---|
| 35 | self.whiteboard = whiteboard # Shared storage location |
|---|
| 36 | self.mystate = mystate # link to MyState object |
|---|
| 37 | self.log = logging.getLogger(self.type+"/"+self.group) |
|---|
| 38 | self.pidkill = pidkill |
|---|
| 39 | |
|---|
| 40 | def setVar(self, key, val): |
|---|
| 41 | """ Always set local (ME.key) """ |
|---|
| 42 | self.storage[self.name+"."+key] = val |
|---|
| 43 | |
|---|
| 44 | def getVar(self, key): |
|---|
| 45 | """ Attempt ME.key first, then default to key """ |
|---|
| 46 | return self.storage.get(self.name+"."+key, self.storage.get(key)) |
|---|
| 47 | |
|---|
| 48 | def getGroupVar(self, key): |
|---|
| 49 | """ Just look up key in storage """ |
|---|
| 50 | return self.storage.get(key) |
|---|
| 51 | |
|---|
| 52 | def getNodeVar(self, key): |
|---|
| 53 | """ Just look up ME.key in storage """ |
|---|
| 54 | return self.storage.get(self.name+"."+key) |
|---|
| 55 | |
|---|
| 56 | def getOtherNodeVar(self, node, key): |
|---|
| 57 | """ Look up node.key then default to key """ |
|---|
| 58 | return self.storage.get(node+"."+key, self.storage.get(key)) |
|---|
| 59 | |
|---|
| 60 | |
|---|
| 61 | def myNodeMemberOf(self, arrayname): |
|---|
| 62 | """ Check if my node name is in the particular list """ |
|---|
| 63 | array = self.getVar(arrayname) |
|---|
| 64 | compare = self.name.lower() |
|---|
| 65 | |
|---|
| 66 | for node in array: |
|---|
| 67 | if (node == '*') or (node.lower() == compare): |
|---|
| 68 | return True |
|---|
| 69 | return False |
|---|
| 70 | |
|---|
| 71 | |
|---|
| 72 | def myIPMemberOf(self, arrayname): |
|---|
| 73 | """ Check if one of my ip addrs is in the particular list and return them """ |
|---|
| 74 | array = self.getVar(arrayname) |
|---|
| 75 | iplist = self.mystate.GetIPList() |
|---|
| 76 | collect = {} |
|---|
| 77 | |
|---|
| 78 | for aip in array: |
|---|
| 79 | for myip in iplist: |
|---|
| 80 | if (aip == myip): |
|---|
| 81 | collect[myip] = 1 |
|---|
| 82 | return collect.keys() |
|---|
| 83 | |
|---|
| 84 | |
|---|
| 85 | def addVarType(self, key, type, default): |
|---|
| 86 | self.vartypes[key] = type |
|---|
| 87 | self.storage[key] = default |
|---|
| 88 | |
|---|
| 89 | def processArgs(self, args): |
|---|
| 90 | """ Insert all of the variables, verifying/converting each """ |
|---|
| 91 | for k in args: |
|---|
| 92 | singlekey = k[k.rfind('.')+1:] |
|---|
| 93 | if (singlekey not in self.vartypes): |
|---|
| 94 | self.log.error("Ignoring unknown key (%s)" % (k)) |
|---|
| 95 | continue |
|---|
| 96 | |
|---|
| 97 | # Find out what type of variable we expect this to be |
|---|
| 98 | type = self.vartypes.get(singlekey) |
|---|
| 99 | val = args[k] |
|---|
| 100 | |
|---|
| 101 | try: |
|---|
| 102 | if (type == 'array'): |
|---|
| 103 | self.storage[k] = re.split('[\s,]+', val) |
|---|
| 104 | elif (type == 'float'): |
|---|
| 105 | self.storage[k] = float(val) |
|---|
| 106 | elif (type == 'int'): |
|---|
| 107 | self.storage[k] = int(val) |
|---|
| 108 | elif (type == 'bool'): |
|---|
| 109 | self.storage[k] = bool(int(val) == 1) |
|---|
| 110 | elif (type == 'proto'): |
|---|
| 111 | if re.match('[a-zA-Z]+', val): |
|---|
| 112 | self.storage[k] = socket.getprotobyname(val) |
|---|
| 113 | else: |
|---|
| 114 | self.storage[k] = int(val) |
|---|
| 115 | elif (type == 'cidr'): |
|---|
| 116 | self.storage[k] = seer.CIDR(inputstr=val) |
|---|
| 117 | else: |
|---|
| 118 | self.storage[k] = val |
|---|
| 119 | |
|---|
| 120 | except Exception: |
|---|
| 121 | self.log.error("Can't convert value (%s) to %s " % (val, type), exc_info=1) |
|---|
| 122 | |
|---|
| 123 | else: |
|---|
| 124 | # If we accepted the value, put its string in the statedump hash |
|---|
| 125 | self.statedump[k] = val |
|---|
| 126 | |
|---|
| 127 | self.configDone() |
|---|
| 128 | |
|---|
| 129 | |
|---|
| 130 | def configDone(self): |
|---|
| 131 | """ Called when all variables in an event have been processed """ |
|---|
| 132 | pass |
|---|
| 133 | |
|---|
| 134 | |
|---|
| 135 | def launchProgram(self): |
|---|
| 136 | """ Base function that is called with basic agent receives a START """ |
|---|
| 137 | pass |
|---|
| 138 | |
|---|
| 139 | |
|---|
| 140 | def handleSTART(self): |
|---|
| 141 | if len(self.pids) > 0: |
|---|
| 142 | self.log.info("Already running, not restarting") |
|---|
| 143 | return |
|---|
| 144 | |
|---|
| 145 | if (self.myNodeMemberOf('NODES')): |
|---|
| 146 | self.launchProgram() |
|---|
| 147 | |
|---|
| 148 | |
|---|
| 149 | def handleSTOP(self): |
|---|
| 150 | for pid in self.pids: |
|---|
| 151 | self.pidkill.kill(pid, signal.SIGTERM) |
|---|
| 152 | self.pids = [] |
|---|
| 153 | |
|---|
| 154 | |
|---|
| 155 | def __repr__(self): |
|---|
| 156 | return "Agent(%s, %s, %s)\n%s" % (self.type, self.group, self.name, pprint.pformat(self.storage, 6)) |
|---|
| 157 | |
|---|
| 158 | |
|---|
| 159 | |
|---|
| 160 | """ |
|---|
| 161 | Trafgen extends the regular agent and provides some callbacks for starting a server and clients |
|---|
| 162 | """ |
|---|
| 163 | class TrafgenAgent(Agent): |
|---|
| 164 | |
|---|
| 165 | def __init__(self): |
|---|
| 166 | Agent.__init__(self) |
|---|
| 167 | self.runningserver = 0 |
|---|
| 168 | self.addVarType('servers', 'array', []) |
|---|
| 169 | self.addVarType('think', 'string', '1') |
|---|
| 170 | self.addVarType('sizes', 'string', '1') |
|---|
| 171 | self.addVarType('autoquit', 'int', None) |
|---|
| 172 | |
|---|
| 173 | |
|---|
| 174 | def handleSTART(self): |
|---|
| 175 | if len(self.pids) > 0: |
|---|
| 176 | self.log.info("Already running, not restarting") |
|---|
| 177 | return |
|---|
| 178 | |
|---|
| 179 | if (self.myNodeMemberOf('servers')): |
|---|
| 180 | self.runningserver = 1 |
|---|
| 181 | self.serverExec() |
|---|
| 182 | |
|---|
| 183 | if (self.myNodeMemberOf('NODES')): |
|---|
| 184 | self.launchTrafficController() |
|---|
| 185 | |
|---|
| 186 | |
|---|
| 187 | def handleSTOP(self): |
|---|
| 188 | if (self.runningserver): |
|---|
| 189 | self.serverStop() |
|---|
| 190 | self.runningserver = 0 |
|---|
| 191 | |
|---|
| 192 | for pid in self.pids: |
|---|
| 193 | self.pidkill.killpg(pid, signal.SIGTERM) |
|---|
| 194 | self.pids = [] |
|---|
| 195 | |
|---|
| 196 | |
|---|
| 197 | def serverExec(self): |
|---|
| 198 | """ To be overriden by subclass, called when the local server should be started """ |
|---|
| 199 | pass |
|---|
| 200 | |
|---|
| 201 | def serverStop(self): |
|---|
| 202 | """ To be overriden by subclass, called when the local server should be stopped """ |
|---|
| 203 | pass |
|---|
| 204 | |
|---|
| 205 | def clientInit(self): |
|---|
| 206 | """ To be overriden by subclass, called after fork but only once before loop starts """ |
|---|
| 207 | pass |
|---|
| 208 | |
|---|
| 209 | def clientExec(self, src, dst, size): |
|---|
| 210 | """ To be overriden by subclass, this will exec or perform the necessary trafgen process multiple times """ |
|---|
| 211 | pass |
|---|
| 212 | |
|---|
| 213 | |
|---|
| 214 | def launchTrafficController(self): |
|---|
| 215 | pid = os.fork() |
|---|
| 216 | if (pid > 0): |
|---|
| 217 | self.pids.append(pid) |
|---|
| 218 | return |
|---|
| 219 | |
|---|
| 220 | os.setsid() |
|---|
| 221 | signal.signal(signal.SIGCHLD, signal.SIG_IGN) |
|---|
| 222 | |
|---|
| 223 | spool = seer.AddressPool(self.whiteboard.get('FAKE.'+self.name)) |
|---|
| 224 | dpool = seer.AddressPool() |
|---|
| 225 | serv = self.getGroupVar("servers") |
|---|
| 226 | for s in serv: |
|---|
| 227 | dpool.Add(s, self.whiteboard.get('FAKE.'+s)) |
|---|
| 228 | |
|---|
| 229 | think = self.getVar("think") |
|---|
| 230 | sizes = self.getVar("sizes") |
|---|
| 231 | autoquit = self.getVar("autoquit") |
|---|
| 232 | |
|---|
| 233 | logfile = self.type+"."+self.group |
|---|
| 234 | starttime = time.time() |
|---|
| 235 | |
|---|
| 236 | # Redirect stdout for exec'd applications |
|---|
| 237 | try: |
|---|
| 238 | self.log.info("starting launcher process at %d - logging output to %s\n" % (starttime, logfile)) |
|---|
| 239 | fp = open("/local/logs/%s" % (logfile), 'a', 1) |
|---|
| 240 | sys.stdout = fp # For python prints from here on (not logging) |
|---|
| 241 | sys.stderr = fp |
|---|
| 242 | os.dup2(fp.fileno(), 1) # For anything we exec from here on |
|---|
| 243 | os.dup2(fp.fileno(), 2) |
|---|
| 244 | except Exception: |
|---|
| 245 | self.log.error("Failed to redirect output", exc_info=1); |
|---|
| 246 | |
|---|
| 247 | # Call overridden init method |
|---|
| 248 | self.clientInit() |
|---|
| 249 | |
|---|
| 250 | # Loop based on wait times |
|---|
| 251 | try: |
|---|
| 252 | while (True): |
|---|
| 253 | elapsed = time.time() - starttime; |
|---|
| 254 | if ((autoquit > 0) and (elapsed > autoquit)): |
|---|
| 255 | os.killpg(0, signal.SIGTERM) # This should kill me and my children as I forked/setsid() |
|---|
| 256 | return |
|---|
| 257 | |
|---|
| 258 | self.clientOneLoop(spool, dpool, think, sizes) |
|---|
| 259 | |
|---|
| 260 | except Exception,e: |
|---|
| 261 | self.log.error("error in client process", exc_info=1); |
|---|
| 262 | |
|---|
| 263 | |
|---|
| 264 | def clientOneLoop(self, spool, dpool, think, sizes): |
|---|
| 265 | """ This provides an internal point to override just the loop behaviour """ |
|---|
| 266 | src = spool.Random() |
|---|
| 267 | dst = dpool.Random() |
|---|
| 268 | size = int(eval(sizes)) |
|---|
| 269 | |
|---|
| 270 | # Check memory here |
|---|
| 271 | self.clientExec(src, dst, size) # Call overridden method |
|---|
| 272 | |
|---|
| 273 | waitfor = eval(think) |
|---|
| 274 | time.sleep(waitfor) |
|---|