Code-Cookbook

博客

  • Blogs
    • 1. [Springboot x spark]java.util.concurrent.ExecutionException: Boxed Error
    • 2. [Apollo]Apollo Config Center
    • 3. [Confluent]Confluent快速上手
    • 4. [Flink]CommdLine+Springboot+flink无法指定配置文件启动
    • 5. [Flink]Flink on k8s任务的提交和实操
    • 6. [Flink]Flink-connector-http
    • 7. [Flink]FlinkKafkaProducer启用压缩
    • 8. [Flink]FlinkSQL时间处理函数
    • 9. [Flink]Flink Sources
      • 9.1. Hive
        • 9.1.1. Read From Hive
        • 9.1.2. Write To Hive
      • 9.2. Kafka
        • 9.2.1. Read From Kafka
        • 9.2.2. Sink to kafka
      • 9.3. HTTP
    • 10. [Flink]Flink中自定义watermark生成器
    • 11. [Flink]Flink的并行度与TaskSlot
    • 12. [Flink]ProcessFunction无法使用,抛出InvalidProgramException
    • 13. [Flink]使用状态算子将stream聚合输出
    • 14. [Flink]关于Flink的Checkpoint的一次问题排查
    • 15. [Flink]多源实时行为报告的设计思路
    • 16. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
    • 17. [Flink]监控Flink Metrics
    • 18. [Flink]自定义序列化消费Kafka数据
    • 19. [Flink源码]Flink任务是如何启动的
    • 20. [Flink源码]YarnApplication模式的任务启动
    • 21. [Flink源码]流式工厂模式与配置的延迟绑定
    • 22. [Git]Git问题
    • 23. [Git]误在Master分支开发并commit无法push
    • 24. [Hadoop]Hadoop distcp
    • 25. [Hadoop]一些Hadoop问题
    • 26. [Hive]Hive分区表批量删除分区
    • 27. [Hive]Hive的Analyze函数,Statistics in Hive
    • 28. [Hive]修改存储格式为Parquet的Hive表的字段类型
    • 29. [Hive]在指定位置添加字段
    • 30. [Hive]外部表修改为内部表
    • 31. [Hive]更新Metastore中的LastAccessTime
    • 32. [Hive]本地连接需要Kerberos认证的Hive
    • 33. [Java]如何根据需要动态生成Java的class
    • 34. [Java]元注解
    • 35. 注解解析
    • 36. [Java]集合
    • 37. 简单(常用)数据结构
    • 38. [Java]Java8 Stream API
    • 39. [Java]SPI和责任链模式
    • 40. [Java]Socket
    • 41. [Java]使用Java在服务端和客户端之间传送文件
    • 42. [Java]三种策略模式应用于服务的启动
    • 43. [Java]多线程
    • 44. [Java]生产者消费者模型问题
    • 45. [Java]让项目顺利读取resources目录下的文件
    • 46. [Java]设计模式
    • 47. Continuing…
    • 48. [Java]设计模式六大原则
    • 49. [Java]面向对象知识点梳理
    • 50. [Java]OOP防脱发指南
    • 51. [Kerberos]Message stream modified (41)错误
    • 52. [Kudu]关于Kudu Upsert列的问题
    • 53. [Kudu]关于Kudu列的顺序的修改
    • 54. [Logback]特定业务日志重定向
    • 55. [MongoDB]MongoDB基本查询
    • 56. [Paimon]Flink读写Nginx代理的OSS上的Paimon表
    • 57. [Pyspark]PySpark
    • 58. [PySpark]PySpark On Yarn
    • 59. [Spark]CDP上安装其他版本SPARK(SPARK3)
    • 60. [SQL]Druid SQL解析器
    • 61. [SQL]IN/NOT IN/EXISTS/NOT EXISTS的替代写法
    • 62. [SQL]IN OR NOT IN , IS A PROBLEM
    • 63. [SQL]SQLLineage解析SQL血缘
    • 64. [SQL]业务数据库中的create_time和update_time分析时的问题
    • 65. [SQL]为什么LEFT JOIN后总数却与右表的总数一样了?
    • 66. [SQL]求用户任意天连续登录(每天为第多少天连续登录)
    • 67. [SQL]计算指定日期的年-周(为某年的第多少周)
    • 68. [Scala]函数中闭包(Closure)和柯里化(Currying)
    • 69. [Shell]EOF
    • 70. [Shell] Zip命令
    • 71. [Shell]Shell脚本日期递增(起止日期内递增)
    • 72. [Shell]将字符串转换为数字进行大小比较
    • 73. [Shell]打印本机IP
    • 74. [SparkStreaming]消费kafka写入Hive失败的问题Lease timeout of 0 seconds expired
    • 75. [Spark]SparkSQL 列转行的一种方法
    • 76. [Spark]SparkSQL JDBC并发连接读取
    • 77. [Spark]Spark提交任务RSA premaster secret error
    • 78. [Spark]Springboot整合Spark, 本地、集群部署
    • 79. [Spark]如何使用Java创建一个Row
    • 80. [Spark]将Spark DataFrame中的数值取出
    • 81. [Springboot]okHttp错误:Exception in thread “OkHttp Dispatcher” java.lang.IllegalStateException: closed
    • 82. [Vim]Vim查找和替换命令
    • 83. [debezium]在启动任务时传入SQL语句生成Snapshot
    • 84. [debezium]热修改Debezium MySQL Connector配置
    • 85. [Paimon]Paimon表的读写
  • 文章
  • 项目

Random ramblings

  • Random ramblings

大数据

  • Bigdata
  • Bigdata Tools

大数据辅助工具

  • Auxiliary tools

SQL相关

  • SQL
Code-Cookbook
  • Blogs
  • 9. [Flink]Flink Sources
  • 查看页面源码

9. [Flink]Flink Sources

本文在下面记录了一些常用的Flink读取常见的数据源的方法,虽然在官网已经有详细的说明文档了,但是只看文档还是不行的,还得实际操练起来,毕竟代码只有写多了才会用,遇到的bug越多,经验也就越多。

先说明,Flink官网已经明确表示,DatasetAPI将在今后的版本中被删除,目前为软删除状态,强烈建议大家使用TableAndSQL API 或者DataStreamAPI批模式

Starting with Flink 1.12 the DataSet API has been soft deprecated.

We recommend that you use the Table API and SQL to run efficient batch pipelines in a fully unified API. Table API is well integrated with common batch connectors and catalogs.

Alternatively, you can also use the DataStream API with BATCH execution mode. The linked section also outlines cases where it makes sense to use the DataSet API but those cases will become rarer as development progresses and the DataSet API will eventually be removed. Please also see FLIP-131 for background information on this decision.

9.1. Hive

如今,Hive仍然是数据仓库构建的主要工具(这话说得好别扭),就是说,只要是在做数仓,你一定会接触到Hive,那么使用Flink读取或者写入到Hive就显得很有必要。

9.1.1. Read From Hive

虽然使用HiveCatalog连接到Hive不需要规划器,但是读写Hive都需要使用BlinkPlanner

Please note while HiveCatalog doesn’t require a particular planner, reading/writing Hive tables only works with blink planner. Therefore it’s highly recommended that you use blink planner when connecting to your Hive warehouse.

以下为读写Hive的一个实例:读支持了流的模式也支持批的模式,以下使用了流的模式,读可以,但是写不太好使,如果想写,可以使用批模式,在下面有说。

Note:本地运行,所以需要在resources目录下放置集群的四个site.xml

public class HiveSource {
    private static final String HIVE_CATALOG = "default";

    public static void main(String[] args) {
        //定义流处理环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings streamSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv, streamSetting);

        HiveCatalog hive = new HiveCatalog(HIVE_CATALOG, "test", "lib/conf");
        tEnv.registerCatalog(HIVE_CATALOG, hive);
        tEnv.useCatalog(HIVE_CATALOG);
        tEnv.useDatabase("test");
        tEnv.executeSql("SHOW DATABASES").print();
        
        tEnv.executeSql("SELECT * FROM ods_user").print();
    }
}

结果:

+---------------+
| database name |
+---------------+
|       default |
|   dw_unicdata |
|          test |
|      unicdata |
+---------------+
4 rows in set
+-------------+--------------------------------+--------------------------------+
|          id |                           name |                             dt |
+-------------+--------------------------------+--------------------------------+
|           1 |                         长大强 |                     2021-07-24 |
|           2 |                         孙大壮 |                     2021-07-24 |
+-------------+--------------------------------+--------------------------------+
2 rows in set

9.1.2. Write To Hive

9.1.2.1. 批模式(Batch Mode)

和读的方式类似,写入,以下使用批模式

public class HiveSource {
    private static final String HIVE_CATALOG = "default";

    public static void main(String[] args) {
        //定义流处理环境
//        ExecutionEnvironment streamEnv = ExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings streamSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
        TableEnvironment tEnv = TableEnvironment.create(streamSetting);

        HiveCatalog hive = new HiveCatalog(HIVE_CATALOG, "test", "lib/conf");
        tEnv.registerCatalog(HIVE_CATALOG, hive);
        tEnv.useCatalog(HIVE_CATALOG);
        tEnv.useDatabase("test");
        tEnv.executeSql("SHOW DATABASES").print();

        tEnv.executeSql("SELECT * FROM ods_user").print();
        tEnv.executeSql("INSERT INTO test.ods_user PARTITION(dt='2021-08-19') SELECT id, name FROM test.ods_user").print();
        tEnv.executeSql("SELECT * FROM ods_user").print();
    }
}

结果:

+-----------------------+
| default.test.ods_user |
+-----------------------+
|                    -1 |
+-----------------------+
1 row in set
+-------------+--------------------------------+--------------------------------+
|          id |                           name |                             dt |
+-------------+--------------------------------+--------------------------------+
|           1 |                         长大强 |                     2021-08-19 |
|           2 |                         孙大壮 |                     2021-08-19 |
|           2 |                         孙大壮 |                     2021-08-19 |
|           1 |                         长大强 |                     2021-07-24 |
|           2 |                         孙大壮 |                     2021-07-24 |
|           1 |                         长大强 |                     2021-08-19 |
|           2 |                         孙大壮 |                     2021-08-19 |
|           1 |                         长大强 |                     2021-08-19 |
+-------------+--------------------------------+--------------------------------+
8 rows in set

9.1.2.2. 流表模式(Streaming Mode)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//CHECKPOINT
env.enableCheckpointing(6000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(3000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
//RESTART STRATEGY
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,
      Time.of(1000, TimeUnit.SECONDS)));

DataStreamSource<String> streamSource = env.readTextFile("a text file to generate source records");
EnvironmentSettings streamSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env, streamSetting);
//设置hive catalog
HiveCatalog hive = new HiveCatalog(flinkSinkHiveProperties.getCatalog(),
        "a database to use",
        "hive conf dir(在cdh上一般为/etc/hive/conf)"
);
streamTableEnv.registerCatalog("catalog名(自定义)", hive);
streamTableEnv.useCatalog("catalog名(自定义)");
streamTableEnv.useDatabase("数据库名,自己指定");
streamTableEnv.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-reader", "true");

//--------------------------------------------------------------------------------------------------------
//  执行hive语句
//--------------------------------------------------------------------------------------------------------
//1、可以使用 StatementSet ,往StatementSet中添加语句然后批量执行,
StatementSet hiveStatementSet = streamTableEnv.createStatementSet();
hiveStatementSet.addInsertSql(sql);
hiveStatementSet.execute();

//2、也可以使用
streamTableEnv.executeSql("select ...");
//3、最后一定得用如下来执行整个任务,否则会报没有触发算子无法生成流图
env.execute();

说明,如果你只想使用了流表,并且都是使用SQL进行操作,那么你可以使用1的方式进行操作,而不需要使用2和3。但是,如果将流和表混合使用,在触发任务执行的时候,将1、2、3结合使用仍然会抛出异常,这时候你得抛弃1的使用方法,而将2和3结合使用(使用Flink1.12版本进行测试)

9.1.2.3. 一个问题:Hive怎么没数据?

  • 1、当你使用以上代码去往Hive写数据时,经过一番操作,发现任务跑起来了,并且没有抛异常(抛了也被你解决了),一切执行成功,非常nice,可是你去查询hive的时候却发现,表里却没有数据,这时候再去hdfs上文件目录里也是有文件的,并且可以看到inprogress

    • 解决办法:检查代码是否设置了checkpoint,如果设置了再检查程序是否checkpoint成功。Flink只有在checkpoint成功时才会将hive的分区信息commit并且输出的小文件由inprogress状态切换位success

  • 2、当发现1的问题后成功设置了checkpoint,但还是发现hive中没有数据

    • 这个时候当程序成功运行完之后,需要去Hive上修复目标表,修复表的元数据,即MSCK REPAIR TABLE xxx.xxx

9.2. Kafka

9.2.1. Read From Kafka

9.2.1.1. 使用DataStream API

以下使用了SimpleStringSchema去反序列化kafka中的消息,如果遇到了flink提供的反序列化类不足以支撑实现你的业务功能时,需要自定义反序列化Schema,可以参考前面文章,描述了一个自定义Schema去反序列化Kafka中的存储着的序列化的对象的消息

@SneakyThrows
@Test
public void QueryKafka() {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "cdh001:9092,cdh002:9092,cdh003:9092");
    properties.setProperty("group.id", "test");
    DataStream<String> stream = env.addSource(
            new FlinkKafkaConsumer<>("mos-uat-vds-charging_settings_snapshot-vcf_originated",
                    new SimpleStringSchema(), properties)
            .setStartFromEarliest()
    );
    stream.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws Exception {
            return value;
        }
    }).print();

    env.execute("Kafka Source");
}

9.2.2. Sink to kafka

以下来自于flink官网(版本1.12)

DataStream<String> stream = ...

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
        "my-topic",                  // target topic
        new SimpleStringSchema(),    // serialization schema
        properties,                  // producer config
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

stream.addSink(myProducer);

9.3. HTTP

以下为自定义的HttpSource用以获取http接口数据的核心代码,里面包含了一些复杂的业务代码(先在open方法中调用接口获取到token,再拿着token去run方法中调用接口获取数据),经过删繁就简就进行简单替换即可实现自己的业务需求。

package com.xxxx.xxxx.source;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xxxx.xxxx.utils.HttpUtils;
import lombok.SneakyThrows;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;

/**
 * Flink从接口获取数据
 *
 * @author roohom
 */
public class HttpSource extends RichSourceFunction<String> {
    private final Logger LOG = LoggerFactory.getLogger(HttpSource.class);

    private volatile boolean isRunning = true;
    private final String getURL;
    private final String getSizeURL;
    private final String tokenPostURL;
    private final HashMap<String, String> postBody;
    private final long requestInterval;

    // count out event
    private transient Counter counter;

    public HttpSource(String getURL, String getSizeURL, String tokenPostURL, HashMap<String, String> postBody, long requestInterval) {
        this.getURL = getURL;
        this.getSizeURL = getSizeURL;
        this.tokenPostURL = tokenPostURL;
        this.postBody = postBody;
        this.requestInterval = requestInterval;
    }

    String token;
    ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void open(Configuration parameters) throws Exception {
        counter = new SimpleCounter();
        this.counter = getRuntimeContext()
                .getMetricGroup()
                .counter("httpCounter");
        //获取TOKEN
        String tokenJson = HttpUtils.doPost(tokenPostURL, postBody);
        JsonNode jsonNode = objectMapper.readTree(tokenJson);
        JsonNode accessToken = jsonNode.get("access_token");
        JsonNode tokenType = jsonNode.get("token_type");
        token = tokenType.toString().replace("\"", "") + " " + accessToken.toString().replaceAll("\"", "");

    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        String totalSizeMsg = "";
        int size = 0;
        try {
            //获取全部数据的个数
            totalSizeMsg = HttpUtils.doGet(getSizeURL, token);
            JsonNode sizeNode = objectMapper.readTree(totalSizeMsg);
            size = Integer.parseInt(sizeNode.get("data").toString());
            LOG.info("HttpSource -> Get the size of all omd data -> {}", size);
        } catch (Exception e) {
            LOG.info("HttpSource -> Do get request failed.");
            e.printStackTrace();
        }
        //将消息切分,每一页20条
        for (int i = 1; i <= (size / 20) + 1; i++) {
            String requestURL = getURL;
            requestURL += "?pageNo=" + i + "&pageSize=20";
            String message = HttpUtils.doGet(requestURL, token);
            LOG.info("HttpSource -> GET the url here, the url is -> {}", requestURL);

            collectMessage(ctx, objectMapper, message);
            Thread.sleep(requestInterval);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    /**
     * 收集数据以发送
     *
     * @param ctx 流上下文
     */
    @SneakyThrows
    public void collectMessage(SourceContext<String> ctx, ObjectMapper objectMapper, String message) {
        JsonNode dataNodes = objectMapper.readTree(message).get("data");
        List<Object> dataList = objectMapper.readValue(dataNodes.toString(), List.class);
        dataList.forEach(x -> {
            try {
                ctx.collect(objectMapper.writeValueAsString(x));
                this.counter.inc();
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        });
    }
}

上面用到的HttpUtils如下:

package com.xxxx.xxxx.utils;

import lombok.SneakyThrows;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;


public class HttpUtils {


    private static final Logger log = LoggerFactory.getLogger(HttpUtils.class);

    /**
     * 默认超时时间
     */
    private static final int DEFAULT_TIME_OUT = 3000;

    /**
     * get请求,超时时间默认
     *
     * @param api 请求URL
     * @return 响应JSON字符串
     */
    public static String doGet(String api, String token) {
        return doGet(api, token, DEFAULT_TIME_OUT);
    }


    /**
     * get请求,超时时间传参
     *
     * @param api     请求URL
     * @param timeOut 请求超时时间(毫秒)
     * @return 响应JSON字符串
     */
    public static String doGet(String api, int timeOut) {
        HttpGet httpGet = new HttpGet(api);
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(timeOut)
                .setConnectionRequestTimeout(timeOut)
                .build();
        httpGet.setConfig(config);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse response = client.execute(httpGet)) {
            return EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            log.error("get " + api + " failed!", e);
        }
        return null;
    }

    /**
     * get请求,超时时间传参
     *
     * @param api     请求URL
     * @param token   token
     * @param timeOut 请求超时时间(毫秒)
     * @return 响应JSON字符串
     */
    public static String doGet(String api, String token, int timeOut) {
        HttpGet httpGet = new HttpGet(api);
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(timeOut)
                .setConnectionRequestTimeout(timeOut)
                .setAuthenticationEnabled(true)
                .build();

        httpGet.setConfig(config);
        httpGet.addHeader(new BasicHeader("Content-Type", "application/json"));
        httpGet.addHeader(new BasicHeader("Authorization", token));

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse response = client.execute(httpGet)) {
            return EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            log.error("get " + api + " failed!", e);
        }
        return null;
    }

    /**
     * post请求,超时时间默认
     *
     * @param api  请求URL
     * @param body 请求体JSON字符串
     * @return 响应JSON字符串
     */
    public static String doPost(String api, String body) {
        return doPost(api, body, DEFAULT_TIME_OUT);
    }

    /**
     * post请求,超时时间默认
     *
     * @param api  请求URL
     * @param body 请求体JSON字符串
     * @return 响应JSON字符串
     */
    public static String doPost(String api, HashMap<String, String> body) {
        return doPost(api, body, DEFAULT_TIME_OUT);
    }


    /**
     * post请求,超时时间传参
     *
     * @param api     请求URL
     * @param body    请求体JSON字符串
     * @param timeOut 请求超时时间(毫秒)
     * @return 响应JSON字符串
     */
    public static String doPost(String api, String body, int timeOut) {
        HttpPost httpPost = new HttpPost(api);
        StringEntity entity = new StringEntity(body, "utf-8");
        entity.setContentType("application/json");
        entity.setContentEncoding("utf-8");
        httpPost.setEntity(entity);
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(timeOut)
                .setConnectionRequestTimeout(timeOut)
                .build();
        httpPost.setConfig(config);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse response = client.execute(httpPost)) {
            return EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            log.error("post " + api + " failed!", e);
        }
        return null;
    }

    /**
     * post请求,超时时间传参
     *
     * @param api     请求URL
     * @param body    请求体Map
     * @param timeOut 请求超时时间(毫秒)
     * @return 响应JSON字符串
     */
    @SneakyThrows
    public static String doPost(String api, HashMap<String, String> body, int timeOut) {
        HttpPost httpPost = new HttpPost(api);

        ArrayList<NameValuePair> pairs = new ArrayList<>();
        body.forEach(
                (x, y) -> pairs.add(new BasicNameValuePair(x, y))
        );

        httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8");
        httpPost.setEntity(new UrlEncodedFormEntity(pairs, "UTF-8"));
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(timeOut)
                .setConnectionRequestTimeout(timeOut)
                .build();
        httpPost.setConfig(config);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse response = client.execute(httpPost)) {
            return EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            log.error("post " + api + " failed!", e);
        }
        return null;
    }
}
上一页 下一页

© 版权所有 2020-2026, roohom。

利用 Sphinx 构建,使用的 主题 由 Read the Docs 开发.