我们提供安全,免费的手游软件下载!
背景:需要将1000万条某类型数据同步到另一个数据源。使用公司的大数据平台可以快速处理完毕,且仅使用了极少量内存(公司的大数据平台底层是flink,但连接器使用的是chunjun开源产品)。由于个人想尝试使用flink原生的连接器,因此模拟了1000万条数据,并启动了flink单节点,通过flinksql的方式提交了同步任务。最终结果却出现了内存溢出的问题。
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.LoggerFactory;
import java.util.List;
public class Main2 {
static {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
List loggerList = loggerContext.getLoggerList();
loggerList.forEach(logger -> {
logger.setLevel(Level.INFO);
});
}
public static void main(String[] args) throws Exception {
// 代码内容
}
}
以上是一个简单的示例,定义了三个SQL语句。首先是定义了两个数据源,然后进行查询插入操作。运行之后将开始执行flinksql。如果在启动时指定JVM内存大小为 -Xms512m -Xmx1g,会发现根本无法启动,直接出现了OOM错误。如果不指定JVM内存,则程序能够启动,但内存使用量会逐渐升高,甚至需要使用近4GB内存。如果在flink集群上运行,将直接出现OOM错误。
通过调试模式分析代码是如何运行的,经过一番调试后发现了以下代码:
public void openInputFormat() {
try {
// 代码内容
} catch (SQLException var2) {
throw new IllegalArgumentException("open() failed." + var2.getMessage(), var2);
} catch (ClassNotFoundException var3) {
throw new IllegalArgumentException("JDBC-Class not found. - " + var3.getMessage(), var3);
}
}
flink是使用游标来分批拉取数据,因此需要确定是否真正使用了游标。于是,我编写了一个原生的JDBC程序读取数据的程序(没有限制JVM内存):
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class Main3 {
public static void main(String[] args) {
// 代码内容
}
}
最终打印的结果是:(插入图片)
很显然,数据是全部读取出来的。这时需要确认程序是否真正使用了游标。经过一番查看后发现,需要在JDBC的参数里加上&useCursorFetch=true,才能使游标生效。修改完JDBC参数后,问题得到了完全的解决:(插入图片)
除此之外,我使用过Apache的Seatunnel,这个同步数据的速度非常快。然而,在使用时可能会漏掉一些JDBC相关的参数(以MySQL为例)。例如:"rewriteBatchedStatements" : "true",这个批量参数Apache Seatunnel也不会自动添加,需要手动添加,否则数据将逐条插入。我也曾踩过这个坑。
热门资讯