从文件导数据到数据库的性能优化思路(笔记)

概述

最近公司一.NET 项目需要对其日志 Log 入数据库统计,写这个脚本导入的任务便落到我身上了。采用了熟练的 Java,这个日志也不是很大,一个文件大概几兆,有上万条数据,一天大概有 7,8 个文件需要这样的导入处理。由于之前写 Web 没有这么去批处理这么多数据,所以没有太注意性能,第一个版本程序导入速度慢的吓人,一个文件导完可能需要 10 多分钟,也就是说如果把每天的文件导完可能需要 2 个多小时的时间,听听就很蛋疼,最终经过优化后,一个文件导入也就几秒,甚至可以更短。目标日志文件的信息都是按行存储,所以程序中按行读取后,然后进行相应的字符串截取入库。下面则为思路分享以及主要代码的分享。

优化思路

1.程序流程: 程序先读取本地的文件到内存,然后把内存的数据批量 Insert 到数据库。 2.归纳:可以看出首先程序需要进行文件 IO 操作,然后则是数据 JDBC 操作,所以优化方向大致可以是以下几个: a.文件 IO 优化 b.JDBC 操作优化 c.使用多线程并行 JDBC 操作

文件常见 IO 简介

Java 的文件读写操作大概有这么几种方式,但是我们应该注意几种文件操作方式的区别,哪些操作方式适合不同的数据文件对象。 1.(InputStream/OutputStream) 为字节输入/输出流,这种读写方式都是按一定字节量读取数据。 2. (FileInputStream/FileOutputStream) 此方法继承自上面的(InputStream/OutpustStream),同样按字节流输入/输出,用于读取图像之类的原始字节流 3.(FileReader/FileWriter) 此方法适用于按字符流的文件操作 4. (BufferedReader/BufferedWriter) 从字符输入流中读取文本,缓冲各个字符,从而实现字符、数组和行的高效读取。

注:更详细的 IO 操作说明,请查看具体的 JDK 文档。 此处我采用的 BufferedReader 按行读取,效率比较好 代码片段:

public static List<String> getLogLinesByBuf(String filePath){
    List<String> items = new ArrayList<String>();
    File file = new File(filePath);
    BufferedReader reader;
    if (file.exists()) {

        try {
            reader = new BufferedReader(new FileReader(file));
            String temp = "";
            while((temp = reader.readLine()) != null) {
                items.add(temp);
            }
            //close
            reader.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    } else {
        System.out.println("该路径文件不存在.");
    }
    return items;
}

JDBC 批处理,PreparedStatement 和 Statement

JDBC 操作我们经常会用到 PreparedStatement 和 Statement,PreparedStatement 相对 Statement 来讲,PreparedStatement 拥有预编译能力,性能更好,2 者其它的优缺点比较可以查看相关的资料。另外,平常我们插入数据都是一条,2 条,当完成成千上万条数据插入操作的时候,你会看到性能是直线下降的,所以这里会采用 sql 批处理。

代码片段:

public static void insertLogInfo(List<String> data) {
    String sql = "INSERT INTO log_info(
                date_time,s_sitename,
                s_ip,cs_method,cs_uri_stem
                ,cs_uri_query,"
            + "s_port,cs_username,
            c_ip,cs_user_agent,sc_status,
            sc_substatus,sc_win32_status"
            + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)";
    Connection conn = DBSource.getConnection();
    int count = 0;
    try {
        conn.setAutoCommit(false);
        PreparedStatement prest = conn.prepareStatement(sql);  

        for(String str : data) {
            String[] arr = str.split(" ");
            prest.setString(1, arr[0]+" "+arr[1]);
            prest.setString(2, arr[2]);
            prest.setString(3, arr[3]);
            prest.setString(4, arr[4]);
            prest.setString(5, arr[5]);
            prest.setString(6, arr[6]);
            prest.setString(7, arr[7]);
            prest.setString(8, arr[8]);
            prest.setString(9, arr[9]);
            prest.setString(10, arr[10]);
            prest.setString(11, arr[11]);
            prest.setString(12, arr[12]);
            prest.setString(13, arr[13]);
            //添加到批处理
            prest.addBatch();
        }

        int [] intarr = prest.executeBatch();
        conn.commit();  
        prest.clearBatch();
        prest.close();
        conn.close();
        for (int j = 0 ; j < intarr.length; j++) {
            if (intarr[j] > 0) {
                count +=1;
            }
        }
    } catch (Exception e) {
        System.out.println(new Date().toLocaleString()+":数据库插入操作失败"+e.getMessage());
    }
    System.out.println("本次操作成功插入"+count+"行数据");
}

采用多线程并行处理

例如本来 1 万条数据是一个线程进行 JDBC 批量提交,现在启用 5 个线程并行处理,每个线程 2000 条数据,甚至你可以根据数据量来分配更多线程来完成同步提交,性能提升会比较明显。

代码片段:

package com.xj.dbsource;

import java.io.File;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.List;

import com.json.utils.JsonFileUtils;
import com.xj.iislog.bean.JDBCInfo;


/**
 *
 * @author Ziv
 * 数据操作源
 */
public class DBSource extends Thread {

    //声明对象
    private static Statement statement;
    //连接对象
    private static Connection conn;

    private List<String> data;

    public DBSource(List<String> data) {
        super();
        this.data = data;
    }

    public void run(){
        System.out.println(System.currentTimeMillis());
        DBSource.insertLogInfo(data);
        System.out.println(System.currentTimeMillis());
    }

    /**
     *
     * @param sql
     * @return int
     */
    @SuppressWarnings("deprecation")
    public int insert(String sql) {

        int result = 0;
        try {
            conn = getConnection();
            statement = conn.createStatement();
            result = statement.executeUpdate(sql);
            //关闭连接
            conn.close();
        } catch (SQLException e) {
            System.out.println(new Date().toLocaleString()+":数据库插入操作失败" +e.getMessage());
        }

        return result;
    }

    /**
     * prepared方式入库
     * @param arr
     * @return
     * @throws SQLException
     */
    @SuppressWarnings("deprecation")
    public static void insertLogInfo(List<String> data) {

        String sql = "INSERT INTO log_info(
                    date_time,s_sitename,s_ip,
                    cs_method,cs_uri_stem,cs_uri_query,"
                + "s_port,cs_username,c_ip,cs_user_agent,
                sc_status,sc_substatus,sc_win32_status"
                + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)";
        Connection conn = DBSource.getConnection();
        int count = 0;
        try {
            conn.setAutoCommit(false);
            PreparedStatement prest = conn.prepareStatement(sql);  

            for(String str : data) {
                String[] arr = str.split(" ");
                prest.setString(1, arr[0]+" "+arr[1]);
                prest.setString(2, arr[2]);
                prest.setString(3, arr[3]);
                prest.setString(4, arr[4]);
                prest.setString(5, arr[5]);
                prest.setString(6, arr[6]);
                prest.setString(7, arr[7]);
                prest.setString(8, arr[8]);
                prest.setString(9, arr[9]);
                prest.setString(10, arr[10]);
                prest.setString(11, arr[11]);
                prest.setString(12, arr[12]);
                prest.setString(13, arr[13]);
                //添加到批处理
                prest.addBatch();
            }

            int [] intarr = prest.executeBatch();
            conn.commit();  
            prest.clearBatch();
            prest.close();
            conn.close();
            for (int j = 0 ; j < intarr.length; j++) {
                if (intarr[j] > 0) {
                    count +=1;
                }
            }
        } catch (Exception e) {
            System.out.println(new Date().toLocaleString()+":数据库插入操作失败"+e.getMessage());
        }
        System.out.println("本次操作成功插入"+count+"行数据");
    }

    /**
     * 创建连接池
     * @return Connection
     */
    public static Connection getConnection() {
        Connection con = null;
        try {
            //从配置文件中获取jdbc config
            JDBCInfo jdbc = JsonFileUtils.readJsonFile(
                new File("resource/config.json"), JDBCInfo.class
            );
            if (jdbc != null) {
                //mysql驱动加载
                Class.forName(jdbc.getDriver());
                con = DriverManager.getConnection(jdbc.getUrl(),
                        jdbc.getUser(), jdbc.getPassword());
            }
        } catch (Exception e) {
            System.out.println("数据库连接失败" +e.getMessage());
        }
        return con;
    }


    /**
     * 获取Sql
     * @param arr
     * @return
     */
    public String getSql(String[] arr) {

        StringBuffer sql = new StringBuffer("INSERT INTO log_info (");
        sql.append("date_time,");
        sql.append("s_sitename,");
        sql.append("s_ip,");
        sql.append("cs_method,");
        sql.append("cs_uri_stem,");
        sql.append("cs_uri_query,");
        sql.append("s_port,");
        sql.append("cs_username,");
        sql.append("c_ip,");
        sql.append("cs_user_agent,");
        sql.append("sc_status,");
        sql.append("sc_substatus,");
        sql.append("sc_win32_status");
        sql.append(") VALUES ('");
        sql.append(arr[0]+" "+arr[1]);
        sql.append("','");
        sql.append(arr[2]);
        sql.append("','");
        sql.append(arr[3]);
        sql.append("','");
        sql.append(arr[4]);
        sql.append("','");
        sql.append(arr[5]);
        sql.append("','");
        sql.append(arr[6]);
        sql.append("','");
        sql.append(arr[7]);
        sql.append("','");
        sql.append(arr[8]);
        sql.append("','");
        sql.append(arr[9]);
        sql.append("','");
        sql.append(arr[10]);
        sql.append("','");
        sql.append(arr[11]);
        sql.append("','");
        sql.append(arr[12]);
        sql.append("','");
        sql.append(arr[13]);
        sql.append("')");

        return sql.toString();
    }
}
  调用代码


/**
     * 此方法采用递归操作,直至数据全部入库写入完毕
     * 同时调用5个线程进行入库操作
     * @param data
     * @param start
     * @param end
     */
    public static void threadsHandle(List<String> data, int start, int end) {

        int total = data.size();
        int size = (int)data.size()/5;
        //数据不越界
        if (start < total) {
            List<String> temp = null;
            if (end < total) {
                temp = data.subList(start, end);
            } else if (end >= total) {
                temp = data.subList(start, total);
            }
            //执行数据写入
            DBSource thread = new DBSource(temp);
            thread.start();

            start = end;
            end = start+size;
            threadsHandle(data, start, end);
        }
    }

最终结果

原来的 12 分钟,变成了 6 秒左右,效率大了一大截。其他朋友如果有更好的建议,可以跟我交流下 0.0。下次再把数据弄的更大些。


最后修改于 2014-04-25