Code-Cookbook

博客

  • Blogs
    • 1. [Springboot x spark]java.util.concurrent.ExecutionException: Boxed Error
    • 2. [Apollo]Apollo Config Center
    • 3. [Confluent]Confluent快速上手
    • 4. [Flink]CommdLine+Springboot+flink无法指定配置文件启动
    • 5. [Flink]Flink on k8s任务的提交和实操
    • 6. [Flink]Flink-connector-http
      • 6.1. Source
        • 6.1.1. Get
        • 6.1.2. Post
        • 6.1.3. 使用
      • 6.2. Sink
      • 6.3. 附录
    • 7. [Flink]FlinkKafkaProducer启用压缩
    • 8. [Flink]FlinkSQL时间处理函数
    • 9. [Flink]Flink Sources
    • 10. [Flink]Flink中自定义watermark生成器
    • 11. [Flink]Flink的并行度与TaskSlot
    • 12. [Flink]ProcessFunction无法使用,抛出InvalidProgramException
    • 13. [Flink]使用状态算子将stream聚合输出
    • 14. [Flink]关于Flink的Checkpoint的一次问题排查
    • 15. [Flink]多源实时行为报告的设计思路
    • 16. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
    • 17. [Flink]监控Flink Metrics
    • 18. [Flink]自定义序列化消费Kafka数据
    • 19. [Flink源码]Flink任务是如何启动的
    • 20. [Flink源码]YarnApplication模式的任务启动
    • 21. [Flink源码]流式工厂模式与配置的延迟绑定
    • 22. [Git]Git问题
    • 23. [Git]误在Master分支开发并commit无法push
    • 24. [Hadoop]Hadoop distcp
    • 25. [Hadoop]一些Hadoop问题
    • 26. [Hive]Hive分区表批量删除分区
    • 27. [Hive]Hive的Analyze函数,Statistics in Hive
    • 28. [Hive]修改存储格式为Parquet的Hive表的字段类型
    • 29. [Hive]在指定位置添加字段
    • 30. [Hive]外部表修改为内部表
    • 31. [Hive]更新Metastore中的LastAccessTime
    • 32. [Hive]本地连接需要Kerberos认证的Hive
    • 33. [Java]如何根据需要动态生成Java的class
    • 34. [Java]元注解
    • 35. 注解解析
    • 36. [Java]集合
    • 37. 简单(常用)数据结构
    • 38. [Java]Java8 Stream API
    • 39. [Java]SPI和责任链模式
    • 40. [Java]Socket
    • 41. [Java]使用Java在服务端和客户端之间传送文件
    • 42. [Java]三种策略模式应用于服务的启动
    • 43. [Java]多线程
    • 44. [Java]生产者消费者模型问题
    • 45. [Java]让项目顺利读取resources目录下的文件
    • 46. [Java]设计模式
    • 47. Continuing…
    • 48. [Java]设计模式六大原则
    • 49. [Java]面向对象知识点梳理
    • 50. [Java]OOP防脱发指南
    • 51. [Kerberos]Message stream modified (41)错误
    • 52. [Kudu]关于Kudu Upsert列的问题
    • 53. [Kudu]关于Kudu列的顺序的修改
    • 54. [Logback]特定业务日志重定向
    • 55. [MongoDB]MongoDB基本查询
    • 56. [Paimon]Flink读写Nginx代理的OSS上的Paimon表
    • 57. [Pyspark]PySpark
    • 58. [PySpark]PySpark On Yarn
    • 59. [Spark]CDP上安装其他版本SPARK(SPARK3)
    • 60. [SQL]Druid SQL解析器
    • 61. [SQL]IN/NOT IN/EXISTS/NOT EXISTS的替代写法
    • 62. [SQL]IN OR NOT IN , IS A PROBLEM
    • 63. [SQL]SQLLineage解析SQL血缘
    • 64. [SQL]业务数据库中的create_time和update_time分析时的问题
    • 65. [SQL]为什么LEFT JOIN后总数却与右表的总数一样了?
    • 66. [SQL]求用户任意天连续登录(每天为第多少天连续登录)
    • 67. [SQL]计算指定日期的年-周(为某年的第多少周)
    • 68. [Scala]函数中闭包(Closure)和柯里化(Currying)
    • 69. [Shell]EOF
    • 70. [Shell] Zip命令
    • 71. [Shell]Shell脚本日期递增(起止日期内递增)
    • 72. [Shell]将字符串转换为数字进行大小比较
    • 73. [Shell]打印本机IP
    • 74. [SparkStreaming]消费kafka写入Hive失败的问题Lease timeout of 0 seconds expired
    • 75. [Spark]SparkSQL 列转行的一种方法
    • 76. [Spark]SparkSQL JDBC并发连接读取
    • 77. [Spark]Spark提交任务RSA premaster secret error
    • 78. [Spark]Springboot整合Spark, 本地、集群部署
    • 79. [Spark]如何使用Java创建一个Row
    • 80. [Spark]将Spark DataFrame中的数值取出
    • 81. [Springboot]okHttp错误:Exception in thread “OkHttp Dispatcher” java.lang.IllegalStateException: closed
    • 82. [Vim]Vim查找和替换命令
    • 83. [debezium]在启动任务时传入SQL语句生成Snapshot
    • 84. [debezium]热修改Debezium MySQL Connector配置
    • 85. [Paimon]Paimon表的读写
  • 文章
  • 项目

Random ramblings

  • Random ramblings

大数据

  • Bigdata
  • Bigdata Tools

大数据辅助工具

  • Auxiliary tools

SQL相关

  • SQL
Code-Cookbook
  • Blogs
  • 6. [Flink]Flink-connector-http
  • 查看页面源码

6. [Flink]Flink-connector-http

下面展示如何通过Flink去请求http接口或者将数据发送给http接口

6.1. Source

准备工作,需要在maven中引入依赖:

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.10</version>
</dependency>

一个HttpUtil,往上很多工具,用来实际发送http请求,详见附录部分

6.1.1. Get

public class HttpGetSource extends RichSourceFunction<String> {

    private volatile boolean isRunning = true;
    private String url;
    private long requestInterval;
    private DeserializationSchema<String> deserializer;
    // count out event
    private transient Counter counter;

    public HttpGetSource(String url, long requestInterval, DeserializationSchema<String> deserializer) {
        this.url = url;
        this.requestInterval = requestInterval;
        this.deserializer = deserializer;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        counter = new SimpleCounter();
        this.counter = getRuntimeContext()
                .getMetricGroup()
                .counter("myCounter");
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            try {
                // receive http message, csv format
                String message = HttpUtil.doGet(url);
                // deserializer csv message
                ctx.collect(deserializer.deserialize(message.getBytes()).toString());
                this.counter.inc();

                Thread.sleep(requestInterval);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
    @Override
    public void cancel() {
        isRunning = false;
    }

}

6.1.2. Post

public class HttpPostSource extends RichSourceFunction<String> {

    private volatile boolean isRunning = true;
    private String url;
    private long requestInterval;
    private DeserializationSchema<String> deserializer;
    // count out event
    private transient Counter counter;
    private String body;

    public HttpPostSource(String url, long requestInterval, String body, DeserializationSchema<String> deserializer) {
        this.url = url;
        this.requestInterval = requestInterval;
        this.deserializer = deserializer;
        this.body = body;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        counter = new SimpleCounter();
        this.counter = getRuntimeContext()
                .getMetricGroup()
                .counter("myCounter");
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            try {
                // receive http message, csv format
                String message = HttpUtil.doPost(url, body, 1000);
                // deserializer csv message
                ctx.collect(deserializer.deserialize(message.getBytes()).toString());
                this.counter.inc();

                Thread.sleep(requestInterval);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
    @Override
    public void cancel() {
        isRunning = false;
    }
}

6.1.3. 使用

  • 使用Post

    DataStreamSource<String> streamSource = env.addSource(new HttpPostSource(URL, 1000, "", new SimpleStringSchema()));
    
  • 使用Get

    DataStreamSource<String> streamSource = env.addSource(new HttpGetSource(URL, 1000, new SimpleStringSchema()));
    

6.2. Sink

同样可以将数据使用http发送出去,sink到其他端

这里使用别人已经写好的工具,需要在maven中引入,由于该依赖并不存在于中央仓库,开发者说明了使用方法,也就是去github上将仓库clone到本地,再使用maven clean install在本地编译并打入本地仓库之后,即可在项目pom中引入

<!--flink-connector-http--> 
<dependency>
    <groupId>net.galgus</groupId>
    <artifactId>flink-connector-http</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

先定义配置条件

//设置endpoint
String endpoint = "http://localhost:8080/api/postdata/";
//设置header
HashMap<String, String> headerMap = new HashMap<>();
headerMap.put("Content-Type", "application/json");
HTTPConnectionConfig httpConnectionConfig = new HTTPConnectionConfig(
        endpoint,
        HTTPConnectionConfig.HTTPMethod.POST,
        headerMap,
        false
);

在数据流中添加sink即可:

stream.addSink(new HTTPSink<>(httpConnectionConfig))

6.3. 附录

  • HttpUtil

public class HttpUtil {
    private static final Logger log = LoggerFactory.getLogger(HttpUtil.class);

    /**
     * 默认超时时间
     */
    private static final int DEFAULT_TIME_OUT = 3000;
    /**
     * get请求,超时时间默认
     * @param api 请求URL
     * @return 响应JSON字符串
     */
    public static String doGet(String api) {
        return doGet(api, DEFAULT_TIME_OUT);
    }

    /**
     * get请求,超时时间传参
     * @param api 请求URL
     * @param timeOut 请求超时时间(毫秒)
     * @return 响应JSON字符串
     */
    public static String doGet(String api, int timeOut) {
        HttpGet httpGet = new HttpGet(api);
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(timeOut)
                .setConnectionRequestTimeout(timeOut)
                .build();
        httpGet.setConfig(config);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse response = client.execute(httpGet)) {
            return EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            log.error("get " + api + " failed!", e);
        }
        return null;
    }

    /**
     * post请求,超时时间默认
     * @param api 请求URL
     * @param body 请求体JSON字符串
     * @return 响应JSON字符串
     */
    public static String doPost(String api, String body) {
        return doPost(api, body, DEFAULT_TIME_OUT);
    }

    /**
     * post请求,超时时间传参
     * @param api 请求URL
     * @param body 请求体JSON字符串
     * @param timeOut 请求超时时间(毫秒)
     * @return 响应JSON字符串
     */
    public static String doPost(String api, String body, int timeOut) {
        HttpPost httpPost = new HttpPost(api);
        StringEntity entity = new StringEntity(body, "utf-8");
        entity.setContentType("application/json");
        entity.setContentEncoding("utf-8");
        httpPost.setEntity(entity);
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(timeOut)
                .setConnectionRequestTimeout(timeOut)
                .build();
        httpPost.setConfig(config);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse response = client.execute(httpPost)) {
            return EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            log.error("post " + api + " failed!", e);
        }
        return null;
    }
}
  • 完整测试pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.roohom</groupId>
    <artifactId>flink-http</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.2</flink.version>
        <kafka.version>2.1.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.10</version>
        </dependency>

        <!--flink-connector-http-->
        <dependency>
            <groupId>net.galgus</groupId>
            <artifactId>flink-connector-http</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

    </dependencies>

</project>
上一页 下一页

© 版权所有 2020-2026, roohom。

利用 Sphinx 构建,使用的 主题 由 Read the Docs 开发.