1、说明

本文是在《外部数据统一接入分发中间件》基础上增加Kafka数据接入并分发功能,所以需要先完成基础框架的搭建,才能继续。但本文中的关于集成librdkafka库的说明还是可以看看的^_^。

2、集成librdkafka

librdkafka是Apache Kafka的C/C++客户端库,提供高性能的生产者、消费者和管理客户端,可以与Kafka服务端建立高效的数据交互通道,编译过程我就不讲了,网上很多教程了。我这里主要说明怎么以afsim的3rd_party的方式集成到afsim的构建工具中。过程大概与《麒麟V10服务器docker容器集成后台仿真引擎》类似,只是内容需要修改为librdkafka相关的,并将编译好的文件打包放到源码dependencies目录的3rd_party目录下。

2.1、创建cmake文件

进入afsim源码目录tools\3rd_party-cmake目录,创建kafka-2.5.3.cmake文件,并写入下面的内容保存:

get_filename_component(CURRENT_LIST_DIR ${CMAKE_CURRENT_LIST_FILE} PATH)
include(${CURRENT_LIST_DIR}/shared.cmake)
set(KAFKAROOT ${kafka_ROOT_DIR})
set(KAFKAINC ${KAFKAROOT}/include)
set(KAFKALIBDIR ${KAFKAROOT}/${SWDEV_THIRD_PARTY_BUILD_TYPE})
set(KAFKALIB kafka)
set(KAFKADEBBINDIR ${KAFKAROOT}/debug)
set(KAFKARELBINDIR ${KAFKAROOT}/release)

file(GLOB KAFKA_INSTALL_LIBS ${KAFKALIBDIR}/*.so*)
macro(link_kafka TARGET)
   if(WIN32)
      target_link_libraries(${TARGET} debug 
     ${KAFKAROOT}/debug/rdkafka.lib 
     ${KAFKAROOT}/debug/rdkafka++.lib
     optimized 
     ${KAFKAROOT}/release/rdkafka.lib 
     ${KAFKAROOT}/release/rdkafka++.lib
     )
   else(WIN32)
      target_link_libraries(${TARGET} ${KAFKAROOT}/lib/librdkafka.so ${KAFKAROOT}/lib/librdkafka++.so)
   endif(WIN32)
endmacro(link_kafka)

set(kafka_INCLUDE_DIR ${KAFKAROOT}/include)
set_property(GLOBAL PROPERTY kafka_BINDIR_DEBUG ${KAFKAROOT}/debug)
set_property(GLOBAL PROPERTY kafka_BINDIR_RELEASE ${KAFKAROOT}/release)
if(WIN32)
   set(kafka_LIBRARY debug 
   ${KAFKAROOT}/debug/rdkafka.lib 
     ${KAFKAROOT}/debug/rdkafka++.lib
     optimized 
     ${KAFKAROOT}/release/rdkafka.lib 
     ${KAFKAROOT}/release/rdkafka++.lib
     )
else()
   set(kafka_LIBRARY ${KAFKAROOT}/lib/librdkafka.so ${KAFKAROOT}/lib/librdkafka++.so)
endif()

这样就完成了kafka-2.5.3的cmake配置。

2.2、集成到中间件插件

修改中间件的CMakeLists.txt添加kafka第三方库的引用,整个文件代码如下:

project(HSLinkPlugin)
include(swdev_project)

#  添加Qt支持
set(SWDEV_QT_PACKAGE "qt-5.12.11" CACHE STRING "" FORCE)
mark_as_advanced(FORCE SWDEV_QT_PACKAGE)
swdev_acquire_packages(${CMAKE_CURRENT_SOURCE_DIR} ${SWDEV_QT_PACKAGE}  ${SWDEV_HV_PACKAGE})
######################

# 添加libhv
set(SWDEV_HV_PACKAGE "hv-1.3.3" CACHE STRING "" FORCE)
mark_as_advanced(FORCE SWDEV_HV_PACKAGE)
swdev_acquire_packages(${CMAKE_CURRENT_SOURCE_DIR} ${SWDEV_HV_PACKAGE})
######################

# 添加librdkafka
set(SWDEV_KAFKA_PACKAGE "kafka-2.5.3" CACHE STRING "" FORCE)
mark_as_advanced(FORCE SWDEV_KAFKA_PACKAGE)
swdev_acquire_packages(${CMAKE_CURRENT_SOURCE_DIR} ${SWDEV_KAFKA_PACKAGE})
######################


set(SUB_DIR source/client source/listener source/data)
foreach(SUB ${SUB_DIR})
   file(GLOB SUB_SRCS ${SUB}/*.?pp ${SUB}/*.h)
   set(SRCS ${SRCS} ${SUB_SRCS})
   source_group(${SUB} FILES ${SUB_SRCS})
endforeach()

# Add source files in source
file(GLOB SUB_SRCS source/*.?pp source/*.h)
set(SRCS ${SRCS} ${SUB_SRCS})

file(GLOB QRC ui/resources/*.qrc)
file(GLOB UIS ui/*.ui)

#  添加Qt支持
# Include macros and configuration to support builds of Qt projects
include(qt_project)
configure_qt_project(UI UI_HEADERS ${UIS})
######################

# 添加libhv
include_directories(. ${HVINC})
install_third_party(${HVROOT})
install_source_files(${TOOLS_DIRECTORY}/3rd_party-cmake/${SWDEV_HV_PACKAGE}.cmake)
######################

# 添加librdkafka
include_directories(. ${KAFKAINC})
install_third_party(${KAFKAROOT})
install_source_files(${TOOLS_DIRECTORY}/3rd_party-cmake/${SWDEV_KAFKA_PACKAGE}.cmake)
######################

wsf_project_template(${PROJECT_NAME} PLUGIN
HDRS
   ${HDRS}
SRCS
   ${SRCS}
#GRAMMARS
#   grammar/hslink.ag
)

try_add_subdirectory(${TOOLS_DIRECTORY}/utilqt/source  utilqt)

link_hv(${PROJECT_NAME})
link_kafka(${PROJECT_NAME})

target_link_libraries(${PROJECT_NAME} util utilqt)

# Link with core WSF libs.
# target_link_libraries(${PROJECT_NAME} ${WSF_LIBS} Qt5::Core Qt5::Gui Qt5::Widgets)

if (MSVC)
	include(user_file_config)
	create_vs_debug_env(
		${CMAKE_CURRENT_SOURCE_DIR}
		"${TEMPLATE_DIR}/Template.VisualStudio.Settings.user"
		)
endif()

# Install required qt core libs
install_qt_libs_all()
install_qt_plugins(${INSTALL_EXE_PATH}/qt_plugins)
install_qt_licenses()

# install 3rd_party-cmake files that are required to build mover creator not covered elsewhere
install_source_files(${TOOLS_DIRECTORY}/3rd_party-cmake/config.cmake
                     ${TOOLS_DIRECTORY}/3rd_party-cmake/shared.cmake
                     ${TOOLS_DIRECTORY}/3rd_party-cmake/${SWDEV_QT_PACKAGE}.cmake
                     tools/3rd_party-cmake
                    )

配置好后,用cmake构建并打开工程后,可看到在项目链接输入中自动包含了rdkafka库的引用,即可使用librdkafka进行数据收发。

图片-rQfE.png

3、建立KafkaConsumer

要接收Kafka数据,需要创建consumer,网上有非常多的文章将consumer构建的方法,我自己也做了点封装,但本文就不献丑了。只是强调一下:在收到数据后需要创建AbstraceData并通过Dispatcher来进行分发。如下:

RdKafka::Message* msg = m_consumer->consume(1);
if (msg && (msg->err() != RdKafka::ERR_NO_ERROR))
{
   //qDebug() << QString("Consumer error: %1").arg(msg.errstr().c_str());
}
else
{
   QString topic = msg->topic_name().c_str();
   auto partition = msg->partition();
   QString key;
   QByteArray data;

   // 消息key
   if (msg->key())
   {
      key = msg->key()->c_str();
   }
   // 消息Value
   if (msg->payload())
   {
      data.append((const char*)(msg->payload()), msg->len());
   }
   JsonDataPtr dataPtr = JsonDataPtr::create();	// 创建待分发数据
   if (dataPtr->parse(data))						// 解析数据
   {
      Dispatcher::getInstance()->dispatch(dataPtr);// 分发数据
   }
   m_consumer->commitAsync();
}
delete msg;

4、测试

测试数据还是以Json字符串的形式进行,格式沿用以前的:

1. 



Json型



{

    "Key": "KafkaDataTestKey",

    "Name": "KafkaDataTest",

    "Data": {

        "Property1": "10101",

        "Property2": "22222"

    }

}

Kafka主题为:T_hslinkplugin

插件订阅数据Key:KafkaDataTestKey

下面直接看视频吧:

5、后记

以上为Kafka数据通过中间件进行接收并分发到afsim内部各订阅数据端的实现思路说明,感谢观看!

如本文对各位的实现有所启发或帮助,不妨点赞、收藏、分享、赞赏!研究不易,敬请支持!!

某宝连接(含HSLinkPlugin中间件实现,购买过的直接网盘下载^_^)

往期推荐

外部数据统一接入分发中间件

飞腾D2000麒麟V10国防版下编译

基于wsf插件扩展内置platform

arm64版的麒麟V10服务器docker容器集成后台仿真引擎

文末二维码.png