台式电脑

索尼电脑VpCcB17怎么样(Knative 实战:基于阿里云 Kafka 实现消息推送)

在Knative中已经提供了对Kafka事件源的支持,那么如何在阿里云上基于Kafka实现消息推送,本文给大家解锁这一新的姿势。

背景

消息队列forApacheKafka是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列forApacheKafka广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。

另外Knative中提供了KafkaSource事件源的支持,通过这个事件源可以方便的对接Kafka消息服务。

在阿里云上创建Kafka实例

创建Kafka实例

登录消息队列Kafka控制台,选择【购买实例】。由于当前Knative中Kafka事件源支持2.0.0及以上版本,在阿里云上创建Kafka实例需要选择包年包月、专业版本进行购买,购买之后升级到2.0.0即可。

Knative 实战:基于阿里云 Kafka 实现消息推送

部署实例并绑定VPC

购买完成之后,进行部署,部署时设置Knative集群所在的VPC即可:

Knative 实战:基于阿里云 Kafka 实现消息推送

创建Topic和ConsumerGroup

接下来我们创建Topic和消费组。

进入【Topic管理】,点击创建Topic,这里我们创建名称为demo的topic:

Knative 实战:基于阿里云 Kafka 实现消息推送

进入【ConsumerGroup管理】,点击创建ConsumerGroup,这里我们创建名称为demo-consumer的消费组:

Knative 实战:基于阿里云 Kafka 实现消息推送

部署Kafka数据源

部署Kafkaaddon组件

登录容器服务控制台,进入【Knative组件管理】,部署Kafkaaddon组件。

Knative 实战:基于阿里云 Kafka 实现消息推送

创建KafkaSource实例

首先创建用于接收事件的服务event-display:

apiVersion:serving.knative.dev/v1kind:Servicemetadata:name:event-displayspec:template:spec:containers:-image:registry.cn-hangzhou.aliyuncs.com/knative-sample/eventing-sources-cmd-event_display:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542c21d

接下来创建KafkaSource

apiVersion:sources.eventing.knative.dev/v1alpha1kind:KafkaSourcemetadata:name:alikafka-sourcespec:consumerGroup:demo-consumer#BrokerURL.ReplacethiswiththeURLsforyourkafkacluster,#whichisintheformatofmy-cluster-kafka-bootstrap.my-kafka-namespace:9092.bootstrapServers:192.168.0.6x:9092,192.168.0.7x:9092,192.168.0.8x:9092topics:demosink:apiVersion:serving.knative.dev/v1alpha1kind:Servicename:event-display

说明:

bootstrapServers:KafkaVPC访问地址consumerGroup:设置消费组topics:设置Topic

创建完成之后,我们可以查看对应的实例已经运行:

[root@iZ2zeae8wzyq0ypgjowzq2Z~]#kubectlgetpodsNAMEREADYSTATUSRESTARTSAGEalikafka-source-k22vz-db44cc7f8-879pj1/1Running08h

索尼电脑VpCcB17怎么样(Knative 实战:基于阿里云 Kafka 实现消息推送)

验证

在Kafka控制台,选择topic发送消息,注意这里的消息格式必须是json:

Knative 实战:基于阿里云 Kafka 实现消息推送

我们可以看到已经接收到了发送过来的Kafka消息:

[root@iZ2zeae8wzyq0ypgjowzq2Z~]#kubectllogsevent-display-zl6m5-deployment-6bf9596b4f-8psx4user-container?CloudEvent:validContextAttributes,SpecVersion:0.2Type:dev.knative.kafka.eventSource:/apis/v1/namespaces/default/kafkasources/alikafka-source#demoID:partition:7/offset:1Time:2019-10-18T08:50:32.492ZContentType:application/jsonExtensions:key:demoTransportContext,URI:/Host:event-display.default.svc.cluster.localMethod:POSTData,{"key":"test"}

作者:元毅

相关新闻

返回顶部