1. 说明

在《外部数据统一接入》文章中,实现了一套框架用于封装底层通信细节,上层应用只需关注数据的内容然后根据业务需求使用数据即可,该框架理论上可以支持任何通信协议的扩展。但是之前实现的时候只考虑了接收外部数据并分发到内容插件,没有实现内容插件对外发布数据的能力。

afsim内置的对外推送数据的模块是dis和xio,这两种通过编写脚本即可直接使用,外部程序连接dis或xio后即可获取afsim推送的数据,这个就不多讲了。本文要讲的是通过统一的接口封装通信细节,上层应用只关心业务处理,在适时通过统一的接口发送数据即可。

2. 原理

大致上如果将通信模式高度抽象,可以归纳为有两种:请求响应模式和广播模式

请求响应模式:Client向Server发送数据处理请求,Server接收到Client发送的数据进行处理,处理完成后向Client返回响应数据,如http、tcp等

  广播模式:Server定时向网络上报送数据,而不关心哪个Client接收,也不关心Client接收数据的顺序是否正确,如UDP广播、UDP组播、Kafka、MQ等。

所以对于第一种模式(请求响应模式),Server端需要记录Client的信息,用于发送响应数据,而第二种模式则只需要在Server配置好相应的参数调用接口发送即可。

3. 实现思路

之前实现的接入分发中间件采用的是发布订阅模式,这个模式如果大家熟悉的话或者看之前的实现代码就会知道在数据分发出去后,数据没有携带Client的信息,只能被动接收处理但不能返回响应数据,对应请求响应的通信模式就不能发送反馈数据了。我这里的想法是,将所有客户端都统一让中间件层来管理,业务层不去关心来的数据是哪个客户端发送的,只需要告知中间件层将反馈数据返回给正确的客户端就行了。这种情况下数据类AbstractData携带的客户端信息就只需要使用唯一标识来表示就足够了,因此需要为AbstractData增加一个成员变量clientKey。

现在回到业务处理层的逻辑上去:业务处理层收到订阅的数据并进行处理后,需要将处理的反馈结果发送出去。那么它只需要将clientKey和待发送的数据给到中间件,中间件根据clientKey找到对应的客户端信息,然后进行发送即可。这样就简化了业务层的处理。

上图极简的反应了上述过程,下面同样以相对简单的udp点对点为例来将上图框架进行实现(注:《外部数据统一接入》中有些类已经给出了代码,为减小篇幅下面就只把关键代码贴出)。

3.1. AbstractData添加成员变量

在AbstractData添加一个成员变量来表示发送请求的客户端,并在创建时传入保存:

3.2. 封装客户端代理

上面的clientKey是用来保存客户端的唯一标识,而此标识应该在中间件接收到请求的第一时间生成,不同的请求协议,客户端的信息也是不同的,那么就需要先定义一个抽象类来封装和实现多态。抽象基类封装共同信息和接口,子类来具体实现接口,抽象类的主要定义如下:

// ClientProxy.h
class HSLINKPLUGIN_EXPORT ClientProxy
{
public:
    enum Type
    {
        Unknown,    // 未知
        UDP_P2P,    // udp点对点客户端
    };

    ClientProxy(Type type);
virtual ~ClientProxy();

    ClientProxy::Type getType() const { return m_type; }
const QString& getKey() const { return m_key; }

protected:
    // 将反馈数据发送给客户端
virtual int send(const QByteArray& data) = 0;
friend class Dispatcher; // 只能通过Dispatcher发送

protected:
    ClientProxy::Type m_type = Unknown;
QString m_key;

    friend class ClientManager;
};
typedef QSharedPointer<ClientProxy> ClientPtr;
typedef QList<ClientPtr> ClientList;
typedef QMap<QString, ClientPtr> ClientMap;
// ClientProxy.cpp
ClientProxy::ClientProxy(ClientProxy::Type type)
    : m_type(type)
{
}

ClientProxy::~ClientProxy()
{
}

3.3. 客户端代理管理类

此类为单例类,用来管理所有的客户端连接:

// ClientManager.h
class ClientManager
{
    DEC_SINGLETON(ClientManager)
public:
    ~ClientManager();

QString addClient(ClientPtr client);
// 获取指定标识的客户端代理,remove为true表示返回后删除此客户端
ClientPtr getClient(const QString& clientKey, bool remove = true);

protected:
    ClientManager() {}
    Q_DISABLE_COPY(ClientManager);
private:
QMutex m_mutex;  // 多线程安全
// 客户端集合
    ClientMap m_clientMap;
};
// ClientManager.cpp
IMP_SINGLETON(ClientManager)
ClientManager::~ClientManager()
{
    QMutexLocker locker(&m_mutex);
    m_clientMap.clear();
}

QString ClientManager::addClient(ClientPtr client)
{
    auto clientKey = QUuid::createUuid().toString(QUuid::Id128);
    client->m_key = clientKey;
    QMutexLocker locker(&m_mutex);
    m_clientMap[clientKey] = client;
    return clientKey;
}
ClientPtr ClientManager::getClient(const QString& clientKey, bool remove /*= true*/)
{
	QMutexLocker locker(&m_mutex);
	if (remove)
	{
		ClientPtr client = m_clientMap.take(clientKey);
		return client;
	}
	else
	{
		return m_clientMap[clientKey];
	}
}

上面代码中getClient有个默认参数remove,这样设计的原因是有些客户端使用随机端口,因此不能将所有客户端都进行缓存,而是用后即焚,当然明确的固定端口则可以传入false参数就行了。以上完成了基本的客户端代理框架,下面在此框架基础上实现一下UDP点对点的发送和反馈流程。

3.4. UDP点对点实现

首先通过继承ClientProxy抽象类,实现UDP点对点的客户端代理。UDP点对点的通信需要接收方绑定一个端口,而发送方则可以使用固定端口或随机端口,在发送请求时会将IP和端口发送到接收端,接收进行保存。而接收端如果要发送反馈,则直接通过接口方法send向保存的IP和端口号发送即可,实现如下:

// UDP_P2PClientProxy.h
class UDP_P2PClientProxy : public ClientProxy
{
public:
    UDP_P2PClientProxy(const QHostAddress& hostAddress, quint16 port);

virtual int send(const QByteArray& data) override;

private:
    QHostAddress m_hostAddress;
quint16 m_port;

    QUdpSocket m_udpSocket;
};
typedef QSharedPointer<UDP_P2PClientProxy> UDP_P2PClientPtr;
// UDP_P2PClientProxy.cpp
UDP_P2PClientProxy::UDP_P2PClientProxy(const QHostAddress& hostAddress, quint16 port)
    : ClientProxy(ClientProxy::UDP_P2P)
    , m_hostAddress(hostAddress)
    , m_port(port)
{

}
int UDP_P2PClientProxy::send(const QByteArray& data)
{
    if (!m_hostAddress.isNull())
    {
        m_udpSocket.writeDatagram(data, m_hostAddress, m_port);
    }
}

上面的实现应该不难理解,就是通过send函数向构造函数保存的IP和端口发送data数据。

3.5. 客户端创建

上面准备好了抽象的客户端代理、客户端管理器,以及具体的客户端代理实现,下面就将之前UDPListener接收数据并转发的实现进行改造,一方面创建UDP_P2PClientProxy代理类并添加到管理器中,另一方面把客户端标识保存到待分发的数据中供发送反馈时使用。

下面的代码是之前的实现,收到数据后即刻送一个收到数据的反馈,但不含业务处理的反馈数据:

QObject::connect(udpSocket, &QUdpSocket::readyRead, [udpSocket]
{
    QByteArray data;
    while (udpSocket->hasPendingDatagrams()) {
        int pendingDatagramSize = udpSocket->pendingDatagramSize();
        if (-1 != pendingDatagramSize)
        {
            QByteArray datagram(pendingDatagramSize, 0);
            QHostAddress hostAddress;
            quint16 port;
            udpSocket->readDatagram(datagram.data(), datagram.size(), &hostAddress, &port);
            data.append(datagram);

            auto dataPtr = BinaryDataPtr::create();
            if (dataPtr->parse(data))
            {
                Dispatcher::getInstance()->dispatch(dataPtr);
                udpSocket->writeDatagram(QStringLiteral("收到数据,已转发").toLocal8Bit(), hostAddress, port);
            }
            else
            {
                auto errString = dataPtr->getErrorString();
                qInfo() << errString;
                udpSocket->writeDatagram(errString.toLocal8Bit(), hostAddress, port);
            }
        }
    }
});

将其修改为下面的代码逻辑:

当Dispatcher将数据分发到订阅端后,订阅端就能获取到客户端的唯一标识,然后在适当的时候通过此唯一标识将反馈数据再次通过Dispatcher发给客户端。下面来实现Dispatcher的发送(publish)能力。

3.6. 为Dispatcher增加发送

经过上面的搭建,要实现通过Dispatcher将反馈数据发送给客户端就很简单了,因为实际的发送逻辑是在各具体实现的ClientProxy中,而ClientManger管理了所有的ClientProxy,因此只需要通过客户端唯一标识获取到ClientProxy,然后调用send即可:

// 向客户端发送反馈数据,发送成功,返回发送的字节数,失败返回-1,,removeClient为true表示返回后删除此客户端
int Dispatcher::publish(const QString& clientKey, const QByteArray& responseData, QString* errMsg /*= nullptr*/, bool removeClient = true)
{
	QMutexLocker locker(&g_mutex);
	auto client = ClientManager::getInstance()->getClient(clientKey, removeClient);
	int ret = client->send(responseData);
	if (ret == -1 && errMsg != nullptr)
	{
		*errMsg = client->lastErrorMessage();
	}
	return ret;
}

3.7. 订阅端发送反馈数据

这个就没啥说的了,直接调用上面的方法即可:

case AbstractData::Binary:
{
    auto bData = data.dynamicCast<BinaryData>();
    std::cout << QStringLiteral("收到客户端数据:\n").toLocal8Bit().data();

    // 获取客户端信息
    auto client = ClientManager::getInstance()->getClient(data->getClientKey(), false);
    switch (client->getType())
    {
    case ClientProxy::UDP_P2P:
    {
        auto udp_p2pClient = client.dynamicCast<UDP_P2PClientProxy>();
        std::cout << "    IP: " << udp_p2pClient->getHostAddress().toString().toStdString() << std::endl;
        std::cout << "    Port: " << udp_p2pClient->getPort() << std::endl;
    }
    default:
        break;
    }
    std::cout << "    Key: " << data->getKey().toStdString() << std::endl;
std::cout << "    Data: " << QString::fromUtf8(data->getData()).toLocal8Bit().data() << std::endl;

    // TODO,这里开始业务处理,处理完成后发送处理结果给客户端

    // 发送反馈数据
    QString responseData = QStringLiteral("这是反馈数据");
    QString errorMessage;
    int size = Dispatcher::getInstance()->publish(data->getClientKey(), responseData.toLocal8Bit(), &errorMessage);
    if (size == -1)
    {
        std::cout << errorMessage.toLocal8Bit().data() << std::endl;
    }
    else
    {
        std::cout << QStringLiteral("发送反馈数据成功: %1").arg(size).toLocal8Bit().data() << std::endl;
    }
}break;

看下视频演示:

4. 作为客户端

上面的实现是以服务端的模式处理的,即客户端发送请求到HSLinkPlugin,分发处理后返回响应数据给客户端。那么如果AFSim仅作为客户端向外面发送数据的话,上面的框架依然支持!!!

框架里面的ClientProxy是作为独立的客户端代理实现,当然也可以自己把自己作为客户端代理添加进管理器,就可以通过Dispatcher对外发送数据。下面也以UDP点对点的方式来看看怎么做(因为我们已经有了这个类了^_^)。

跟上面的UDPListener接到数据后创建客户端类似,只是我们在自己的业务层里面去创建:

bool WsfPluginTemplate::Initialize()
{
    mCallbacks.Add(WsfObserver::AdvanceTime(&GetSimulation()).Connect(&WsfPluginTemplate::AdvanceTime, this));

    // 创建客户端
    auto udp_p2p = UDP_P2PClientPtr::create(QHostAddress("127.0.0.1"), 5678);
    // 添加到管理器
    g_clientKey = ClientManager::getInstance()->addClient(udp_p2p);
    // 在AdvanceTime里面去模拟发送
    return true;
}

发送:

void WsfPluginTemplate::AdvanceTime(double aSimTime)
{
    // 模拟发送数据到外部,注:因为要重复使用客户端,所以传入fasle
    Dispatcher::getInstance()->publish(g_clientKey, QStringLiteral("模拟客户端对外主动发送的数据\n").toLocal8Bit(), nullptr, false);
}

视频:

5.Kafka发送 

下面再实现一个广播类型的发送示例,以Kafka发送数据为例。

首先创建一个Kafka的生产者producer,这个类也是继承自ClientProxy(当然也可以创建为独立的类,在自己的代码中实现发送也行,我这里是统一到中间件中,所有需要继承):

// KafkaProducer.h
class HSLINKPLUGIN_EXPORT KafkaProducer : public ClientProxy
{
public:
    KafkaProducer(QObject* parent = nullptr);
    virtual ~KafkaProducer();

    // servers  ip:port;ip:port
    bool init(const QString &servers);
protected:
    virtual int send(const QByteArray& data, const QMap<QString, QVariant>& auxData = QMap<QString, QVariant>()) override;
protected:
    QString                 m_servers;
    RdKafka::Conf       *   m_kafkaConf = nullptr;
    RdKafka::Producer   *   m_kafkaProducer = nullptr;
};

发送的实现如下:

int KafkaProducer::send(const QByteArray& data, const QMap<QString, QVariant>& auxData /*= QMap<QString, QVariant>()*/)
{
    auto topic = auxData["Topic"].toString();
    if (topic.isEmpty())
    {
        m_errorMessage = QObject::tr("auxData miss \"Topic\"");
        qDebug() << m_errorMessage;
        return -1;
    }

retry:
    RdKafka::ErrorCode errCode = m_kafkaProducer->produce(
        topic.toStdString(),
        0, // 
        RdKafka::Producer::RK_MSG_COPY, // 复制消息发送
        (void*)(data.toStdString().c_str()), data.toStdString().length(),
        NULL, 0, // key
        0,  // 时间戳,0表示当前时间
        NULL,// 消息头
        NULL); // 
    if (errCode != RdKafka::ERR_NO_ERROR)
    {
        m_errorMessage = QObject::tr("Failed to produce to topic %1: %2").arg(topic).arg(RdKafka::err2str(errCode).c_str());
        qDebug() << m_errorMessage;
        if (errCode == RdKafka::ERR__QUEUE_FULL)
        {
            // 队列满,重发,队列长度限制,可通过配置设置, queue.buffering.max.messages 和 queue.buffering.max.kbytes
            m_kafkaProducer->poll(500); // 等待500 ms
            goto retry;
        }
        return -1;
    }
    else
    {
        // 发送成功
        //qDebug() << QString("Enqueued message (%1 bytes) for topic %2").arg(message.size()).arg(topic);
        m_kafkaProducer->poll(0);
    }
    //qDebug() << QString("Flushing final messages...");
    m_kafkaProducer->flush(1000); // 刷新
    // 余下多少没有发送
    if (m_kafkaProducer->outq_len() > 0)
    {
        qDebug() << QString("%1 message(s) were not delivered").arg(m_kafkaProducer->outq_len());
    }
    return data.toStdString().length();
}

上面的发送实现需要知道发送到哪个主题,而主题的定义是在业务层的,因此需要通过Dispatcher的send函数增加auxData数据设置Topic。在Dispatcher发送时通过auxData指定发送的Topic。下面是创建KafkaProducer和发送的代码,我这里结合之前做的event_pipe数据通过kafka发送,创建KafkaProducer:

KafkaProducerPtr kafkaProducer = KafkaProducerPtr::create();
kafkaProducer->init("47.108.230.188:19092");
g_kafkaClientKey = ClientManager::getInstance()->addClient(kafkaProducer);

在自定义的EventPipeMsg这个Observer回调函数中通过Dispatcher发送到kafka:

void CustomEventPipePlugin::EventPipeMsg(double simTime, WsfEventPipe::MsgBase* msg)
{
    if (msg->GetMessageId() == 1)
    {
        WsfEventPipe::MsgEntityState* state = dynamic_cast<WsfEventPipe::MsgEntityState*>(msg);
        if (state != nullptr)
        {
            QJsonObject entityStateObj;
            entityStateObj["PlatformIndex"] = (int)state->state().platformIndex();
            entityStateObj["damageFactor"] = state->state().damageFactor();

            QJsonObject locationWCSObj;
            locationWCSObj["x"] = state->state().locationWCS().x();
            locationWCSObj["y"] = state->state().locationWCS().y();
            locationWCSObj["z"] = state->state().locationWCS().z();
            entityStateObj["locationWCS"] = locationWCSObj;
            QJsonObject velocityWCSObj;
            velocityWCSObj["x"] = state->state().velocityWCS().x();
            velocityWCSObj["y"] = state->state().velocityWCS().y();
            velocityWCSObj["z"] = state->state().velocityWCS().z();
            entityStateObj["velocityWCS"] = velocityWCSObj;
            QJsonObject accelerationWCSObj;
            accelerationWCSObj["x"] = state->state().accelerationWCS().x();
            accelerationWCSObj["y"] = state->state().accelerationWCS().y();
            accelerationWCSObj["z"] = state->state().accelerationWCS().z();
            entityStateObj["accelerationWCS"] = accelerationWCSObj;
            QJsonObject orientationWCSObj;
            orientationWCSObj["x"] = state->state().orientationWCS().x();
            orientationWCSObj["y"] = state->state().orientationWCS().y();
            orientationWCSObj["z"] = state->state().orientationWCS().z();
            entityStateObj["orientationWCS"] = orientationWCSObj;
            entityStateObj["fuelCurrent"] = (int)state->state().fuelCurrent();
            entityStateObj["machNumber"] = (int)state->state().machNumber();

            QJsonDocument doc(entityStateObj);
            auto jsonStr = doc.toJson(QJsonDocument::Compact);
            // 发送到kafka
            QMap<QString, QVariant> auxData;
            auxData["Topic"] = "EntityState"; // 设置主题名称
            QString errMsg;
            Dispatcher::getInstance()->publish(g_kafkaClientKey, jsonStr, &errMsg, false, auxData);
        }
    }
}

6.后记

本文是对上一篇《外部数据接入分发中间件》的补充,增加对外发送的功能,同时具备了UDP点对点、Kafka消息的发送和接收能力。基本上这两篇即完成了中间件的主体框架,后续只需要在此框架上就能比较轻松的增加其他协议的支持。

如本文对各位的实现有所启发或帮助,不妨点赞、收藏、分享、赞赏!研究不易,敬请支持!!

淘宝链接:

往期推荐

外部数据统一接入分发中间件

天线方向图实时绘制插件

基于wsf插件扩展内置platform

AFSim在飞腾D2000麒麟V10国防版下编译

文末二维码.png