Commit c61d1a6e authored by wenmo's avatar wenmo

执行持久化联调

parent 0ece94bc
...@@ -13,19 +13,19 @@ import lombok.Setter; ...@@ -13,19 +13,19 @@ import lombok.Setter;
@Getter @Getter
@Setter @Setter
public class StudioDDLDTO { public class StudioDDLDTO {
private boolean isResult; private boolean useResult;
private boolean isSession; private boolean useSession;
private String session; private String session;
private boolean isRemote; private boolean useRemote;
private Integer clusterId; private Integer clusterId;
private String statement; private String statement;
public JobConfig getJobConfig() { public JobConfig getJobConfig() {
return new JobConfig(isResult, isSession, getSession(), isRemote, clusterId); return new JobConfig(useResult, useSession, getSession(), useRemote, clusterId);
} }
public String getSession() { public String getSession() {
if(isRemote) { if(useRemote) {
return clusterId + "_" + session; return clusterId + "_" + session;
}else{ }else{
return "0_" + session; return "0_" + session;
......
...@@ -13,10 +13,10 @@ import lombok.Setter; ...@@ -13,10 +13,10 @@ import lombok.Setter;
@Getter @Getter
@Setter @Setter
public class StudioExecuteDTO { public class StudioExecuteDTO {
private boolean isResult; private boolean useResult;
private boolean isSession; private boolean useSession;
private String session; private String session;
private boolean isRemote; private boolean useRemote;
private Integer clusterId; private Integer clusterId;
private boolean fragment; private boolean fragment;
private String statement; private String statement;
...@@ -28,11 +28,11 @@ public class StudioExecuteDTO { ...@@ -28,11 +28,11 @@ public class StudioExecuteDTO {
private String savePointPath; private String savePointPath;
public JobConfig getJobConfig() { public JobConfig getJobConfig() {
return new JobConfig(isResult, isSession, getSession(), isRemote, clusterId, taskId, jobName, fragment, maxRowNum, checkPoint, parallelism, savePointPath); return new JobConfig(useResult, useSession, getSession(), useRemote, clusterId, taskId, jobName, fragment, maxRowNum, checkPoint, parallelism, savePointPath);
} }
public String getSession() { public String getSession() {
if(isRemote) { if(useRemote) {
return clusterId + "_" + session; return clusterId + "_" + session;
}else{ }else{
return "0_" + session; return "0_" + session;
......
...@@ -6,7 +6,7 @@ import lombok.Data; ...@@ -6,7 +6,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDate; import java.time.LocalDateTime;
/** /**
* History * History
...@@ -32,8 +32,8 @@ public class History implements Serializable { ...@@ -32,8 +32,8 @@ public class History implements Serializable {
private String error; private String error;
private String result; private String result;
private String config; private String config;
private LocalDate startTime; private LocalDateTime startTime;
private LocalDate endTime; private LocalDateTime endTime;
private Integer taskId; private Integer taskId;
@TableField(exist = false) @TableField(exist = false)
......
...@@ -72,7 +72,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -72,7 +72,7 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) { public JobResult executeSql(StudioExecuteDTO studioExecuteDTO) {
JobConfig config = studioExecuteDTO.getJobConfig(); JobConfig config = studioExecuteDTO.getJobConfig();
if(config.isRemote()) { if(config.isUseRemote()) {
config.setHost(clusterService.getJobManagerAddress( config.setHost(clusterService.getJobManagerAddress(
clusterService.getById(studioExecuteDTO.getClusterId()) clusterService.getById(studioExecuteDTO.getClusterId())
)); ));
...@@ -84,7 +84,7 @@ public class StudioServiceImpl implements StudioService { ...@@ -84,7 +84,7 @@ public class StudioServiceImpl implements StudioService {
@Override @Override
public IResult executeDDL(StudioDDLDTO studioDDLDTO) { public IResult executeDDL(StudioDDLDTO studioDDLDTO) {
JobConfig config = studioDDLDTO.getJobConfig(); JobConfig config = studioDDLDTO.getJobConfig();
if(config.isRemote()) { if(config.isUseRemote()) {
config.setHost(clusterService.getJobManagerAddress( config.setHost(clusterService.getJobManagerAddress(
clusterService.getById(studioDDLDTO.getClusterId()) clusterService.getById(studioDDLDTO.getClusterId())
)); ));
......
...@@ -49,6 +49,6 @@ public class Job { ...@@ -49,6 +49,6 @@ public class Job {
} }
public JobResult getJobResult(){ public JobResult getJobResult(){
return new JobResult(id,jobConfig,jobManagerAddress,status,statement,jobId,error,result,executorSetting,startTime,endTime); return new JobResult(id,jobConfig,jobManagerAddress,status,statement,jobId,error,result,startTime,endTime);
} }
} }
...@@ -15,10 +15,10 @@ import lombok.Setter; ...@@ -15,10 +15,10 @@ import lombok.Setter;
@Setter @Setter
public class JobConfig { public class JobConfig {
private boolean isResult; private boolean useResult;
private boolean isSession; private boolean useSession;
private String sessionKey; private String sessionKey;
private boolean isRemote; private boolean useRemote;
private Integer clusterId; private Integer clusterId;
private String host; private String host;
private Integer taskId; private Integer taskId;
...@@ -29,13 +29,13 @@ public class JobConfig { ...@@ -29,13 +29,13 @@ public class JobConfig {
private Integer parallelism; private Integer parallelism;
private String savePointPath; private String savePointPath;
public JobConfig(boolean isResult, boolean isSession, String sessionKey, boolean isRemote, Integer clusterId, public JobConfig(boolean useResult, boolean useSession, String sessionKey, boolean useRemote, Integer clusterId,
Integer taskId, String jobName, boolean useSqlFragment, Integer maxRowNum, Integer checkpoint, Integer taskId, String jobName, boolean useSqlFragment, Integer maxRowNum, Integer checkpoint,
Integer parallelism, String savePointPath) { Integer parallelism, String savePointPath) {
this.isResult = isResult; this.useResult = useResult;
this.isSession = isSession; this.useSession = useSession;
this.sessionKey = sessionKey; this.sessionKey = sessionKey;
this.isRemote = isRemote; this.useRemote = useRemote;
this.clusterId = clusterId; this.clusterId = clusterId;
this.taskId = taskId; this.taskId = taskId;
this.jobName = jobName; this.jobName = jobName;
...@@ -46,17 +46,17 @@ public class JobConfig { ...@@ -46,17 +46,17 @@ public class JobConfig {
this.savePointPath = savePointPath; this.savePointPath = savePointPath;
} }
public JobConfig(boolean isResult, boolean isSession, String sessionKey, boolean isRemote, Integer clusterId) { public JobConfig(boolean useResult, boolean useSession, String sessionKey, boolean useRemote, Integer clusterId) {
this.isResult = isResult; this.useResult = useResult;
this.isSession = isSession; this.useSession = useSession;
this.sessionKey = sessionKey; this.sessionKey = sessionKey;
this.isRemote = isRemote; this.useRemote = useRemote;
this.clusterId = clusterId; this.clusterId = clusterId;
} }
public ExecutorSetting getExecutorSetting(){ public ExecutorSetting getExecutorSetting(){
String type = Executor.LOCAL; String type = Executor.LOCAL;
if(isRemote){ if(useRemote){
type = Executor.REMOTE; type = Executor.REMOTE;
} }
return new ExecutorSetting(host,type,checkpoint,parallelism,useSqlFragment,savePointPath,jobName); return new ExecutorSetting(host,type,checkpoint,parallelism,useSqlFragment,savePointPath,jobName);
......
...@@ -91,7 +91,7 @@ public class JobManager extends RunTime { ...@@ -91,7 +91,7 @@ public class JobManager extends RunTime {
} }
private Executor createExecutor() { private Executor createExecutor() {
if (config.isRemote()) { if (config.isUseRemote()) {
executor = Executor.build(new EnvironmentSetting(jobManagerHost, jobManagerPort), config.getExecutorSetting()); executor = Executor.build(new EnvironmentSetting(jobManagerHost, jobManagerPort), config.getExecutorSetting());
return executor; return executor;
} else { } else {
...@@ -116,7 +116,7 @@ public class JobManager extends RunTime { ...@@ -116,7 +116,7 @@ public class JobManager extends RunTime {
}*/ }*/
private Executor createExecutorWithSession() { private Executor createExecutorWithSession() {
if(config.isSession()) { if(config.isUseSession()) {
ExecutorEntity executorEntity = SessionPool.get(config.getSessionKey()); ExecutorEntity executorEntity = SessionPool.get(config.getSessionKey());
if (executorEntity != null) { if (executorEntity != null) {
executor = executorEntity.getExecutor(); executor = executorEntity.getExecutor();
...@@ -134,7 +134,7 @@ public class JobManager extends RunTime { ...@@ -134,7 +134,7 @@ public class JobManager extends RunTime {
public boolean init() { public boolean init() {
handler = JobHandler.build(); handler = JobHandler.build();
String host = config.getHost(); String host = config.getHost();
if (config.isRemote() && host != null && !("").equals(host)) { if (config.isUseRemote() && host != null && !("").equals(host)) {
String[] strs = host.split(NetConstant.COLON); String[] strs = host.split(NetConstant.COLON);
if (strs.length >= 2) { if (strs.length >= 2) {
jobManagerHost = strs[0]; jobManagerHost = strs[0];
...@@ -295,7 +295,7 @@ public class JobManager extends RunTime { ...@@ -295,7 +295,7 @@ public class JobManager extends RunTime {
if (tableResult.getJobClient().isPresent()) { if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
} }
if(config.isResult()) { if(config.isUseResult()) {
IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult); IResult result = ResultBuilder.build(operationType, maxRowNum, "", false).getResult(tableResult);
job.setResult(result); job.setResult(result);
} }
......
...@@ -20,24 +20,24 @@ public class JobResult { ...@@ -20,24 +20,24 @@ public class JobResult {
private JobConfig jobConfig; private JobConfig jobConfig;
private String jobManagerAddress; private String jobManagerAddress;
private Job.JobStatus status; private Job.JobStatus status;
private boolean success;
private String statement; private String statement;
private String jobId; private String jobId;
private String error; private String error;
private IResult result; private IResult result;
private ExecutorSetting executorSetting;
private LocalDate startTime; private LocalDate startTime;
private LocalDate endTime; private LocalDate endTime;
public JobResult(Integer id, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status, String statement, String jobId, String error, IResult result, ExecutorSetting executorSetting, LocalDate startTime, LocalDate endTime) { public JobResult(Integer id, JobConfig jobConfig, String jobManagerAddress, Job.JobStatus status, String statement, String jobId, String error, IResult result, LocalDate startTime, LocalDate endTime) {
this.id = id; this.id = id;
this.jobConfig = jobConfig; this.jobConfig = jobConfig;
this.jobManagerAddress = jobManagerAddress; this.jobManagerAddress = jobManagerAddress;
this.status = status; this.status = status;
this.success = (status==(Job.JobStatus.SUCCESS))?true:false;
this.statement = statement; this.statement = statement;
this.jobId = jobId; this.jobId = jobId;
this.error = error; this.error = error;
this.result = result; this.result = result;
this.executorSetting = executorSetting;
this.startTime = startTime; this.startTime = startTime;
this.endTime = endTime; this.endTime = endTime;
} }
......
...@@ -74,7 +74,7 @@ const StudioConfig = (props: any) => { ...@@ -74,7 +74,7 @@ const StudioConfig = (props: any) => {
<Row> <Row>
<Col span={12}> <Col span={12}>
<Form.Item <Form.Item
label="预览结果" className={styles.form_item} name="isResult" valuePropName="checked" label="预览结果" className={styles.form_item} name="useResult" valuePropName="checked"
tooltip={{ title: '开启预览结果,将同步运行并返回数据结果', icon: <InfoCircleOutlined /> }} tooltip={{ title: '开启预览结果,将同步运行并返回数据结果', icon: <InfoCircleOutlined /> }}
> >
<Switch checkedChildren="启用" unCheckedChildren="禁用" <Switch checkedChildren="启用" unCheckedChildren="禁用"
...@@ -91,7 +91,7 @@ const StudioConfig = (props: any) => { ...@@ -91,7 +91,7 @@ const StudioConfig = (props: any) => {
</Col> </Col>
</Row> </Row>
<Form.Item <Form.Item
label="远程执行" className={styles.form_item} name="isRemote" valuePropName="checked" label="远程执行" className={styles.form_item} name="useRemote" valuePropName="checked"
tooltip={{ title: '开启远程执行,将在远程集群进行任务执行', icon: <InfoCircleOutlined /> }} tooltip={{ title: '开启远程执行,将在远程集群进行任务执行', icon: <InfoCircleOutlined /> }}
> >
<Switch checkedChildren="启用" unCheckedChildren="禁用" <Switch checkedChildren="启用" unCheckedChildren="禁用"
...@@ -100,7 +100,7 @@ const StudioConfig = (props: any) => { ...@@ -100,7 +100,7 @@ const StudioConfig = (props: any) => {
<Row> <Row>
<Col span={10}> <Col span={10}>
<Form.Item <Form.Item
label="共享会话" className={styles.form_item} name="isSession" valuePropName="checked" label="共享会话" className={styles.form_item} name="useSession" valuePropName="checked"
tooltip={{ title: '开启共享会话,将进行 Flink Catalog 的共享', icon: <InfoCircleOutlined /> }} tooltip={{ title: '开启共享会话,将进行 Flink Catalog 的共享', icon: <InfoCircleOutlined /> }}
> >
<Switch checkedChildren="启用" unCheckedChildren="禁用" <Switch checkedChildren="启用" unCheckedChildren="禁用"
......
...@@ -13,18 +13,18 @@ const StudioMsg = (props:any) => { ...@@ -13,18 +13,18 @@ const StudioMsg = (props:any) => {
{current.console.result.map((item,index)=> { {current.console.result.map((item,index)=> {
if(index==0) { if(index==0) {
return (<Paragraph> return (<Paragraph>
<blockquote><Link href={`http://${item.flinkHost}:${item.flinkPort}`} target="_blank"> <blockquote><Link href={`http://${item.jobConfig.host}`} target="_blank">
[{item.sessionId}:{item.flinkHost}:{item.flinkPort}] [{item.jobConfig.sessionKey}:{item.jobConfig.host}]
</Link> <Divider type="vertical"/>{item.finishDate} </Link> <Divider type="vertical"/>{item.startTime}
<Divider type="vertical"/>{item.endTime}
<Divider type="vertical"/> <Divider type="vertical"/>
{!item.success ? <><Badge status="error"/><Text type="danger">Error</Text></> : {!(item.status=='SUCCESS') ? <><Badge status="error"/><Text type="danger">Error</Text></> :
<><Badge status="success"/><Text type="success">Success</Text></>} <><Badge status="success"/><Text type="success">Success</Text></>}
<Divider type="vertical"/> <Divider type="vertical"/>
{item.jobName&&<Text code>{item.jobName}</Text>} {item.jobConfig.jobName&&<Text code>{item.jobConfig.jobName}</Text>}
{item.jobId&&<Text code>{item.jobId}</Text>} {item.jobId&&<Text code>{item.jobId}</Text>}
<Text keyboard>{item.time}ms</Text></blockquote> </blockquote>
{item.statement && (<pre style={{height: '100px'}}>{item.statement}</pre>)} {item.statement && (<pre style={{height: '100px'}}>{item.statement}</pre>)}
{item.msg ? item.msg : ''}
{item.error && (<pre style={{height: '100px'}}>{item.error}</pre>)} {item.error && (<pre style={{height: '100px'}}>{item.error}</pre>)}
</Paragraph>) </Paragraph>)
}else{ }else{
......
...@@ -109,7 +109,7 @@ const StudioTable = (props:any) => { ...@@ -109,7 +109,7 @@ const StudioTable = (props:any) => {
onChange={onChange} onChange={onChange}
> >
{current.console.result.map((item,index)=> { {current.console.result.map((item,index)=> {
if(item.success) { if(item.status=='SUCCESS') {
let tag = (<> <Tooltip placement="topLeft" title={item.statement}><Tag color="processing">{item.finishDate}</Tag> let tag = (<> <Tooltip placement="topLeft" title={item.statement}><Tag color="processing">{item.finishDate}</Tag>
<Text underline>[{item.sessionId}:{item.flinkHost}:{item.flinkPort}]</Text> <Text underline>[{item.sessionId}:{item.flinkHost}:{item.flinkPort}]</Text>
{item.jobName&&<Text code>{item.jobName}</Text>} {item.jobName&&<Text code>{item.jobName}</Text>}
......
...@@ -44,9 +44,9 @@ const StudioMenu = (props: any) => { ...@@ -44,9 +44,9 @@ const StudioMenu = (props: any) => {
fragment:current.task.fragment, fragment:current.task.fragment,
savePointPath:current.task.savePointPath, savePointPath:current.task.savePointPath,
jobName:current.task.jobName, jobName:current.task.jobName,
isResult:current.task.isResult, useResult:current.task.useResult,
isSession:current.task.isSession, useSession:current.task.useSession,
remote:current.task.isRemote, useRemote:current.task.useRemote,
}; };
const key = current.key; const key = current.key;
const taskKey = (Math.random()*1000)+''; const taskKey = (Math.random()*1000)+'';
......
...@@ -139,9 +139,9 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => { ...@@ -139,9 +139,9 @@ const StudioTree: React.FC<StudioTreeProps> = (props) => {
session:'', session:'',
maxRowNum: 100, maxRowNum: 100,
jobName:node.name, jobName:node.name,
isResult:false, useResult:false,
isSession:false, useSession:false,
isRemote:true, useRemote:true,
...result.datas ...result.datas
}, },
console:{ console:{
......
...@@ -41,9 +41,9 @@ export type TaskType = { ...@@ -41,9 +41,9 @@ export type TaskType = {
session: string; session: string;
maxRowNum: number; maxRowNum: number;
jobName: string; jobName: string;
isResult:boolean; useResult:boolean;
isSession:boolean; useSession:boolean;
isRemote:boolean; useRemote:boolean;
}; };
export type ConsoleType = { export type ConsoleType = {
...@@ -151,9 +151,9 @@ const Model: ModelType = { ...@@ -151,9 +151,9 @@ const Model: ModelType = {
maxRowNum: 100, maxRowNum: 100,
session: '', session: '',
alias: '草稿', alias: '草稿',
isResult:true, useResult:true,
isSession:false, useSession:false,
isRemote:false, useRemote:false,
}, },
console: { console: {
result: [], result: [],
...@@ -182,9 +182,9 @@ const Model: ModelType = { ...@@ -182,9 +182,9 @@ const Model: ModelType = {
session: '', session: '',
maxRowNum: 100, maxRowNum: 100,
alias: '草稿', alias: '草稿',
isResult:true, useResult:true,
isSession:false, useSession:false,
isRemote:false, useRemote:false,
}, },
console: { console: {
result: [], result: [],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment