瀏覽代碼

更新argo日志获取

Zhangvinjo 3 年之前
父節點
當前提交
d66d5f6099

+ 17 - 0
pom.xml

@@ -298,6 +298,23 @@
 			<!--<scope>system</scope>-->
 			<!--<systemPath>D:/JAVA8/lib/tools.jar</systemPath>-->
 		<!--</dependency>-->
+<!--		<dependency>-->
+<!--			<groupId>javax.websocket</groupId>-->
+<!--			<artifactId>javax.websocket-api</artifactId>-->
+<!--			<version>1.1</version>-->
+<!--		</dependency>-->
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-websocket</artifactId>
+		</dependency>
+
+		<!--HttpClient-->
+		<dependency>
+			<groupId>commons-httpclient</groupId>
+			<artifactId>commons-httpclient</artifactId>
+			<version>3.1</version>
+		</dependency>
+
 	</dependencies>
 
 	<build>

+ 146 - 0
src/main/java/io/renren/common/utils/ArgoUtils.java

@@ -1,5 +1,6 @@
 package io.renren.common.utils;
 
+import com.alibaba.fastjson.JSONObject;
 import io.argoproj.workflow.ApiException;
 import io.argoproj.workflow.Configuration;
 import io.argoproj.workflow.JSON;
@@ -12,8 +13,13 @@ import io.argoproj.workflow.models.WorkflowTemplateCreateRequest;
 import lombok.extern.slf4j.Slf4j;
 
 import io.argoproj.workflow.ApiClient;
+//import org.apache.http.client.HttpClient;
 import org.springframework.beans.factory.annotation.Value;
 
+import java.io.IOException;
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
 /**
  * Argo 工具
  * @author Zhang
@@ -186,6 +192,146 @@ public class ArgoUtils {
         return apiInstance.workflowServiceGetWorkflow(namespace, workflowname, null, null);
     }
 
+    /**
+     * 根据工作流名称获取工作流状态
+     * @param namespace 命名空间
+     * @param workflowname workflow名称
+     * @return 获取到的工作流
+     * @throws ApiException api访问异常
+     */
+    public static String getWorkflowStatus(String namespace,String workflowname) throws ApiException {
+        WorkflowServiceApi apiInstance = new WorkflowServiceApi(apiClient);
+        Workflow workflow = apiInstance.workflowServiceGetWorkflow(namespace, workflowname, null, null);
+        return workflow.getStatus().getPhase();
+    }
+
+    /**
+     * 根据工作流名称获取工作流
+     * @param namespace 命名空间
+     * @param workflowname workflow名称
+     * @return 获取到的工作流
+     * @throws ApiException api访问异常
+     */
+    public static String getWorkflowlog(String namespace,String workflowname,String containertype){
+//        WorkflowServiceApi apiInstance = new WorkflowServiceApi(apiClient);
+//        return apiInstance.workflowServiceGetWorkflow(namespace, workflowname, null, null);
+
+        String getlogurl = basepath+"/api/v1/workflows/"+namespace+"/"+workflowname+"/log?logOptions.container="+containertype+"&logOptions.follow=true";
+//        String response = doGet("https://42.192.195.253:31691/api/v1/workflows/argo/algorithm-visual-2rqhq/log?logOptions.container=wait&logOptions.follow=true", "GBK");
+        System.out.println(getlogurl);
+        String response = doGet(getlogurl, "GBK");
+//        System.out.println("---------------------切分");
+        StringBuilder sblog = new StringBuilder();
+
+        if(!response.equals("")){
+            String[] split = response.split("\\r?\\n");
+            for(String strsplit:split){
+                JSONObject parse = com.alibaba.fastjson.JSON.parseObject(strsplit);
+                System.out.println(parse.toString());
+                JSONObject result = parse.getJSONObject("result");
+                System.out.println(result.getString("podName"));
+                System.out.println(result.getString("content"));
+                sblog.append(result.getString("podName")+": "+result.getString("content"));
+                sblog.append("\n");
+            }
+        }
+
+        return sblog.toString();
+    }
+
+    /**
+     * 根据工作流名称获取工作流
+     * @param namespace 命名空间
+     * @param workflowname workflow名称
+     * @return 获取到的工作流
+     * @throws ApiException api访问异常
+     */
+    public static String getpodlog(String namespace,String workflowname,String containertype,String podname){
+//        WorkflowServiceApi apiInstance = new WorkflowServiceApi(apiClient);
+//        return apiInstance.workflowServiceGetWorkflow(namespace, workflowname, null, null);
+
+        String getlogurl = basepath+"/api/v1/workflows/"+namespace+"/"+workflowname+"/log?logOptions.container="+containertype+"&logOptions.follow=true&podName="+podname;
+//        String response = doGet("https://42.192.195.253:31691/api/v1/workflows/argo/algorithm-visual-2rqhq/log?logOptions.container=wait&logOptions.follow=true&podName=algorithm-visual-2rqhq-2795439875", "GBK");
+        System.out.println(getlogurl);
+        String response = doGet(getlogurl, "GBK");
+//        System.out.println("---------------------切分");
+        StringBuilder sblog = new StringBuilder();
+
+        if(!response.equals("")){
+            String[] split = response.split("\\r?\\n");
+            for(String strsplit:split){
+                JSONObject parse = com.alibaba.fastjson.JSON.parseObject(strsplit);
+//                System.out.println(parse.toString());
+                JSONObject result = parse.getJSONObject("result");
+//                System.out.println(result.getString("podName"));
+//                System.out.println(result.getString("content"));
+                sblog.append(result.getString("podName")+": "+result.getString("content"));
+                sblog.append("\n");
+            }
+        }
+
+        return sblog.toString();
+    }
+
+
+    /**
+     * httpClient的get请求方式
+     * 使用GetMethod来访问一个URL对应的网页实现步骤:
+     * 1.生成一个HttpClient对象并设置相应的参数;
+     * 2.生成一个GetMethod对象并设置响应的参数;
+     * 3.用HttpClient生成的对象来执行GetMethod生成的Get方法;
+     * 4.处理响应状态码;
+     * 5.若响应正常,处理HTTP响应内容;
+     * 6.释放连接。
+     * @param url
+     * @param charset
+     * @return
+     */
+    public static String doGet(String url, String charset) {
+        //1.生成HttpClient对象并设置参数
+        HttpClient httpClient = new HttpClient();
+        //设置Http连接超时为10秒
+        httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(10000);
+        //2.生成GetMethod对象并设置参数
+        GetMethod getMethod = new GetMethod(url);
+        //设置get请求超时为10秒
+        getMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, 10000);
+        //设置请求重试处理,用的是默认的重试处理:请求三次
+        getMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler());
+        String response = "";
+        //3.执行HTTP GET 请求
+        try {
+            int statusCode = httpClient.executeMethod(getMethod);
+            //4.判断访问的状态码
+            if (statusCode != HttpStatus.SC_OK) {
+                System.err.println("请求出错:" + getMethod.getStatusLine());
+            }
+            //5.处理HTTP响应内容
+            //HTTP响应头部信息,这里简单打印
+            Header[] headers = getMethod.getResponseHeaders();
+//            for(Header h : headers) {
+//                System.out.println(h.getName() + "---------------" + h.getValue());
+//            }
+            //读取HTTP响应内容,这里简单打印网页内容
+            //读取为字节数组
+            byte[] responseBody = getMethod.getResponseBody();
+
+            response = new String(responseBody, charset);
+
+        } catch (HttpException e) {
+            //发生致命的异常,可能是协议不对或者返回的内容有问题
+            System.out.println("请检查输入的URL!");
+            e.printStackTrace();
+        } catch (IOException e) {
+            //发生网络异常
+            System.out.println("发生网络异常!");
+        } finally {
+            //6.释放连接
+            getMethod.releaseConnection();
+        }
+        return response;
+    }
+
 
 
 

+ 176 - 3
src/main/java/io/renren/common/utils/KubernetesUtils.java

@@ -1,13 +1,23 @@
 package io.renren.common.utils;
 
+import com.alibaba.fastjson.JSONObject;
+import io.argoproj.workflow.models.NodeStatus;
+import io.argoproj.workflow.models.Workflow;
 import io.kubernetes.client.PodLogs;
 import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1Pod;
 import io.kubernetes.client.util.ClientBuilder;
 import io.kubernetes.client.util.KubeConfig;
 import lombok.extern.slf4j.Slf4j;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.Configuration;
 /**
@@ -83,5 +93,168 @@ public class KubernetesUtils {
         return inputStream;
     }
 
+    /**
+     * 根据podname 获取pod ,获取pod中容器个数,获取每个容器的日志,返回日志结果
+     * @param namespace 命名空间
+     * @param podname pod名称
+     * @return 日志结果
+     * @throws IOException
+     * @throws ApiException
+     */
+    public static Map<String,String> getPodLog2(String namespace, String podname,Map<String,String> podcontent) throws IOException{
+        PodLogs podLogs = new PodLogs(kubeClient);
+        CoreV1Api apiInstance = new CoreV1Api(kubeClient);
+        V1Pod resultpod;
+        try {
+            resultpod = apiInstance.readNamespacedPod(podname, namespace, "true", true, true);
+            List<V1Container> containers1 = resultpod.getSpec().getContainers();
+            for(V1Container container:containers1){
+//                container.getName();
+//                System.out.println(container.getName());
+                InputStream inputStream1 = podLogs.streamNamespacedPodLog(namespace, podname, container.getName());
+                String podcontentmain1= new BufferedReader(new InputStreamReader(inputStream1)).lines().collect(Collectors.joining(System.lineSeparator()));
+                podcontent.put(container.getName(),podcontentmain1);
+            }
+
+        } catch (ApiException e) {
+            System.out.println();
+        }
+        return podcontent;
+
+    }
+
+    /**
+     * 根据podname 获取pod
+     * @param namespace 命名空间
+     * @param podname pod名称
+     * @return 日志结果
+     * @throws IOException
+     * @throws ApiException
+     */
+    public static V1Pod getPod(String namespace, String podname) {
+        CoreV1Api apiInstance = new CoreV1Api(kubeClient);
+        V1Pod resultpod = null;
+        try {
+            resultpod = apiInstance.readNamespacedPod(podname, namespace, "true", true, true);
+        } catch (ApiException e) {
+            System.out.println();
+        }
+        return resultpod;
+
+    }
+
+    /**
+     * 根据podname 获取pod 中镜像列表
+     * @param namespace 命名空间
+     * @param podname pod名称
+     * @return 日志结果
+
+     */
+    public static List<V1Container> getPodContainers(String namespace, String podname) {
+        System.out.println("podnameL "+podname+"    namespace: "+namespace);
+        CoreV1Api apiInstance = new CoreV1Api(kubeClient);
+        V1Pod resultpod;
+        List<V1Container> containers1 = null;
+        try {
+            resultpod = apiInstance.readNamespacedPod(podname, namespace, "true", true, true);
+            System.out.println(resultpod);
+            containers1 = resultpod.getSpec().getContainers();
+            System.out.println("containers");
+            System.out.println(containers1);
+
+        } catch (ApiException e) {
+            System.out.println(e);
+        }
+        return containers1;
+
+    }
+
+
+    /**
+     * 根据podname 获取pod ,获取pod中容器个数,获取每个容器的日志,返回日志结果
+     * @param namespace 命名空间
+     * @return 日志结果
+     * @throws IOException
+     * @throws ApiException
+     */
+    public static void watchWorkflowlog(String namespace,String workflowName,OutputStream outputStream) throws IOException{
+        Map<String,String> workflowcontent = new HashMap<String,String>();//日志
+        StringBuilder sbmain = new StringBuilder();
+        StringBuilder sbinit = new StringBuilder();
+        StringBuilder sbwait = new StringBuilder();
+
+        //根据提交后的工作流名称获取该工作流完整信息
+        Workflow successworkflow = null;
+        try {
+            successworkflow = ArgoUtils.getWorkflow(namespace, workflowName);
+            //创建一个api对象
+            CoreV1Api api = new CoreV1Api();
+            Map<String, NodeStatus> nodeStatusMap = successworkflow.getStatus().getNodes();
+            for (Map.Entry<String, NodeStatus> entry : nodeStatusMap.entrySet()) {
+                //
+                if (entry.getValue().getType().equals("Pod")) {
+//                    String templateName = entry.getValue().getTemplateName();
+//                    String displayName = entry.getValue().getDisplayName();   //dag  taskname唯一
+                    String podname = entry.getValue().getId();
+                    System.out.println("podname");
+                    System.out.println(podname);
+                    String podphase = entry.getValue().getPhase();
+                    if(podphase.equals("Pending")){
+//                            sbmain.append("pod初始化中");
+//                            sbmain.append("\n");
+                    }else{
+                        List<V1Container> podContainers = KubernetesUtils.getPodContainers(namespace, podname);
+                        if(podContainers!=null){
+                            for(V1Container container:podContainers){
+//                container.getName();
+//                System.out.println(container.getName());
+                                InputStream inputStream1 = getPodLog(namespace,podname,container.getName());
+                                String podcontentmain1= new BufferedReader(new InputStreamReader(inputStream1)).lines().collect(Collectors.joining(System.lineSeparator()));
+                                if(container.getName().equals("main")){
+                                    System.out.println("main: " +podcontentmain1);
+                                    sbmain.append(podcontentmain1);
+                                    sbmain.append("\n");
+                                }else if(container.getName().equals("init")){
+                                    System.out.println("init: " +podcontentmain1);
+                                    sbinit.append(podcontentmain1);
+                                    sbinit.append("\n");
+                                }else if(container.getName().equals("wait")){
+                                    System.out.println("wait: " +podcontentmain1);
+                                    sbwait.append(podcontentmain1);
+                                    sbwait.append("\n");
+                                }
+                            }
+                        }else {
+                            System.out.println("当前pod中镜像为空");
+                        }
+                    }
+                }
+
+            }
+
+        } catch (io.argoproj.workflow.ApiException | FileNotFoundException e) {
+            e.printStackTrace();
+        } catch (io.kubernetes.client.openapi.ApiException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        workflowcontent.put("main",sbmain.toString());
+        workflowcontent.put("init",sbinit.toString());
+        workflowcontent.put("wait",sbwait.toString());
+        System.out.println(workflowcontent);
+        String  paramjson= JSONObject.toJSONString(workflowcontent);
+
+        outputStream.write(paramjson.getBytes());
+//        MyWebSocket.sendMessage(paramjson);
+//        if(successworkflow.getStatus().getPhase().equals("Succeed"))
+
+//        kubeClient.pods().inNamespace(namespace).withName(podName).tailingLines(10).watchLog(System.out);
+
+    }
+
+
+
+
 
 }

+ 1 - 0
src/main/java/io/renren/config/ShiroConfig.java

@@ -62,6 +62,7 @@ public class ShiroConfig {
         filterMap.put("/aaa.txt", "anon");
         filterMap.put("/hello/**", "anon");
         filterMap.put("/aiplat/**", "anon");
+        filterMap.put("/webSocket/**", "anon");
         filterMap.put("/**", "oauth2");
         shiroFilter.setFilterChainDefinitionMap(filterMap);
 

File diff suppressed because it is too large
+ 299 - 267
src/main/java/io/renren/modules/sys/controller/VisiWorkflowController.java


Some files were not shown because too many files changed in this diff