1. 说明
在afsim提供的与外部系统交互的接口中只有xio和dis,其中xio需要集成afsim的库来使用,而dis是作为分布式系统交互的通用标准定义了相当多的Pdu,并在github上的opendis提供了各种语言的封装实现,但仅仅这两种方式对于使用是完全不够的,我们还需要其他的通信方式如socket、http、kafka、mq、zmq、hla等等。那么就需要实现不同的通信插件并集成到afsim中。这样做的不足之处是要维护很多的插件,前期要陷入很多的时间到调试通信功能上,要处理不同通信方式的接口并转换为统一的与afsim交互的接口数据,然后才能开始业务开发,工作量大且,重复工作较多。
经过一番折腾,我这里先来做一个接收外部各种通信协议发送的数据并在afsim内部分发的工具(对外发送的后续再做),大致框架如下:

上图表达的意思就是通过统一的中间件HSLink来接收外部系统发送的数据,由中间件按需将数据转发到afsim各个业务插件进行处理。
2. mission改造
PS:由于本人习惯于使用Qt的模块来实现功能,而mission默认是不依赖与Qt库的,所以需要让其增加对Qt的支持,大家也可以根据自己熟悉的方式来决定是否需要做这个改造,这不是必须的。
由于mission的主线程中没有创建QCoreApplication,因此使用mission跑想定,在实现的插件中无法使用信号槽等Qt相关的特性,所以需要给mission添加Qt运行时支持,这里主要在CMakeList.txt中添加Qt库和配置即可:
# 添加Qt支持
set(SWDEV_QT_PACKAGE "qt-5.12.11" CACHE STRING "" FORCE)
mark_as_advanced(FORCE SWDEV_QT_PACKAGE)
swdev_acquire_packages(${CMAKE_CURRENT_SOURCE_DIR} ${SWDEV_QT_PACKAGE})
# Include macros and configuration to support builds of Qt projects
include(qt_project)
configure_qt_project(UI UI_HEADERS ${UIS})改好CMake后就可以在代码中使用Qt相关模块了。我们查看mission.cpp的代码可知它只有一个main主函数,在这里面调用aSimPtr的RunEventLoop来进入仿真循环,一旦结束后,mission也就退出了。而我们知道,要使用Qt的信号槽及RTTI等特性,需要创建一个QCoreApplication并调用app.exec来启动Qt的事件循环,那么这里就会有个问题:此处就会出现两个阻塞循环函数的调用,但如果在同一个线程中执行,那么总会有一个循环无法执行。所以这里的解决思路是将两个阻塞循环函数分别放到不同的线程中去执行。这里按Qt的常用写法将app.exe放在了主线程中,然后开启新线程来跑mission的事件循环。
1)新建一个线程类来封装mission的实现:
// MissionThread.h
class MissionThread : public QThread
{
Q_OBJECT
public:
MissionThread(int argc, char* argv[], QObject* parent);
// 必须覆写此方法,用于安全退出仿真
void terminate();
protected:
virtual void run() override;
private:
std::unique_ptr<WsfSimulation> mSimPtr;
int argc;
char** argv;
};// MissionThread.cpp
#include "MissionThread.h"
MissionThread::MissionThread(int argc, char* argv[], QObject* parent)
: QThread(parent)
{
this->argc = argc;
this->argv = argv;
}
void MissionThread::terminate()
{
if (mSimPtr != nullptr)
{
mSimPtr->RequestTermination();
}
}
void MissionThread::run()
{
// 这里的实现基本与mission原本的main中的实现一致
// ...
}2)修改main主函数实现
// mission.cpp
int main(int argc, char* argv[])
{
// 创建Qt应用
QCoreApplication app(argc, argv);
// 创建Mission执行线程
MissionThread* missionThread = new MissionThread(argc, argv, &app);
QObject::connect(missionThread, &MissionThread::finished, [missionThread, &app]
{
missionThread ->deleteLater();
app.quit();
});
// 启动mission线程
missionThread->start();
return app.exec();
}编译后通过命令行运行的效果与原本的mission是一致的,大家可以自行执行看看。
3. HSLinkPlugin插件
其实这个中间件应该作为wsf核心库的一部分进行集成,这样在其他插件链接的时候,就不用专门添加依赖了。但本着尽量不修改afsim源码的前提,本文还是以插件的方式来搭框架,其他插件要使用中间件数据分发的能力,则需要链接这个插件(很简单^_^见下文),先根据下面代码创建wsf插件:
// HSLinkPlugin.h
class HSLinkPlugin : public WsfSimulationExtension
{
public:
static HSLinkPlugin* Find(const WsfSimulation* aSimulation);
static constexpr const char* cNAME = "wsf_hslinkplugin"; // 插件唯一名称
HSLinkPlugin();
~HSLinkPlugin() noexcept override;
bool Initialize() override;
};// HSLinkPlugin.cpp
HSLinkPlugin* HSLinkPlugin::Find(const WsfSimulation* aSimulation)
{
auto& ret = aSimulation->GetExtension(HSLinkPlugin::cNAME);
return static_cast<HSLinkPlugin*>(&ret);
}
HSLinkPlugin::HSLinkPlugin()
{
}
HSLinkPlugin::~HSLinkPlugin() noexcept
{
}
bool HSLinkPlugin::Initialize()
{
return true;
}// PluginRegistration.cpp
class RegisterPlugin : public WsfScenarioExtension
{
public:
~RegisterPlugin() noexcept override = default;
void SimulationCreated(WsfSimulation& aSimulation) override
{
// Simulation对象创建完成后,注册Simulation扩展
aSimulation.RegisterExtension(HSLinkPlugin::cNAME, ut::make_unique<HSLinkPlugin>());
}
};
extern "C"
{
HSLINKPLUGIN_EXPORT void WsfPluginVersion(UtPluginVersion& aVersion)
{
aVersion = UtPluginVersion(
WSF_PLUGIN_API_MAJOR_VERSION,
WSF_PLUGIN_API_MINOR_VERSION,
WSF_PLUGIN_API_COMPILER_STRING
);
}
HSLINKPLUGIN_EXPORT void WsfPluginSetup(WsfApplication& aApplication)
{
// 注册本插件工程
// 此处使用默认Application扩展
aApplication.RegisterExtension(HSLinkPlugin::cNAME,ut::make_unique<WsfDefaultApplicationExtension<RegisterPlugin>>());
}
}上面都是创建wsf插件的套路了,添加好afsim的头文件和lib文件依赖就能加载了。

4. 基本通信
首先以最简单的udp点对点通信来把插件跑通,然后再来抽象框架。第二节已经对mission做了改造,支持Qt的运行时,所以这里通过QUdpSocket来监听udp数据:
// UDPListener.h
class UDPListener: public QThread
{
Q_OBJECT
public:
UDPListener(QObject* parent = nullptr);
protected:
virtual void run() override;
};// UDPListener.cpp
UDPListener::UDPListener(QObject* parent /*= nullptr*/)
:QThread(parent)
{
this->start();
}
void UDPListener::run()
{
QUdpSocket* udpSocket = new QUdpSocket();
bool ret = udpSocket->bind(QHostAddress::AnyIPv4, 5800);
QObject::connect(udpSocket, &QUdpSocket::readyRead, [udpSocket]
{
QByteArray data;
while (udpSocket->hasPendingDatagrams()) {
int pendingDatagramSize = udpSocket->pendingDatagramSize();
if (-1 != pendingDatagramSize)
{
QByteArray datagram(pendingDatagramSize, 0);
QHostAddress hostAddress;
quint16 port;
udpSocket->readDatagram(datagram.data(), datagram.size(), &hostAddress, &port);
data.append(datagram);
qInfo() << QDateTime::currentDateTime().toString("yyyyMMdd hh:mm:ss")
<< QStringLiteral("收到<%1:%2>的数据:").arg(hostAddress.toString()).arg(port)
<< QString(data);
udpSocket->writeDatagram(QStringLiteral("我收到了").toLocal8Bit(), hostAddress, port);
}
}
});
exec();
udpSocket->deleteLater();
}注:上面之所以是以继承QThread来实现udp数据接收,是因为Qt的网络数据处理是异步的,需要链接信号槽,就需要事件循环,但是呢afsim的插件不是在主线程创建的,不会进入到Qt的事件循环,因此需要创建线程并执行exec()来驱动事件循环。更深层的原理请查看Qt相关文档^_^
将上面的类在HSLinkPlugin的初始化中创建出实例后就能监听5800端口发来的数据,这里也简单发送了一个回复给客户端的IP和端口。
5. 基本框架
要实现文章开头所示的结构,需要搭建一些基础类来实现底层的逻辑,后续需要添加新的协议类型就只需要针对协议实现数据解析和打包即可。强调一下:本文的实现是接收外部数据并分发给需要的插件,而不是对外发送数据的!!!
5.1. 抽象数据类AbstractData
为了实现统一的数据分发逻辑,这样需要抽象一个基本的用于表示客户端发送的数据的基础类,并且为了支持扩展而定义的数据规范:

根据上面的数据结构,定义一个抽象数据类:
// AbstractData.h
class AbstractData
{
public:
enum Type {
Unknown, // 未知
Json, // Json型
Binary, // 二进制型
};
AbstractData(int type, const QByteArray& rawNetData);
virtual ~AbstractData();
int getType() const { return m_type; }
const QString& getKey() const { return m_key; }
const QString& getName() const { return m_name; }
const QString& getErrorString() const { return m_errString; }
// 由具体的实现类负责解析网络数据
virtual bool parse() = 0;
protected:
int m_type = Unknown;
QByteArray m_rawNetData;
QString m_key;
QString m_name;
QString m_errString;
};
typedef QSharedPointer<AbstractData> DataPtr;// AbstractData.cpp
#include "AbstractData.h"
AbstractData::AbstractData(int type, const QByteArray& rawNetData)
: m_type(type)
, m_rawNetData(rawNetData)
{
}在抽象类的基础上,实现Json型和二进制型的具体数据类:
// JsonData.h
class JsonData : public AbstractData
{
public:
JsonData(const QByteArray& rawNetData);
const QString& getData() const { return m_data; }
// 解析Json文本型数据
virtual bool parse() override;
protected:
QString m_data; // Json数据体Data的内容
};
typedef QSharedPointer<JsonData> JsonDataPtr;// JsonData.cpp
bool JsonData::parse()
{
QJsonParseError err;
auto doc = QJsonDocument::fromJson(m_rawNetData, &err);
if (err.error != QJsonParseError::NoError)
{
auto errString = err.errorString();
m_errString = errString;
return false;
}
auto root = doc.object();
m_key = root["Key"].toString();
m_name = root["Name"].toString();
auto dataObj = root["Data"].toObject();
m_data = QJsonDocument(dataObj).toJson(QJsonDocument::Compact);
return true;
}// BinaryData.h
class BinaryData : public AbstractData
{
public:
BinaryData(const QByteArray& rawNetData);
const QByteArray& getData() const { return m_data; }
// 解析二进制数据
virtual bool parse() override;
protected:
QByteArray m_data; // utf-8编码保存的实际数据内容
};
typedef QSharedPointer<BinaryData> BinaryDataPtr;// BinaryData.cpp
bool BinaryData::parse()
{
QDataStream dataStream(m_rawNetData);
dataStream.setVersion(QDataStream::Qt_4_5);
//dataStream.setByteOrder(QDataStream::LittleEndian);
quint16 header = 0;
dataStream >> header;
if (header != 0xa0a0)
{
m_errString = QStringLiteral("数据头不正确:<0x%1>").arg(header, 0, 16);
return false;
}
char tmp[32] = { 0 };
dataStream.readRawData(tmp, 32);
m_key = QString::fromLocal8Bit(tmp);
dataStream.readRawData(tmp, 32);
m_name = QString::fromUtf8(tmp);
qint32 length = 0;
dataStream >> length;
if (length < 0)
{
m_errString = QStringLiteral("数据长度异常:<%1>").arg(length);
return false;
}
char* buf = new char[length];
dataStream.readRawData(buf, length);
m_data = QByteArray(buf, length);
quint16 tail = 0;
dataStream >> tail;
if (tail != 0xf0f0)
{
m_errString = QStringLiteral("数据尾不正确:<0x%1>").arg(tail, 0, 16);
return false;
}
return true;
}然后就可将之前的UDPListener接收数据的部分改为使用BinaryData的方式进行处理:

编译后运行即可接收并解析二进制格式的数据

Json型的数据类似,通过创建JsonData类来解析即可。接着来看看怎么实现订阅和分发的机制。
5.2. 抽象订阅者类AbstraceSubscriber
上面我们定义的数据协议包含了一个关键字Key,这个关键字是用来区别不同的数据需保持唯一,有了这个Key我们就可以来建立发布订阅机制。
首先需要定义一个发布器Dispatcher,这个类一方面提供通过Key来订阅需要的数据,一方面根据接收的数据的Key将数据进行分发。它是个单例类(有个隐藏Bug:多想定运行时数据会串,不过先这样吧后续再改进)。
// Dispatcher.h
class Dispatcher
{
DEC_SINGLETON(Dispatcher)
public:
~Dispatcher();
// 根据数据关键字Key进行订阅,注需自行维护指针
void subscribe(const QString& dataKey, AbstractSubscriber* subcriber);
// 根据数据关键字Key取消订阅,注需自行维护指针
void unSubscribe(const QString& dataKey, AbstractSubscriber* subscriber);
// 取消所有指定Subscriber的订阅
void unSubscribe(AbstractSubscriber* subscriber);
// 分发数据
void dispatch(DataPtr data);
private:
QHash<QString, QList<AbstractSubscriber*>> m_subscribers;
};// Dispatcher.cpp
static QMutex g_mutex; // 多线程锁
IMP_SINGLETON(Dispatcher)
Dispatcher::~Dispatcher()
{
QMutexLocker locker(&g_mutex);
m_subscribers.clear();
}
void Dispatcher::subscribe(const QString& dataKey, AbstractSubscriber* subcriber)
{
QMutexLocker locker(&g_mutex);
if (!m_subscribers[dataKey].contains(subcriber))
{
m_subscribers[dataKey].append(subcriber);
}
}
void Dispatcher::unSubscribe(const QString& dataKey, AbstractSubscriber* subscriber)
{
QMutexLocker locker(&g_mutex);
m_subscribers[dataKey].removeAll(subscriber);
}
void Dispatcher::unSubscribe(AbstractSubscriber* subscriber)
{
QMutexLocker locker(&g_mutex);
for (auto dataKey : m_subscribers.keys())
{
m_subscribers[dataKey].removeAll(subscriber);
}
}
void Dispatcher::dispatch(DataPtr data)
{
QMutexLocker locker(&g_mutex);
auto dataKey = data->getKey();
auto& subscribers = m_subscribers[dataKey];
for (auto& subscriber : subscribers)
{
// 分发数据到订阅端
subscriber->process(data);
}
}上面代码中的subscribe的参数分别是数据的唯一关键字Key以及订阅者AbstractSubscriber(即其他插件),dispatch方法就是通过关键字获取到订阅端列表然后将数据进行分发。
订阅者AbstractSubscriber的实现如下:
// AbstractSubscriber.h
class AbstractSubscriber
{
public:
AbstractSubscriber(const QString& name);
virtual ~AbstractSubscriber();
void subscribe(const QString& dataKey);
void unSubscribe(const QString& dataKey);
const QString& getName() const { return m_name; }
protected:
// 订阅者实现数据处理
virtual void process(DataPtr data) = 0;
protected:
QString m_name;
friend class Dispatcher;
};// AbstractSubscriber.cpp
AbstractSubscriber::AbstractSubscriber(const QString& name)
: m_name(name)
{}
AbstractSubscriber::~AbstractSubscriber()
{
Dispatcher::getInstance()->unSubscribe(this);
}
void AbstractSubscriber::subscribe(const QString& dataKey)
{
Dispatcher::getInstance()->subscribe(dataKey, this);
}
void AbstractSubscriber::unSubscribe(const QString& dataKey)
{
Dispatcher::getInstance()->unSubscribe(dataKey, this);
}编译后运行能够正常进入到dispatch的实现中,但我们还没有订阅端所以不会有任何处理直接返回了。

5.3. 发布订阅测试
下面我通过在本插件订阅Binary数据(唯一关键字:DataKey),并在之前的wsf_plugin_demo插件中订阅Json型的数据来测试上面的框架流程,数据内容如下:
首先让HSLinkPlugin继承AbstractSubscriber,并在构造函数订阅数据:
HSLinkPlugin::HSLinkPlugin()
: AbstractSubscriber(HSLinkPlugin::cNAME)
{
this->subscribe("DataKey");
}然后实现process纯虚函数:
void HSLinkPlugin::process(DataPtr data)
{
int type = data->getType();
switch (type)
{
case AbstractData::Json: {} break;
case AbstractData::Binary:
{
auto bData = data.dynamicCast<BinaryData>();
qInfo() << QString::fromUtf8(bData->getData());
}break;
default:
break;
}
}编译后运行,数据收到后即可触发process方法

其他插件订阅和处理数据的方式与上面类似,在插件的CMakeLists.txt文件中的target_link_libraries中增加链接中间件的库名称即可:
target_link_libraries(${PROJECT_NAME} wsf_mil HSLinkPlugin)CMake后编译执行,HSLinkPlugin接到数据后就会分发到此插件:

6. http协议添加
上面是通过UDP点对点协议接收数据并分发的,那如果要添加新的协议(如http)来接收数据并分发应该怎么实现呢?下面就以添加一个http协议到HSLinkPlugin中间件插件的过程进行说明:
6.1. 集成libhv
这个第三方库我在《arm64版的麒麟V10服务器容器集成仿真引擎》的第7节已经讲过编译和集成方法了,这里就不多讲,只把HSLinkPlugin插件的CMake文件修改部分贴出来:
# 添加libhv
set(SWDEV_HV_PACKAGE "hv-1.3.3" CACHE STRING "" FORCE)
mark_as_advanced(FORCE SWDEV_HV_PACKAGE)
swdev_acquire_packages(${CMAKE_CURRENT_SOURCE_DIR} ${SWDEV_HV_PACKAGE})
######################
# 添加libhv
include_directories(. ${HVINC})
install_third_party(${HVROOT})
install_source_files(${TOOLS_DIRECTORY}/3rd_party-cmake/${SWDEV_HV_PACKAGE}.cmake)
link_hv(${PROJECT_NAME})6.2. HttpListener
通过新建HttpListener类,用来封装http服务端的代码来接收客户端发送的数据并进行分发:
// HttpListener.h
class HttpListener : public QObject
{
public:
HttpListener(
quint16 port = 8898,
int threadCount = 4,
QObject* parent = nullptr);
bool init();
bool exit();
quint16 getPort() const { return m_port; }
protected:
void initRouter();
protected:
quint16 m_port = 8898;
int m_threadCount = 4;
QString m_errMsg;
http_server_t m_server;
HttpService m_router;
};// HttpListener.cpp
HttpListener::HttpListener(
quint16 port /* = 8898 */,
int threadCount /* = 4 */,
QObject* parent /* = nullptr */)
: QObject(parent)
, m_port(port)
, m_threadCount(threadCount)
{
}
bool HttpListener::init()
{
initRouter();
m_server.port = m_port;
m_server.worker_threads = m_threadCount;
m_server.service = &m_router;
// http_server_run(&m_server, 1); // 阻塞
int ret = http_server_run(&m_server, 0); // 不阻塞
if (ret != 0)
{
m_errMsg = hv_strerror(ret);
return false;
}
else
{
m_errMsg.clear();
return true;
}
}
bool HttpListener::exit()
{
http_server_stop(&m_server);
return true;
}
void HttpListener::initRouter()
{
m_router.POST("/hsimengine", [this](const HttpContextPtr& ctx)
{
auto req = ctx->request;
auto resp = ctx->response;
auto ipport = QString("%1:%2").arg(req->client_addr.ip.c_str()).arg(req->client_addr.port);
auto body = req->body;
QByteArray reqData(body.c_str());
qInfo() << QStringLiteral("收到客户端<%1>的请求数据Url:%2:%3").arg(ipport).arg(req->url.c_str()).arg(reqData.data());
QString ret;
JsonDataPtr data = JsonDataPtr::create(reqData);
if (data->parse())
{
resp->status_code = HTTP_STATUS_OK;
ret = QStringLiteral("{\"ResponseMsg\":%1}").arg(QStringLiteral("收到数据"));
// 分发
Dispatcher::getInstance()->dispatch(data);
}
else
{
resp->status_code = HTTP_STATUS_INTERNAL_SERVER_ERROR;
ret = QStringLiteral("{\"ResponseMsg\":%1}").arg(data->getErrorString());
}
return ctx->send(ret.toUtf8().data(), ctx->type());
});
}上面的代码即是http响应的代码,即接收到数据后进行转发然后返回接收状态,但不会返回处理数据处理结果。创建这个HttpListener并初始化后就能通过Http客户端(postman)发送数据:
从添加http协议来看,我们并没有修改插件处理数据的逻辑,只增加了http请求的监听和数据转发,然后之前的插件就能获取到http转发的数据并处理。这就是框架、抽象和统一接口规范提供的能力,所以要扩展其他协议也只需要在HSLinkPlugin中添加对应的Listener并转发数据即可。
7. Python测试
上面的实现提供了两种协议和数据类型的实现,很多粉丝希望演示一下与Python的通信,我这里也通过Python来写两个测试代码,将两种协议和两种数据类型都通过Python发送到HSLinkPlugin由其进行分发。
7.1 UDP-Binary型数据
import socket
def main():
# 1.创建一个udp套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
b_data = bytes([
0xA0,0xA0,0x44,0x61,0x74,0x61,0x4B,0x65,0x79,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0xE6,0x88,0x91,0xE6,0x98,0xAF,0xE5,0x90,0x8D,0xE7,0xA7,0xB0,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x0D,0xE6,0x88,0x91,0xE6,0x98,0xAF,0xE6,0x95,0xB0,0xE6,
0x8D,0xAE,0x00,0xF0,0xF0])
# 2.准备接收方的地址
udp_socket.sendto(b_data, ("127.0.0.1", 5800))
# 3.关闭套接字
udp_socket.close()
if __name__ == "__main__":
main()7.2 http-Json型数据
import requests
def main():
url = "http://127.0.0.1:8898/hsimengine"
data = '''{
"Key": "DataKey2",
"Name": "JsonData",
"Data": {
"Property1": "123",
"Property2": "456"
}
}'''
res = requests.post(url, data=data)
res.encoding = res.apparent_encoding
print(res.text) # 返回文本内容
if __name__ == "__main__":
main()8. 后记
本中间件的目的是通过将各类协议发送的数据在同一个插件进行接收并通过统一的接口将数据转发到不同的插件,不含业务处理,只定义了数据结构,不定义具体的数据内容,除了dis,其他如socket、http、kafka、mq这种通用协议中的数据需要根据业务自行定义,并通过中间件进行接收并转发后在afsim中自定义处理。
上面实现的dispatch分发并没有通过线程的方式进行,会造成一些性能损失,建议改造为线程。
以上中间件接收转发的思路已经介绍完毕,其中的框架仅是能驱动起来的基础结构。后续还需要根据具体业务对结构进行调整。
如本文对各位的实现有所启发或帮助,不妨点赞、收藏、分享、赞赏!研究不易,敬请支持!!
淘宝链接

往期推荐
event_pipe数据实时接收处理框架
飞腾D2000麒麟V10国防版下编译
服务端引擎增加fs可控帧步进模式
arm64版的麒麟V10服务器docker容器集成后台仿真引擎

评论