python实时监控logstash日志代码
实时读取logstash日志,有异常错误keywork即触发报警。
#/usr/bin/envpython3
#-*-coding:utf-8-*-
#__author__=caozhi
#create_time2018-11-12,update_time2018-11-15
#version=1.0
#录像高可用报警
#1读取日志使用游标移动
#2线上业务日志文件会切割,切割后,读取上一个切割的日志
importos
importsys
importjson
importrequests
importtime
importre
cini=conf.ini'
log_file=logstash.log'
defreadconf():
try:
withopen(cini,'r+')asf:
CONF=json.load(f)
except:
CONF={"seek":0,"inode":922817,"last_file":logstash.log"}
writeconf(CONF=CONF)
print('conf.ini配置文件缺失,自动创建一个新的配置文件')
returnCONF
defwriteconf(CONF):
withopen(cini,'w+')ase:
json.dump(CONF,e)
defread_log(log_file,seek):
try:
f=open(log_file,'r')
exceptFileNotFoundError:
f=open(logstash.log','r')
seek=0
print('上一个文件读取失败了,请检查切割的日志文件')
except:
print('日志文件打开错误,退出程序')
sys.exit()
f.seek(seek)
line=f.readline()
new_seek=f.tell()
ifnew_seek==seek:
print('没有追加日志,退出程序')
sys.exit()
whileline:
try:
logstash=json.loads(line)
except:
CONF={"seek":0,"inode":922817,"last_file":"/data/logs/lmrs/logstash.log"}
writeconf(CONF=CONF)
print('json数据加载错误,重新创建一个新的配置文件')
sys.exit()
#if'''re.search(time.strftime("%Y:%H:%M",time.localtime()),logstash.get('log_time'))and'''logstash.get('rtype')==6andlogstash.get('uri')=='/publish'andlogstash.get('event')==0:
iflogstash.get('rtype')==6andlogstash.get('uri')=='/publish'andlogstash.get('event')==0:
value=1
stream=logstash.get('name')
print('{}{}'.format(value,stream))
record(value=value,stream=stream)
else:
value=0
stream=0
line=f.readline()
seek=f.tell()
f.close
returnvalue,stream,seek
defrecord(value,stream):
data=[]
record={}
record['metric']='recording_high_availability_monitor'
record['endpoint']=os.uname()[1]
record['timestamp']=int(time.time())
record['step']=60
record['value']=value
record['counterType']='GAUGE'
record['Tags']='{}={}'.format(int(time.time()),stream)
data.append(record)
ifdata:
print('这是data的json数据')
print(data)
falcon_request=requests.post("http://127.0.0.1:1988/v1/push",data=json.dumps(data))
#falcon_request=requests.post("http://127.0.0.1:1988/v1/push",json=data)
print('json参数请求返回状态码为:'+str(falcon_request.status_code))
print('json参数请求返回为:'+str(falcon_request.text))
if__name__=='__main__':
print()
print('***************************************')
print('本次执行脚本时间:{}'.format(time.strftime("%Y%m%d_%H%M",time.localtime())))
CONF=readconf()
print('first_CONF:{}'.format(CONF))
print('NO1.log_file',log_file)
last_inode=CONF['inode']
inode=os.stat(log_file).st_ino
print('last_inode:{}inode:{}'.format(last_inode,inode))
ifinode==last_inode:
seek=CONF['seek']
next_file=0
else:
log_file=CONF['last_file']+time.strftime("-%Y%m%d_",time.localtime())+str(time.strftime("%H%M",time.localtime()))[:-1]+'0'
next_file=1
seek=CONF['seek']
print('NO2.log_file',log_file)
value,stream,seek=read_log(log_file=log_file,seek=seek)
ifnext_file:
CONF['seek']=0
else:
CONF['seek']=seek
CONF['inode']=os.stat(logstash.log').st_ino
writeconf(CONF=CONF)
print('last_CONF:{}'.format(CONF))
补充知识:logstash调用exec
我就废话不多说了,还是直接看代码吧!
[elk@Vsftplogstash]$catt3.conf
input{
stdin{
}
}
filter{
grok{
match=>["message","(?m)\s*%{TIMESTAMP_ISO8601:time}\s*(?(\S+)).*"]
}
date{
match=>["time","yyyy-MM-ddHH:mm:ss,SSS"]
}
mutate{
add_field=>["type","tailong"]
add_field=>["messager","%{type}-%{message}"]
remove_field=>["message"]
}
}
output{
if([Level]=="ERROR"or[messager]=~"Exception")and[messager]!~"温金服务未连接"and[messager]!~"调用温金代理系统接口错误"and[messager]!~"BusinessException"{
exec{
command=>"/bin/smail.pl\"%{messager}\"\"%{type}\""
}
}
stdout{
codec=>rubydebug
}
}
Vsftp:/root#cat/bin/smail.pl
#!/usr/bin/perl
useNet::SMTP;
useHTTP::Dateqw(time2isostr2timetime2isotime2isoz);
useData::Dumper;
useGetopt::Std;
usevarsqw($opt_d);
getopts('d:');
#mail_usershouldbeyour_mail@163.com
$message="@ARGV";
$env="$opt_d";
subsend_mail{
my$CurrTime=time2iso(time());
my$to_address=shift;
my$mail_user='zhao.yangjian@163.com';
my$mail_pwd='xx';
my$mail_server='smtp.163.com';
my$from="From:$mail_user\n";
my$subject="Subject:zjcapinfo\n";
my$info="$CurrTime--$message";
my$message=<new($mail_server);
$smtp->auth($mail_user,$mail_pwd)||die"AuthError!$!";
$smtp->mail($mail_user);
$smtp->to($to_address);
$smtp->data();#beginthedata
$smtp->datasend($from);#setuser
$smtp->datasend($subject);#setsubject
$smtp->datasend("\n\n");
$smtp->datasend("$message\n");#setcontent
$smtp->dataend();
$smtp->quit();
};
send_mail('zhao.yangjian@163.com');
2017-01-1210:19:19,888jjjjjException
{
"@version"=>"1",
"@timestamp"=>"2017-01-12T02:19:19.888Z",
"host"=>"Vsftp",
"time"=>"2017-01-1210:19:19,888",
"Level"=>"jjjjj",
"type"=>"tailong",
"messager"=>"tailong-2017-01-1210:19:19,888jjjjjException"
}
以上这篇python实时监控logstash日志代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。