博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Carbondata源码系列(一)文件生成过程
阅读量:6737 次
发布时间:2019-06-25

本文共 15850 字,大约阅读时间需要 52 分钟。

在滴滴的两年一直在加班,人也变懒了,就很少再写博客了,最近在进行Carbondata和hive集成方面的工作,于是乎需要对Carbondata进行深入的研究。

于是新开一个系列,记录自己学习Carbondata的点点滴滴。

1、环境准备

当前版本是1.2.0-SNAPSHOT

git clone https://github.com/apache/carbondata.git

先用IDEA打开carbondata的代码,点击上方的View -> Tool Windows -> Maven Projects, 先勾选一下需要的profile和编译format工程,如下图所示: 

 

 

2、探寻代码入口

我们先打开入口类CarbonDataFrameWriter,找到writeToCarbonFile这个方法

private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = {    val options = new CarbonOption(parameters)    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)    if (options.tempCSV) {      loadTempCSV(options, cc)    } else {      loadDataFrame(options, cc)    }  }

它有两个方式,loadTempCSV和loadDataFrame。

loadTempCSV是先生成CSV文件,再调用LOAD DATA INPATH...的命令导入数据。

这里我们之研究loadDataFrame这种直接生成数据的方式。

一路点进去,目标落在carbonTableSchema的LoadTable的run方法里,接着就是洋洋洒洒的二百行的set代码。它是核心其实是构造一个CarbonLoadModel类。

val carbonLoadModel = new CarbonLoadModel()      carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)      carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)      carbonLoadModel.setStorePath(relation.tableMeta.storePath)      val table = relation.tableMeta.carbonTable      carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)      carbonLoadModel.setTableName(table.getFactTableName)      val dataLoadSchema = new CarbonDataLoadSchema(table)      // Need to fill dimension relation      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)

这些代码为了Load一个文本文件准备的,如果是用dataframe的方式则不需要看了。直接略过,直接调到if (carbonLoadModel.getUseOnePass)这一句。

这个跟字典的生成方式有关,这个值默认是false,先忽略true的过程吧,看主流程就行,下面这哥俩才是我们要找的。

// 生成字典文件          GlobalDictionaryUtil            .generateGlobalDictionary(              sparkSession.sqlContext,              carbonLoadModel,              relation.tableMeta.storePath,              dictionaryDataFrame) // 生成数据文件      CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,            carbonLoadModel,            relation.tableMeta.storePath,            columnar,            partitionStatus,            None,            loadDataFrame,            updateModel)

 

3、字段生成过程

先看GlobalDictionaryUtil.generateGlobalDictionary方法

if (StringUtils.isEmpty(allDictionaryPath)) {        LOGGER.info("Generate global dictionary from source data files!")        // load data by using dataSource com.databricks.spark.csv        var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))        var headers = carbonLoadModel.getCsvHeaderColumns        headers = headers.map(headerName => headerName.trim)        val colDictFilePath = carbonLoadModel.getColDictFilePath        if (colDictFilePath != null) {          // generate predefined dictionary          generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,            dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)        }        if (headers.length > df.columns.length) {          val msg = "The number of columns in the file header do not match the " +                    "number of columns in the data file; Either delimiter " +                    "or fileheader provided is not correct"          LOGGER.error(msg)          throw new DataLoadingException(msg)        }        // use fact file to generate global dict        val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,          headers, df.columns)        if (requireDimension.nonEmpty) {          // select column to push down pruning          df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)          val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,            requireDimension, storePath, dictfolderPath, false)          // combine distinct value in a block and partition by column          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)            .partitionBy(new ColumnPartitioner(model.primDimensions.length))          // generate global dictionary files          val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()          // check result status          checkStatus(carbonLoadModel, sqlContext, model, statusList)        } else {          LOGGER.info("No column found for generating global dictionary in source data files")        }      } else {        generateDictionaryFromDictionaryFiles(sqlContext,          carbonLoadModel,          storePath,          carbonTableIdentifier,          dictfolderPath,          dimensions,          allDictionaryPath)      }
View Code

包含了两种情况:不存在字典文件和已存在字段文件。

先看不存在的情况

// use fact file to generate global dict        val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,          headers, df.columns)        if (requireDimension.nonEmpty) {          // 只选取标记为字典的维度列          df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)          val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,            requireDimension, storePath, dictfolderPath, false)          // 去重之后按列分区          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)            .partitionBy(new ColumnPartitioner(model.primDimensions.length))          // 生成全局字段文件          val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()          // check result status          checkStatus(carbonLoadModel, sqlContext, model, statusList)        } else {          LOGGER.info("No column found for generating global dictionary in source data files")        }

先从源文件当中读取所有维度列,去重之后按列分区,然后输出,具体输出的过程请看CarbonGlobalDictionaryGenerateRDD的internalCompute方法。

val dictWriteTask = new DictionaryWriterTask(valuesBuffer,            dictionaryForDistinctValueLookUp,            model.table,            model.columnIdentifier(split.index),            model.hdfsLocation,            model.primDimensions(split.index).getColumnSchema,            model.dictFileExists(split.index)          )          // execute dictionary writer task to get distinct values          val distinctValues = dictWriteTask.execute()          val dictWriteTime = System.currentTimeMillis() - t3          val t4 = System.currentTimeMillis()          // if new data came than rewrite sort index file          if (distinctValues.size() > 0) {            val sortIndexWriteTask = new SortIndexWriterTask(model.table,              model.columnIdentifier(split.index),              model.primDimensions(split.index).getDataType,              model.hdfsLocation,              dictionaryForDistinctValueLookUp,              distinctValues)            sortIndexWriteTask.execute()          }          val sortIndexWriteTime = System.currentTimeMillis() - t4          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()          // After sortIndex writing, update dictionaryMeta          dictWriteTask.updateMetaData()
View Code

字典文件在表目录的下的Metadata目录下,它需要生成三种文件

1、字段文件,命令方式为 列ID.dict

2、sort index文件,命令方式为 列ID.sortindex

3、字典列的meta信息,命令方式为 列ID.dictmeta

4、数据生成过程

请打开CarbonDataRDDFactory,找到loadCarbonData这个方法,方法里面包括了从load命令和从dataframe加载的两种方式,代码看起来是有点儿又长又臭的感觉。我们只关注loadDataFrame的方式就好。

def loadDataFrame(): Unit = {        try {          val rdd = dataFrame.get.rdd      // 获取数据的位置          val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>            DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)          }.distinct.size       // 确保executor数量要和数据的节点数一样多          val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,            sqlContext.sparkContext)          val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)      // 生成数据文件          status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,            new DataLoadResultImpl(),            carbonLoadModel,            currentLoadCount,            tableCreationTime,            schemaLastUpdatedTime,            newRdd).collect()        } catch {          case ex: Exception =>            LOGGER.error(ex, "load data frame failed")            throw ex        }      }

 

打开NewDataFrameLoaderRDD类,查看internalCompute方法,这个方法的核心是这句话

new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)

打开DataLoadExecutor,execute方法里面的核心是DataLoadProcessBuilder的build方法,根据表不同的参数设置,DataLoadProcessBuilder的build过程会有一些不同

public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,      CarbonIterator[] inputIterators) throws Exception {    CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);    if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {      // 没有排序列或者carbon.load.sort.scope设置为NO_SORT的      return buildInternalForNoSort(inputIterators, configuration);    } else if (configuration.getBucketingInfo() != null) {      // 设置了Bucket的表      return buildInternalForBucketing(inputIterators, configuration);    } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {      // carbon.load.sort.scope设置为BATCH_SORT      return buildInternalForBatchSort(inputIterators, configuration);    } else {      return buildInternal(inputIterators, configuration);    }  }

下面仅介绍标准的导入过程buildInternal:

private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,      CarbonDataLoadConfiguration configuration) {    // 1. Reads the data input iterators and parses the data.    AbstractDataLoadProcessorStep inputProcessorStep =        new InputProcessorStepImpl(configuration, inputIterators);    // 2. Converts the data like dictionary or non dictionary or complex objects depends on    // data types and configurations.    AbstractDataLoadProcessorStep converterProcessorStep =        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);    // 3. Sorts the data by SortColumn    AbstractDataLoadProcessorStep sortProcessorStep =        new SortProcessorStepImpl(configuration, converterProcessorStep);    // 4. Writes the sorted data in carbondata format.    return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);  }

主要是分4个步骤:

1、读取数据,并进行格式转换,这一步骤是读取csv文件服务的,dataframe的数据格式都已经处理过了

2、根据字段的数据类型和配置,替换掉字典列的值;非字典列会被替换成byte数组

3、按照Sort列进行排序

4、把数据用Carbondata的格式输出

 

下面我们从第二步DataConverterProcessorStepImpl开始说起,在getIterator方法当中,会发现每一个CarbonRowBatch都要经过localConverter的convert方法转换,localConverter中只有RowConverterImpl一个转换器。

RowConverterImpl由很多的FieldConverter组成,在initialize方法中可以看到它是由FieldEncoderFactory的createFieldEncoder方法生成的。

public FieldConverter createFieldEncoder(DataField dataField,      Cache
cache, CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat, DictionaryClient client, Boolean useOnePass, String storePath, boolean tableInitialize, Map
localCache, boolean isEmptyBadRecord) throws IOException { // Converters are only needed for dimensions and measures it return null. if (dataField.getColumn().isDimension()) { if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) && !dataField.getColumn().isComplex()) { return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) && !dataField.getColumn().isComplex()) { return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat, index, client, useOnePass, storePath, tableInitialize, localCache, isEmptyBadRecord); } else if (dataField.getColumn().isComplex()) { return new ComplexFieldConverterImpl( createComplexType(dataField, cache, carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache), index); } else { return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); } } else { return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord); } }
View Code

从这段代码当中可以看出来,它是分成了几种类型的

1、维度类型,编码方式为Encoding.DIRECT_DICTIONARY的非复杂列,采用DirectDictionaryFieldConverterImpl (主要是TIMESTAMP和DATE类型),换算成值和基准时间的差值

2、维度类型,编码方式为Encoding.DICTIONARY的非复杂列,采用DictionaryFieldConverterImpl (非高基数的字段类型),把字段换成字典中的key(int类型)

3、维度类型,复杂列,采用ComplexFieldConverterImpl  (复杂字段类型,Sturct和Array类型),把字段转成二进制

4、维度类型,高基数列,采用NonDictionaryFieldConverterImpl,原封不动,原来是啥样,现在还是啥样

5、指标类型,采用MeasureFieldConverterImpl (值类型,float、double、int、bigint、decimal等),原封不动,原来是啥样,现在还是啥样

 

第三步SortProcessorStepImpl,关键点在SorterFactory.createSorter是怎么实现的

public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {    boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,            CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);    Sorter sorter;    if (offheapsort) {      if (configuration.getBucketingInfo() != null) {        sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),            configuration.getBucketingInfo());      } else {        sorter = new UnsafeParallelReadMergeSorterImpl(counter);      }    } else {      if (configuration.getBucketingInfo() != null) {        sorter =            new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());      } else {        sorter = new ParallelReadMergeSorterImpl(counter);      }    }    if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {      if (configuration.getBucketingInfo() == null) {        sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);      } else {        LOGGER.warn(            "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass()                .getName());      }    }    return sorter;  }
View Code

居然还可以使用堆外内存sort,设置enable.unsafe.sort为true就可以开启了。我们看默认的ParallelReadMergeSorterImpl吧。

超过100000条记录就要把数据排序,然后生成一个文件,文件数超过20个文件之后,就要做一次文件合并。

规则在NewRowComparator和NewRowComparatorForNormalDims当中

相关参数:

carbon.sort.size 100000

carbon.sort.intermediate.files.limit 20

 

到最后一步了,打开DataWriterProcessorStepImpl类,它是通过CarbonFactHandlerFactory.createCarbonFactHandler生成一个CarbonFactHandler,通过CarbonFactHandler的addDataToStore方法处理CarbonRow

addDataToStore的实现很简单,当row的数量达到一个blocklet的大小之后,就往线程池里提交一个异步的任务Producer进行处理

public void addDataToStore(CarbonRow row) throws CarbonDataWriterException {    dataRows.add(row);    this.entryCount++;    // if entry count reaches to leaf node size then we are ready to write    // this to leaf node file and update the intermediate files    if (this.entryCount == this.blockletSize) {      try {        semaphore.acquire();        producerExecutorServiceTaskList.add(            producerExecutorService.submit(                new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)            )        );        blockletProcessingCount.incrementAndGet();        // set the entry count to zero        processedDataCount += entryCount;        LOGGER.info("Total Number Of records added to store: " + processedDataCount);        dataRows = new ArrayList<>(this.blockletSize);        this.entryCount = 0;      } catch (InterruptedException e) {        LOGGER.error(e, e.getMessage());        throw new CarbonDataWriterException(e.getMessage(), e);      }    }  }
View Code

这里用到了生产者消费者的模式,Producer的处理是多线程的,Consumer是单线程的;Producer主要是负责数据的压缩,Consumer负责进行输出,数据的交换通过blockletDataHolder。

 

相关参数:

carbon.number.of.cores.while.loading 2 (Producer的线程数)

number.of.rows.per.blocklet.column.page 32000

 

文件生成主要包含以上过程,限于文章篇幅,下一章再继续接着写Carbondata的数据文件格式细节。

 

岑玉海

转载请注明出处,谢谢!

 

转载于:https://www.cnblogs.com/cenyuhai/p/6681239.html

你可能感兴趣的文章