1、说明
本文是在《外部数据统一接入分发中间件》基础上增加Kafka数据接入并分发功能,所以需要先完成基础框架的搭建,才能继续。但本文中的关于集成librdkafka库的说明还是可以看看的^_^。
2、集成librdkafka
librdkafka是Apache Kafka的C/C++客户端库,提供高性能的生产者、消费者和管理客户端,可以与Kafka服务端建立高效的数据交互通道,编译过程我就不讲了,网上很多教程了。我这里主要说明怎么以afsim的3rd_party的方式集成到afsim的构建工具中。过程大概与《麒麟V10服务器docker容器集成后台仿真引擎》类似,只是内容需要修改为librdkafka相关的,并将编译好的文件打包放到源码dependencies目录的3rd_party目录下。
2.1、创建cmake文件
进入afsim源码目录tools\3rd_party-cmake目录,创建kafka-2.5.3.cmake文件,并写入下面的内容保存:
get_filename_component(CURRENT_LIST_DIR ${CMAKE_CURRENT_LIST_FILE} PATH)
include(${CURRENT_LIST_DIR}/shared.cmake)
set(KAFKAROOT ${kafka_ROOT_DIR})
set(KAFKAINC ${KAFKAROOT}/include)
set(KAFKALIBDIR ${KAFKAROOT}/${SWDEV_THIRD_PARTY_BUILD_TYPE})
set(KAFKALIB kafka)
set(KAFKADEBBINDIR ${KAFKAROOT}/debug)
set(KAFKARELBINDIR ${KAFKAROOT}/release)
file(GLOB KAFKA_INSTALL_LIBS ${KAFKALIBDIR}/*.so*)
macro(link_kafka TARGET)
if(WIN32)
target_link_libraries(${TARGET} debug
${KAFKAROOT}/debug/rdkafka.lib
${KAFKAROOT}/debug/rdkafka++.lib
optimized
${KAFKAROOT}/release/rdkafka.lib
${KAFKAROOT}/release/rdkafka++.lib
)
else(WIN32)
target_link_libraries(${TARGET} ${KAFKAROOT}/lib/librdkafka.so ${KAFKAROOT}/lib/librdkafka++.so)
endif(WIN32)
endmacro(link_kafka)
set(kafka_INCLUDE_DIR ${KAFKAROOT}/include)
set_property(GLOBAL PROPERTY kafka_BINDIR_DEBUG ${KAFKAROOT}/debug)
set_property(GLOBAL PROPERTY kafka_BINDIR_RELEASE ${KAFKAROOT}/release)
if(WIN32)
set(kafka_LIBRARY debug
${KAFKAROOT}/debug/rdkafka.lib
${KAFKAROOT}/debug/rdkafka++.lib
optimized
${KAFKAROOT}/release/rdkafka.lib
${KAFKAROOT}/release/rdkafka++.lib
)
else()
set(kafka_LIBRARY ${KAFKAROOT}/lib/librdkafka.so ${KAFKAROOT}/lib/librdkafka++.so)
endif()这样就完成了kafka-2.5.3的cmake配置。
2.2、集成到中间件插件
修改中间件的CMakeLists.txt添加kafka第三方库的引用,整个文件代码如下:
project(HSLinkPlugin)
include(swdev_project)
# 添加Qt支持
set(SWDEV_QT_PACKAGE "qt-5.12.11" CACHE STRING "" FORCE)
mark_as_advanced(FORCE SWDEV_QT_PACKAGE)
swdev_acquire_packages(${CMAKE_CURRENT_SOURCE_DIR} ${SWDEV_QT_PACKAGE} ${SWDEV_HV_PACKAGE})
######################
# 添加libhv
set(SWDEV_HV_PACKAGE "hv-1.3.3" CACHE STRING "" FORCE)
mark_as_advanced(FORCE SWDEV_HV_PACKAGE)
swdev_acquire_packages(${CMAKE_CURRENT_SOURCE_DIR} ${SWDEV_HV_PACKAGE})
######################
# 添加librdkafka
set(SWDEV_KAFKA_PACKAGE "kafka-2.5.3" CACHE STRING "" FORCE)
mark_as_advanced(FORCE SWDEV_KAFKA_PACKAGE)
swdev_acquire_packages(${CMAKE_CURRENT_SOURCE_DIR} ${SWDEV_KAFKA_PACKAGE})
######################
set(SUB_DIR source/client source/listener source/data)
foreach(SUB ${SUB_DIR})
file(GLOB SUB_SRCS ${SUB}/*.?pp ${SUB}/*.h)
set(SRCS ${SRCS} ${SUB_SRCS})
source_group(${SUB} FILES ${SUB_SRCS})
endforeach()
# Add source files in source
file(GLOB SUB_SRCS source/*.?pp source/*.h)
set(SRCS ${SRCS} ${SUB_SRCS})
file(GLOB QRC ui/resources/*.qrc)
file(GLOB UIS ui/*.ui)
# 添加Qt支持
# Include macros and configuration to support builds of Qt projects
include(qt_project)
configure_qt_project(UI UI_HEADERS ${UIS})
######################
# 添加libhv
include_directories(. ${HVINC})
install_third_party(${HVROOT})
install_source_files(${TOOLS_DIRECTORY}/3rd_party-cmake/${SWDEV_HV_PACKAGE}.cmake)
######################
# 添加librdkafka
include_directories(. ${KAFKAINC})
install_third_party(${KAFKAROOT})
install_source_files(${TOOLS_DIRECTORY}/3rd_party-cmake/${SWDEV_KAFKA_PACKAGE}.cmake)
######################
wsf_project_template(${PROJECT_NAME} PLUGIN
HDRS
${HDRS}
SRCS
${SRCS}
#GRAMMARS
# grammar/hslink.ag
)
try_add_subdirectory(${TOOLS_DIRECTORY}/utilqt/source utilqt)
link_hv(${PROJECT_NAME})
link_kafka(${PROJECT_NAME})
target_link_libraries(${PROJECT_NAME} util utilqt)
# Link with core WSF libs.
# target_link_libraries(${PROJECT_NAME} ${WSF_LIBS} Qt5::Core Qt5::Gui Qt5::Widgets)
if (MSVC)
include(user_file_config)
create_vs_debug_env(
${CMAKE_CURRENT_SOURCE_DIR}
"${TEMPLATE_DIR}/Template.VisualStudio.Settings.user"
)
endif()
# Install required qt core libs
install_qt_libs_all()
install_qt_plugins(${INSTALL_EXE_PATH}/qt_plugins)
install_qt_licenses()
# install 3rd_party-cmake files that are required to build mover creator not covered elsewhere
install_source_files(${TOOLS_DIRECTORY}/3rd_party-cmake/config.cmake
${TOOLS_DIRECTORY}/3rd_party-cmake/shared.cmake
${TOOLS_DIRECTORY}/3rd_party-cmake/${SWDEV_QT_PACKAGE}.cmake
tools/3rd_party-cmake
)配置好后,用cmake构建并打开工程后,可看到在项目链接输入中自动包含了rdkafka库的引用,即可使用librdkafka进行数据收发。

3、建立KafkaConsumer
要接收Kafka数据,需要创建consumer,网上有非常多的文章将consumer构建的方法,我自己也做了点封装,但本文就不献丑了。只是强调一下:在收到数据后需要创建AbstraceData并通过Dispatcher来进行分发。如下:
RdKafka::Message* msg = m_consumer->consume(1);
if (msg && (msg->err() != RdKafka::ERR_NO_ERROR))
{
//qDebug() << QString("Consumer error: %1").arg(msg.errstr().c_str());
}
else
{
QString topic = msg->topic_name().c_str();
auto partition = msg->partition();
QString key;
QByteArray data;
// 消息key
if (msg->key())
{
key = msg->key()->c_str();
}
// 消息Value
if (msg->payload())
{
data.append((const char*)(msg->payload()), msg->len());
}
JsonDataPtr dataPtr = JsonDataPtr::create(); // 创建待分发数据
if (dataPtr->parse(data)) // 解析数据
{
Dispatcher::getInstance()->dispatch(dataPtr);// 分发数据
}
m_consumer->commitAsync();
}
delete msg;4、测试
测试数据还是以Json字符串的形式进行,格式沿用以前的:
Kafka主题为:T_hslinkplugin
插件订阅数据Key:KafkaDataTestKey
下面直接看视频吧:
5、后记
以上为Kafka数据通过中间件进行接收并分发到afsim内部各订阅数据端的实现思路说明,感谢观看!
如本文对各位的实现有所启发或帮助,不妨点赞、收藏、分享、赞赏!研究不易,敬请支持!!
某宝连接(含HSLinkPlugin中间件实现,购买过的直接网盘下载^_^)

往期推荐
外部数据统一接入分发中间件
飞腾D2000麒麟V10国防版下编译
基于wsf插件扩展内置platform
arm64版的麒麟V10服务器docker容器集成后台仿真引擎

评论