Unverified Commit 6222f508 authored by 高岩's avatar 高岩 Committed by GitHub

Feature support custom k8sapp submit at studio (#1076)

* Support custom k8sapp submit interface

* fix JcaPEMKey error for k8s commit tasks

* Auto registers cluster set the id as same as taskid, if not , submit cluster twice will get error on updata this table

* add UI on k8s app
submit

* fix code style

* Fix the problem of failure of k8s mode online

* 1,upper case some dialect
2,remove setId in Cluster.java
3,use jobId instead of clustername

* Optimize k8s tab UI
parent 5b2b3347
......@@ -474,10 +474,13 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
Task task = getTaskInfoById(id);
Assert.check(task);
if (JobLifeCycle.DEVELOP.equalsValue(task.getStep())) {
List<SqlExplainResult> sqlExplainResults = explainTask(id);
for (SqlExplainResult sqlExplainResult : sqlExplainResults) {
if (!sqlExplainResult.isParseTrue() || !sqlExplainResult.isExplainTrue()) {
return Result.failed("语法校验和逻辑检查有误,发布失败");
//KubernetesApplaction is not sql, skip sqlExplain verify
if (!Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect())) {
List<SqlExplainResult> sqlExplainResults = explainTask(id);
for (SqlExplainResult sqlExplainResult : sqlExplainResults) {
if (!sqlExplainResult.isParseTrue() || !sqlExplainResult.isExplainTrue()) {
return Result.failed("语法校验和逻辑检查有误,发布失败");
}
}
}
task.setStep(JobLifeCycle.RELEASE.getValue());
......@@ -678,6 +681,14 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
jobConfig.setType(cluster.getType());
if (GatewayType.KUBERNETES_APPLICATION.equalsValue(cluster.getType())) {
Statement statement = statementService.getById(cluster.getTaskId());
Map<String, Object> gatewayConfig = JSONUtil.toMap(statement.getStatement(),String.class,Object.class);
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
useGateway = true;
}
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
......@@ -714,7 +725,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
}
private JobConfig buildJobConfig(Task task) {
boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect());
boolean isJarTask = Dialect.FLINKJAR.equalsVal(task.getDialect())
|| Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect());
if (!isJarTask && Asserts.isNotNull(task.getFragment()) ? task.getFragment() : false) {
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (Asserts.isNotNullString(flinkWithSql)) {
......@@ -731,8 +743,15 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
config.setJarTask(isJarTask);
if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
}
//support custom K8s app submit, rather than clusterConfiguration
else if (Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect())
&& GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
Map<String, Object> gatewayConfig = JSONUtil.toMap(task.getStatement(),String.class,Object.class);
config.buildGatewayConfig(gatewayConfig);
} else {
Map<String, Object> gatewayConfig = clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId());
//submit application type with clusterConfiguration
if (GatewayType.YARN_APPLICATION.equalsValue(config.getType()) || GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
if (!isJarTask) {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
......
......@@ -31,7 +31,7 @@ public enum Dialect {
FLINKSQL("FlinkSql"), FLINKJAR("FlinkJar"), FLINKSQLENV("FlinkSqlEnv"), SQL("Sql"), JAVA("Java"),
MYSQL("Mysql"), ORACLE("Oracle"), SQLSERVER("SqlServer"), POSTGRESQL("PostgreSql"), CLICKHOUSE("ClickHouse"),
DORIS("Doris"), PHOENIX("Phoenix"), HIVE("Hive"), STARROCKS("StarRocks");
DORIS("Doris"), PHOENIX("Phoenix"), HIVE("Hive"), STARROCKS("StarRocks"), KUBERNETES_APPLICATION("KubernetesApplaction");
private String value;
......
......@@ -29,6 +29,9 @@ import com.dlink.gateway.config.GatewayConfig;
import com.dlink.gateway.config.SavePointStrategy;
import com.dlink.session.SessionConfig;
import org.apache.http.util.TextUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
......@@ -202,7 +205,16 @@ public class JobConfig {
appConfig.setUserJarMainAppClass(config.get("userJarMainAppClass").toString());
}
if (config.containsKey("userJarParas") && Asserts.isNotNullString((String) config.get("userJarParas"))) {
appConfig.setUserJarParas(config.get("userJarParas").toString().split(" "));
//There may be multiple spaces between the parameter and value during user input,
// which will directly lead to a parameter passing error and needs to be eliminated
String[] temp = config.get("userJarParas").toString().split(" ");
List<String> paraSplit = new ArrayList<>();
for (String s : temp) {
if (!TextUtils.isEmpty(s.trim())) {
paraSplit.add(s);
}
}
appConfig.setUserJarParas(paraSplit.toArray(new String[0]));
}
gatewayConfig.setAppConfig(appConfig);
}
......
......@@ -71,6 +71,25 @@
<artifactId>dlink-client-hadoop</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<!-- delete this dependency will result in a JcaPEMKey error
for k8s commit tasks-->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.69</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.69</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-ext-jdk15on</artifactId>
<version>1.69</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -38,6 +38,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.http.util.TextUtils;
import java.util.ArrayList;
import java.util.Collection;
......@@ -108,8 +109,17 @@ public class KubernetesApplicationGateway extends KubernetesGateway {
}
result.setJids(jids);
}
String clusterId = clusterClient.getClusterId();
result.setClusterId(clusterId);
String jobId = "";
//application mode only have one job, so we can get any one to be jobId
for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
jobId = jobStatusMessage.getJobId().toHexString();
}
//if JobStatusMessage not have job id, use timestamp
//and... it`s maybe wrong with submit
if (TextUtils.isEmpty(jobId)) {
jobId = "unknown" + System.currentTimeMillis();
}
result.setClusterId(jobId);
result.setWebURL(clusterClient.getWebInterfaceURL());
result.success();
} catch (Exception e) {
......
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import {Divider, Form, Input, Space,} from 'antd';
import {Dispatch, DocumentStateType} from "@@/plugin-dva/connect";
import {connect} from "umi";
import Button from "antd/es/button/button";
import {useState} from "react";
import {
APP_CONFIG_LIST,
Config,
FLINK_CONFIG_LIST, FLINK_CONFIG_NAME_LIST,
KUBERNETES_CONFIG_LIST, KUBERNETES_CONFIG_NAME_LIST
} from "@/pages/ClusterConfiguration/conf";
import {MinusCircleOutlined, PlusOutlined} from "@ant-design/icons";
import {getConfig} from "@/pages/ClusterConfiguration/function";
const formLayout = {
labelCol: {span: 7},
wrapperCol: {span: 13},
};
const StudioKubernetes = (props: any) => {
const {
height = '100%',
width = '100%',
conf,
} = props;
const [form] = Form.useForm();
//Separate normal configuration and user-defined configuration.
function addMapToList(map: Record<string, unknown>, keys: string[]) {
const list = [];
for (const i in map) {
if (!keys.includes(i)) {
list.push({
name: i,
value: map[i],
})
}
}
return list;
}
//Merge pre-defined config
function mergeConfig(source: Record<string, unknown>, from: Record<string, unknown>) {
for (const key in from) {
source[key] = from[key];
}
return source;
}
const initValue = (configJson: string) => {
//formValue cannot show hierarchical relationships
// and needs to be converted from a profile within the database to display
const config = configJson ? JSON.parse(configJson) : {}
let initValues = {}
//Normal configuration and user-defined configuration are mixed together
// need to be separated
//Normal config set
if (config["kubernetesConfig"]) {
initValues = mergeConfig(initValues, config["kubernetesConfig"])
}
if (config["flinkConfig"]) {
initValues = mergeConfig(initValues, config["flinkConfig"])
}
//App config is mix in flink config , need separated
APP_CONFIG_LIST.forEach((value, index) => {
initValues[APP_CONFIG_LIST[index].name] = config[APP_CONFIG_LIST[index].name]
})
//user custom config set
initValues["flinkConfigList"] = addMapToList(config["flinkConfig"], FLINK_CONFIG_NAME_LIST())
initValues["kubernetesConfigList"] = addMapToList(config["kubernetesConfig"], KUBERNETES_CONFIG_NAME_LIST())
initValues["flinkConfigPath"] = config["flinkConfigPath"]
return initValues;
}
const [formVals, setFormVals] = useState<Record<string, unknown>>(initValue(conf));
const onValuesChange = (change: any, all: any) => {
all.type = "Kubernetes"
const values = getConfig(all)
APP_CONFIG_LIST.forEach((value, index) => {
values[APP_CONFIG_LIST[index].name] = all[APP_CONFIG_LIST[index].name]
})
setFormVals(all)
props.saveSql(JSON.stringify(values))
}
// build pre-defined config item
const buildConfig = (config: Config[]) => {
const itemList: JSX.Element[] = [];
config.forEach(configItem => {
itemList.push(<Form.Item
key={configItem.name}
name={configItem.name}
label={configItem.lable}
help={configItem.help}
>
<Input placeholder={configItem.placeholder}
// defaultValue={configItem.defaultValue}
/>
</Form.Item>)
});
return itemList;
};
// build user-defined config item
const buildOtherConfig = (itemName: string, configName: string, addDescription: string) => {
return (
<Form.Item
label={itemName}
>
<Form.List name={configName}>
{(fields, {add, remove}) => (
<>
{fields.map(({key, name, fieldKey, ...restField}) => (
<Space key={key} style={{display: 'flex'}} align="baseline">
<Form.Item
{...restField}
name={[name, 'name']}
fieldKey={[fieldKey, 'name']}
>
<Input placeholder="name"/>
</Form.Item>
<Form.Item
{...restField}
name={[name, 'value']}
fieldKey={[fieldKey, 'value']}
>
<Input placeholder="value"/>
</Form.Item>
<MinusCircleOutlined onClick={() => remove(name)}/>
</Space>
))}
<Form.Item>
<Button type="dashed" onClick={() => add()} block icon={<PlusOutlined/>}>
{addDescription}
</Button>
</Form.Item>
</>
)}
</Form.List>
</Form.Item>
)
}
// render
const renderContent = () => {
return (
<>
<Divider>Kubernetes 配置(必选)</Divider>
{buildConfig(KUBERNETES_CONFIG_LIST)}
{buildOtherConfig("其他配置", "kubernetesConfigList", "添加一个自定义项")}
<Divider>App 配置(必选)</Divider>
<Form.Item
name="flinkConfigPath"
label="配置文件路径"
rules={[{required: true, message: '请输入 flink-conf.yaml 路径!'}]}
help="指定 flink-conf.yaml 的路径(末尾无/)"
>
<Input placeholder="值如 /opt/flink/conf"/>
</Form.Item>
{buildConfig(APP_CONFIG_LIST)}
<Divider>其他配置(可选)</Divider>
{buildConfig(FLINK_CONFIG_LIST)}
{buildOtherConfig("其他配置", "flinkConfigList", "添加一个自定义项")}
</>
);
};
return (
<div style={{
width: width,
height: height,
padding: "10px",
overflowY: "scroll"
}}>
<Form
style={{width: "1000px"}}
labelAlign={"left"}
{...formLayout}
form={form}
initialValues={formVals}
onValuesChange={onValuesChange}
>
{renderContent()}
</Form>
</div>
);
}
const mapDispatchToProps = (dispatch: Dispatch) => ({
saveSql: (val: any) => dispatch({
type: "Studio/saveSql",
payload: val,
}), saveSqlMetaData: (sqlMetaData: any, key: number) => dispatch({
type: "Studio/saveSqlMetaData",
payload: {
activeKey: key,
sqlMetaData,
isModified: true,
}
}),
})
export default connect(({Document}: { Document: DocumentStateType }) => ({
fillDocuments: Document.fillDocuments,
}), mapDispatchToProps)(StudioKubernetes);
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
@import '~antd/es/style/themes/default.less';
.form_setting{
padding-left: 10px!important;
}
.form_item{
margin-bottom: 5px!important;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import {connect} from "umi";
import {StateType} from "@/pages/DataStudio/model";
import {Form, Input, Select, Row, Col, Tooltip, Button} from "antd";
import { MinusSquareOutlined} from "@ant-design/icons";
import styles from "./index.less";
import {useEffect} from "react";
import {JarStateType} from "@/pages/Jar/model";
import {Scrollbars} from "react-custom-scrollbars";
import {RUN_MODE} from "@/components/Studio/conf";
import {AlertStateType} from "@/pages/AlertInstance/model";
const {Option} = Select;
const StudioKubernetesConfig = (props: any) => {
const { current, form, dispatch, tabs, group, toolHeight} = props;
const getGroupOptions = () => {
const itemList = [<Option key={0} value={0} label='禁用'>
禁用
</Option>];
for (const item of group) {
itemList.push(<Option key={item.id} value={item.id} label={item.name}>
{item.name}
</Option>)
}
return itemList;
};
useEffect(() => {
//Force set type k8s
current.task.type=RUN_MODE.KUBERNETES_APPLICATION
form.setFieldsValue(current.task);
}, [current.task]);
const onValuesChange = (change: any, all: any) => {
const newTabs = tabs;
for (let i = 0; i < newTabs.panes.length; i++) {
if (newTabs.panes[i].key === newTabs.activeKey) {
for (const key in change) {
newTabs.panes[i].task[key] = all[key];
}
break;
}
}
dispatch({
type: "Studio/saveTabs",
payload: newTabs,
});
};
return (
<>
<Row>
<Col span={24}>
<div style={{float: "right"}}>
<Tooltip title="最小化">
<Button
type="text"
icon={<MinusSquareOutlined/>}
/>
</Tooltip>
</div>
</Col>
</Row>
<Scrollbars style={{height: (toolHeight - 32)}}>
<Form
form={form}
layout="vertical"
className={styles.form_setting}
onValuesChange={onValuesChange}
>
<Form.Item
label="SavePoint策略" className={styles.form_item} name="savePointStrategy"
tooltip='指定 SavePoint策略,默认为禁用'
>
<Select defaultValue={0}>
<Option value={0}>禁用</Option>
<Option value={1}>最近一次</Option>
<Option value={2}>最早一次</Option>
<Option value={3}>指定一次</Option>
</Select>
</Form.Item>
{current.task.savePointStrategy === 3 ?
(<Form.Item
label="SavePointPath" className={styles.form_item} name="savePointPath"
tooltip='从SavePointPath恢复Flink任务'
>
<Input placeholder="hdfs://..."/>
</Form.Item>) : ''
}
<Row>
<Col span={24}>
<Form.Item label="报警组" tooltip={`选择报警组`} name="alertGroupId"
className={styles.form_item}>
<Select
style={{width: '100%'}}
placeholder="选择报警组"
optionLabelProp="label"
defaultValue={0}
>
{getGroupOptions()}
</Select>
</Form.Item>
</Col>
</Row>
</Form>
</Scrollbars>
</>
);
};
export default connect(({Studio, Jar, Alert}: { Studio: StateType, Jar: JarStateType , Alert: AlertStateType }) => ({
sessionCluster: Studio.sessionCluster,
clusterConfiguration: Studio.clusterConfiguration,
current: Studio.current,
tabs: Studio.tabs,
session: Studio.session,
currentSession: Studio.currentSession,
toolHeight: Studio.toolHeight,
jars: Jar.jars,
env: Studio.env,
group: Alert.group,
}))(StudioKubernetesConfig);
......@@ -33,6 +33,7 @@ import StudioJarSetting from "./StudioJarSetting";
import StudioGuide from "./StudioGuide";
import StudioTaskInfo from "./StudioTaskInfo";
import {DIALECT, isSql} from "@/components/Studio/conf";
import StudioKubernetesConfig from "@/components/Studio/StudioRightTool/StudioKubernetesConfig";
const {TabPane} = Tabs;
......@@ -53,6 +54,9 @@ const StudioRightTool = (props: any) => {
if (DIALECT.JAVA === current.task.dialect) {
return renderUDFContent();
}
if (DIALECT.KUBERNETES_APPLICATION === current.task.dialect) {
return renderKubernetesContent();
}
return renderFlinkSqlContent();
};
......@@ -72,6 +76,17 @@ const StudioRightTool = (props: any) => {
</>)
};
const renderKubernetesContent = () => {
return (<>
<TabPane tab={<span><SettingOutlined/> 执行配置</span>} key="StudioSqlConfig">
<StudioKubernetesConfig form={form}/>
</TabPane>
<TabPane tab={<span><ScheduleOutlined/> 保存点</span>} key="StudioSavePoint">
<StudioSavePoint/>
</TabPane>
</>)
};
const renderJarContent = () => {
return (<>
<TabPane tab={<span><SettingOutlined/> 作业配置</span>} key="StudioJarSetting">
......
......@@ -27,6 +27,7 @@ import StudioEdit from '../StudioEdit';
import {DIALECT} from '../conf';
import StudioHome from "@/components/Studio/StudioHome";
import {Dispatch} from "@@/plugin-dva/connect";
import StudioKubernetes from "@/components/Studio/StudioKubernetes";
const {TabPane} = Tabs;
......@@ -103,6 +104,35 @@ const EditorTabs = (props: any) => {
</span>
);
// as different dialet return different Panle
const getTabPane = (pane, i) => {
if (pane.task.dialect == DIALECT.KUBERNETES_APPLICATION) {
return (
<TabPane tab={Tab(pane)} key={pane.key} closable={pane.closable}>
<StudioKubernetes
tabsKey={pane.key}
conf={pane.value}
monaco={pane.monaco}
height={height ? height : (toolHeight - 32)}
width={width}
/>
</TabPane>
)
} else {
return (<TabPane tab={Tab(pane)} key={pane.key} closable={pane.closable}>
<StudioEdit
tabsKey={pane.key}
sql={pane.value}
monaco={pane.monaco}
// sqlMetaData={pane.sqlMetaData}
height={height ? height : (toolHeight - 32)}
width={width}
language={current.task.dialect === DIALECT.JAVA ? 'java' : 'sql'}
/>
</TabPane>)
}
}
return (
<>
{tabs.panes.length === 0 ? <StudioHome width={width}/> :
......@@ -116,19 +146,7 @@ const EditorTabs = (props: any) => {
className={styles['edit-tabs']}
style={{height: height ? height : toolHeight}}
>
{tabs.panes.map((pane, i) => (
<TabPane tab={Tab(pane)} key={pane.key} closable={pane.closable}>
<StudioEdit
tabsKey={pane.key}
sql={pane.value}
monaco={pane.monaco}
// sqlMetaData={pane.sqlMetaData}
height={height ? height : (toolHeight - 32)}
width={width}
language={current.task.dialect === DIALECT.JAVA ? 'java' : 'sql'}
/>
</TabPane>
))}
{tabs.panes.map((pane, i) => getTabPane(pane, i))}
</Tabs>}
</>
);
......
......@@ -72,6 +72,7 @@ const SimpleTaskForm: React.FC<UpdateFormProps> = (props) => {
>
<Select defaultValue={DIALECT.FLINKSQL} value={DIALECT.FLINKSQL}>
<Option value={DIALECT.FLINKSQL}>{DIALECT.FLINKSQL}</Option>
<Option value={DIALECT.KUBERNETES_APPLICATION}>{DIALECT.KUBERNETES_APPLICATION}</Option>
<Option value={DIALECT.FLINKJAR}>{DIALECT.FLINKJAR}</Option>
<Option value={DIALECT.FLINKSQLENV}>{DIALECT.FLINKSQLENV}</Option>
<Option value={DIALECT.MYSQL}>{DIALECT.MYSQL}</Option>
......
......@@ -42,6 +42,7 @@ export const DIALECT = {
HIVE: 'Hive',
PHOENIX: 'Phoenix',
STARROCKS: 'StarRocks',
KUBERNETES_APPLICATION: 'KubernetesApplaction',
JAVA: 'Java',
};
......@@ -108,6 +109,7 @@ export const isTask = (dialect: string) => {
case DIALECT.FLINKJAR:
case DIALECT.HIVE:
case DIALECT.STARROCKS:
case DIALECT.KUBERNETES_APPLICATION:
return true;
default:
return false;
......
......@@ -23,6 +23,7 @@ export type Config = {
lable: string,
placeholder: string
defaultValue?: string
help?: string
}
export const HADOOP_CONFIG_LIST: Config[] = [{
......@@ -44,14 +45,20 @@ export const KUBERNETES_CONFIG_LIST: Config[] = [{
placeholder: 'NodePort',
defaultValue: 'NodePort',
}];
export const FLINK_CONFIG_LIST: Config[] = [{
export const FLINK_CONFIG_LIST: Config[] = [
{
name: 'kubernetes.pod-template-file',
lable: 'kubernetes.pod-template-file',
placeholder: 'pod-template-file path',
help: '可选,如果配置需要指定到具体文件'
},{
name: 'jobmanager.memory.process.size',
lable: 'jobmanager.memory.process.size',
placeholder: '1600m',
placeholder: '1600m jobmanager內存設置',
}, {
name: 'taskmanager.memory.process.size',
lable: 'taskmanager.memory.process.size',
placeholder: '2048m',
placeholder: '2048m taskmanager內存設置',
}, {
name: 'taskmanager.memory.framework.heap.size',
lable: 'taskmanager.memory.framework.heap.size',
......@@ -59,7 +66,15 @@ export const FLINK_CONFIG_LIST: Config[] = [{
}, {
name: 'taskmanager.numberOfTaskSlots',
lable: 'taskmanager.numberOfTaskSlots',
placeholder: '4',
placeholder: '1',
}, {
name: 'kubernetes.jobmanager.cpu',
lable: 'kubernetes.jobmanager.cpu',
placeholder: '1',
}, {
name: 'kubernetes.taskmanager.cpu',
lable: 'kubernetes.taskmanager.cpu',
placeholder: '1',
}, {
name: 'parallelism.default',
lable: 'parallelism.default',
......@@ -71,6 +86,25 @@ export const FLINK_CONFIG_LIST: Config[] = [{
}
];
export const APP_CONFIG_LIST: Config[] = [{
name: 'userJarPath',
lable: '镜像内Jar路径',
placeholder: 'local:///opt/example.jar',
}, {
name: 'userJarMainAppClass',
lable: '启动类',
placeholder: 'com.example.app',
}, {
name: 'userJarParas',
lable: '启动参数',
placeholder: '值如 -conf test.properties',
},{
name: 'state.savepoints.dir',
lable: 'state.savepoints.dir',
placeholder: 'hdfs:///flink/savepoints/',
}
];
export function HADOOP_CONFIG_NAME_LIST () {
const list: string[] = [];
HADOOP_CONFIG_LIST.forEach(item => {
......@@ -86,6 +120,13 @@ export function KUBERNETES_CONFIG_NAME_LIST () {
});
return list;
}
export function APP_CONFIG_NAME_LIST () {
const list: string[] = [];
APP_CONFIG_LIST.forEach(item => {
list.push(item.name);
});
return list;
}
export function FLINK_CONFIG_NAME_LIST() {
const list: string[] = [];
......
......@@ -47,6 +47,9 @@ export function getConfig(values:any) {
kubernetesConfig,
flinkConfig,
};
}else {
//all code paths must return a value.
return {}
}
}
......@@ -57,7 +60,10 @@ type ConfigItem = {
function addListToMap(list:[ConfigItem],config:{}){
for(let i in list){
config[list[i].name]=list[i].value;
//the param maybe undefind
if (list[i] != undefined){
config[list[i].name]=list[i].value;
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment