packageorg.apache.flume.sink;
importjava.io.IOException;
importjava.util.Calendar;
importjava.util.List;
importjava.util.concurrent.Executors;
importjava.util.concurrent.ScheduledExecutorService;
importorg.apache.flume.Channel;
importorg.apache.flume.Context;
importorg.apache.flume.Event;
importorg.apache.flume.EventDeliveryException;
importorg.apache.flume.Transaction;
importorg.apache.flume.conf.Configurable;
importorg.apache.flume.formatter.output.BucketPath;
importorg.apache.flume.instrumentation.SinkCounter;
importorg.apache.flume.serialization.EventSerializer;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importcom.google.common.base.Preconditions;
importcom.google.common.collect.Lists;
importcom.google.common.util.concurrent.ThreadFactoryBuilder;
publicclassFileSinkextendsAbstractSinkimplementsConfigurable
{
privatestaticfinalLoggerlogger=
LoggerFactory
.getLogger(FileSink.class);
privateStringpath;
privatestaticfinalStringdefaultFileName="FlumeData";
privatestaticfinalintdefaultMaxOpenFiles=
50;
/**
* Default length of time we wait for blocking BucketWriter calls before
* timing out the operation. Intended to prevent server hangs.
*/
privatelongtxnEventMax;
privateFileWriterLinkedHashMapsfWriters;
privateStringserializerType;
privateContextserializerContext;
privatebooleanneedRounding=false;
privateintroundUnit=
Calendar.SECOND;
privateintroundValue=
1;
privateSinkCountersinkCounter;
privateintmaxOpenFiles;
privateScheduledExecutorServicetimedRollerPool;
privatelongrollInterval;
@Override
publicvoidconfigure(Context
context) {
String directory = Preconditions.checkNotNull(
context.getString("file.path"),"file.path
is required");
String fileName = context.getString("file.filePrefix",defaultFileName);
this.path=
directory +"/"+ fileName;
maxOpenFiles=
context.getInteger("file.maxOpenFiles",
defaultMaxOpenFiles);
serializerType=
context.getString("sink.serializer","TEXT");
serializerContext=newContext(
context.getSubProperties(EventSerializer.CTX_PREFIX));
txnEventMax=
context.getLong("file.txnEventMax", 1l);
if(sinkCounter==null)
{
sinkCounter=newSinkCounter(getName());
}
rollInterval=
context.getLong("file.rollInterval", 30l);
String rollerName ="hdfs-"+
getName() +"-roll-timer-%d";
timedRollerPool=
Executors.newScheduledThreadPool(maxOpenFiles,
newThreadFactoryBuilder().setNameFormat(rollerName).build());
}
@Override
publicStatus
process()throwsEventDeliveryException
{
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketFileWriter> writers = Lists.newArrayList();
transaction.begin();
try{
Event event =null;
inttxnEventCount
= 0;
for(txnEventCount
= 0; txnEventCount <txnEventMax; txnEventCount++) {
event = channel.take();
if(event
==null) {
break;
}
// reconstruct the path
name by substituting place holders
String realPath = BucketPath
.escapeString(path,
event.getHeaders(),needRounding,
roundUnit,roundValue);
BucketFileWriter bucketFileWriter =sfWriters.get(realPath);
// we haven't seen this
file yet, so open it and cache the
// handle
if(bucketFileWriter
==null) {
bucketFileWriter =newBucketFileWriter();
bucketFileWriter.open(realPath,serializerType,
serializerContext,rollInterval,timedRollerPool,
sfWriters);
sfWriters.put(realPath,
bucketFileWriter);
}
// track the buckets getting
written in this transaction
if(!writers.contains(bucketFileWriter))
{
writers.add(bucketFileWriter);
}
// Write the data to File
bucketFileWriter.append(event);
}
if(txnEventCount
== 0) {
sinkCounter.incrementBatchEmptyCount();
}elseif(txnEventCount
==txnEventMax) {
sinkCounter.incrementBatchCompleteCount();
}else{
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets
before committing the transaction
for(BucketFileWriter
bucketFileWriter : writers) {
if(!bucketFileWriter.isBatchComplete())
{
flush(bucketFileWriter);
}
}
transaction.commit();
if(txnEventCount
> 0) {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
}
if(event
==null) {
returnStatus.BACKOFF;
}
returnStatus.READY;
}catch(IOException
eIO) {
transaction.rollback();
logger.warn("File
IO error", eIO);
returnStatus.BACKOFF;
}catch(Throwable
th) {
transaction.rollback();
logger.error("process
failed", th);
if(thinstanceofError)
{
throw(Error)
th;
}else{
thrownewEventDeliveryException(th);
}
}finally{
transaction.close();
}
}
privatevoidflush(BucketFileWriter
bucketFileWriter)throwsIOException
{
bucketFileWriter.flush();
}
@Override
publicsynchronizedvoidstart()
{
super.start();
this.sfWriters=newFileWriterLinkedHashMap(maxOpenFiles);
sinkCounter.start();
}
}
|
相关推荐
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng安装
flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql
flume-ng-sql-source-1.5.2源码
flume-ng-1.6.0-cdh5.5.0.tar.gz
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-...
Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS 11111
项目源码:https://github.com/dugangandy/flume-ng-elasticsearch-sink flume1.7支持elasticsearch 6.5.4的sink
flume-ng-1.6.0 cdh5.7.0安装包,稳定版本。大家可以自由下载
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
flume-ng-sql-source-1.4.3.jar
flume-ng-sql-source实现oracle增量数据读取 flume连接oracle增量数据读取
flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume
flume-ng-sdk-1.6.0.ja,如果想要通过log4j将日志直接导入到flume需要导入这个jar包
修改以后的flume-ng-core-1.7.0.jar,将原来的文件按行读取修改为按文件读取。使用时,直接替换到集群中flume安装目录下面lib文件夹中的flume-ng-core-1.7.0.jar即可使用。