博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HDFS工作机制——自开发分布式数据采集系统
阅读量:7063 次
发布时间:2019-06-28

本文共 9748 字,大约阅读时间需要 32 分钟。

hot3.png

需求描述:

在业务系统的服务器上,业务程序会不断生成业务日志(比如网站的页面访问日志)

业务日志是用log4j生成的,会不断地切出日志文件,需要定期(比如每小时)从业务服务器上的日志目录中,探测需要采集的日志文件(access.log不能采),发往HDFS

注意点:业务服务器可能有多台(hdfs上的文件名不能直接用日志服务器上的文件名)

当天采集到的日志要放在hdfs的当天目录中,采集完成的日志文件,需要移动到到日志服务器的一个备份目录中定期检查(每小时检查一下备份目录),将备份时长超出24小时的日志文件清除

数据采集流程分析

1.流程启动一个定时任务    定时探测日志源目录    获取需要采集得文件    移动这些文件到一个待上传得临时目录    遍历待上传目录中得文件,逐一传输到HDFS得目标路径    同时将传输得文件移动到备份目录启动一个定时任务:    探测备份目录中得备份数据,检查是否超出最长备份时长,超出,则删除2.规划各种路径日志源路径:d:/logs/accesslog/待上传临时目录:d:/logs/toupload/备份目录:d:/logs/backup/日期HDFS 存储路径:/logs/日期HDFS中文件的前缀:acceaa_log_HDFS中文件的后缀:.log

准备工作

4da77452837cb7b9a71a521ca4935648e90.jpg

[root@hdp-01 ~]# start-dfs.shStarting namenodes on [hdp-01]hdp-01: starting namenode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-namenode-hdp-01.outhdp-01: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-01.outhdp-03: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-03.outhdp-02: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-02.outhdp-04: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-04.outStarting secondary namenodes [hdp-02]hdp-02: starting secondarynamenode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-secondarynamenode-hdp-02.out

154e99d03ac49fe292663b3e337aeef0e87.jpg

代码如下

DataCollection 

public class DataCollection {    public static void main(String[] args) {        Timer timer = new Timer();        //1.【定时探测日志源目录】,每一小时执行一次        timer.schedule(new CollectTask(),0,60*60*1000L);        //2.【定时删除文件】        timer.schedule(new BackupCleanTask(),0,60*60*1000L);    }}

collect.properties

LOG_SOURCE_DIR=d:/logs/accesslog/LOG_TOUPLOAD_DIR=d:/logs/toupload/LOG_BACKUP_BASE_DIR=d:/logs/backup/LOG_LEGAL_PREFIX=access.log.HDFS_URI=hdfs://hdp-01:9000/HDFS_DEST_BASE_DIR=/logs/HDFS_FILE_PREFIX=access_log_HDFS_FILE_SUFFIX=.log

Contants 

/** * 日志目录参数key */public class Contants {    /**     * 本地要上传文件目录     */    public static final String LOG_SOURCE_DIR="LOG_SOURCE_DIR";    /**     * 临时上传目录中的文件     */    public static final String LOG_TOUPLOAD_DIR="LOG_TOUPLOAD_DIR";    /**     * d:/logs/backup/     */    public static final String LOG_BACKUP_BASE_DIR="LOG_BACKUP_BASE_DIR";    /**     * 需要采集得文件     */    public static final String LOG_LEGAL_PREFIX="LOG_LEGAL_PREFIX";    /**     * 上传到 HDFS ip+port     */    public static final String HDFS_URI="HDFS_URI";    /**     * hdfs:logs/目录     */    public static final String HDFS_DEST_BASE_DIR="HDFS_DEST_BASE_DIR";    /**     * hdfs文件前缀:access_log_     */    public static final String HDFS_FILE_PREFIX="HDFS_FILE_PREFIX";    /**     * hdfs文件后缀:.log     */    public static final String HDFS_FILE_SUFFIX="HDFS_FILE_SUFFIX";}

单例设计方式一:饿汉式单例,程序启动时创建

PropertyHolderHungery

/** * 单例设计方式一:饿汉式单例,程序启动时创建 */public class PropertyHolderHungery {    private static Properties prop=new Properties();    //静态代码块    static {        try {            prop.load(PropertyHolderHungery.class.getClassLoader()                    .getResourceAsStream("collect.properties"));        }catch (Exception e){            e.printStackTrace();        }    }    public static Properties getProps()throws Exception{        return prop;    }}

单例设计模式二:懒汉式,使用的时候创建,还考虑线程安全

PropertyHolderLazy 

/** * 单例设计模式二:懒汉式,使用的时候创建,还考虑线程安全 */public class PropertyHolderLazy {     //默认构造器私有化    private PropertyHolderLazy(){    }    //禁止指令重排序    private volatile static Properties prop=null;    public static Properties getProps()throws Exception{        if(prop==null){            //加锁,保证多线程场景下线程安全问题            synchronized (PropertyHolderLazy.class){                //防止再次new                if(prop==null){                    prop=new Properties();                    prop.load(PropertyHolderLazy.class.getClassLoader()                            .getResourceAsStream("collect.properties"));                }            }        }        return prop;    }}

CollectTask 

public class CollectTask extends TimerTask {    //构造一个log4j日志对象    public static Log log = LogFactory.getLog(CollectTask.class);    public void run() {        /**         *     1.定时探测日志源目录         *     2.获取需要采集得文件         *     3.移动这些文件到一个待上传得临时目录         *     4.遍历待上传目录中得文件,逐一传输到HDFS得目标路径         *     5.同时将传输得文件移动到备份目录         */        try{            //获取配置参数            final Properties props = PropertyHolderLazy.getProps();            //获取本次采集时的日期            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");            String day = sdf.format(new Date());            //获取本地要上传文件目录            File srcDir = new File(props.getProperty(Contants.LOG_SOURCE_DIR));            //2.【获取需要采集得文件】            File[] listFiles = srcDir.listFiles(new FilenameFilter() {            public boolean accept(File dir, String name) {                if(name.startsWith(props.getProperty(Contants.LOG_LEGAL_PREFIX))){                    return true;                }                return false;            }        });        //记录日志        log.info("探测到如下文件需要采集:"+Arrays.toString(listFiles));        //获取临时上传目录中的文件        File toUploadDir = new File(props.getProperty(Contants.LOG_TOUPLOAD_DIR));        //3.【移动这些文件到一个待上传得临时目录】        for (File file:listFiles) {           //将采集的文件移到临时上传目录,将源目录中需要上传的文件移动到临时上传目录中            FileUtils.moveFileToDirectory(file,toUploadDir,true);        }            //记录日志            log.info("上述文件移动到待上传目录"+toUploadDir.getAbsolutePath());            //构造一个HDFS的客户端对象            FileSystem fs = FileSystem.get(new URI(props.getProperty(Contants.HDFS_URI)), new Configuration(), "root");            //从临时上传目录中列出所有文件            File[] toUploadFiles = toUploadDir.listFiles();            //1.检查hdfs 中的日期是否存在            Path hdfsDestPath = new Path(props.getProperty(Contants.HDFS_DEST_BASE_DIR) + day);            if(!fs.exists(hdfsDestPath)){                fs.mkdirs(hdfsDestPath);            }            //2.检查本地的备份目录是否存在            File backupDir = new File(props.getProperty(Contants.LOG_BACKUP_BASE_DIR) + day);            if(!backupDir.exists()){                backupDir.mkdirs();            }            //4.【遍历待上传目录中得文件,逐一传输到HDFS得目标路径】            for (File file:toUploadFiles) {                //传输文件到HDFS并改名                Path destPath = new Path(hdfsDestPath +"/"+props.getProperty(Contants.HDFS_FILE_PREFIX)                        + UUID.randomUUID() +props.getProperty(Contants.HDFS_FILE_SUFFIX));                //将临时上传目录中的文件上传到hdfs中                fs.copyFromLocalFile(new Path(file.getAbsolutePath()),destPath);                //记录日志                log.info("文件传输到hdfs完成:"+file.getAbsolutePath() +"-->"+destPath);                //5.【同时将传输得文件移动到备份目录】                FileUtils.moveFileToDirectory(file,backupDir,true);                //记录日志                log.info("文件备份完成:"+file.getAbsolutePath() +"-->"+backupDir);            }        } catch (Exception e) {            e.printStackTrace();        }    }}

BackupCleanTask 

public class BackupCleanTask extends TimerTask {    public void run() {        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");        long now = new Date().getTime();        //探测备份目录        try {            //获取配置参数            final Properties props = PropertyHolderLazy.getProps();            File backupBaseDir = new File(props.getProperty(Contants.LOG_BACKUP_BASE_DIR));            File[] dayBackDir = backupBaseDir.listFiles();            //判断备份目录是否已经超过24h            for (File dir : dayBackDir) {                long time = sdf.parse(dir.getName()).getTime();                if(now-time>24*60*60*1000L){                    //递归删除目录                    FileUtils.deleteDirectory(dir);                }            }        }catch(Exception e){            e.printStackTrace();        }    }}

日志配置

log4j.rootLogger=CONSOLE,stdout,logfile#stdout控制器log4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target=System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternLayout#输出格式log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]:%L - %m%n#文件路径输出log4j.appender.logfile=org.apache.log4j.RollingFileAppenderlog4j.appender.logfile.File=d:/logs/collect/collect.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

pom依赖

junit
junit
RELEASE
org.apache.logging.log4j
log4j-core
2.8.2
org.apache.hadoop
hadoop-common
2.7.2
org.apache.hadoop
hadoop-client
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2

控制台输出

...47 - 探测到如下文件需要采集:[d:\logs\accesslog\access.log.1, d:\logs\accesslog\access.log.2, d:\logs\accesslog\access.log.3]57 - 上述文件移动到待上传目录d:\logs\toupload79 - 文件传输到hdfs完成:d:\logs\toupload\access.log.1-->/logs/2019-05-25-12/access_log_9dc0542e-0153-4bb3-b804-d85115c20153.log83 - 文件备份完成:d:\logs\toupload\access.log.1-->d:\logs\backup\2019-05-25-1279 - 文件传输到hdfs完成:d:\logs\toupload\access.log.2-->/logs/2019-05-25-12/access_log_5ce3450d-8874-4dd7-a23e-8d6a7a52c6d9.log83 - 文件备份完成:d:\logs\toupload\access.log.2-->d:\logs\backup\2019-05-25-1279 - 文件传输到hdfs完成:d:\logs\toupload\access.log.3-->/logs/2019-05-25-12/access_log_f7b7c741-bb87-4778-98ad-e33f06501441.log83 - 文件备份完成:d:\logs\toupload\access.log.3-->d:\logs\backup\2019-05-25-12...

效果图

1957d8298db1222ea298434ab8e38e6ab40.jpg

fb9178f17d44312474a123b0363b3bf4fd6.jpg

版权

转载于:https://my.oschina.net/u/3995125/blog/3053939

你可能感兴趣的文章
Skyline培训完,做的程序代码
查看>>
Header:请求头参数详解
查看>>
转:BASH数组
查看>>
第八课:日期的扩展与修复
查看>>
关于字符数组要注意的一个小问题
查看>>
GNU ARM嵌入式汇编注释方法
查看>>
xml file too big to import to wordpress website
查看>>
python初学小结二:Import、字符串切片、列表的切片、enumerate()说明、字典、ptyhon-copy()与deepcopy()区别...
查看>>
shell脚本切割tomcat的日志文件
查看>>
HTML5 Video Player概览
查看>>
学会使用Git创建分支
查看>>
Spring可扩展的XML Schema机制
查看>>
学习笔记之曾国藩家书
查看>>
libSVM 参数选择
查看>>
UVA10976 Fractions Again?!
查看>>
express 获取本地文件夹下的图片和文件
查看>>
Spring Boot定时任务应用实践
查看>>
java OO学习后的感悟
查看>>
扫呗、通联微信后台配置支付授权目录流程
查看>>
OpenGL坐标系之间的转换 http://blog.csdn.net/sac761/article/details/52179585
查看>>