需求
在java中httpServer是非常普遍的一个功能,前端发出请求,后端根据url 以及参数做出相应的处理,返回数据,有get请求 post请求。
现在我需要flink实现这样的功能,http请求flink的source,sink进行处理并返回结果
最终实现结果截图如下


实现代码如下
Job代码
package Job;
import flink.HttpSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Job {
public static Logger logger = LoggerFactory.getLogger(Job.class);
public static void main(String[] args) throws Exception {
//1、获取流处理的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource source = env.addSource(new HttpSource());
source.addSink(new Sink());
env*ex.e**cute("http计算");
}
}
HttpSource代码
监听端口数据
package flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
public class HttpSource extends RichParallelSourceFunction<String> {
public static ServerSocket serverSocket = null;
public static Socket socket = null;
public static BufferedReader bufferedReader =null;
public static Logger logger = LoggerFactory.getLogger(HttpSource.class);
@Override
public void open(Configuration parameters) throws Exception {
int port = 8200;
try {
//1、开启端口
serverSocket = new ServerSocket(port);
} catch (IOException e) {
logger.error("open port error "+e.getMessage());
}
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
try {
logger.info("serverSocket accept ");
socket = serverSocket.accept();
logger.info("serverSocket bufferedReader ");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//获取输入流(请求)
StringBuilder stringBuilder = new StringBuilder();
String line = null;
Thread.sleep(500);
//得到请求的内容,注意这里作两个判断非空和""都要,只判断null会有问题
logger.info("serverSocket readLine ");
while ((line = bufferedReader.readLine()) != null && !line.equals("")) {
stringBuilder.append(line);
}
//过滤掉某些请求头之类的
String result = stringBuilder.toString().split(" ")[1];
ctx.collect(result);
} catch (Exception e) {
logger.error("HttpSource "+e.getMessage());
continue;
}finally {
if(bufferedReader!=null){
bufferedReader.close();
}
}
}
}
@Override
public void cancel() {
}
}
注意一个地方

正常情况下,前端进行请求,会返回请求方式 请求路径以及请求参数 后面会有浏览器整个信息,非常多,我这里做了处理,只获取请求路径以及请求参数
Sink类
package Job;
import Server.httpRequest;
import Server.httpResponse;
import com.alibaba.fastjson.JSONObject;
import flink.HttpSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Sink extends RichSinkFunction<String> {
public static Logger logger=LoggerFactory.getLogger(Sink.class);
public void open(Configuration parameters){
}
@Override
public void invoke(String result, Context context){
//1、处理数据,过滤干扰因素 /hello?name=dde
JSONObject jsonObject = new httpRequest().analysisContent(result);
//2、获取url
if(jsonObject.containsKey("url")){
//3、根据不同的url做出不同的处理
if(jsonObject.getString("url").equals("/sub")){
if(jsonObject.containsKey("params")){
System.out.println("hello "+result);
new httpResponse().sendsg(HttpSource.socket,jsonObject.getJSONObject("params"));
}
}
if(jsonObject.getString("url").equals("/add")){
if(jsonObject.containsKey("params")){
System.out.println("fail "+result);
new httpResponse().sendsg1(HttpSource.socket,jsonObject.getJSONObject("params"));
}
}
}
}
}
对不同的url进行不同的处理 httpResponse类
package Server;
import com.alibaba.fastjson.JSONObject;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
public class httpResponse {
public void sendsg(Socket socket, JSONObject msg){
PrintWriter printWriter = null;//这里第二个参数表示自动刷新缓存
try {
printWriter = new PrintWriter(
socket.getOutputStream(), true);
} catch (IOException e) {
System.out.println(""+e.getMessage());
}
printWriter.println("HTTP/1.1 200 OK");
printWriter.println("Content-Type:text/html;charset=utf-8");
printWriter.println();
float a=0;
float b=0;
float c=0;
if(msg.containsKey("a")){
a= msg.getFloat("a");
}
if(msg.containsKey("b")){
b= msg.getFloat("b");
}
if(msg.containsKey("c")){
c=msg.getFloat("c");
}
c=a*2+b-c;
printWriter.write(c+"");//将日志输出到浏览器
printWriter.close();
}
public void sendsg1(Socket socket, JSONObject msg){
PrintWriter printWriter = null;//这里第二个参数表示自动刷新缓存
try {
printWriter = new PrintWriter(
socket.getOutputStream(), true);
} catch (IOException e) {
System.out.println(""+e.getMessage());
}
printWriter.println("HTTP/1.1 200 OK");
printWriter.println("Content-Type:text/html;charset=utf-8");
printWriter.println();
float a=0;
float b=0;
float c=0;
if(msg.containsKey("a")){
a= msg.getFloat("a");
}
if(msg.containsKey("b")){
b= msg.getFloat("b");
}
if(msg.containsKey("c")){
c=msg.getFloat("c");
}
c=a+b+c;
printWriter.write(c+"");//将日志输出到浏览器
printWriter.close();
}
}
对获取的参数进行过滤筛选
package Server;
import com.alibaba.fastjson.JSONObject;
public class httpRequest {
public JSONObject analysisContent(String result){
JSONObject obj=new JSONObject();
//1、过滤只留下url 和请求参数 值
String [] contents=result.split("\\?");
if(contents.length>1){
obj.put("url",contents[0]);
JSONObject keyObject=new JSONObject();
String [] params=contents[1].split("&");
for(int i=0;i<params.length;i++){
String [] map= params[i].split("=");
keyObject.put(map[0],map[1]);
}
obj.put("params",keyObject);
}
return obj;
}
}
打包成jar包

在flink上运行

测试运行15m以后是否有效
不同的url返回不同的处理结果,完美


感想
很早接触flink 非常的排斥,然后很多都是借用工具的,如果没有开发好的轮子,需要自己造的,就会非常排斥,比如这个功能,我就搞了很久,之前搞的版本是springboot+flink的方式,纯jar包,脱离了集群,我感觉那种方式跟搞着玩一样。
昨天看了一篇文章穷爸爸富爸爸,讲了穷人思维,突然觉得自己有必要思考,结果2h不到,就搞出来了,改变思维方式,不然会越忙越穷