State server v Pythonu
05.09.2011
Jedná se software, který umí centralizovaně hlídat provádění naplánovaných ůloh a oznamovat jejich výsledek. State server vznikl jako variace na SMS server, kdy je využito stejného pythoního TCP serveru, pouze se mění logika zpracování zpráv zaslaných klienty.
Intro
Mějme nějaký Windows server a na něm naplánované úlohy. Čas od času se v nějaké úloze vyskytne chyba to jak očekávaná (chybné provedení nějakého příkazu) nebo neočekávaná (baťák a Windows si ošetřování vyjímek nehraje) a úloha se hryzne. Občas se taky stane, že se úloha nespustí vůbec - např. při změně hesla uživatele, pod kterým se úloha má spustit. V každém případě to skončí nějakou návratovou hodnotou v "Plánovači úloh", na kterou člověk přijde až v momentě, když čte záplavu emailů od rozladěných uživatelů, kterým zase "něco" nejde :-) Čili by se hodilo něco, co by výše uvedené stavy hlídalo.
Myšlenka je následující:
- Naplánová úloha se obvykle spustí v nějaký čas T a má trvat předpokládanou dobu X.
- Během provádění úlohy mohou nastat následující stavy:
- OK - úloha skončila správně a vše je v pořádku.
- FAIL - došlo k očekávané chybě. Tedy nikoliv k chybě úlohy samotné, ale například k chybě zálohy z důvodu nedostupného disku apod. Úloha se tedy spustila, zjistila, že je něco špatně a skončila.
- TIMEOUT - došlo k překročení doby X. Úloha se buď kousla a nebo se vůbec nespustila.
Funkce
State server řeší evidenci naplánovaných úloh - časy spuštění a maximální dobu jejich běhu. Od běžících úloh sbírá výsledky OK nebo FAIL a u těch, které neodpoví, generuje TIMEOUT. U všech těchto událostí lze nastavit alertaci na email nebo SMS (pomocí SMS serveru). U události OK to zpravidla nemá smysl, ale u FAIL nebo TIMEOUT už ano.
- tSTART - čas, kdy si StateServer myslí, že bude úloha spuštěna - zahájí se běh "virtuálního" procesu. U každé definice úlohy musí být určeno do kdy má skončit. Do databáze běžícího procesu se uloží tSTART a tTIMEOUT, kde tTIMEOUT = tSTART + předpokládaná doba běhu.
- tRUN - čas, kdy bude úloha skutečně spuštěna. Reálně je tRUN=tSTART
- tFAIL - čas, kdy úloha ohlásí StateServeru, že došlo k nějaké chybě. StateServer si tento čas zapíše do historie provedených úloh a ukončí proces.
- tOK - čas, kdy úloha ohlásí StateServeru, že vše dopadlo dobře. StateServer si tento čas zapíše do historie provedených úloh a ukončí proces.
- tTIMEOUT - čas, do kdy musela úloha StateServeru ohlásit svůj výsledek. Pokud se tak nestalo, StateServer vygeneruje událost TIMEOUT, tento čas zapíše do historie provedených úloh a ukončí proces.
- tOSKILL - čas, kdy operační systém "odstřelí" případně zatuhnutou úlohu. To však již StateServer nezajímá, neboť pro něj skončila v čase tTIMEOUT.
V textu jsou zmíněny dva termíny:
- úloha (task) - myšleno to, co se ve Windows nastavuje ve správci naplánovaných úloh. Tedy příkaz k tomu, že se nějaký spustitelný soubor má spustit v tolik a tolik - a má dělat to a to.
- proces - děj ve StateServeru, který má svůj začátek, konec a může nabývat různých stavů.
Dokumentace
Tabulky v MySQL
Databáze: proces
- defp - tabulka pro definici procesů
- hist - tabulka ukončených procesů
- msg - tabulka přijatých zpráv (OK a FAIL) od naplánovaných úloh
- run - tabulka aktuálně běžících procesů
Zobrazit/skrýt create table SQL
CREATE TABLE IF NOT EXISTS `defp` (
`idd` int(11) NOT NULL AUTO_INCREMENT,
`dnazev` varchar(100) COLLATE utf8_czech_ci NOT NULL,
`dcron` varchar(100) COLLATE utf8_czech_ci NOT NULL,
`dtrvani` int(11) NOT NULL,
`d_on_fail` varchar(60) COLLATE utf8_czech_ci NOT NULL,
`d_on_success` varchar(60) COLLATE utf8_czech_ci NOT NULL,
`d_on_timeout` varchar(60) COLLATE utf8_czech_ci NOT NULL,
`sms_on_fail` varchar(255) COLLATE utf8_czech_ci NOT NULL,
`sms_on_success` varchar(255) COLLATE utf8_czech_ci NOT NULL,
`sms_on_timeout` varchar(255) COLLATE utf8_czech_ci NOT NULL,
`derror` smallint(6) NOT NULL DEFAULT '0',
`dbarva` varchar(10) COLLATE utf8_czech_ci NOT NULL,
`dpozn` text COLLATE utf8_czech_ci NOT NULL,
PRIMARY KEY (`idd`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_czech_ci ;
CREATE TABLE IF NOT EXISTS `hist` (
`idh` int(11) NOT NULL AUTO_INCREMENT,
`idd` int(11) NOT NULL,
`zacatek` datetime NOT NULL,
`ukonceni` datetime NOT NULL,
`typ` varchar(15) COLLATE utf8_czech_ci NOT NULL,
`pozn` varchar(100) COLLATE utf8_czech_ci NOT NULL DEFAULT '',
PRIMARY KEY (`idh`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_czech_ci ;
CREATE TABLE IF NOT EXISTS `msg` (
`idm` int(11) NOT NULL AUTO_INCREMENT,
`msg` varchar(100) COLLATE utf8_czech_ci NOT NULL,
`ulozeno` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`idm`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_czech_ci ;
CREATE TABLE IF NOT EXISTS `run` (
`idr` int(11) NOT NULL AUTO_INCREMENT,
`idd` int(11) NOT NULL,
`start` datetime NOT NULL,
`stop` datetime NOT NULL,
`ulozeno` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`idr`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_czech_ci ;
TCP Server
Principem činnosti je otevření portu 2020 pro konzolovou TCP komunikaci (telnet). Na tomto portu očekává připojení od klientů a předání stavu běžícímu procesu. Pokud klient zašle požadavek ve správném formátu, server uloží jeho obsah do tabulky msg.
- Umístění: /usr/local/procesy/vss
- Spouštění: /usr/local/procesy/start_stop
- PID file: /var/run/vss.pid
- Logy: /var/log/vss/vss.log
Zobrazit/skrýt soubor /usr/local/procesy/vss
#!/usr/bin/env python
# -*- coding: utf-8 -*-
####################################################################################################
# PROCESY: STATE SERVER #
# Skript na prijem stavu o probihajicich procesech na serverech. Musi bezet v nohupu na pozadi #
# Jan Matuska, walda@starhill.org #
####################################################################################################
logfile = "/var/log/vss/vss.log"
pidfile = "/var/run/vss.pid"
import sys, socket, select, time, os
import MySQLdb
import logger, daemon
# Seznam socketu prichozich spojeni
open_sockets = []
# Slovnik bufferu na prichozi zpravy
messages = dict()
# Instance tridy na logovani
log = logger.logger(logfile)
# Instance tridy daemon a nastaveni parametru
proc = daemon.daemon(pidfile)
proc.logcl = log
# Funkce na ulozeni zpravy do DB -------------------------------------------------------------------
def todb(zprava):
# pripojeni k MySQL
try:
db=MySQLdb.connect(host="localhost",user="uzivatel",passwd="heslo",db="proces");
except Exception:
log.log("Cannot to connnect database")
return
# provedeni vlozeni
try:
cursor = db.cursor()
cursor.execute("INSERT INTO msg(msg) VALUES('%s')" % zprava);
cursor.close()
except Exception:
log.log("Error in SQL query")
db.close()
# --------------------------------------------------------------------------------------------------
# Funkce na zpracovani bufferu ---------------------------------------------------------------------
def ceb(msgid, socket):
if messages[msgid].find('\n') == -1:
return False
msg = messages[msgid][0:messages[msgid].rfind('\n')]
for value in msg.split('\n'):
if value.find('#')==0:
socket.send("Command(s) stored\n\r");
for prikaz in value.split('#'):
if len(prikaz)>0:
log.log("Buffer "+str(msgid)+" stored ["+prikaz+"]")
todb(prikaz)
else:
socket.send("Syntax error\n\r");
return True
# --------------------------------------------------------------------------------------------------
# Pokus o spusteni serveru
log.log("Trying to start server")
# Test, zda-li process jiz bezi nebo ne
if proc.IsProcessRunning():
sys.exit()
# Deamonizujeme proces
proc.DaemonizeProcess()
# Regulerne spustime (pidfile)
proc.RunProcess()
# Naslouchajici socket
listening_socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
# Binding
binded=False
while binded==False:
try:
listening_socket.bind( ("", 2020) )
binded = True
except Exception:
log.log("Unable to bind. Waiting 30 seconds.")
time.sleep(30)
# Pocet cekajicich spojeni v TCP stacku na obsluhu
listening_socket.listen(5)
# Serverova smycka
log.log("Server running. PID %s" % os.getpid())
while True:
# Cekame na prichozi paket nebo udalost
try:
rlist, wlist, xlist = select.select( [listening_socket] + open_sockets, [], [] )
except Exception:
#log.log("Stopping server")
listening_socket.close()
proc.ExitProcess()
for i in rlist:
if i is listening_socket:
try:
new_socket, addr = listening_socket.accept()
open_sockets.append(new_socket)
ip, port = addr;
msgid = open_sockets.index(new_socket);
messages[msgid]="";
new_socket.send("Vema State Server v.1.0\r\n")
log.log("New connection from "+ip+":"+str(port)+", created buffer "+str(msgid))
except Exception:
try:
msgid = open_sockets.index(new_socket)
del messages[msgid]
except:
None
open_sockets.remove(new_socket);
log.log("Broken connection from "+ip+":"+str(port)+", buffer "+str(msgid)+" created and removed")
new_socket.close()
else:
try:
data, adresa = i.recvfrom(1024)
addr = i.getpeername();
ip, port = addr;
msgid = open_sockets.index(i);
if data == "":
del messages[msgid];
open_sockets.remove(i)
log.log("Connection from "+ip+":"+str(port)+" terminated by host, buffer "+str(msgid)+" removed")
else:
data = data.replace('\r','\n').replace('\n\n','\n');
messages[msgid] = messages[msgid] + data;
log.log("Data received from "+ip+" added to buffer "+str(msgid)+": "+repr(data))
if ceb(msgid, i) is True:
del messages[msgid];
open_sockets.remove(i)
#i.send("\n\rHello goodbye\n\r");
i.close()
log.log("Connection from "+ip+":"+str(port)+" terminated by server, buffer "+str(msgid)+" removed")
except Exception:
try:
del messages[open_sockets.index(i)]
except:
None
open_sockets.remove(i);
i.close()
log.log("Connection from "+ip+":"+str(port)+" terminated by host, buffer "+str(msgid)+" removed")
Skript /usr/local/procesy/vss potřebuje ke svému běhu třídu daemon. Obsahuje metody na daemononizaci procesu, tedy to, že se tcp server spustí na pozadí a zůstane běžet.
Zobrazit/skrýt třídu /usr/local/vss/daemon.py
#!/usr/bin/env python
########################################################################################################################
# D A E M O N C L A S S #
# Tato trida je urcena pro daemonizaci a spravu procesu pomoci PID souboru (lze vyuzit nezavisle) #
########################################################################################################################
import os, sys
from signal import *
class daemon:
# aktualni cislo procesu
pid = None
# soubor s pid beziciho procesu
pidfile = "/tmp/none"
# instance na tridu s logovanim
logcl = None
# ------------------------------------------------------------------------------------------------------------------
# Konstruktor
# Vstup: pidfile
# ------------------------------------------------------------------------------------------------------------------
def __init__(self, pidfile):
self.pid = os.getpid()
self.pidfile = pidfile
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# Logovani na terminal a do tridy pro logovani
# Vstup: retezec logu
# ------------------------------------------------------------------------------------------------------------------
def __log(self, text):
print text
try:
self.logcl.log(text)
except Exception:
None
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# Test, zda-li proces bezi nebo ne
# Vystup: true/false podle toho, zda-li proces bezi nebo ne
# sys.exit() pokud se vyskytne problem s pidfile (prava)
# ------------------------------------------------------------------------------------------------------------------
def IsProcessRunning(self):
if os.path.exists(self.pidfile):
try:
file = open(self.pidfile, "r")
lines = file.readlines()
file.close()
if os.path.exists("/proc/%s" % str(lines[0])):
self.__log("Process is running, try kill him or delete pidfile. PID=%s" % lines[0])
return True
else:
os.remove(self.pidfile)
except Exception:
self.__log("Cannot open existing pidfile. Check permissions.")
sys.exit()
return False
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# Zalozeni PID file s aktualnim pid a nastaveni reakce na TERM signal
# Vystup: sys.exit() pokud se vyskytne problem s pidfile (prava)
# ------------------------------------------------------------------------------------------------------------------
def RunProcess(self):
try:
file = open(self.pidfile,'w')
file.write(str(self.pid))
file.close()
except Exception:
self.__log("Cannot create pidfile. Check permissions.")
sys.exit()
signal(SIGTERM, self.__eventterm)
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# Obsluha po prichodu TERM signalu
# Vystup: sys.exit() - koreknti ukonceni
# ------------------------------------------------------------------------------------------------------------------
def __eventterm(self, signum, frame):
self.__log("Stopping server SIGTERM")
self.Exit()
sys.exit()
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# Korektni ukonceni
# Vystup: sys.exit()
# ------------------------------------------------------------------------------------------------------------------
def ExitProcess(self):
try:
os.remove(self.pidfile)
except Exception:
self.__log("Cannot delete pidfile. Check permissions.")
sys.exit()
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# Daemonizace procesu
# Vystup: nic
# ------------------------------------------------------------------------------------------------------------------
def DaemonizeProcess(self):
# Udelame fork, tedy klon procesu - rodic (pid=cislo potomka) a prvni potomek (pid=0)
try:
pid = os.fork()
except OSError, e:
raise Exception, "%s [%d]" % (e.strerror, e.errno)
if (pid == 0): # Pokud je proces prvni potomek
os.setsid() # Odpojime od terminalu
try:
pid = os.fork() # Vytvorime druheho potomka z prvniho potomka
except OSError, e:
raise Exception, "%s [%d]" % (e.strerror, e.errno)
if (pid == 0): # Pokud je proces druhy potomek
os.chdir("/") # Nastaveni pracovniho adresare
os.umask(0) # Nastaveni masky
else:
os._exit(0) # Ukonceni rodice druheho potomka (=prvniho potomka)
else:
os._exit(0) # Ukonceni rodice prvniho potomka
# Nyni by mel zustat bezet druhy potomek procesu a to je to, co chceme
# Preventivni uzavreni vsech filedektriptoru - preventivne
import resource
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if (maxfd == resource.RLIM_INFINITY):
maxfd = 1024
# Iterate through and close all file descriptors.
for fd in range(0, maxfd):
try:
os.close(fd)
except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass
# Presmerovani vstupu a vystupu
if (hasattr(os, "devnull")):
REDIRECT_TO = os.devnull
else:
REDIRECT_TO = "/dev/null"
os.open(REDIRECT_TO, os.O_RDWR) # standard input (0)
os.dup2(0, 1) # standard output (1)
os.dup2(0, 2) # standard error (2)
# Hotovo"
self.pid = os.getpid()
# ------------------------------------------------------------------------------------------------------------------
Skript /usr/local/procesy/vss potřebuje ke svému běhu třídu logger. Obsahuje jednoduché metody logování do souboru.
Zobrazit/skrýt třídu /usr/local/smss/logger.py
#!/usr/bin/env python
########################################################################################################################
# L O G G E R C L A S S #
# Tato trida je urcena pro realizaci logovani na terminal a do souboru #
########################################################################################################################
from time import gmtime, strftime, localtime
class logger:
# logovy soubor
logfile = "/tmp/nonelog"
# logovani na terminal
log_to_console = False
# ------------------------------------------------------------------------------------------------------------------
# Konstruktor
# Vstup: logfile
# ------------------------------------------------------------------------------------------------------------------
def __init__(self, logfile, truncatefile=False):
self.logfile = logfile
if truncatefile==True:
FW = open(self.logfile,'w').close()
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# Metoda na logovani
# Vstup: text logu
# Vystup: nic
# ------------------------------------------------------------------------------------------------------------------
def log(self, text):
if self.log_to_console:
print strftime("%Y.%m.%d %H:%M:%S ", localtime()) + text
try:
FW = open(self.logfile,'a')
FW.write(strftime("%Y.%m.%d %H:%M:%S ", localtime()) + str(text) + '\n');
FW.close()
except:
print "Cannot open logfile"
# ------------------------------------------------------------------------------------------------------------------
TCP se dá spustit nebo restartovat následujícím způsobem.
Zobrazit/skrýt soubor /usr/local/smss/stop_start
#!/bin/bash
# Zastavi bezici proces serveru, resp. posle SIGTERM
kill `cat /var/run/vss.pid`
# Spusti server
/usr/local/procesy/vss
Pokud Python zakřičí, že nemá nějaký modul, stačí ho doinstalovat z debianích repozitářů.
PID file je implementován pro snadnou identifikaci procesu, neboť TCP server je po spuštění daemonizován. Obsahem PID file je číslo procesu v OS. V případě problémů se spuštěním je třeba PID soubor smazat.
Formát zprávy: #id=IDPROCESU;state=STAV;name=NAZEVULOHY
- IDPROCESU - atribut idd z tabulky defp.
- STAV - výsledek úlohy OK nebo FAIL.
- NAZEVULOHY - libovolný řetezěc identifikující úlohu
Př. #id=25;state=OK;name=JajaBackupData
Skript check_events
Spuštění tohoto skriptu způsobí vyhodnocení přijatých zpráv z tabulky msg v kombinaci s běžícími procesy uvedenými v tabulce run. Odešle SMS (přes SMS server), emaily (přes SMTP server) a nakonec zapíše informaci o ukončeném procesu do tabulky hist.
- Umístění: /usr/local/procesy/check_events
- Logy: /var/log/vss/check_events.log
- Spouštění: Nutno cronem každou minutu, v /etc/crontab
0-59/1 * * * * root /usr/local/procesy/check_events
Zobrazit/skrýt soubor /usr/local/procesy/check_events
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##########################################################################################################
# PROCESY: CHECK EVENT #
# Skript detekuje udalosti OK, FAIL a TIMEOUT na bezicich procesech #
# Pouziti: check_events (spousteni kazdou minutu z CRONu #
##########################################################################################################
logfile = "/var/log/vss/check_events.log"
smssclient = "/usr/local/smss/smss_client"
import sys
import os
import re
import MySQLdb
import smtplib
import logger
from time import gmtime, strftime, localtime
# Povoleni/zakazani emailove notifikace
NotifyEnable = True
# Instance tridy na logovani
log = logger.logger(logfile)
log.log_to_console = True
# --------------------------------------------------------------------------------------------------------
# Funkce na odeslani notifikace
# --------------------------------------------------------------------------------------------------------
def notify(email, name, stav, pstart , aderror):
now = strftime("%Y-%m-%d %H:%M:%S ", localtime())
server = smtplib.SMTP('smtp.domena.cz')
if aderror==1:
fromaddr='error@domena.cz'
else:
fromaddr='warning@domena.cz'
subject = "Proces '%s': %s" % (name, stav)
zprava = "\
From: %s\r\nTo: %s\r\nSubject: %s\r\nX-Mailer: ProcessCheckEvent-Mail\r\n\r\n\
Proces '%s' ohlasil stav %s.\n\n\
Cas spusteni procesu: %s\n\
Cas nastale udalosti: %s\n\n\
Tato zprava byla vygenerovana skriptem GLUM:\usr\local\procesy\check_evenets systemu Procesy.\n\
Nastaveni lze provest na adrese http://net.vema.cz/" % (fromaddr,email,subject,name,stav,pstart,now)
for mail in email.split(';'):
server.sendmail(fromaddr, mail, zprava)
None
server.quit()
# --------------------------------------------------------------------------------------------------------
# --------------------------------------------------------------------------------------------------------
# Funkce na odeslani notifikace pres SMS
# --------------------------------------------------------------------------------------------------------
def smsnotify(phones, name, stav, pstart , aderror):
now = strftime("%Y-%m-%d %H:%M:%S ", localtime())
for cislo in phones.split(';'):
if re.match("^[0-9]{12}$", cislo):
command = "#%s;Proces %s hlasi %s. Spusteni procesu %s. Udalost nastala %s" % (cislo,name,stav,pstart,now)
os.system("%s localhost \"%s\" > /dev/null" % (smssclient, command))
# --------------------------------------------------------------------------------------------------------
# pripojeni k MySQL
try:
db=MySQLdb.connect(host="localhost",user="uzivatel",passwd="heslo",db="proces");
except Exception:
log("ERROR: Nelze se pripojit k MySQL")
sys.exit()
# --- BEGIN: detekce udalosti OK|FAIL ----
# SQL dotaz na seznam vsech prijatych zprav
try:
cursor = db.cursor()
cursor.execute("SELECT msg, idm FROM msg ORDER BY ulozeno ASC")
result = cursor.fetchall()
except Exception:
log.log("ERROR: Chyba v SQL dotazu 1 - fatalita - KONEC")
sys.exit()
# prochazeni seznamem prijatych zprav
for record in result:
ProcessThis = False
# test korektnosti zpravy
try:
(msg, idm) = record;
idd = msg.split(';')[0].split('=')[1]
state = msg.split(';')[1].split('=')[1].upper()
if not re.match("^(FAIL|OK)$", state):
raise
if not idd.isdigit():
raise
ProcessThis = True
except Exception:
log.log("ERROR: Spatny format zpravy [" + msg + "] -> mazu")
# zpracovani zpravy
if ProcessThis:
try:
cursor2 = db.cursor()
cursor3 = db.cursor()
cursor2.execute("SELECT run.idr, dnazev, d_on_fail, d_on_success, sms_on_fail, sms_on_success, derror, start\
FROM defp, run\
WHERE run.idd=defp.idd AND\
run.idd=%s\
ORDER BY run.start ASC" % idd)
if cursor2.rowcount >= 1:
(idr, dnazev, d_on_fail, d_on_success, sms_on_fail, sms_on_success, derror, start) = cursor2.fetchall()[0]
# vlozeni to tabulky s historii
cursor3.execute("INSERT INTO hist(idd, zacatek, ukonceni, typ, pozn)\
VALUES("+str(idd)+", '"+str(start)+"', now(), '"+state+"', '"+msg+"')")
# smazani procesu z tabulky bezicich procesu
cursor3.execute("DELETE FROM run WHERE idr="+str(idr))
# emailova notifikace, paklize je kam odeslat
if (len(d_on_success)!=0) & (NotifyEnable==True) & (re.match("^(OK)$", state) != None):
notify(d_on_success, dnazev, 'SUCCESS :-)', str(start), derror)
# emailova notifikace, paklize je kam odeslat
if (len(d_on_success)!=0) & (NotifyEnable==True) & (re.match("^(FAIL)$", state) != None):
notify(d_on_fail, dnazev, 'FAIL :-(', str(start), derror)
# sms notifikace, paklize je kam odeslat
if (len(sms_on_success)!=0) & (NotifyEnable==True) & (re.match("^(OK)$", state) != None):
smsnotify(sms_on_success, dnazev, 'SUCCESS', str(start), derror)
# sms notifikace, paklize je kam odeslat
if (len(sms_on_success)!=0) & (NotifyEnable==True) & (re.match("^(FAIL)$", state) != None):
smsnotify(sms_on_fail, dnazev, 'FAIL', str(start), derror)
except Exception:
log.log("ERROR: Chyba pri zpracovani zpravy " + str(idm) + " " + msg)
finally:
cursor3.close()
cursor2.close()
# smazani zpravy z tabulky prijatych zprav
try:
cursor2 = db.cursor()
cursor2.execute("DELETE FROM msg WHERE idm="+str(idm))
cursor2.close()
except Exception:
log.log("ERROR: Chyba pri mazani zpravy z tabulky")
# uzavreni kurzoru
cursor.close()
# --- END: detekce udalosti OK|FAIL ----
# --- BEGIN: detekce udalosti TIMEOUT ----
# SQL dotaz na seznam vsech bezicich procesu po timeoutu
try:
cursor = db.cursor()
cursor.execute("SELECT run.idr, run.idd, dnazev, d_on_timeout, sms_on_timeout, derror, start, stop\
FROM defp, run\
WHERE run.idd=defp.idd AND\
run.stop<=now()\
ORDER BY run.stop");
result = cursor.fetchall()
except Exception:
log("ERROR: Chyba v SQL dotazu 2 - fatalita - KONEC")
sys.exit()
# otevreni dalsiho kurzoru pro vnorene dotazy
cursor2 = db.cursor()
# prochazeni seznamu bezich procesu po timeoutu
for record in result:
try:
(idr, idd, dnazev, d_on_timeout, sms_on_timeout, derror, start, stop) = record;
# vlozeni to tabulky s historii
cursor2.execute("INSERT INTO hist(idd, zacatek, ukonceni, typ)\
VALUES("+str(idd)+", '"+str(start)+"', now(), 'TIMEOUT')")
# smazani z tabulky bezicich procesu
cursor2.execute("DELETE FROM run WHERE idr="+str(idr))
# emailova notifikace, paklize je kam odeslat
if (len(d_on_timeout)!=0) & (NotifyEnable==True):
notify(d_on_timeout, dnazev, 'TIMEOUT', str(start), derror)
# sms notifikace, paklize je kam odeslat
if (len(sms_on_timeout)!=0) & (NotifyEnable==True):
smsnotify(sms_on_timeout, dnazev, 'TIMEOUT', str(start), derror)
except Exception:
log("ERROR: Chyba pri zpracovani zaznamu 2")
# uzavreni kurzoru
cursor.close()
cursor2.close()
# --- END: detekce udalosti TIMEOUT ----
# Ukonceni spojeni do DB
db.close()
Skript create_process
Tento skript je spouštěn cronem s parametrem IDPROCESU. Způsobí zápis do tabulky run, tzn. spuštění procesu. Do uvedené tabulky se zapíše čas spuštění a čas ukončení vypočítaný dle definice procesu z tabulky defp. Záznamy do souboru /etc/crontab vytváří skript regenerate_cron spouštěný z www rozhraní, čili není potřeba nic upravovat ručně.
- Umístění: /usr/local/procesy/create_process
- Logy: /var/log/vss/create_process.log
- Spouštění: Cronem dle pravidel vytvořených skriptem regenerate_cron.
Zobrazit/skrýt soubor /usr/local/procesy/create_process
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#########################################################################################################
# PROCESY: PROCESS CREATOR #
# Skript vytvori novy proces z existujicich definic #
# Pouziti: create_proces .... #
#########################################################################################################
# Soubor s logem
logfile = "/var/log/vss/create_process.log"
import sys
import _mysql
import logger
# Instance tridy na logovani
log = logger.logger(logfile)
log.log_to_console = True
# pripojeni k MySQL
try:
db=_mysql.connect(host="localhost",user="uzivatel",passwd="heslo",db="proces");
except Exception:
log.log("ERROR: Nelze se pripojit k MySQL")
sys.exit()
# seznam pro ukladani korektnich idd
idds = []
# mame nejaky argument?
if len(sys.argv)>1:
# kontrola numericnosti a naplneni seznamu idds
for idd in sys.argv[1:]:
try:
i = float(idd)
except ValueError:
None
else:
idds.append(idd)
# prosel alespon jeden idd kontrolou - je v idds neco?
if len(idds)>0:
try:
sql = "INSERT INTO run (idd, start, stop)\
SELECT idd, now(), now() + INTERVAL dtrvani MINUTE FROM defp WHERE idd IN (" + ','.join(idds)+")"
db.query(sql)
except Exception:
log.log("ERROR: Chyba pri provadeni SQL dotazu")
Skript regenerate_cron
Skript vytvoří pravidla pro spouštění procesů v /etc/crontab. Je spouštěn kliknutím ve webovém rozhraní - práva jsou řešena přes sudo - viz man sudoers.
- Umístění: /usr/local/procesy/regenerate_cron
- Logy: /var/log/vss/regenerate_cron.log
- Spouštění: z www rohraní přes sudo (/etc/sudoers)
Zobrazit/skrýt soubor /usr/local/procesy/create_process
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#########################################################################################################
# PROCESY: REGENERATE CRON #
# Skript modifikuje crontab a restartuje cron #
# Pouziti: regenerate_cron (spusti se z web aplikace via sudo) #
#########################################################################################################
# Soubor s logem
logfile = "/var/log/vss/regenerate_cron.log"
# Cesty k souborum
crontab="/etc/crontab"
crontabtmp="/tmp/crontab"
import sys
import os
import MySQLdb
import logger
# Instance tridy na logovani
log = logger.logger(logfile)
log.log_to_console = True
# pripojeni k MySQL
try:
db=MySQLdb.connect(host="localhost",user="uzivatel",passwd="heslo",db="proces");
except Exception:
log.log("ERROR: Nelze se pripojit k MySQL")
sys.exit()
# provedeni SQL dotazu
try:
cursor = db.cursor()
cursor.execute("SELECT REPLACE(dcron,';',' '),\
( SELECT GROUP_CONCAT(s.idd SEPARATOR ' ')\
FROM defp s WHERE s.dcron=defp.dcron ) AS idds\
FROM defp\
GROUP BY defp.dcron");
result = cursor.fetchall()
except Exception:
log.log("ERROR: Chyba v SQL dotazu")
sys.exit()
# vykopirovani crontabu bez predchozich radku procesu
os.system("grep -v create_process " + crontab + " > " + crontabtmp)
os.system("echo '# Vytvareni procesu (create_process) - generovano skriptem regenerate_cron' >> " + crontabtmp)
# pridani novych radku procesu do crontabu
for record in result:
print record[0] + "\troot\t/usr/local/procesy/create_process " + record[1]
line = record[0] + "\troot\t/usr/local/procesy/create_process " + record[1]
os.system("echo '"+line+"' >> " + crontabtmp)
# nakopirovani crontabu na puvdoni misto
os.system("cat " + crontabtmp + " > " + crontab)
# restart cronu
os.system("/etc/init.d/cron restart")
# uzavreni spojeni s DB
cursor.close()
db.close()
Webové rozhraní
Seznam definic procesů. Tabulka defp.
Editace procesu. Tabulka defp.
Běžící procesy. Tabulka run.
Historie procesů. Tabulka hist.
Běžící procesy. Tabulka run. (update 29.10.2014)
Nezpracované zprávy od úloh. Tabulka msgs.
Vytvoření pravidel pro cron skriptem regenerate_cron
Web je možné stáhnout zde. Jedná se o klasiku LAMP (Linux, Apache, MySQL, PHP), napsáno lajdácky, neboť je to web pro autorizované správce a přes SSL.