基于livy的Spark连接方案

基于livy的Spark连接方案

背景

去年在厦门做项目实施过程,集群接入spark相关功能中,采用的最简单的连接方案,也就是“通过独立SparkSession会话来完成spark作业提交和执行”,但该方案存在着频繁申请和释放集群资源的问题,这一点也是行方比较难接受的。

客户现场提供了一个他们目前在其他项目中使用的方案,通过Livy接入Spark作业。该方案最大的特点,就是通过常驻Session实现了共享session,从而实现集群资源共享。同时,由于少了集群资源的频繁申请和释放以及资源包的分发,也带来了执行效率的提升。此外行里还演示了一个厂商的DataFrame数据集传递计算。

在此背景下,通过网络整理了Apache Livy的一份使用调研,并针对调研结果结合我们目前已经实施的spark方案的不足,提出一个共享SparkSession的方案设计。

一、Livy组件调研

1.1、概览

Apache Livy是一种通过REST接口实现与Spark集群轻松交互的服务解决方案。它允许通过简单的REST接口或RPC客户端库轻松提交Spark作业或Spark代码片段、同步或异步结果检索以及Spark上下文管理。ApacheLivy还简化了Spark和应用程序服务器之间的交互,从而使Spark能够用于交互式web/移动应用程序。其他功能包括:

(1) 具有可由多个客户端用于多个Spark作业的长期运行的Spark上下文

(2) 跨多个作业和客户端共享缓存的RDD或数据帧

(3) 可以同时管理多个Spark上下文,并且Spark上下文在集群(YARN/Mesos)上运行,而不是在Livy服务器上运行,以实现良好的容错性和并发性

(4) 作业可以作为预编译的jar、代码片段或通过java/scala客户端API提交

(5) 通过安全的认证通信确保安全

总结,通过Livy接入spark,具备如下3个特点:

从任何地方提交工作

Livy允许从网络/移动应用程序(无需Spark客户端)以编程、容错、多租户提交Spark作业。因此,多个用户可以同时可靠地与您的Spark集群进行交互。

使用交互式Scala或Python

Livy会说Scala或Python,因此客户端可以通过任何一种语言远程与您的Spark集群通信。此外,批处理作业提交可以用Scala、Java或Python完成。

无需更改代码

别担心,使用Livy不需要更改现有程序。只需与Maven一起构建Livy,将配置文件部署到您的Spark集群,您就离开了!

1.2、版本特性

0.7.0-incubating / 2020-02-02

New features

Livy 0.7.0 now requires Java 8, Scala 2.11 and Spark >= 2.2.0. With 0.7.0, JDBC/ODBC feature now becomes GA.

Added support for all current versions of Spark (2.2.x to 2.4.x).

[LIVY-575] Hive-compatible JDBC / ODBC server GA.

[LIVY-678] Add LDAP authorization support for REST, JDBC interface.

With various bugs fixed, details can be checked [here].

0.6.0-incubating / 2019-04-01

New features

Livy 0.6.0 now requires Java 8, Scala 2.11 and Spark >= 2.2.0.

Added support for all current versions of Spark (2.2.x to 2.4.x).

[LIVY-489] New, experimental Hive-compatible JDBC / ODBC server.

[LIVY-551] Impersonation support for all REST endpoints, for better integration with proxy servers.

[LIVY-41] Session naming support.

0.5.0-incubating / 2018-02-05

New features

[LIVY-7] Added autocompletion to REST API and Scala API for Interactive Sessions

[LIVY-19] Added Spark SQL interpreter for Interactive Sessions

[LIVY-104] Updated Livy project to build using Scala 2.11

[LIVY-245] Added support for shared variables across Jobs

[LIVY-299] Support multi-line output in statements

[LIVY-397] Support multiple languages in a single Session

0.4.0-incubating / 2017-09-01

Our first Apache release!

New features

[LIVY-87] Create a Livy Web UI to monitor sessions

[LIVY-348] Improve Livy’s ACLs

1.3、工作流程

基于livy的Spark连接方案

图:livy工作流程-01

基于livy的Spark连接方案

图:livy工作流程-02

客户端提交任务到Livy server后,Livy server启动相应的session,然后提交作业到Yarn集群,当Yarn拉起ApplicationMaster进程后启动SparkContext,并连接到Livy Server进行通信。后续执行的代*会码**通过Livy server发送到Application进程执行。

源码级别的详细的执行流程

1. live-server启动,启动BatchSessionManager, InteractiveSessionManager。

2. 初始化WebServer,通过ServletContextListener启动InteractiveSessionServlet和BatchSessionServlet。

3. 通过http调用SessionServlet的createSession接口,创建session并注册到sessionManager,InteractiveSession和BatchSession会创建SparkYarnApp,SparkYarnApp负责启动Spark作业,并维护yarnclient,获取作业信息、状态或kill作业。

4. BatchSession是以jar包的方式提交作业,运行结束后session作业就结束。

5. InteractiveSession会启动com.cloudera.livy.repl.ReplDriver,ReplDriver继承RSCDriver,初始化期间会通过RPC连接到livy-server,并启动RpcServer;其次会初始化Interpreter(支持PythonInterpreter,SparkInterpreter,SparkRInterpreter)。接收来自livy-server,并启动RpcServer;其次会初始化Interpreter(支持PythonInterpreter,SparkInterpreter,SparkRInterpreter)。接收来自livy-server的信息,然后通过Interpreter执行,livy-server通过RPC请求作业结果。

1.4、模块概述

基于livy的Spark连接方案

图:livy模块概述

1.4.1、Client

要将任务从用户的手里发送给 livy server,任务可以是代码片段(Scala、Python,R)的形式或可执行程序的形式(Jar)。这需要对最原始的任务按照 livy 的接口进行简单的封装,然后通过 http 的方式发送给 livy server。

rest-api 、pylivy、 livy-client-http 、sparkmagic

1.4.2、Router

livy server 提供的 api 是 rest api,Client 发送的请求也是针对各个资源(uri)的增删改查。router 的核心职责是管理好要把对什么资源的什么操作指派给哪个类的哪个函数来处理,

该模块核心类是 SessionServlet,继承于 ScalatraServlet,有两个子类:InteractiveSessionServlet 及 BatchSessionServlet,分别用来路由对 session 及 batch 相关的请求。

1.4.3、权限管理

livy 是一个有权限控制的系统(当然可以不开启),每个用户的每个请求是否有权限执行,都需要进行鉴权。

Livy 引入了 Hadoop 中的代理用户(proxy user)模式,代理用户模式广泛使用于多用户的环境,如 HiveServer2。在此模式中超级用户可以代理成普通用户去访问资源,并拥有普通用户相应的权限。开启了代理用户模式后,以用户 tom 所创建的会话所启动的 Spark 集群用户就会是 tom。

基于livy的Spark连接方案

图: Livy 多用户支持

1.4.4、任务生成

基于livy的Spark连接方案

对于 session 和 batch 的任务,生成 Spark App 的逻辑及最终生成的 Spark App 都是不同的。先来说说相对简单的生成 batch 的 Spark App 涉及的主要类:

SparkProcessBuilder:用于从 livyConf 中提取出运行一个 Spark App 所需的一切,包括 mainClass、executableFile、deployMode、conf、master、queue、env 及 driver 和 executors 的资源配置等等;并最终生成一条启动 Spark App 的 spark-submit 命令

SparkYarnApp:用来运行 SparkProcessBuilder 生成的启动命令,并监控管理启动运行起来的 Spark App,包括获取状态、日志、诊断信息、kill 等(目前 livy 只支持 local 和 yarn 两种模式,local 暂不进行介绍)

接下来是生成 session 的 Spark App 涉及的主要类:

ContextLauncher:用于启动一个新的 Spark App(通过 SparkLauncher)以及获取如何连接到其 driver 的信息(地址、clientId 及秘钥)

RSCClient:与 Spark Driver 建立连接,向其发送创建、查看状态结果日志、修改statement、job 等请求并获取响应

1.4.5、交互式driver

该模块仅对于 session 任务有,batch 并没有。该模块中,最核心的类是 RSCDriver,其继承与 RpcDispatcher,RpcDispatcher 接收来自 RSCClient 发送的 rpc 请求,根据请求的类型调用 RSCDriver 相应的方法去处理请求中包含的具体信息,对于最核心的执行代码片段(statement)请求,调用 repl/Session 去处理,repl/Session 最终会根据不同的 session kind 调用不同的 Interpreter 进行真正的代码执行,目前共有 Spark、Scala、Python、R 对应的 Interpreter

1.4.6、状态数据存储

一个 livy server 管理着众多 sessions、batches,需要维护大量相关信息并且在 livy server 重启后需要能够恢复对 sessions、batches 的管理,这就需要有能存取这些状态数据的模块。

核心类是 StateStore,状态数据的存储都是以 key-value 形式,目前有基于文件系统和 Zookeeper 的实现。另外,SessionStore 继承了该类提供高阶 Api 来进行 sessions 的存储和恢复

核心类是 StateStore,状态数据的存储都是以 key-value 形式,目前有基于文件系统和 Zookeeper 的实现。另外,SessionStore 继承了该类提供高阶 Api 来进行 sessions 的存储和恢复。存疑的是这块跟DateFrame的保存和本地是否有关系。(应用场景未明确)

# Where Livy should store state to for recovery. Possible values:
# <empty>: Default. State store disabled.
# filesystem: Store state on a file system.
# zookeeper: Store state in a Zookeeper instance.
# hdfs:///user/livy/session
# livy.server.recovery.state-store =

1.5、优点/缺点

1.5.1、优点

1、 从任何地方提交job--restapi特性;

2、 client模式下的交互式的Scala、Python语言与远程的spark集群进行通信;

3、 无须改动代码(仅仅改变了job提交);

4、 session复用。比独占式的session资源可以减少资源的独占式和提供利用率。

5、 对于我们来说,还有一个很重要的就是无须多个地方重复配置Spark客户端,尤其是在独立的docker镜像里面,减少因为实施环境的客户端和环境变量不同带来的代码重复修改和侵占。【jupyter的pyspark一定要配置,sparkmagic可以不用配置??】

SparkJobServer

Zeppelin

Livy

官网

支持jar包提交

支持

不支持

支持

支持代码段提交

不支持

支持

支持

支持SparkContext重用与管理

不支持

支持重用,不支持stopcontext

支持

运行模式

Client

Client

Client/Cluster

多个SparkContext运行

同一个JVM,可能存在问题

运行在不同JVM

运行在不同JVM

接口

RestFul

RestFul/WebScoket

RestFul

SQL支持

不支持

支持

不支持

1.5.2、缺点

1. 不支持提交SQL https://issues.cloudera.org/browse/LIVY-19 (实测应该是支持的。不是很理解这个提问者的实际需求)【0.5版本开始支持】

Session Kind

Value

Description

spark

Interactive Scala Spark session

pyspark

Interactive Python Spark session

sparkr

Interactive R Spark session

sql

Interactive SQL Spark session

2. session,app信息都维护在livy-server,livy-server挂掉信息丢失,需要HA。

3. livy-server的性能如何,能并行多少session。

4. 多个livy-server如何管理

1.6、同类产品

lighter 。 https://github.com/exacaster/lighter

源于livy的设计思想,交互式会话暂时还不稳定。sparkmagic基于rest 服务器端架构的2种适配:

1. Livy - for running interactive sessions on Yarn

2. Lighter - for running interactive sessions on Yarn or Kubernetes (only PySpark sessions are supported)

1.7、安装

1.7.1、*载下**

https://livy.incubator.apache.org/download/

基于livy的Spark连接方案

1.7.2、配置

解压,livy-env添加配置

export SPARK_HOME=spark安装目录
export HADOOP_CONF_DIR=Hadoop配置目录
此外,也可以配置一些额外的环境变量,例如:
export LIVY_HOME=livy-server目录
export PATH=$PATH:$LIVY_HOME/bin
export HADOOP_USER_NAME=root

conf/livy.conf可选配置

#作业集群模式
livy.spark.master=yarn
#作业集群部署模式
livy.spark.deploy-mode=client
#默认使用hiveContext
livy.repl.enable-hive-context=true
#开启用户代理
livy.impersonation.enabled=true
#空闲过期时间
livy.server.session.timeout=1h
#失败恢复 默认为off,关闭失败恢复功能;
livy.server.recovery.mode = recovery
#配置将元信息存储在何种可靠存储上,当前支持filesystem和ZooKeeper 
livy.server.recovery.state-store =zookeeper
#配置具体的存储路径,如果是filesystem则改配置为文件路径;而ZooKeeper则为ZooKeeper集群的URL。推荐和配置使用zookeeper,eg 192.168.1.232:2181,192.168.1.233:2181,192.168.1.234:2181
livy.server.recovery.state-store.url =host1:port,host2:port,host3:port

客户端配置

同我们以前的spark客户端配置一样。可补充行里的实现配置过程在这里

1.7.3、启动

依次启动zookeeper集群、hadoop集群 。(已有集群,可以忽略这块,通过 spark-gateway接入

zkServer.sh start

start-dfs.sh
start-yarn.sh

bin/livy-server start

1.7.4、访问

http://localhost:8998/ui

1.7.5、安全

安全设计到集群接入和客户鉴权。集群接入的话,从livy.conf配置上看,支持livy-server配置krb。

存疑是否统一的krb还是每个用户都需要自己的krb,通过auth信息配置??

此外行里实施是server端配置一个统一的krb权限信息还是每个接入者都自己提交keytab文件??

# Authentication support for Livy server
# Livy has a built-in SPnego authentication support for HTTP requests  with below configurations.
# livy.server.auth.type = kerberos
# livy.server.auth.kerberos.principal = <spnego principal>
# livy.server.auth.kerberos.keytab = <spnego keytab>
# livy.server.auth.kerberos.name-rules = DEFAULT

案例:通过KerberosRestTemplate设置请求。

当用户 tom 发起 REST 请求访问 Livy 服务端的时候,我们如何知道该用户是合法用户呢?Livy 采用了基于 Kerberos 的 Spnego 认证。在 Livy 服务端配置Spnego 认证后,用户发起 Http 请求之前必须先获得 Kerberos 认证,只有通过认证后才能正确访问 Livy 服务端,不然的话 Livy 服务端会返回401错误。

PostBatchesRequest postRequestBody = new PostBatchesRequest();
postRequestBody.setFile("/path/to/your/application"); // In HDFS
KerberosRestTemplate kerberosRestTemplate = new KerberosRestTemplate("path_to_your_key_tab_file", "your_user@your_realm");
// Add CSRF header if required:
HttpHeaders headers = new HttpHeaders();
headers.set("X-Requested-By", "your_user@your_realm");
headers.setContentType(MediaType.APPLICATION_JSON);

HttpEntity<PostBatchesRequest> postRequest = new HttpEntity<PostBatchesRequest>(postRequestBody, headers);

Batch batch = kerberosRestTemplate.postForObject("http://your_livy_server:8998" + "/batches", postRequest, Batch.class);

用户鉴权

从用户鉴权上看,通过配置user列表。存疑的地方是

# Allowed users to access Livy, by default any user is allowed to access Livy. If user want to
# limit who could access Livy, user should list all the permitted users with comma separated.
# livy.server.access-control.allowed-users = *

1.8、使用

通过使用livy-session,通过rest-api执行spark-shell,用户处理交互式请求;rest执行spark-submit, 处理非交互式请求。具体查看rest-api。包括新建session、提交代码执行、获取结果

具体参考不同客户端章节。

1.8.1、客户端

客户端支持使用支持livy-client(java端)、pylivy(python端client的livy包)、sparkmagic(用于在jupyter上的一个用于集成livy的kernel,依赖配置livy的server地址信息)

1.8.1.1、pylivy

python端使用livy的客户端pip包.

文档: https://pylivy.readthedocs.io/en/stable/api/client.html

pip install pylivy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2022/11/16 2:38
# @Author  : zhangyunfei
# https://www.likecs.com/show-205106854.html
if __name__ == '__main__':
    from livy import LivySession

    LIVY_URL = 'http://192.168.1.43:8998'
    with LivySession(url=LIVY_URL, session_id=15) as session:
        # session.run("filtered = df.filter(df.name == 'Bob')")
        # # Retrieve the result
        # local_df = session.read('filtered')
        output = session.run("spark.sqlContext.sql(\"show databases\")")
        print(output.text)
        # session.run("df = spark.sqlContext.sql(\"show databases\")")
        # df = session.read("df")
        # print(df)

如果使用该方式,无须me_dbadapter介入。

1.8.1.2、livy-client-http

<dependency>
  <groupId>org.apache.livy</groupId>
  <artifactId>livy-client-http</artifactId>
  <version>0.7.0-incubating</version>
</dependency>

LivyClient client = new LivyClientBuilder()
  .setURI(new URI(livyUrl))
  .build();
try {
  System.err.printf("Uploading %s to the Spark context...\n", piJar);
  client.uploadJar(new File(piJar)).get();
  System.err.printf("Running PiJob with %d samples...\n", samples);
  double pi = client.submit(new PiJob(samples)).get();
  System.out.println("Pi is roughly: " + pi);
} finally {
  client.stop(true);
}

评估:不考虑代码这块以及数据落地到本地,对我们的dbadpater来说,只是替换掉SparkLauncher的独占式提交。影响比较小。(livy 的session id 传递??)

1.8.1.3、sparkmagic

案例: https://github.com/jupyter-incubator/sparkmagic/tree/master/examples

概述: https://github.com/jupyter-incubator/sparkmagic

jupyter-incubator_sparkmagic:用于远程 Spark 集群的 Jupyter 魔法和内核.pdf

1.8.1.3.1、安装

1、安装依赖

 pip install sparkmagic
 
 #pip show sparkmagic

2、Make sure that ipywidgets is properly installed by running

jupyter nbextension enable --py --sys-prefix widgetsnbextension

3、If you're using JupyterLab, you'll need to run another command:

jupyter labextension install "@jupyter-widgets/jupyterlab-manager"

4、(Optional) Install the wrapper kernels. Do pip show sparkmagic and it will show the path where sparkmagic is installed at. cd to that location and do:

pip show sparkmagic

输出样例:

C:\Users\Paul>pip show sparkmagic

WARNING: Ignoring invalid distribution -agic-db (d:\python36\lib\site-packages)

Name: sparkmagic

Version: 0.20.0

Summary: SparkMagic: Spark execution via Livy

Home-page: https://github.com/jupyter-incubator/sparkmagic

Author: Jupyter Development Team

Author-email: jupyter@googlegroups.org

License: BSD 3-clause

Location: d:\python36\lib\site-packages

Requires: autovizwidget, hdijupyterutils, ipykernel, ipython, ipywidgets, mock, nest-asyncio, nose, notebook, numpy, pandas, requests, requests-kerberos, tornado

Required-by:

cd d:\python36\lib\site-packages

jupyter-kernelspec install sparkmagic/kernels/sparkkernel
jupyter-kernelspec install sparkmagic/kernels/pysparkkernel
jupyter-kernelspec install sparkmagic/kernels/sparkrkernel

1、 (Optional) Modify the configuration file at ~/.sparkmagic/config.json.

参考如下 配置 。 核心是配置livy-server的服务端信息,例如:server的url、日志格式、认证实现、session过时设置。

{
  "kernel_python_credentials" : {
    "username": "",
    "password": "",
    "url": "http://localhost:8998",
    "auth": "None"
  },

  "kernel_scala_credentials" : {
    "username": "",
    "password": "",
    "url": "http://localhost:8998",
    "auth": "None"
  },
  "kernel_r_credentials": {
    "username": "",
    "password": "",
    "url": "http://localhost:8998"
  },

  "logging_config": {
    "version": 1,
    "formatters": {
      "magicsFormatter": { 
        "format": "%(asctime)s\t%(levelname)s\t%(message)s",
        "datefmt": ""
      }
    },
    "handlers": {
      "magicsHandler": { 
        "class": "hdijupyterutils.filehandler.MagicsFileHandler",
        "formatter": "magicsFormatter",
        "home_path": "~/.sparkmagic"
      }
    },
    "loggers": {
      "magicsLogger": { 
        "handlers": ["magicsHandler"],
        "level": "DEBUG",
        "propagate": 0
      }
    }
  },
  "authenticators": {
    "Kerberos": "sparkmagic.auth.kerberos.Kerberos",
    "None": "sparkmagic.auth.customauth.Authenticator", 
    "Basic_Access": "sparkmagic.auth.basic.Basic"
  },

  "wait_for_idle_timeout_seconds": 15,
  "livy_session_startup_timeout_seconds": 60,

  "fatal_error_suggestion": "The code failed because of a fatal error:\n\t{}.\n\nSome things to try:\na) Make sure Spark has enough available resources for Jupyter to create a Spark context.\nb) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.\nc) Restart the kernel.",

  "ignore_ssl_errors": false,

  "session_configs": {
    "driverMemory": "1000M",
    "executorCores": 2
  },

  "use_auto_viz": true,
  "coerce_dataframe": true,
  "max_results_sql": 2500,
  "pyspark_dataframe_encoding": "utf-8",
  
  "heartbeat_refresh_seconds": 30,
  "livy_server_heartbeat_timeout_seconds": 0,
  "heartbeat_retry_seconds": 10,

  "server_extension_default_kernel_name": "pysparkkernel",
  "custom_headers": {},
  
  "retry_policy": "configurable",
  "retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5],
  "configurable_retry_policy_max_retries": 8
}

2、 (Optional) Enable the server extension so that clusters can be programatically changed:

jupyter serverextension enable --py sparkmagic

1.8.1.3.2、使用

使用举例包含在普通的jupter 通过魔法函数引入调试spark代码、jupter新建使用pyspark 的kernel 、数据可视化。参考地方为:

https://github.com/jupyter-incubator/sparkmagic/tree/master/examples

jupyter lab

基于livy的Spark连接方案

jupyter notebook

基于livy的Spark连接方案

1.9、高可用

Livy 本身没提供官方的HA方案。考虑是否通过HAProxy 或者Nginx 负载均衡 ?至于元数据服务恢复,待验证是否统一一个URL或者文件系统地址,则通用。

二、MP接入方案

2.1、背景

需要支持在jupter中交互式调试和执行spark代码;能够共享session;(在专家建模能將DataFrame传递到不同Operator操作节点;能够实现在mm、mp、api、flow减少客户端配置的依赖性--行里实施这种配置有点麻烦,最终除了livyserver需要配置一次客户端之后,能够将这种spark接入的数据源看起来像jdbc数据源一样简单配置和使用)

目前已经实现了magicdb的DataSourceManager适配SparkYarn; flow端实现PySparkOperator操作器进行Spark代码执行和提交yarn ; jupter使用pyspark。

2.2、组件架构图

2.2.1、现方案

基于livy的Spark连接方案

2.2.1.2、说明

先要明白spark的基础使用步骤:

配置星环客户端TDH_Client 〉配置Spark客户端SparkClient 〉配置kerborasr认证环境 〉spark的api编写作业 〉通过认证并且提交作业到yarn集群 获取执行结果。

基于livy的Spark连接方案

图:spark的yarn-cluster执行模式任务提交流程图

1、在 YARN Cluster 模式下,Spark 任务提交之后会与 ResourceManager 建立通讯,并发出申请启动 ApplicationMaster 请求;

2、ResourceManage 接收到这个 Job 时,会在集群中选一个合适的 NodeManager 并分配一个 Container;以及启动 ApplicationMaster ,此时的 ApplicationMaster 就是 Driver ;

3、ApplicationMaster 启动后向 ResourceManager 申请资源,ResourceManager 接到 ApplicationMaster 的资源申请后会在合适(有资源的情况下)的 NodeManager 中分配 Container;

4、ApplicationMaster 对指定 NodeManager 分配的 Container 发出启动 Executor 进程请求;

Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行执行 Job 任务;

5、ApplicationMaster 中的 SparkContext 分配 Task 给 Executor 执行,Executor 运行 Task 并向ApplicationMaster 汇报运行的状态、进度、以及最终的计算结果;让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;

6、应用程序运行完成后,ApplicationMaster 向 ResourceManager 申请注销并关闭自己

在上面的部署图中,可以看到几个不足的地方:

1、客户端环境重复性配置太高

其中,在上面的图中也可以看到,在pipeline-api通过magicdb 或者在pipeline-flow 通过PyspysparkOperator操作器执行pyspark代码、以及magiccmm通过magicdb完成spark数据源管理,我们都是要进行前3步的环境配置。也就是说,我们是*绑捆**了这3个步骤,尤其是在容器里面,重复性很高。

2、执行效率低

这个执行效率低,其实是跟我们的使用场景和使用方式相关的。上面的图,其实是没有session管理这个概念的。按理来说,也符合我们在dbadapter的使用习惯,动态创建jdbc连接,用完就跑。这里需要回顾一下Spark作业的提交流程,是避免不了上传或者通过HDFS分发我们的计算依赖、以及排队等待资源分配和调度执行。尤其是行里的集群为多用户,在实施过程中发现还需要分发我们的PYTHON和JAVA环境。

此外,我们每个SQL Mode的操作,当前实现为一个独立的Spark批任务。任务提交到Yarn上进行排队并且获取资源直到开始调度,期间时间消耗其实是不好评估的。当前实施的实验室环境,不考虑业务计算,观测到是1~3分钟之内完成一个作业。

综上,跟jdbc的交互过程来看,大数据的调度和执行是有一套完整的流程的。抛开计算任务负责的耗时比较长场景来说,对于频繁的作业调用来说而且资源占用不大的情况下,其实复用Spark Session或者说通过一个连接池来管理session或者使用spark-shell的交互式会话是一个比较理想的方案。

3、DataFrame无法传递/临时落地

【需要补充一个pyspark操作器的图】

SparkCore/Spark SQL里面有一个核心的概念DataFrame弹性分布式数据集,是一个逻辑上的分布式数据集。这个其实是强关联 Spark Session的生命周期的。Session关闭后也就释放了集群资源,随之的DataFram也就销毁了。类比jdbc的connection关闭后,QueryResultSet无法继续访问一个道理。

如果只是中间过程的落地,又会丢失了内存计算这个特性。不得不问,有没有一种想法,能过将这个数据集的定义存储起来,在使用的时候,能够恢复到 ?

2.2.2、新方案

基于livy的Spark连接方案

2.2.2.1、需要考虑的场景

小数据量方便调试场景

大数据量跑调试 / 跑批处理

支持SQL、core、Python、jar、pyfiels

流处理

加工流程编排接入: DAG 多节点

接入方便,待细化!

版本适配简单,

Jupyter支持

多用户

2.2.2.2、改造要点

将厂商客户端、SPARK客户端、KRB认证配置、集群配置文件配置从magicdb所在的容器服务里面移除,合并到一个Livy-Service容器服务。Livy- Server服务端所在容器/机器需要本地配置TDH-Client和SPARK- Client。

改造完成后,除了需要在Livy- Service配置一次,对于mp、me、Jupyter来说,本地也就不需要配置服务就不需要过分操心这些环境配置问题了。方便维护。

分别引入pylivy、livy-client-http、sparkmagic 去支持在python端、Java端、Jupyter端通过Livy- Server去跟远程集*交群**互。

改造完成后,充分利用上了 Livy的会话的资源共享特点。如果要避免共享会话导致的数据影响可以自由选择创建和维护session。

配置接入hdfs 或者 zookeeper 来完成session数据的持久化。

改造完成后,可用于恢复live的session-id。

这里面有个很重要的点,如果不接入sparkmagic ,pyspark还是免不了需要在本地配置spark客户端的。跟我们想要改造的点(使用者移除复杂的客户端配置)不符合。可以通过sparkmagic接入(同时支持pyspark和spark)

2.2.2.3、不足

Livy- Server的HA在官网未体现,设计上也未解决

在专家建模,pyspark操作器的数据如何跟sql操作器、Python操作器进行数据传递和串接未体现

数据源如何跟session-id做关联?1-1 ?还是允许session-id无法恢复时,动态创建?

2.3、实施步骤

2.3.1、LivyServer服务搭建

方式1:手工部署到宿主机 / 一个镜像服务,实现简单一点。

方式2:通过dockerfile打包一个镜像服务,类似jupter。方便后续拓展到k8s部署。

方式3:能够提供网络*载下**的形式,选择版本,然后选择在线构建和部署。

2.3.2、数据源

1、 安装包上传模式:择主机、填写部署安装地址、上传Livy部署包、Spark部署包、填写配置信息、keytab认证文件。

2、 容器镜像模式:选择容器镜像服务,spark客户端+livy 打包在一起 、spark客户端一起。上传部署脚本

3、 基本信息:

名称:必填

数据库类型:Livy

主机:文本输入IP地址,或者是根据培训信息直接发现进行下拉选择。

4、 读写权限:无

5、 并发限制:无

6、 文件上传方式:无

7、 使用SSH通道:无

数据源ID绑定一个SessionID,开启自动恢复重启功能。(待验证这个),否则使用共享sessionId。

2.3.3、mp-flow端pyspark

pyspark改造: 输入参数加入session_id / 或者读取一个默认的session 。输入输出参数如果出现DataFramee名字,需要去服务端进行拉取。

2.3.4、mp-api端DataSourceManager

1、 配置信息增加sessionid

db_adapter根据传入的DatasourceManager信息获取这个SessionId,在任务提交段执行提交,其余不动。 (需要验证sessionId不可变更性)。me_dbadapter增加配置信息字段sessionid

2、 补充环境信息、krb认证、默认执行参数设置。

{
	"sessionid": "",
	"livy-server": "考虑通过公共组件配置项或者是加入到datasource信息字段里面",
	"sparkConf": {
		"version": {
			"spark": "2.3.0",
			"pyspark": "2.3.0",
			"py4j": "py4j-0.10.6-src.zip",
			"hadoop": "transwarp-2.7.2"
		},
		"environ": [
			"SPARK_HOME=/home/magicpipeline/spark",
			"PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.6-src.zip:$PYTHONPATH",
			"HADOOP_HOME=/home/magicpipeline/TDH-Client/hadoop/hadoop",
			"HADOOP_CONF_DIR=/home/magicpipeline/TDH-Client/conf/hadoop",
			"YARN_CONF_DIR=/home/magicpipeline/TDH-Client/conf/hadoop",
			"PYSPARK_PYTHON=venv/miniconda3/bin/python"
		],
		"kerberos": {
			"ignorekinit": false,
			"using_keytab": true,
			"principal": "aml_admin@TDHTAB",
			"keytab_file": "/home/magicpipeline/spark/conf/aml_admin.keytab",
			"ccache_file": "/home/magicpipeline/spark/conf/ccahe_aml_admin",
			"password": ""
		},
		"spark-submit": {
			"dump": false,
			"path": "/home/magicpipeline/sparksubmitpy.sh",
			"default.conf": [				"spark.yarn.dist.archives=hdfs:///user/aml_admin/jdk1.8.0_211.tar.gz,hdfs:///user/aml_admin/miniconda3.tar.gz#venv",				"spark.yarn.appMasterEnv.JAVA_HOME=./jdk1.8.0_211.tar.gz/jdk1.8.0_211",				"spark.yarn*ex.e**cutorEnv.JAVA_HOME=./jdk1.8.0_211.tar.gz/jdk1.8.0_211",				"spark.yarn.appMasterEnv.PYSPARK_PYTHON=venv/miniconda3/bin/python",				"spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=venv/miniconda3/bin/python"],
			"options": [
				"--master yarn",
				"--deploy-mode cluster",
				"--driver-memory 8G",
				"--executor-memory 8G",
				"--num-executors 3",
				"--driver-cores 3",
				"--executor-cores 3",
				"--queue default",
				"$1 "
			]
		}
	}
}

这里有个问题需要考虑:datasource的执行参数建议做统一设置。

2、SparkLauncher的提交调整为livy-client-http的提交

public static void startApplication(SparkApplicationParam sparkAppParams, ApplicationCallback callback){
    //*************************************
}

2.3.5、jupyter lab增加同步sparkmagic的配置镜像包安装和配置项

参考前面安装步骤,将sparkmagic的kernel补充到jupterlab中。

2.3.6、mm/me接入spark

2.3.6.1、MagicDB方式接入

2.3.6.2、jupyter notebook / jupyter lab方式接入

2.4、综合评估

2.4.1、边界评估

2.4.2、性能评估

2.4.3、潜在风险

1、 本地如何使用文件数据或者是数据集,需要*载下**吗,请给出案例

session.read("dataframe")

2、 多用户认证,是否各自提交KRB认证

3、 HA方案

4、 连接星环

5、 是否支持华为

6、 是否支持Hive的JDBC还是说spark on hive而已 ???

7、

2.4.4、方案验证

三、参考资料

官网

配置kerboras认证

https://www.weijing.co/?p=1291

当连接到启用Kerberos的Hadoop集群时,需要配置

livy.server.launch.kerberos.principal

livy.server.launch.kerberos.keytab

当要求连接Livy的客户端需要提供Kerberos身份时,需要配置

livy.server.auth.type

livy.server.auth.kerberos.principal

livy.server.auth.kerberos.keytab。

当需要以Livy任务提交者身份创建Session时,需要配置

livy.server.auth.type = kerberos

在启用kerberos情况下,若livy.spark.master = yarn,则必须配置

livy.spark.deployMode = cluster

https://community.cloudera.com/t5/Support-Questions/kerberos-livy-quot-requirement-failed-Kerberos-requires-livy/td-p/179828

livy.spark.master = yarn
livy.spark.deployMode = cluster
livy.environment production
livy.impersonation.enabled true
livy.server.csrf_protection.enabled true
livy.server.port 8998
livy.server.session.timeout 3600000
livy.server.recovery.mode off

#livy.server.auth.type = kerberos
livy.server.launch.kerberos.keytab = /etc/security/keytabs/livy.headless.keytab
livy.server.launch.kerberos.principal = livy/_HOST@LBG.COM

#livy.server.kerberos.keytab = /etc/security/keytabs/livy.headless.keytab
livy.server.auth.kerberos.keytab /etc/security/keytabs/spnego.service.keytab
livy.server.auth.kerberos.principal HTTP/_HOST@LBG.COM

livy.superusers=livy

四、现场实施笔记

4.1、客户现场开发环境配置

配置环境:XX.XX.XXX.104. datascience-api

4.1.1、修改配置livy-env.sh

export JAVA_HOME=/home/magicpipeline/jdk1.8.0_171
export HADOOP_HOME=/home/magicpipeline/TDH-Client/hadoop/hadoop
export HADOOP_CONF_DIR=/home/magicpipeline/TDH-Client/conf/hadoop
export    SPARK_HOME=/home/magicpipeline/spark
export    SPARK_CONF_DIR=/home/magicpipeline/livy/conf/spark

在公司集群内做法将$SAPRK_HOME/conf下的目录copy一份到livy。单独管理

理由是搭建的节点为SPARK集群节点之一,避免配置更改影响集群节点

4.1.2、修改配置livy.conf

livy.spark.master=yarn
livy.spark.deploy-mode=cluster
livy.server.launch.kerberos.keytab=/home/magicpipeline/mip.keytab
livy.server.launch.kerberos.principal=mip@MLPUAT
livy.repl.enable-hive-context = true
livy.server.recovery.mode = recovery
livy.server.recovery.state-store =zookeeper
livy.server.recovery.state-store.url =192.168.1.232:2181,192.168.1.233:2181,192.168.1.234:2181

这里出现个踩坑的地方是,配置模板里面为配置livy.server.kerberos.keytab 和 livy.server.kerberos.principal ,实践上是错误的,无法通过认证

实践中,104服务器的spark-shell无法注册AM,改成了local 完成的验证KRB接入和HIVE访问

恢复是在公司集群环境配置ZOOKEEPER验证的。http://192.168.1.234:8998/ui

4.1.3、使用验证

4.1.3.1、创建session

curl -H "Content-Type: application/json" -X POST -d '{"kind": "spark","driverMemory": "800m","driverCores": 1,"executorMemory": "800m","executorCores": 1,"numExecutors": 1,"name": "livy-spark-hive"}' "http://xx.xx.xxx.104:8998/sessions"

4.1.3.2、执行SQL

curl -H "Content-Type: application/json" -X POST -d '{"code": "spark.sqlContext.sql(\"show databases\").show"}' "http://xx.xx.xxx.104:8998/sessions/0/statements"

4.1.3.3、执行SQL

curl -H "Content-Type: application/json" -X POST -d '{"code": "spark.sqlContext.sql(\"select * from temp.hdfs_sample_100W\").show"}' "http:/xx.xx.xxx.104:8998/sessions/0/statements"

4.2、踩坑指南

4.2.1、kerboras认证配置接入

https://issues.apache.org/jira/projects/LIVY/issues/LIVY-855?filter=doneissues

基于livy的Spark连接方案