关于hive on spark会话的共享状态

spark sql中有一个类:

org.apache.spark.sql.internal.SharedState

它用于执行以下操作:

[En]

It is used to do:

1、元数据地址管理(warehousePath)
2、查询结果缓存管理(cacheManager)
3、程序中的执行状态和metrics的监控(statusStore)
4、默认元数据库的目录管理(externalCatalog)
5、全局视图管理(主要是防止元数据库中存在重复)(globalTempViewManager)

1:首先介绍元数据地址管理(warehousePath)

此块的主要用途是获取Spark SQL元数据库的路径地址,因此通常,我们默认将配置单元作为Spark SQL元数据库,因为

[En]

The main purpose of this block is to get the path address of the spark sql Metabase, so in general, we default to hive as the spark sql Metabase, because

它首先加载hive的配置文件hive-site.xml,然后根据hive-site.xml中获取的信息获取hive元数据库的路径:

[En]

It first loads the configuration file “hive-site.xml” of hive, and then obtains the path to the hive Metabase based on the information obtained in hive-site.xml:

hive.metastore.warehouse.dir

因此,有时,如果我们不使用配置单元作为Spark SQL元数据数据库,那么我们此时加载的配置单元元数据路径应该为空。

[En]

So sometimes, if we don’t use hive as the spark sql metadata database, then the hive metadata path we load at this time should be null.

val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")

如果hiveWarehouse Dir为空,则加载Spark SQL自己的元数据管理地址(spak.sql.warehouse.dir)并将该地址的值赋给hive.metastore.warehouse.dir

[En]

If hiveWarehouseDir is null, load the spark sql’s own metadata management address (spark.sql.warehouse.dir) and assign the value of that address to hive.metastore.warehouse.dir

因此,一般流程是获取hiveWarehouse Dir:

[En]

So the general process is to get the hiveWarehouseDir:

关于hive on spark会话的共享状态

具体代码:

[En]

Specific code:

关于hive on spark会话的共享状态关于hive on spark会话的共享状态
val warehousePath: String = {
    val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
    if (configFile != null) {
      logInfo(s"loading hive config file: $configFile")
      sparkContext.hadoopConfiguration.addResource(configFile)
    }

    // hive.metastore.warehouse.dir only stay in hadoopConf
    sparkContext.conf.remove("hive.metastore.warehouse.dir")
    // Set the Hive metastore warehouse path to the one we use
    val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
    if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
      // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
      // we will respect the value of hive.metastore.warehouse.dir.

      sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
      logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
        s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
        s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
      hiveWarehouseDir
    } else {
      // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
      // the value of spark.sql.warehouse.dir.

      // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
      // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.

      val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
      logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " +
        s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
      sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
      sparkWarehouseDir
    }
  }
  logInfo(s"Warehouse path is '$warehousePath'.")

warehousePath

2:CacheManager

缓存查询结果;其优点是,如果您稍后需要该查询的内容,则不需要再次查询数据源(您有时间编写单独的文章)。

[En]

Cache the query results; the advantage is that if you need the contents of this query later, you don’t need to query the data source again (you have time to write a separate article).

具体代码:

[En]

Specific code:

关于hive on spark会话的共享状态关于hive on spark会话的共享状态
/**
   * Class for caching query results reused in future executions.

   */
  val cacheManager: CacheManager = new CacheManager

cacheManager

3:statusStore

代码:

[En]

Code:

关于hive on spark会话的共享状态关于hive on spark会话的共享状态
/**
   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.

   */
  val statusStore: SQLAppStatusStore = {
    val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
    val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
    sparkContext.listenerBus.addToStatusQueue(listener)
    val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
    sparkContext.ui.foreach(new SQLTab(statusStore, _))
    statusStore
  }

statusStore

说白了,这段代码将SQL的状态和一些指标写入监听器。

[En]

To put it bluntly, this code writes the status of sql and some metrics metrics to the listener.

因此,问题来了。监听程序必须实时监听(读取),然后Spark SQL必须保持对监听程序的写入。然后按照传统的列表、地图结构,在读取数据时要对结构进行修改,会出现错误。

[En]

So here comes the problem. The listener must listen (read) in real time, and then spark sql has to keep writing to the listener. Then according to the traditional list,map structure, the structure has to be modified when reading data, and errors will occur.

因此,Spark SQL使用写时复制容器:

[En]

Therefore, spark sql uses a write-time replication container:

private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]

在不影响阅读的情况下继续写入信息

[En]

Keep writing information without affecting reading

4、externalCatalog

获取Spark会话的内部目录(即hiveWarehouse Dir)。如果不存在,请根据hiveWarehouse Dir创建一个。当然,Spark会通过回调函数监控当前目录中的事件:

[En]

Get the internal directory of the spark session (that is, hiveWarehouseDir). If it does not exist, create one according to hiveWarehouseDir. Of course, spark will monitor the events in the current directory through the callback function:

externalCatalog.addListener(new ExternalCatalogEventListener {
      override def onEvent(event: ExternalCatalogEvent): Unit = {
        sparkContext.listenerBus.post(event)
      }
    })

以下是代码:

[En]

Here is the code:

关于hive on spark会话的共享状态关于hive on spark会话的共享状态
/**
   * A catalog that interacts with external systems.

   */
  lazy val externalCatalog: ExternalCatalog = {
    val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
      SharedState.externalCatalogClassName(sparkContext.conf),
      sparkContext.conf,
      sparkContext.hadoopConfiguration)

    val defaultDbDefinition = CatalogDatabase(
      SessionCatalog.DEFAULT_DATABASE,
      "default database",
      CatalogUtils.stringToURI(warehousePath),
      Map())
    // Create default database if it doesn't exist
    if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
      // There may be another Spark application creating default database at the same time, here we
      // set ignoreIfExists = true to avoid DatabaseAlreadyExists exception.

      externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true)
    }

    // Make sure we propagate external catalog events to the spark listener bus
    externalCatalog.addListener(new ExternalCatalogEventListener {
      override def onEvent(event: ExternalCatalogEvent): Unit = {
        sparkContext.listenerBus.post(event)
      }
    })

    externalCatalog
  }

externalCatalog

5、

此处就是防止spark执行过程中的临时数据库出现在externalCatalog中,因为如果spark的GLOBAL_TEMP_DATABASE出现在externalCatalog中的话。那么随着程序的执行,下一个线程想要获取元数据库地址的时候,就没法在里面创建hiveWarehouseDir。因此,如果在externalCatalog中存在GLOBAL_TEMP_DATABASE,那么就抛异常

关于hive on spark会话的共享状态关于hive on spark会话的共享状态
/**
   * A manager for global temporary views.

   */
  lazy val globalTempViewManager: GlobalTempViewManager = {
    // System preserved database should not exists in metastore. However it's hard to guarantee it
    // for every session, because case-sensitivity differs. Here we always lowercase it to make our
    // life easier.

    val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
    if (externalCatalog.databaseExists(globalTempDB)) {
      throw new SparkException(
        s"$globalTempDB is a system preserved database, please rename your existing database " +
          "to resolve the name conflict, or set a different value for " +
          s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.")
    }
    new GlobalTempViewManager(globalTempDB)
  }

globalTempViewManager

Original: https://www.cnblogs.com/niutao/articles/10915322.html
Author: niutao
Title: 关于hive on spark会话的共享状态

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/6885/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部
最近整理资源【免费获取】:   👉 程序员最新必读书单  | 👏 互联网各方向面试题下载 | ✌️计算机核心资源汇总