【Flink Sink 流数据批量写入数据库】

概要

Flink流数据常常存在写入数据库的场景,一般是通过继承RichSinkFunction来实现对数据的写入。如果sink之前不做优化处理,写入时都是单条写入。单条写入有许多弊端:

1、写入频繁造成数据库压力大

2、写入速度慢、效率低,造成反压

所以需要使用批量写入的方式,本文通过开窗window定时缓存周期数据形成批,下发给sink节点,本文通过大数据量生产环境验证,不仅实现了批量写入,还在防止数据倾斜支持并行等方面做了优化,乃呕心之作。

批量写入功能实现

主函数

KeyedStream keyedStream=sinkStream.keyBy(new  HashModKeySelector(keyIndexList,paralleSize));
winStream=keyedStream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(windowSize))) .process(new RowProcessWindowFunction(keyIndexList));
DataStreamSink  sink=winStream.addSink(new DbSinkFunction(conf,writeSql));

1、对业务数据进行分组HashModKeySelector

public class HashModKeySelector implements KeySelector {
	private static final Logger logger = LoggerFactory.getLogger(HashModKeySelector2.class);
	private static final long serialVersionUID = 1L;
	/**
	 * key在row中的索引
	 */
	private List keyIndexList=null;
	private Integer paralleSize;
	private Map md5Map = new ConcurrentHashMap();
	public HashModKeySelector2(List keyIndexList, Integer paralleSize) {
		this.keyIndexList=keyIndexList;
		this.paralleSize=paralleSize;
	}
	@Override
	public String getKey(Row value) {
		int size=keyIndexList.size();
		Row keyRow=new Row(size);		
		for(int i=0;i<size;i++) {
			int index=keyIndexList.get(i);
			keyRow.setField(i, value.getField(index));
		}
		int keyHash=keyRow.hashCode()%paralleSize;
		String strKey=String.valueOf(keyHash);
		String md5Value = md5Map.get(strKey);
		if(StringUtils.isBlank(md5Value)){
			md5Value=md5(strKey);
			md5Map.put(strKey,md5Value);
		}
		return md5Value;
	}
	public static String md5(String key) {
		String result="";
		try {
			// 创建MD5消息摘要对象
			MessageDigest md = MessageDigest.getInstance("MD5");
			// 计算消息的摘要
			byte[] digest = md.digest(key.getBytes());
			// 将摘要转换为十六进制字符串
			String hexString = bytesToHex(digest);
			result=hexString;
		} catch (Exception e) {
			logger.error("计算{}md5值失败:",key,e);
			return key;
		}
		return result;
	}
	public static String bytesToHex(byte[] bytes) {
		StringBuilder hexString = new StringBuilder();
		for (byte b : bytes) {
			String hex = Integer.toHexString(0xff & b);
			if (hex.length() == 1) {
				hexString.append('0');
			}
			hexString.append(hex);
		}
		return hexString.toString();
	}
}

2、 使用滚动窗口缓存数据,将单条数据放入集合中,发送到下游

public class RowProcessWindowFunction extends ProcessWindowFunction{
	private static final Logger LOG = LoggerFactory.getLogger(RowProcessWindowFunction.class);
	/**
	 * key在row中的索引
	 */
	private List keyIndexList;
	public RowProcessWindowFunction(List keyIndexList) {
		if(keyIndexList==null||keyIndexList.size()==0) {
			LOG.error("keyIndexList is empty");
			throw new RuntimeException("keyIndexList is empty");
		}
		this.keyIndexList=keyIndexList;
	}
	@Override
	public void process(String key, Context context, Iterable inRow, Collector out) throws Exception {
		List rowList=new ArrayList();
		for (Row row : inRow) {
			rowList.add(row);
		}
		int size=rowList.size();
		Row[] rows=new Row[size];
		int index=0;
		for(Row tmpRow:rowList) {
			rows[index]=tmpRow;
			index=index+1;
		}
		out.collect(rows);
	}
}

3、批量写入

public class DbSinkFunction extends RichSinkFunction {
    private static final Logger LOG = LoggerFactory.getLogger(DbSinkFunction.class);
    private String driver = null;
    private String sql = null;
    DbConnectionPool pool = null;
    private Integer laodRate;
    private int columnTypes[];

    public DbSinkFunction(String dbDriver, String dmlSql) {
        this.driver = dbDriver;
        this.sql = dmlSql;
    }
   

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //创建连接池
        pool = new DbConnectionPool(conf, driver); 
    }

    @Override
    public void close() throws Exception {
        //关闭资源、释放资源
        super.close();
        //关闭连接池
        pool.close();
    }

    /**
     * 写入数据库
     */
    @Override
    public void invoke(I record, Context context) throws Exception {

        PreparedStatement ps = null;
        Boolean isBatch = false;        
        String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));
        int length=1;
        Connection connection =null;
        try {
        	connection =pool.getConnection();
            ps = connection.prepareStatement(sql);
            //如果是批量数据
            if (record instanceof Row[]) {
                isBatch = true;
                connection.setAutoCommit(false);
                Row[] rowArray = (Row[]) record;
                length=rowArray.length;
                LOG.info("Row array:{}",length);
                int no=0;
                for(int i=1;i<=length;i++) {
                	Row row=rowArray[i-1];
                	fillPreparedStatement(ps, row);
                	ps.addBatch();
                	if(i%3000==0) {
                		ps.executeBatch();
                		connection.commit();
                		ps.clearBatch();
                		no=0;
                	}
                	no=no+1;
                }
                if(no>0) {
                	ps.executeBatch();
            		connection.commit();
            		ps.clearBatch();
              	}
            } else if (record instanceof Row) {
                //单条数据
                isBatch = false;
                Row row = (Row) record;
                fillPreparedStatement(ps, row);
                ps.execute();
            } else {
                throw new RuntimeException("不支持的数据类型 " + record.getClass());
            }
        } catch (SQLException e) {
            connection.rollback();
            if (isBatch) {
                doOneInsert(sql, connection, (Row[]) record);
            }
        } catch (Exception e) {
            LOG.error("写入失败", e);
        } finally {
        	closeDBResources(ps,connection);
        }

    }
    /**
     * 批量失败时 单条写入
     *
     * @param sql
     * @param connection
     * @param rowArray
     */
    protected void doOneInsert(String sql, Connection connection, Row[] rowArray) {
        PreparedStatement ps = null;
        String logSql = sql.substring(0,sql.toUpperCase().indexOf("VALUES"));
        try {
            Integer allSize = rowArray.length;
            Integer errCount = 0;
            connection.setAutoCommit(true);
            ps = connection.prepareStatement(sql);
            for (Row row : rowArray) {
                try {
                    fillPreparedStatement(ps, row);
                    ps.execute();
                } catch (SQLException e) {
                    errCount++;
                } finally {
                    ps.clearParameters();
                }
            }

        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {

        	closeDBResources(ps,null);
        }
    }
    private void closeDBResources(Statement stmt, Connection conn) {
    	try {
			if (!(null== stmt||stmt.isClosed())) {
			     stmt.close();
			}
	        if (!(null == conn||conn.isClosed())) {
	            conn.close();
	        }
		} catch (SQLException e) {
		}
    }

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/22e4acea46.html