packageorg.apache.flume.interceptor;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.CONF_PATH;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP_DFLT;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME_DFLT;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL_DFLT;
importjava.io.File;
importjava.io.FileInputStream;
importjava.io.FileNotFoundException;
importjava.io.IOException;
importjava.text.ParseException;
importjava.text.SimpleDateFormat;
importjava.util.Date;
importjava.util.HashMap;
importjava.util.LinkedList;
importjava.util.List;
importjava.util.Map;
importjava.util.Properties;
importorg.apache.flume.Context;
importorg.apache.flume.Event;
importorg.apache.flume.event.EventBuilder;
importorg.apache.oro.text.regex.MalformedPatternException;
importorg.apache.oro.text.regex.MatchResult;
importorg.apache.oro.text.regex.Pattern;
importorg.apache.oro.text.regex.PatternCompiler;
importorg.apache.oro.text.regex.PatternMatcher;
importorg.apache.oro.text.regex.Perl5Compiler;
importorg.apache.oro.text.regex.Perl5Matcher;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
publicclassLogFormatInterceptorimplementsInterceptor{
privatestaticfinalLoggerlogger=
LoggerFactory
.getLogger(LogFormatInterceptor.class);
privateStringconf_path=null;
privatebooleandynamicProp=false;
privateStringhostname=null;
privatelongpropLastModify=
0;
privatelongpropMonitorInterval;
privateStringregexp=null;
privateList<String>keys=null;
privatePatternpattern=null;
privatePatternCompilercompiler=null;
privatePatternMatchermatcher=null;
privateSimpleDateFormatsdf=null;
privateSimpleDateFormatsd=null;
privateSimpleDateFormatsh=null;
privateSimpleDateFormatsm=null;
privateSimpleDateFormatsdfAll=null;
privatelongeventCount=
0l;
publicLogFormatInterceptor(String
conf_path,booleandynamicProp,
String hostname,longpropMonitorInterval)
{
this.conf_path=
conf_path;
this.dynamicProp=
dynamicProp;
this.hostname=
hostname;
this.propMonitorInterval=
propMonitorInterval;
}
@Override
publicvoidclose()
{
}
@Override
publicvoidinitialize()
{
try{
// 读取配置文件,初始化正在表达式和输出的key列表
File file =newFile(conf_path);
propLastModify=
file.lastModified();
Properties props =newProperties();
FileInputStream fis;
fis =newFileInputStream(file);
props.load(fis);
regexp=
props.getProperty("regexp");
String strKey = props.getProperty("keys");
if(strKey
!=null) {
String[] strkeys = strKey.split(",");
keys=newLinkedList<String>();
for(String
key : strkeys) {
keys.add(key);
}
}
if(keys==null)
{
logger.error("====================keys
is null====================");
}else{
logger.info("keys="+keys);
}
if(regexp==null)
{
logger.error("====================regexp
is null====================");
}else{
logger.info("regexp="+regexp);
}
// 初始化正在表达式以及时间格式化类
compiler=newPerl5Compiler();
pattern=compiler.compile(regexp);
matcher=newPerl5Matcher();
sdf=newSimpleDateFormat("dd/MMM/yyyy:HH:mm:ss
Z",
java.util.Locale.US);
sd=newSimpleDateFormat("yyyyMMdd");
sh=newSimpleDateFormat("HH");
sm=newSimpleDateFormat("mm");
sdfAll=newSimpleDateFormat("yyyyMMddHHmmss");
}catch(MalformedPatternException
e) {
logger.error("Could
not complile pattern!", e);
}catch(FileNotFoundException
e) {
logger.error("conf
file is not found!", e);
}catch(IOException
e) {
logger.error("conf
file can not be read!", e);
}
}
@Override
publicEventintercept(Event
event) {
++eventCount;
try{
if(dynamicProp&&eventCount>propMonitorInterval)
{
File file =newFile(conf_path);
if(file.lastModified()
>propLastModify) {
propLastModify=
file.lastModified();
Properties props =newProperties();
FileInputStream fis;
fis =newFileInputStream(file);
props.load(fis);
String strKey = props.getProperty("keys");
if(strKey
!=null) {
String[] strkeys = strKey.split(",");
List<String> keystmp =newLinkedList<String>();
for(String
key : strkeys) {
keystmp.add(key);
}
if(keystmp.size()
>keys.size()) {
keys=
keystmp;
logger.info("dynamicProp
status updated = "+keys);
}else{
logger.error("dynamicProp
status new keys size less than old,so status update fail = "
+keys);
}
}else{
logger.error("dynamicProp
status get keys fail ,so status update fail = "
+keys);
}
}
}
Map<String, String> headers = event.getHeaders();
headers.put("host",hostname);
String body =newString(event.getBody());
if(pattern!=null)
{
StringBuffer stringBuffer =newStringBuffer();
Date date =null;
Map<String, String> index =newHashMap<String,
String>();
if(matcher.contains(body,pattern))
{
index.put("host",hostname);
MatchResult result =matcher.getMatch();
index.put("ip",
result.group(1));
try{
date =sdf.parse(result.group(2));
index.put("loc_time",sdfAll.format(date));
}catch(ParseException
e1) {
}
String url = result.group(3).replaceAll(",","|");
String[] params = url.split("&");
for(String
param : params) {
String[] p = param.split("=");
if(p.length==
2) {
index.put(p[0], p[1]);
}
}
index.put("browser",
result.group(4).replaceAll(",","|"));
for(String
key :keys) {
if(index.containsKey(key))
{
stringBuffer.append(index.get(key) +",");
}else{
stringBuffer.append("~,");
}
}
if(stringBuffer.length()
> 0) {
stringBuffer.deleteCharAt(stringBuffer.length() - 1);
}else{
stringBuffer.append("error="+
body);
}
if(date
!=null) {
headers.put("dayStr",sd.format(date));
headers.put("hourStr",sh.format(date));
Integer m = Integer.parseInt(sm.format(date));
String min ="";
if(m
>= 0 && m < 10) {
min ="0"+
(m / 5) * 5;
}else{
min = (m / 5) * 5 +"";
}
headers.put("minStr",
min);
}else{
headers.put("dayStr","errorLog");
}
Event e = EventBuilder.withBody(stringBuffer.toString()
.getBytes(), headers);
returne;
}
}
}catch(Exception
e) {
logger.error("LogFormat
error!", e);
}
returnnull;
}
@Override
publicList<Event>intercept(List<Event>
events) {
List<Event> list =newLinkedList<Event>();
for(Event
event : events) {
Event e = intercept(event);
if(e
!=null) {
list.add(e);
}
}
returnlist;
}
/**
* Builder which builds new instances of the HostInterceptor.
*/
publicstaticclassBuilderimplementsInterceptor.Builder
{
privateStringconfPath;
privatebooleandynamicProp;
privateStringhostname;
privatelongpropMonitorInterval;
@Override
publicInterceptor
build() {
returnnewLogFormatInterceptor(confPath,dynamicProp,hostname,
propMonitorInterval);
}
@Override
publicvoidconfigure(Context
context) {
confPath=
context.getString(CONF_PATH);
dynamicProp=
context.getBoolean(DYNAMICPROP,DYNAMICPROP_DFLT);
hostname=
context.getString(HOSTNAME,HOSTNAME_DFLT);
propMonitorInterval=
context.getLong(PROPMONITORINTERVAL,
PROPMONITORINTERVAL_DFLT);
}
}
publicstaticclassConstants
{
publicstaticStringCONF_PATH="confpath";
publicstaticStringDYNAMICPROP="dynamicprop";
publicstaticbooleanDYNAMICPROP_DFLT=false;
publicstaticStringHOSTNAME="hostname";
publicstaticStringHOSTNAME_DFLT="hostname";
publicstaticStringPROPMONITORINTERVAL="prop.monitor.rollInterval";
publicstaticlongPROPMONITORINTERVAL_DFLT=
500000l;
}
}
|
相关推荐
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng安装
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql
flume-ng-1.6.0-cdh5.5.0.tar.gz
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
flume-ng-sql-source-1.5.2源码
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS 11111
flume-ng-1.6.0 cdh5.7.0安装包,稳定版本。大家可以自由下载
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
flume-ng-sql-source-1.4.3.jar
flume-ng-sql-source实现oracle增量数据读取 flume连接oracle增量数据读取
flume-ng-sdk-1.6.0.ja,如果想要通过log4j将日志直接导入到flume需要导入这个jar包
flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...
flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...