做竞价的网站做优化有效果吗,微信公众平台登录入口官网,长春网站建设服务,涿州做网站kafka streamsMapR生态系统软件包2.0#xff08;MEP#xff09;随附了一些与MapR流有关的新功能#xff1a;  用于MapR Streams的Kafka REST代理为MapR Streams和Kafka集群提供了RESTful接口#xff0c;使其易于使用和产生消息以及执行管理操作。  Kafka Connect for MapR … kafka streams  MapR生态系统软件包2.0MEP随附了一些与MapR流有关的新功能   用于MapR Streams的Kafka REST代理为MapR Streams和Kafka集群提供了RESTful接口使其易于使用和产生消息以及执行管理操作。  Kafka Connect for MapR Streams是一个实用程序用于在MapR Streams与Apache Kafka和其他存储系统之间流式传输数据。   MapR生态系统软件包MEP是一种与核心升级脱钩的生态系统升级工具可让您独立于MapR融合数据平台升级工具。 您可以在本文中了解有关MEP 2.0的更多信息 。   在此博客中我们描述了如何使用Kafka REST代理向MapR流发布消息和从MapR流中使用消息。 REST代理是MapR融合数据平台的重要补充它允许任何编程语言使用MapR流。   MapR Streams工具随附的Kafka REST Proxy可以与MapR Streams默认以及Apache Kafka在混合模式下一起使用。 在本文中我们将重点介绍MapR流。  先决条件  具有MEP 2.0的MapR融合数据平台5.2 使用MapR Streams工具  curlwget或任何HTTP / REST客户端工具  创建MapR流和主题  流是主题的集合您可以通过以下方式将其分组管理   设置适用于该流中所有主题的安全策略  为流中创建的每个新主题设置默认的分区数  设置流中每个主题中消息的生存时间   您可以在文档中找到有关MapR Streams概念的更多信息。   在您的MapR群集或沙盒上运行以下命令  $ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p
$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3
$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3启动Kafka控制台的生产者和消费者  打开两个终端窗口并使用以下命令运行使用者的Kafka实用程序   消费者   主题传感器-json  $ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json 主题传感器二进制  $ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary 这两个终端窗口将允许您查看有关不同主题的消息。  使用Kafka REST代理   端点/ topics / [topic_name]允许您获取有关该主题的一些信息。 在MapR Streams中主题是路径标识的流的一部分 要通过REST API访问主题您必须输入完整路径并在URL中进行编码 例如   / apps / iot-streamsensor-json将使用2Fapps2Fiot-stream3Asensor-json进行编码   运行以下命令以获取有关sensor-json主题的信息  $ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json 注意为简单起见我从运行Kafka REST代理的节点上运行命令因此可以使用localhost 。   您可以通过添加Python命令以漂亮的方式打印JSON例如  $ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool 默认流   如上所述流路径是您必须在命令中使用的主题名称的一部分。 但是可以将MapR Kafka REST代理配置为使用默认流。 对于此配置您应该在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下属性   stream.default.stream  / apps / iot-stream   更改Kafka REST代理配置时必须使用maprcli或MCS重新启动服务。   使用streams.default.stream属性的主要原因是简化应用程序使用的URL。 例如   通过streams.default.stream 可以使用curl -X GET http// localhost8082 / topics /  如果没有此配置或者要使用特定的流则必须在URL中指定它 http// localhost8082 / topics /2Fapps2Fiot-stream3Asensor-json   在本文中所有URL都包含编码的流名称因此您可以在不更改配置的情况下开始使用Kafka REST代理也可以将其用于其他流。    用于MapR流的Kafka REST代理允许应用程序将消息发布到MapR流。 消息可以作为JSON或二进制内容base64编码发送。   要发送JSON消息   查询应该是HTTP POST  内容类型应为application / vnd.kafka.json.v1  json  身体  {records:[{value:{temp : 10 ,speed : 40 ,direction : NW}  }]
} 完整的请求是  curl -X POST -H Content-Type: application/vnd.kafka.json.v1json \--data {records:[{value: {temp : 10 , speed : 40 , direction : NW}  }]} \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json 您应该在/ apps / iot-streamsensor-json使用者正在运行的终端窗口中看到打印的消息。   发送二进制消息   查询应该是HTTP POST  内容类型应为application / vnd.kafka.binary.v1  json  身体  {records:[{value:SGVsbG8gV29ybGQ}]
} 请注意 SGVsbG8gV29ybGQ 是在Base64中编码的字符串“ Hello World”。   完整的请求是  curl -X POST -H Content-Type: application/vnd.kafka.binary.v1json \--data {records:[{value:SGVsbG8gV29ybGQ}]} \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary 您应该在/ apps / iot-streamsensor-binary使用者正在运行的终端窗口中看到打印的消息。   发送多个消息   HTTP正文的记录字段允许您发送多个消息 例如您可以发送  curl -X POST -H Content-Type: application/vnd.kafka.json.v1json \--data {records:[{value: {temp : 12 , speed : 42 , direction : NW}  }, {value: {temp : 10 , speed : 37 , direction : N}  } ]} \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json 该命令将发送2条消息并将偏移量增加2。您可以通过在JSON数组中添加新元素来对二进制内容执行相同的操作 例如  curl -X POST -H Content-Type: application/vnd.kafka.binary.v1json \--data {records:[{value:SGVsbG8gV29ybGQ}, {value:Qm9uam91cg}]} \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary 您可能知道可以将密钥绑定到消息以确保所有具有相同密钥的消息都将到达同一分区。 为此将key属性添加到消息中如下所示  {records:[{key: K001,value:{temp : 10 ,speed : 40 ,direction : NW}  }]
} 现在您知道如何使用REST代理将消息发布到MapR Streams主题让我们看看如何使用消息。  消费信息  REST代理还可以用于消费主题消息。 为此您需要   创建使用者实例。  使用第一个调用返回的URL来阅读消息。  如果需要请删除使用者实例。   创建使用者实例   以下请求创建使用者实例  curl -X POST -H Content-Type: application/vnd.kafka.v1json \--data {name: iot_json_consumer, format: json, auto.offset.reset: earliest} \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json 服务器的响应如下所示  {instance_id:iot_json_consumer,base_uri:http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer
} 请注意我们已经使用/ consumers / [topic_name]创建使用者。 后续请求将使用base_uri从主题获取消息。 与任何MapR Streams / Kafka使用者一样 auto.offset.reset定义其行为。 在此示例中该值设置为最早 这意味着使用者将从头开始阅读消息。 您可以在MapR Streams文档中找到有关使用者配置的更多信息。   消费信息   要使用消息只需将MapR Streams主题添加到使用者实例的URL。   以下请求使用了该主题的消息  curl -X GET -H Accept: application/vnd.kafka.json.v1json \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json 此调用返回JSON文档中的消息  [{key:null,value:{temp:10,speed:40,direction:NW},topic:/apps/iot-stream:sensor-json,partition:1,offset:1},{key:null,value:{temp:12,speed:42,direction:NW},topic:/apps/iot-stream:sensor-json,partition:1,offset:2},{key:null,value:{temp:10,speed:37,direction:N},topic:/apps/iot-stream:sensor-json,partition:1,offset:3}
] 每次对API的调用都会根据上一次调用的偏移量返回发布的新消息。   请注意消费者将被销毁   在Consumer.instance.timeout.ms设置了一些空闲时间默认值设置为300000ms / 5分钟后使用REST API调用销毁了该空闲时间请参见下文。  消费二进制格式的消息  如果需要使用二进制消息则方法是相同的您需要更改格式和Accept标头。   调用此URL为二进制主题创建使用者实例  curl -X POST -H Content-Type: application/vnd.kafka.v1json \--data {name: iot_binary_consumer, format: binary, auto.offset.reset: earliest} \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary 然后使用消息accept标头设置为application / vnd.kafka.binary.v1  json   curl -X GET -H Accept: application/vnd.kafka.binary.v1json \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary 该调用返回JSON文档中的消息并且该值在Base64中编码  [{key:null,value:SGVsbG8gV29ybGQ,topic:/apps/iot-stream:sensor-binary,partition:1,offset:1},{key:null,value:Qm9uam91cg,topic:/apps/iot-stream:sensor-binary,partition:1,offset:2}
]删除使用者实例  如前所述将根据REST Proxy的consumer.instance.timeout.ms配置自动销毁使用者 。 也可以使用使用者实例URI和HTTP DELETE调用销毁实例如下所示  curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer结论  在本文中您学习了如何将Kafka REST代理用于MapR流该代理允许任何应用程序使用在MapR融合数据平台中发布的消息。   您可以在MapR文档和以下资源中找到有关Kafka REST代理的更多信息   MapR Streams入门  Ted Dunning和Ellen Friedman撰写的“流传输体系结构使用Apache Kafka和MapR流的新设计”电子书  翻译自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams-2.htmlkafka streams