掘金 后端 ( ) • 2024-04-18 09:56

文章内部分内容引用了下面相关的博客,如有侵权请联系我进行删除,谢谢 https://www.cnblogs.com/superhedantou/p/9004820.html

https://zhuanlan.zhihu.com/p/354769480

https://zhuanlan.zhihu.com/p/638648224

https://developer.aliyun.com/article/1288649

所谓的检查点(checkpoint)其实就是通过将 RDD 中间结果写入磁盘由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题, 可以从检查点开始重做血缘,减少了开销。保障了数据的高可用以及容错。

有两种类型的checkpoint:

  • reliable,spark core中将实际的中间的RDD数据保存到可靠的分布式文件系统的checkpoint
  • local,spark streaming或者GraphX,截断RDD血缘关系的checkpoint

checkpoint与cache/persisit的区别

cache/persisit把 RDD 计算出来然后放在内存或者磁盘中,由exector的bloclManager维护, RDD 的依赖关系仍然保留, 不会丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过依赖链重新计算出来。

checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,依赖关系被丢掉了, 是通过复制实现的高容错。checkpoint就是针对整个RDD计算链条中特别需要数据持久化的环节,其高可用也是利用了分布式文件系统的优势。

spark checkpoint的使用

// demo
SparkConf sparkAppConf = new SparkConf().setMaster("local[2]").setAppName("sparkRddDemo");  
JavaSparkContext sparkContext = new JavaSparkContext(sparkAppConf);  
sparkContext.setCheckpointDir("D:\\project\\sparkDemo\\checkpoint_dir");  
  
List<String> values = Arrays.asList("hello", "java", "hello", "spark");  
  
JavaRDD<String> rdd = sparkContext.parallelize(values);  
// 缓存中内存中
rdd.cache();  // cache底层调用了persist传入了MEMORY_ONLY  
// 缓存到内存及磁盘中,内存溢出部分写入磁盘
rdd.persist(StorageLevel.MEMORY_AND_DISK());  
rdd.checkpoint();  // 使用需要设置sparkContext的setCheckpointDir配置,传入一个目录路径  
  
sparkContext.stop();

建议对checkpoint的RDD使用cache缓存,这样checkpoint的job只需从cache缓存中读取数据即可,否则需要再从头计算一次RDD。

关于在一个spark程序中多个RDD进行checkpoint的问题

关于这个问题,我一开始并没有明确的答案,按照我的理解,应该是一个rdd调用一次checkpoint就会保存一次。 为此,我写了个简单的demo进行测试:

package src.main.rdd;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

import java.util.Arrays;
import java.util.List;


public class rddAppDemo_checkpoint {
    public static void main(String[] args) {

        String checkpointPath = "/home/saberbin/logs/checkpoints";

        SparkConf sparkAppConf = new SparkConf().setMaster("local[2]").setAppName("sparkRddDemo");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkAppConf);
        sparkContext.setCheckpointDir(checkpointPath);

        JavaRDD<String> textFile = sparkContext.textFile("/home/saberbin/logs/QswaS.txt");

        JavaRDD<List<String>> mapRDD = textFile.map(new Function<String, List<String>>() {
            @Override
            public List<String> call(String s) throws Exception {
                return Arrays.asList(s.split(","));
            }
        });

        JavaRDD<String> rdd = mapRDD.map(new Function<List<String>, String>() {
            @Override
            public String call(List<String> strings) throws Exception {
                return strings.get(0);
            }
        });

        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        sparkContext.stop();

    }

}

这里一共使用了3个rdd

textFile
mapRDD
rdd

分别保存3个rdd的checkpoint

package src.main.rdd;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

import java.util.Arrays;
import java.util.List;


public class rddAppDemo_checkpoint {
    public static void main(String[] args) {

        String checkpointPath = "/home/saberbin/logs/checkpoints";

        SparkConf sparkAppConf = new SparkConf().setMaster("local[2]").setAppName("sparkRddDemo");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkAppConf);
        sparkContext.setCheckpointDir(checkpointPath);

        JavaRDD<String> textFile = sparkContext.textFile("/home/saberbin/logs/QswaS.txt");

        JavaRDD<String> textFile2 = saveCheckpoint(textFile);

        JavaRDD<List<String>> mapRDD = textFile2.map(new Function<String, List<String>>() {
            @Override
            public List<String> call(String s) throws Exception {
                return Arrays.asList(s.split(","));
            }
        });

        JavaRDD<String> rdd = mapRDD.map(new Function<List<String>, String>() {
            @Override
            public String call(List<String> strings) throws Exception {
                return strings.get(0);
            }
        });

        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        sparkContext.stop();

    }

    public static <T> JavaRDD saveCheckpoint(JavaRDD<T> rdd){
        rdd.cache();
        rdd.checkpoint();
        return rdd;
    }
}

执行完成之后查看checkpoint目录

image.png

然后我们修改一下保存第二个rdd

package src.main.rdd;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

import java.util.Arrays;
import java.util.List;


public class rddAppDemo_checkpoint {
    public static void main(String[] args) {

        String checkpointPath = "/home/saberbin/logs/checkpoints";

        SparkConf sparkAppConf = new SparkConf().setMaster("local[2]").setAppName("sparkRddDemo");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkAppConf);
        sparkContext.setCheckpointDir(checkpointPath);

        JavaRDD<String> textFile = sparkContext.textFile("/home/saberbin/logs/QswaS.txt");

        JavaRDD<List<String>> mapRDD = textFile.map(new Function<String, List<String>>() {
            @Override
            public List<String> call(String s) throws Exception {
                return Arrays.asList(s.split(","));
            }
        });

        JavaRDD<List<String>> mapRDD2 = saveCheckpoint(mapRDD);

        JavaRDD<String> rdd = mapRDD2.map(new Function<List<String>, String>() {
            @Override
            public String call(List<String> strings) throws Exception {
                return strings.get(0);
            }
        });

        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        sparkContext.stop();

    }

    public static <T> JavaRDD saveCheckpoint(JavaRDD<T> rdd){
        rdd.cache();
        rdd.checkpoint();
        return rdd;
    }
}

image.png

然后我们修改下保存第三个rdd

package src.main.rdd;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

import java.util.Arrays;
import java.util.List;


public class rddAppDemo_checkpoint {
    public static void main(String[] args) {

        String checkpointPath = "/home/saberbin/logs/checkpoints";

        SparkConf sparkAppConf = new SparkConf().setMaster("local[2]").setAppName("sparkRddDemo");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkAppConf);
        sparkContext.setCheckpointDir(checkpointPath);

        JavaRDD<String> textFile = sparkContext.textFile("/home/saberbin/logs/QswaS.txt");

        JavaRDD<List<String>> mapRDD = textFile.map(new Function<String, List<String>>() {
            @Override
            public List<String> call(String s) throws Exception {
                return Arrays.asList(s.split(","));
            }
        });

        JavaRDD<String> rdd = mapRDD.map(new Function<List<String>, String>() {
            @Override
            public String call(List<String> strings) throws Exception {
                return strings.get(0);
            }
        });

        saveCheckpoint(rdd);

        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        sparkContext.stop();

    }

    public static <T> JavaRDD saveCheckpoint(JavaRDD<T> rdd){
        rdd.cache();
        rdd.checkpoint();
        return rdd;
    }
}

image.png

然后我们修改一下保存所有的rdd

package src.main.rdd;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

import java.util.Arrays;
import java.util.List;


public class rddAppDemo_checkpoint {
    public static void main(String[] args) {

        String checkpointPath = "/home/saberbin/logs/checkpoints";

        SparkConf sparkAppConf = new SparkConf().setMaster("local[2]").setAppName("sparkRddDemo");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkAppConf);
        sparkContext.setCheckpointDir(checkpointPath);

        JavaRDD<String> textFile = sparkContext.textFile("/home/saberbin/logs/QswaS.txt");
        
        saveCheckpoint(textFile);

        JavaRDD<List<String>> mapRDD = textFile.map(new Function<String, List<String>>() {
            @Override
            public List<String> call(String s) throws Exception {
                return Arrays.asList(s.split(","));
            }
        });

        saveCheckpoint(mapRDD);
        
        JavaRDD<String> rdd = mapRDD.map(new Function<List<String>, String>() {
            @Override
            public String call(List<String> strings) throws Exception {
                return strings.get(0);
            }
        });

        saveCheckpoint(rdd);

        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        sparkContext.stop();

    }

    public static <T> JavaRDD saveCheckpoint(JavaRDD<T> rdd){
        rdd.cache();
        rdd.checkpoint();
        return rdd;
    }
}

image.png

结论

从上述的实验可以得出以下结论:

  • 一个spark工程中如果存在多个rdd都调用了checkpoint方法,那么则会只有一个rdd保存了
  • rdd的checkpoint保存的时候,会在配置的checkpoint目录下新建一个目录保存当前执行的checkpoint,并且会根据当前rdd的序号创建一个子目录保存当前rdd的数据,例如如果是保存第一个rdd则会创建名为rdd-1的目录保存当前rdd的数据

对于为什么只会保存一个rdd的情况,询问了讯飞火星AI,得到的回答如下:

尽管checkpoint提供了容错的能力,但是它也会占用额外的存储资源和计算资源。
因此,通常不会对所有的RDD都执行checkpoint操作,而是选择性地对关键的RDD进行checkpoint,以平衡资源使用和容错需求。

具体可能需要查看下源码了,感兴趣的小伙伴可以去查看下。

spark读取checkpoint

可以通过sparkContext.checkpointFile方法读取checkpoint文件,该方法需要传入一个路径,该路径必须到对应的rdd数据目录,即下图箭头所指的目录层级

image.png

package src.main.rdd;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import java.util.Arrays;
import java.util.List;


public class rddAppDemo_checkpoint2 {
    public static void main(String[] args) {
        SparkConf sparkAppConf = new SparkConf().setMaster("local[2]").setAppName("sparkRddDemo");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkAppConf);
        sparkContext.setCheckpointDir("/home/saberbin/logs/checkpoints");


        JavaRDD<Object> rdd = sparkContext.checkpointFile("/home/saberbin/logs/checkpoints/09a9acfc-e2d2-4e1d-a18a-558c84220f6c/rdd-3");

        rdd.foreach(new VoidFunction<Object>() {
            @Override
            public void call(Object strings) throws Exception {
                System.out.println(strings);
            }
        });

        sparkContext.stop();

    }
}

如果可以确定当前读取的rdd的泛型,则可以直接添加具体的泛型

JavaRDD<String> rdd = sparkContext.checkpointFile("/home/saberbin/logs/checkpoints/09a9acfc-e2d2-4e1d-a18a-558c84220f6c/rdd-3");

rdd.foreach(new VoidFunction<String>() {
    @Override
    public void call(String strings) throws Exception {
        System.out.println(strings);
    }
});