`
rjhym
  • 浏览: 64499 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Flume-ng生产环境实践(四)实现log格式化interceptor

阅读更多
续上篇,由于filesink中需要使用/data/log/%{dayStr}/log-%{hourStr}%{minStr}-这样文件格式的,为了使file-sink能使用%{dayStr}这样的标签,需要在数据传输过程中,给event的header中添加对应的键值对。在flume-ng中提供了很方便的方式:Interceptor
以下为实现的interceptor,首先使用正则表达式匹配nginx日志,如何匹配成功,则获取匹配到的数据,并且对url中的参数进行处理,最后所有日志信息都被存储在Map中。根据配置文件中需要输出的键找到对应的值,按照顺序输出为csv格式的行。
原始日志格式:
112.245.239.72 - - [29/Dec/2012:15:00:00 +0800] "GET /p.gif?a=1&b=2HTTP/1.1" 200 0 "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4
.0; 4399Box.1357; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; AskTbPTV2/5.9.1.14019; 4399Box.1357)"

最终结果:
1,2
配置信息为:
agent.sources = source
agent.channels = channel
agent.sinks = sink

agent.sources.source.type = exec
#agent.sources.source.command = tail -n +0 -F /data/tmp/accesspvpb_2012-11-18.log
agent.sources.source.command = cat /opt/nginx/logs/vvaccess_log_pipe
agent.sources.source.interceptors = logformat

agent.sources.source.interceptors.logformat.type = org.apache.flume.interceptor.LogFormatInterceptor$Builder
agent.sources.source.interceptors.logformat.confpath = /usr/programs/flume/conf/logformat_vv.properties
agent.sources.source.interceptors.logformat.dynamicprop = true
agent.sources.source.interceptors.logformat.hostname = vv111
agent.sources.source.interceptors.logformat.prop.monitor.rollInterval = 100000
# The channel can be defined as follows.
agent.sources.source.channels = channel


agent.sinks.sink.type = avro
agent.sinks.sink.hostname = 192.168.0.100
agent.sinks.sink.port = 44444
agent.sinks.sink.channel = channel

# Each channel's type is defined.
agent.channels.channel.type = file
agent.channels.channel.checkpointDir = /data/tmpc/checkpoint
agent.channels.channel.dataDirs = /data/tmpc/data
agent.channels.channel.transactionCapacity = 15000

/usr/programs/flume/conf/logformat_vv.properties文件内容为:
keys=a,b
regexp=([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3})\\s-\\s-\\s\\[([^]]+)\\]\\s\"GET\\s/p.gif\\?(.+)\\s.*\"\\s[0-9]+\\s[0-9]+\\s\"(.+)\"

interceptor的代码:
packageorg.apache.flume.interceptor;

importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.CONF_PATH;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP_DFLT;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME_DFLT;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL_DFLT;

importjava.io.File;
importjava.io.FileInputStream;
importjava.io.FileNotFoundException;
importjava.io.IOException;
importjava.text.ParseException;
importjava.text.SimpleDateFormat;
importjava.util.Date;
importjava.util.HashMap;
importjava.util.LinkedList;
importjava.util.List;
importjava.util.Map;
importjava.util.Properties;

importorg.apache.flume.Context;
importorg.apache.flume.Event;
importorg.apache.flume.event.EventBuilder;
importorg.apache.oro.text.regex.MalformedPatternException;
importorg.apache.oro.text.regex.MatchResult;
importorg.apache.oro.text.regex.Pattern;
importorg.apache.oro.text.regex.PatternCompiler;
importorg.apache.oro.text.regex.PatternMatcher;
importorg.apache.oro.text.regex.Perl5Compiler;
importorg.apache.oro.text.regex.Perl5Matcher;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;

publicclassLogFormatInterceptorimplementsInterceptor{

privatestaticfinalLoggerlogger= LoggerFactory
.getLogger(LogFormatInterceptor.class);

privateStringconf_path=null;
privatebooleandynamicProp=false;
privateStringhostname=null;

privatelongpropLastModify= 0;
privatelongpropMonitorInterval;

privateStringregexp=null;
privateList<String>keys=null;

privatePatternpattern=null;
privatePatternCompilercompiler=null;
privatePatternMatchermatcher=null;
privateSimpleDateFormatsdf=null;
privateSimpleDateFormatsd=null;
privateSimpleDateFormatsh=null;
privateSimpleDateFormatsm=null;
privateSimpleDateFormatsdfAll=null;

privatelongeventCount= 0l;

publicLogFormatInterceptor(String conf_path,booleandynamicProp,
String hostname,longpropMonitorInterval) {
this.conf_path= conf_path;
this.dynamicProp= dynamicProp;
this.hostname= hostname;
this.propMonitorInterval= propMonitorInterval;
}

@Override
publicvoidclose() {

}

@Override
publicvoidinitialize() {
try{
// 读取配置文件,初始化正在表达式和输出的key列表
File file =newFile(conf_path);
propLastModify= file.lastModified();
Properties props =newProperties();
FileInputStream fis;
fis =newFileInputStream(file);
props.load(fis);
regexp= props.getProperty("regexp");
String strKey = props.getProperty("keys");
if(strKey !=null) {
String[] strkeys = strKey.split(",");
keys=newLinkedList<String>();
for(String key : strkeys) {
keys.add(key);
}
}
if(keys==null) {
logger.error("====================keys is null====================");
}else{
logger.info("keys="+keys);
}
if(regexp==null) {
logger.error("====================regexp is null====================");
}else{
logger.info("regexp="+regexp);
}

// 初始化正在表达式以及时间格式化类
compiler=newPerl5Compiler();
pattern=compiler.compile(regexp);
matcher=newPerl5Matcher();

sdf=newSimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",
java.util.Locale.US);
sd=newSimpleDateFormat("yyyyMMdd");
sh=newSimpleDateFormat("HH");
sm=newSimpleDateFormat("mm");
sdfAll=newSimpleDateFormat("yyyyMMddHHmmss");

}catch(MalformedPatternException e) {
logger.error("Could not complile pattern!", e);
}catch(FileNotFoundException e) {
logger.error("conf file is not found!", e);
}catch(IOException e) {
logger.error("conf file can not be read!", e);
}
}

@Override
publicEventintercept(Event event) {
++eventCount;
try{
if(dynamicProp&&eventCount>propMonitorInterval) {
File file =newFile(conf_path);
if(file.lastModified() >propLastModify) {
propLastModify= file.lastModified();
Properties props =newProperties();
FileInputStream fis;
fis =newFileInputStream(file);
props.load(fis);
String strKey = props.getProperty("keys");
if(strKey !=null) {
String[] strkeys = strKey.split(",");
List<String> keystmp =newLinkedList<String>();
for(String key : strkeys) {
keystmp.add(key);
}
if(keystmp.size() >keys.size()) {
keys= keystmp;
logger.info("dynamicProp status updated = "+keys);
}else{
logger.error("dynamicProp status new keys size less than old,so status update fail = "
+keys);
}
}else{
logger.error("dynamicProp status get keys fail ,so status update fail = "
+keys);
}

}
}

Map<String, String> headers = event.getHeaders();
headers.put("host",hostname);
String body =newString(event.getBody());
if(pattern!=null) {
StringBuffer stringBuffer =newStringBuffer();
Date date =null;
Map<String, String> index =newHashMap<String, String>();
if(matcher.contains(body,pattern)) {
index.put("host",hostname);
MatchResult result =matcher.getMatch();
index.put("ip", result.group(1));
try{
date =sdf.parse(result.group(2));
index.put("loc_time",sdfAll.format(date));
}catch(ParseException e1) {

}
String url = result.group(3).replaceAll(",","|");
String[] params = url.split("&");
for(String param : params) {
String[] p = param.split("=");
if(p.length== 2) {
index.put(p[0], p[1]);
}
}
index.put("browser", result.group(4).replaceAll(",","|"));
for(String key :keys) {
if(index.containsKey(key)) {
stringBuffer.append(index.get(key) +",");
}else{
stringBuffer.append("~,");
}
}
if(stringBuffer.length() > 0) {
stringBuffer.deleteCharAt(stringBuffer.length() - 1);
}else{
stringBuffer.append("error="+ body);
}

if(date !=null) {
headers.put("dayStr",sd.format(date));
headers.put("hourStr",sh.format(date));
Integer m = Integer.parseInt(sm.format(date));
String min ="";
if(m >= 0 && m < 10) {
min ="0"+ (m / 5) * 5;
}else{
min = (m / 5) * 5 +"";
}
headers.put("minStr", min);
}else{
headers.put("dayStr","errorLog");
}
Event e = EventBuilder.withBody(stringBuffer.toString()
.getBytes(), headers);
returne;
}
}
}catch(Exception e) {
logger.error("LogFormat error!", e);
}
returnnull;
}

@Override
publicList<Event>intercept(List<Event> events) {
List<Event> list =newLinkedList<Event>();
for(Event event : events) {
Event e = intercept(event);
if(e !=null) {
list.add(e);
}
}
returnlist;
}

/**
* Builder which builds new instances of the HostInterceptor.
*/
publicstaticclassBuilderimplementsInterceptor.Builder {

privateStringconfPath;
privatebooleandynamicProp;
privateStringhostname;
privatelongpropMonitorInterval;

@Override
publicInterceptor build() {
returnnewLogFormatInterceptor(confPath,dynamicProp,hostname,
propMonitorInterval);
}

@Override
publicvoidconfigure(Context context) {
confPath= context.getString(CONF_PATH);
dynamicProp= context.getBoolean(DYNAMICPROP,DYNAMICPROP_DFLT);
hostname= context.getString(HOSTNAME,HOSTNAME_DFLT);
propMonitorInterval= context.getLong(PROPMONITORINTERVAL,
PROPMONITORINTERVAL_DFLT);
}

}

publicstaticclassConstants {

publicstaticStringCONF_PATH="confpath";

publicstaticStringDYNAMICPROP="dynamicprop";
publicstaticbooleanDYNAMICPROP_DFLT=false;

publicstaticStringHOSTNAME="hostname";
publicstaticStringHOSTNAME_DFLT="hostname";

publicstaticStringPROPMONITORINTERVAL="prop.monitor.rollInterval";
publicstaticlongPROPMONITORINTERVAL_DFLT= 500000l;

}

}
至此,获取nginx日志,进行格式化清洗,传输到collector机器,按照格式化的目录和文件名进行输出全部完成。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics