需求描述:
在业务系统的服务器上,业务程序会不断生成业务日志(比如网站的页面访问日志)
业务日志是用log4j生成的,会不断地切出日志文件,需要定期(比如每小时)从业务服务器上的日志目录中,探测需要采集的日志文件(access.log不能采),发往HDFS
注意点:业务服务器可能有多台(hdfs上的文件名不能直接用日志服务器上的文件名)
当天采集到的日志要放在hdfs的当天目录中,采集完成的日志文件,需要移动到到日志服务器的一个备份目录中定期检查(每小时检查一下备份目录),将备份时长超出24小时的日志文件清除
数据采集流程分析
1.流程启动一个定时任务 定时探测日志源目录 获取需要采集得文件 移动这些文件到一个待上传得临时目录 遍历待上传目录中得文件,逐一传输到HDFS得目标路径 同时将传输得文件移动到备份目录启动一个定时任务: 探测备份目录中得备份数据,检查是否超出最长备份时长,超出,则删除2.规划各种路径日志源路径:d:/logs/accesslog/待上传临时目录:d:/logs/toupload/备份目录:d:/logs/backup/日期HDFS 存储路径:/logs/日期HDFS中文件的前缀:acceaa_log_HDFS中文件的后缀:.log
准备工作
[root@hdp-01 ~]# start-dfs.shStarting namenodes on [hdp-01]hdp-01: starting namenode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-namenode-hdp-01.outhdp-01: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-01.outhdp-03: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-03.outhdp-02: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-02.outhdp-04: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-04.outStarting secondary namenodes [hdp-02]hdp-02: starting secondarynamenode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-secondarynamenode-hdp-02.out
代码如下
DataCollection
public class DataCollection { public static void main(String[] args) { Timer timer = new Timer(); //1.【定时探测日志源目录】,每一小时执行一次 timer.schedule(new CollectTask(),0,60*60*1000L); //2.【定时删除文件】 timer.schedule(new BackupCleanTask(),0,60*60*1000L); }}
collect.properties
LOG_SOURCE_DIR=d:/logs/accesslog/LOG_TOUPLOAD_DIR=d:/logs/toupload/LOG_BACKUP_BASE_DIR=d:/logs/backup/LOG_LEGAL_PREFIX=access.log.HDFS_URI=hdfs://hdp-01:9000/HDFS_DEST_BASE_DIR=/logs/HDFS_FILE_PREFIX=access_log_HDFS_FILE_SUFFIX=.log
Contants
/** * 日志目录参数key */public class Contants { /** * 本地要上传文件目录 */ public static final String LOG_SOURCE_DIR="LOG_SOURCE_DIR"; /** * 临时上传目录中的文件 */ public static final String LOG_TOUPLOAD_DIR="LOG_TOUPLOAD_DIR"; /** * d:/logs/backup/ */ public static final String LOG_BACKUP_BASE_DIR="LOG_BACKUP_BASE_DIR"; /** * 需要采集得文件 */ public static final String LOG_LEGAL_PREFIX="LOG_LEGAL_PREFIX"; /** * 上传到 HDFS ip+port */ public static final String HDFS_URI="HDFS_URI"; /** * hdfs:logs/目录 */ public static final String HDFS_DEST_BASE_DIR="HDFS_DEST_BASE_DIR"; /** * hdfs文件前缀:access_log_ */ public static final String HDFS_FILE_PREFIX="HDFS_FILE_PREFIX"; /** * hdfs文件后缀:.log */ public static final String HDFS_FILE_SUFFIX="HDFS_FILE_SUFFIX";}
单例设计方式一:饿汉式单例,程序启动时创建
PropertyHolderHungery
/** * 单例设计方式一:饿汉式单例,程序启动时创建 */public class PropertyHolderHungery { private static Properties prop=new Properties(); //静态代码块 static { try { prop.load(PropertyHolderHungery.class.getClassLoader() .getResourceAsStream("collect.properties")); }catch (Exception e){ e.printStackTrace(); } } public static Properties getProps()throws Exception{ return prop; }}
单例设计模式二:懒汉式,使用的时候创建,还考虑线程安全
PropertyHolderLazy
/** * 单例设计模式二:懒汉式,使用的时候创建,还考虑线程安全 */public class PropertyHolderLazy { //默认构造器私有化 private PropertyHolderLazy(){ } //禁止指令重排序 private volatile static Properties prop=null; public static Properties getProps()throws Exception{ if(prop==null){ //加锁,保证多线程场景下线程安全问题 synchronized (PropertyHolderLazy.class){ //防止再次new if(prop==null){ prop=new Properties(); prop.load(PropertyHolderLazy.class.getClassLoader() .getResourceAsStream("collect.properties")); } } } return prop; }}
CollectTask
public class CollectTask extends TimerTask { //构造一个log4j日志对象 public static Log log = LogFactory.getLog(CollectTask.class); public void run() { /** * 1.定时探测日志源目录 * 2.获取需要采集得文件 * 3.移动这些文件到一个待上传得临时目录 * 4.遍历待上传目录中得文件,逐一传输到HDFS得目标路径 * 5.同时将传输得文件移动到备份目录 */ try{ //获取配置参数 final Properties props = PropertyHolderLazy.getProps(); //获取本次采集时的日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String day = sdf.format(new Date()); //获取本地要上传文件目录 File srcDir = new File(props.getProperty(Contants.LOG_SOURCE_DIR)); //2.【获取需要采集得文件】 File[] listFiles = srcDir.listFiles(new FilenameFilter() { public boolean accept(File dir, String name) { if(name.startsWith(props.getProperty(Contants.LOG_LEGAL_PREFIX))){ return true; } return false; } }); //记录日志 log.info("探测到如下文件需要采集:"+Arrays.toString(listFiles)); //获取临时上传目录中的文件 File toUploadDir = new File(props.getProperty(Contants.LOG_TOUPLOAD_DIR)); //3.【移动这些文件到一个待上传得临时目录】 for (File file:listFiles) { //将采集的文件移到临时上传目录,将源目录中需要上传的文件移动到临时上传目录中 FileUtils.moveFileToDirectory(file,toUploadDir,true); } //记录日志 log.info("上述文件移动到待上传目录"+toUploadDir.getAbsolutePath()); //构造一个HDFS的客户端对象 FileSystem fs = FileSystem.get(new URI(props.getProperty(Contants.HDFS_URI)), new Configuration(), "root"); //从临时上传目录中列出所有文件 File[] toUploadFiles = toUploadDir.listFiles(); //1.检查hdfs 中的日期是否存在 Path hdfsDestPath = new Path(props.getProperty(Contants.HDFS_DEST_BASE_DIR) + day); if(!fs.exists(hdfsDestPath)){ fs.mkdirs(hdfsDestPath); } //2.检查本地的备份目录是否存在 File backupDir = new File(props.getProperty(Contants.LOG_BACKUP_BASE_DIR) + day); if(!backupDir.exists()){ backupDir.mkdirs(); } //4.【遍历待上传目录中得文件,逐一传输到HDFS得目标路径】 for (File file:toUploadFiles) { //传输文件到HDFS并改名 Path destPath = new Path(hdfsDestPath +"/"+props.getProperty(Contants.HDFS_FILE_PREFIX) + UUID.randomUUID() +props.getProperty(Contants.HDFS_FILE_SUFFIX)); //将临时上传目录中的文件上传到hdfs中 fs.copyFromLocalFile(new Path(file.getAbsolutePath()),destPath); //记录日志 log.info("文件传输到hdfs完成:"+file.getAbsolutePath() +"-->"+destPath); //5.【同时将传输得文件移动到备份目录】 FileUtils.moveFileToDirectory(file,backupDir,true); //记录日志 log.info("文件备份完成:"+file.getAbsolutePath() +"-->"+backupDir); } } catch (Exception e) { e.printStackTrace(); } }}
BackupCleanTask
public class BackupCleanTask extends TimerTask { public void run() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); long now = new Date().getTime(); //探测备份目录 try { //获取配置参数 final Properties props = PropertyHolderLazy.getProps(); File backupBaseDir = new File(props.getProperty(Contants.LOG_BACKUP_BASE_DIR)); File[] dayBackDir = backupBaseDir.listFiles(); //判断备份目录是否已经超过24h for (File dir : dayBackDir) { long time = sdf.parse(dir.getName()).getTime(); if(now-time>24*60*60*1000L){ //递归删除目录 FileUtils.deleteDirectory(dir); } } }catch(Exception e){ e.printStackTrace(); } }}
日志配置
log4j.rootLogger=CONSOLE,stdout,logfile#stdout控制器log4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target=System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternLayout#输出格式log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]:%L - %m%n#文件路径输出log4j.appender.logfile=org.apache.log4j.RollingFileAppenderlog4j.appender.logfile.File=d:/logs/collect/collect.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
pom依赖
junit junit RELEASE org.apache.logging.log4j log4j-core 2.8.2 org.apache.hadoop hadoop-common 2.7.2 org.apache.hadoop hadoop-client 2.7.2 org.apache.hadoop hadoop-hdfs 2.7.2
控制台输出
...47 - 探测到如下文件需要采集:[d:\logs\accesslog\access.log.1, d:\logs\accesslog\access.log.2, d:\logs\accesslog\access.log.3]57 - 上述文件移动到待上传目录d:\logs\toupload79 - 文件传输到hdfs完成:d:\logs\toupload\access.log.1-->/logs/2019-05-25-12/access_log_9dc0542e-0153-4bb3-b804-d85115c20153.log83 - 文件备份完成:d:\logs\toupload\access.log.1-->d:\logs\backup\2019-05-25-1279 - 文件传输到hdfs完成:d:\logs\toupload\access.log.2-->/logs/2019-05-25-12/access_log_5ce3450d-8874-4dd7-a23e-8d6a7a52c6d9.log83 - 文件备份完成:d:\logs\toupload\access.log.2-->d:\logs\backup\2019-05-25-1279 - 文件传输到hdfs完成:d:\logs\toupload\access.log.3-->/logs/2019-05-25-12/access_log_f7b7c741-bb87-4778-98ad-e33f06501441.log83 - 文件备份完成:d:\logs\toupload\access.log.3-->d:\logs\backup\2019-05-25-12...
效果图
版权