Commit 83671764 authored by wenmo's avatar wenmo

flink远程任务停止

parent 3afbb1de
...@@ -108,11 +108,19 @@ public class StudioController { ...@@ -108,11 +108,19 @@ public class StudioController {
} }
/** /**
* 获取session列表 * 获取flinkjobs列表
*/ */
@GetMapping("/listJobs") @GetMapping("/listJobs")
public Result listJobs(@RequestParam Integer clusterId) { public Result listJobs(@RequestParam Integer clusterId) {
List<JsonNode> jobs = studioService.listJobs(clusterId); List<JsonNode> jobs = studioService.listJobs(clusterId);
return Result.succeed(jobs.toArray(),"获取成功"); return Result.succeed(jobs.toArray(),"获取成功");
} }
/**
* 获取session列表
*/
@GetMapping("/cancel")
public Result cancel(@RequestParam Integer clusterId,@RequestParam String jobId) {
return Result.succeed(studioService.cancel(clusterId,jobId),"停止成功");
}
} }
...@@ -40,4 +40,6 @@ public interface StudioService { ...@@ -40,4 +40,6 @@ public interface StudioService {
List<ColumnCANode> getColumnCAByStatement(String statement); List<ColumnCANode> getColumnCAByStatement(String statement);
List<JsonNode> listJobs(Integer clusterId); List<JsonNode> listJobs(Integer clusterId);
boolean cancel(Integer clusterId,String jobId);
} }
...@@ -130,4 +130,11 @@ public class StudioServiceImpl implements StudioService { ...@@ -130,4 +130,11 @@ public class StudioServiceImpl implements StudioService {
Asserts.checkNotNull(cluster,"该集群不存在"); Asserts.checkNotNull(cluster,"该集群不存在");
return FlinkAPI.build(cluster.getJobManagerHost()).listJobs(); return FlinkAPI.build(cluster.getJobManagerHost()).listJobs();
} }
@Override
public boolean cancel(Integer clusterId,String jobId) {
Cluster cluster = clusterService.getById(clusterId);
Asserts.checkNotNull(cluster,"该集群不存在");
return FlinkAPI.build(cluster.getJobManagerHost()).stop(jobId);
}
} }
package com.dlink.api; package com.dlink.api;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import cn.hutool.http.Method;
import com.dlink.constant.FlinkRestAPIConstant; import com.dlink.constant.FlinkRestAPIConstant;
import com.dlink.constant.NetConstant; import com.dlink.constant.NetConstant;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
...@@ -51,8 +52,14 @@ public class FlinkAPI { ...@@ -51,8 +52,14 @@ public class FlinkAPI {
return parse(res); return parse(res);
} }
private JsonNode patch(String route, String body) {
String res = HttpUtil.createRequest(Method.PATCH,NetConstant.HTTP + address + NetConstant.SLASH + route).timeout(NetConstant.SERVER_TIME_OUT_ACTIVE).body(body).execute().body();
return parse(res);
}
public List<JsonNode> listJobs() { public List<JsonNode> listJobs() {
JsonNode result = get(FlinkRestAPIConstant.JOBS); JsonNode result = get(FlinkRestAPIConstant.JOBSLIST);
JsonNode jobs = result.get("jobs"); JsonNode jobs = result.get("jobs");
List<JsonNode> joblist = new ArrayList<>(); List<JsonNode> joblist = new ArrayList<>();
if (jobs.isArray()) { if (jobs.isArray()) {
...@@ -62,4 +69,9 @@ public class FlinkAPI { ...@@ -62,4 +69,9 @@ public class FlinkAPI {
} }
return joblist; return joblist;
} }
public boolean stop(String jobId){
get(FlinkRestAPIConstant.JOBS+jobId+FlinkRestAPIConstant.CANCEL);
return true;
}
} }
...@@ -10,5 +10,13 @@ public interface FlinkRestAPIConstant { ...@@ -10,5 +10,13 @@ public interface FlinkRestAPIConstant {
/** /**
* jobs * jobs
*/ */
String JOBS = "jobs"; String JOBS = "jobs/";
/**
* JOBSLIST
*/
String JOBSLIST = "jobs/overview";
/**
* cancel
*/
String CANCEL = "/yarn-cancel";
} }
...@@ -20,4 +20,9 @@ public class FlinkRestAPITest { ...@@ -20,4 +20,9 @@ public class FlinkRestAPITest {
List<JsonNode> jobs = FlinkAPI.build(address).listJobs(); List<JsonNode> jobs = FlinkAPI.build(address).listJobs();
System.out.println(jobs.toString()); System.out.println(jobs.toString());
} }
@Test
public void stopTest(){
FlinkAPI.build(address).stop("0727f796fcf9e07d89e724f7e15598cf");
}
} }
import {Input, Tag, Divider, Empty, message, Select} from "antd"; import {Empty, Tag, Divider, Tooltip, message, Select, Button, Space, Modal} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model"; import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi"; import {connect} from "umi";
import {useState} from "react"; import {useState} from "react";
import { SearchOutlined } from '@ant-design/icons'; import {SearchOutlined,CheckCircleOutlined,SyncOutlined,CloseCircleOutlined,ClockCircleOutlined,MinusCircleOutlined} from '@ant-design/icons';
import ProTable from '@ant-design/pro-table'; import ProTable from '@ant-design/pro-table';
import {showFlinkJobs} from "../../StudioEvent/DDL"; import {cancelJob, showFlinkJobs} from "../../StudioEvent/DDL";
const { Option } = Select; const {Option} = Select;
const StudioProcess = (props:any) => { const StudioProcess = (props: any) => {
const {cluster} = props; const {cluster} = props;
const [jobsData, setJobsData] = useState<any>({}); const [jobsData, setJobsData] = useState<any>({});
const [clusterId, setClusterId] = useState<number>();
const getColumns=()=>{ const getColumns = () => {
let columns: any = [{ let columns: any = [{
title: "JobId", title: "作业ID",
dataIndex: "id", dataIndex: "jid",
key: "id", key: "jid",
sorter: true, sorter: true,
},{ }, {
title: "status", title: "作业名",
dataIndex: "status", dataIndex: "name",
sorter: true,
}, {
title: "状态",
dataIndex: "state",
sorter: true,
render: (_, row) => {
return (
<>
{(row.state == 'FINISHED') ?
(<Tag icon={<CheckCircleOutlined />} color="success">
FINISHED
</Tag>) :
(row.state == 'RUNNING') ?
(<Tag icon={<SyncOutlined spin />} color="processing">
RUNNING
</Tag>) :
(row.state == 'FAILED') ?
(<Tag icon={<CloseCircleOutlined />} color="error">
FAILED
</Tag>) :
(row.state == 'CANCELED') ?
(<Tag icon={<MinusCircleOutlined />} color="default">
CANCELED
</Tag>) :
(row.state == 'INITIALIZE') ?
(<Tag icon={<ClockCircleOutlined />} color="default">
INITIALIZE
</Tag>) :
(<Tag color="default">
UNKNOWEN
</Tag>)
}</>)
;
}
}, {
title: "开始时间",
dataIndex: "start-time",
sorter: true, sorter: true,
valueType: 'dateTime',
}, {
title: "最近修改时间",
dataIndex: "last-modification",
sorter: true,
valueType: 'dateTime',
}, {
title: "结束时间",
dataIndex: "end-time",
sorter: true,
valueType: 'dateTime',
}, {
title: "耗时",
dataIndex: "duration",
sorter: true,
valueType: 'second',
}, {
title: "tasks",
dataIndex: "tasks",
sorter: true,
render: (_, row) => {
return (<>
{row.tasks.total>0?(<Tooltip title="TOTAL"><Tag color="#666">{row.tasks.total}</Tag></Tooltip>):''}
{row.tasks.created>0?(<Tooltip title="CREATED"><Tag color="#666">{row.tasks.created}</Tag></Tooltip>):''}
{row.tasks.deploying>0?(<Tooltip title="DEPLOYING"><Tag color="#666">{row.tasks.deploying}</Tag></Tooltip>):''}
{row.tasks.running>0?(<Tooltip title="RUNNING"><Tag color="#44b549">{row.tasks.running}</Tag></Tooltip>):''}
{row.tasks.failed>0?(<Tooltip title="FAILED"><Tag color="#666">{row.tasks.failed}</Tag></Tooltip>):''}
{row.tasks.finished>0?(<Tooltip title="CREATED"><Tag color="#108ee9">{row.tasks.finished}</Tag></Tooltip>):''}
{row.tasks.reconciling>0?(<Tooltip title="RECONCILING"><Tag color="#666">{row.tasks.reconciling}</Tag></Tooltip>):''}
{row.tasks.scheduled>0?(<Tooltip title="SCHEDULED"><Tag color="#666">{row.tasks.scheduled}</Tag></Tooltip>):''}
{row.tasks.canceling>0?(<Tooltip title="CANCELING"><Tag color="#feb72b">{row.tasks.canceling}</Tag></Tooltip>):''}
{row.tasks.canceled>0?(<Tooltip title="CANCELED"><Tag color="#db970f">{row.tasks.canceled}</Tag></Tooltip>):''}
</>
)
}
}, { }, {
title: '操作', title: '操作',
dataIndex: 'option', dataIndex: 'option',
valueType: 'option', valueType: 'option',
render: (_, record) => [ render: (_, record) => {
<a let option = [<a
onClick={() => { onClick={() => {
message.success('敬请期待'); message.success('敬请期待');
}} }}
> >
详情 详情
</a>, <Divider type="vertical"/>, <a </a>];
if(record.state=='RUNNING'||record.state=='RECONCILING'||record.state=='SCHEDULED'){
option.push(<Divider type="vertical"/>);
option.push(<a
onClick={() => { onClick={() => {
message.success('敬请期待'); onCancel(record.jid);
}} }}
> >
停止 停止
</a> </a>);
], }
return option;
},
},]; },];
return columns; return columns;
}; };
const getClusterOptions = ()=>{ const onCancel = (jobId:string)=>{
Modal.confirm({
title: `确认停止作业【${jobId}】?`,
okText: '停止',
cancelText: '取消',
onOk: async () => {
if (!clusterId) return;
let res = cancelJob(clusterId, jobId);
res.then((result) => {
if (result.datas == true) {
message.success("停止成功");
onRefreshJobs();
} else {
message.error("停止失败");
}
});
}
});
};
const getClusterOptions = () => {
let itemList = []; let itemList = [];
for(let item of cluster){ for (let item of cluster) {
let tag =(<><Tag color={item.enabled?"processing":"error"}>{item.type}</Tag>{item.alias}</>); let tag = (<><Tag color={item.enabled ? "processing" : "error"}>{item.type}</Tag>{item.alias}</>);
itemList.push(<Option value={item.id} label={tag}> itemList.push(<Option value={item.id} label={tag}>
{tag} {tag}
</Option>) </Option>)
...@@ -57,30 +155,45 @@ const StudioProcess = (props:any) => { ...@@ -57,30 +155,45 @@ const StudioProcess = (props:any) => {
return itemList; return itemList;
}; };
const onChangeCluster= (value:number)=>{ const onChangeCluster = (value: number) => {
let res = showFlinkJobs(value); setClusterId(value);
onRefreshJobs();
};
const onRefreshJobs = ()=>{
if(!clusterId) return;
let res = showFlinkJobs(clusterId);
res.then((result) => { res.then((result) => {
for(let i in result.datas){
result.datas[i].duration = result.datas[i].duration*0.001;
if(result.datas[i]['end-time']==-1){
result.datas[i]['end-time']=null;
}
}
setJobsData(result.datas); setJobsData(result.datas);
}); });
}; };
return ( return (
<div style={{width: '100%'}}> <div style={{width: '100%'}}>
<Space>
<Select <Select
style={{ width: '100%' }} // style={{width: '100%'}}
placeholder="选择Flink集群" placeholder="选择Flink集群"
optionLabelProp="label" optionLabelProp="label"
onChange={onChangeCluster} onChange={onChangeCluster}
> >
{getClusterOptions()} {getClusterOptions()}
</Select> </Select>
{jobsData.length>0? <Button type="primary" icon={<SearchOutlined />} onClick={onRefreshJobs} />
(<ProTable dataSource={jobsData} columns={getColumns()} search={false} </Space>
/>):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)} {jobsData.length > 0 ?
(<ProTable dataSource={jobsData} columns={getColumns()} size="small" search={false} toolBarRender={false}
/>) : (<Empty image={Empty.PRESENTED_IMAGE_SIMPLE}/>)}
</div> </div>
); );
}; };
export default connect(({ Studio }: { Studio: StateType }) => ({ export default connect(({Studio}: { Studio: StateType }) => ({
cluster: Studio.cluster, cluster: Studio.cluster,
}))(StudioProcess); }))(StudioProcess);
...@@ -124,3 +124,7 @@ export function showDataBase(dispatch: any) { ...@@ -124,3 +124,7 @@ export function showDataBase(dispatch: any) {
export function showFlinkJobs(clusterId:number) { export function showFlinkJobs(clusterId:number) {
return getData('api/studio/listJobs',{clusterId:clusterId}); return getData('api/studio/listJobs',{clusterId:clusterId});
} }
/*--- 停止 Flink Jobs ---*/
export function cancelJob(clusterId:number,jobId:string) {
return getData('api/studio/cancel',{clusterId:clusterId,jobId:jobId});
}
import {Table, Tag, Divider, Empty, message, Select} from "antd";
import {StateType} from "@/pages/FlinkSqlStudio/model";
import {connect} from "umi";
import {useState} from "react";
import { SearchOutlined } from '@ant-design/icons';
import {showFlinkJobs} from "../../StudioEvent/DDL";
const { Option } = Select;
const StudioJobs = (props:any) => {
const {cluster} = props;
const [jobsData, setJobsData] = useState<any>({});
const getColumns=()=>{
let columns: any = [{
title: "JobId",
dataIndex: "id",
key: "id",
sorter: true,
},{
title: "status",
dataIndex: "status",
sorter: true,
}, {
title: '操作',
dataIndex: 'option',
valueType: 'option',
render: (_, record) => [
<a
onClick={() => {
message.success('敬请期待');
}}
>
详情
</a>, <Divider type="vertical"/>, <a
onClick={() => {
message.success('敬请期待');
}}
>
停止
</a>
],
},];
return columns;
};
const getClusterOptions = ()=>{
let itemList = [];
for(let item of cluster){
let tag =(<><Tag color={item.enabled?"processing":"error"}>{item.type}</Tag>{item.alias}</>);
itemList.push(<Option value={item.id} label={tag}>
{tag}
</Option>)
}
return itemList;
};
const onChangeCluster= (value:number)=>{
let res = showFlinkJobs(value);
res.then((result) => {
setJobsData(result.datas);
});
};
return (
<>
<Select
style={{ width: '100%' }}
placeholder="选择Flink集群"
optionLabelProp="label"
onChange={onChangeCluster}
>
{getClusterOptions()}
</Select>
{jobsData.length>0?
(<Table dataSource={jobsData} columns={getColumns()}
/>):(<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />)}
</>
);
};
export default connect(({ Studio }: { Studio: StateType }) => ({
cluster: Studio.cluster,
}))(StudioJobs);
...@@ -8,13 +8,9 @@ import StudioConnector from "./StudioConnector"; ...@@ -8,13 +8,9 @@ import StudioConnector from "./StudioConnector";
import React from "react"; import React from "react";
import StudioDataBase from "./StudioDataBase"; import StudioDataBase from "./StudioDataBase";
import StudioCluster from "./StudioCluster"; import StudioCluster from "./StudioCluster";
import StudioJobs from "./StudioJobs";
const { TabPane } = Tabs; const { TabPane } = Tabs;
const StudioLeftTool = (props:any) => { const StudioLeftTool = (props:any) => {
return ( return (
...@@ -34,9 +30,6 @@ const StudioLeftTool = (props:any) => { ...@@ -34,9 +30,6 @@ const StudioLeftTool = (props:any) => {
<TabPane tab={<span><AppstoreOutlined /> 元数据</span>} key="MetaData" > <TabPane tab={<span><AppstoreOutlined /> 元数据</span>} key="MetaData" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} /> <Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane> </TabPane>
<TabPane tab={<span><FireOutlined /> 任务</span>} key="FlinkTask" >
<StudioJobs />
</TabPane>
<TabPane tab={<span><FunctionOutlined /> 函数</span>} key="Function" > <TabPane tab={<span><FunctionOutlined /> 函数</span>} key="Function" >
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} /> <Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
</TabPane> </TabPane>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment