Pages - Menu

標籤

AWS (1) bash (1) Boost (2) C (2) CMake (2) Concurrency_Programming (3) CPP (37) Database (2) DNS (1) Docker (4) Docker-Compose (1) ELK (1) emacs (4) gcp (1) gdrive (1) git (1) gitbash (2) gitlab (1) kvm (4) Linux (5) MT4 (4) MT5 (4) Multicast (2) MySQL (2) Nijatrader8 (1) OpenCV (1) Python (4) QT5 (1) R (1) rdp (3) screenshot (1) ssh (3) Tabnine (1) TCP (1) TensorFlow (1) Tools (12) Ubuntu_1904 (11) Ubuntu_20_04 (5) UDP (1) VS2010 (1) VS2015 (1) VS2019 (1) WebServer (1) Win10 (1) winmerge (1) WSL (1) xrdp (1)

搜尋此網誌

2020年5月2日星期六

Multicast non stop sending setup. Primary and Secondary server.

Aim

This is an example to show how to use multicast to deliver a "non stop" sending server. This class is supported Linux.

Logic

In this example, we first have 2 servers. There is a way for them to select the primary server and send out the multicast data. On the above picture, Server 2 is the primary.

After several seconds in our example, we will turn off Server 2. They will then select the primary server again. There is a chance for Server 2 to become primary again but mostly Server 1 will take the role faster.

Source



#include <iostream>
#include <string.h>
#include <thread>
#include <chrono>
#include <atomic>

#include "MCastArbitrator.h"
#include "Logger.h"
#include "LinuxThread.h"

// usgae: ./MCastArbitrator

std::string MCastAddress = "225.1.32.28";
short MCastPort = 6000;
std::string SenderIP = "192.168.1.208";
class Sender : public LinuxThread
{
public:
    Sender() { m_running = true; }
    ~Sender() { stopThread(); m_running = false; joinThread(); }

    bool InitComponent(int instance) { m_instance = instance; startThread(); return true; }
    void ChangeStatusForTest() { MCArbi.ChangeStatus(MCastArbitrator::MCArbiStatus::SECONDARY); }
private:
    // override
    void *Main()
    {
        std::vector<char> sendData;

        if (MCArbi.InitComponent(SenderIP, MCastPort, 32, m_instance) == MCastArbitrator::MCArbiStatus::SUCCESS)
        {
            MCArbi.JoinGroup(MCastAddress);
            sendData.resize(4);
            int count = 0;
            while(m_running.load())
            {
                memcpy(&sendData[0], &count, sizeof(int));
                MCastArbitrator::MCArbiStatus retStatus = MCArbi.Send(&sendData[0], sizeof(int));
                if (retStatus == MCastArbitrator::MCArbiStatus::SUCCESS)
                {
                    LOGMSG_MSG("Send success! count: %d instance: %d\n", count, m_instance);
                }
                else
                {
                    // LOGMSG_ERR("Send fail! instance: %d\n", m_instance);
                }
                count++;
                usleep(1000000); // 1 sec
            }
        }
        return NULL;
    }
public:
    int m_instance;
private:
    std::atomic<bool> m_running;
    MCastArbitrator MCArbi;
};
int main(int argc, char *argv[])
{
    Sender *sender001 = new Sender(); sender001->InitComponent(1);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    Sender *sender002 = new Sender(); sender002->InitComponent(2);

    int count = 1;
    int lastDie = 2;
    while (true)
    {
        if (count % 5 == 0)
        {
            if (lastDie == 2)
            {
                sender001->ChangeStatusForTest();
                lastDie = 1;
            }
            else
            {
                sender002->ChangeStatusForTest();
                lastDie = 2;
            }
            count = 0;
        }
        count++;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    return 0;
}

Output

$ ./src/MCastArbitrator 
20200502_183146_131 [MSG]           MCastArbitrator::          SendArbitration:    70,  9136,Start thread
20200502_183146_132 [MSG]           MCastArbitrator::       ReceiveArbitration:   106,  9137,Start thread
20200502_183146_632 [MSG]           MCastArbitrator::SwitchStatus_LongTimeNoUpdate:   211,  9137,IN
20200502_183146_632 [MSG]           MCastArbitrator::              PrintStatus:   174,  9137,Status updated Raise hand
20200502_183146_882 [MSG]           MCastArbitrator::              PrintStatus:   174,  9137,Status updated Primary
20200502_183147_131 [MSG]                    Sender::                     Main:    41,  9135,Send success! count: 1 instance: 1
20200502_183147_132 [MSG]           MCastArbitrator::          SendArbitration:    70,  9139,Start thread
20200502_183147_132 [MSG]           MCastArbitrator::       ReceiveArbitration:   106,  9140,Start thread
20200502_183147_132 [MSG]           MCastArbitrator::              PrintStatus:   174,  9140,Status updated Secondary
20200502_183148_132 [MSG]                    Sender::                     Main:    41,  9135,Send success! count: 2 instance: 1
20200502_183149_132 [MSG]                    Sender::                     Main:    41,  9135,Send success! count: 3 instance: 1
20200502_183150_132 [MSG]                    Sender::                     Main:    41,  9135,Send success! count: 4 instance: 1
20200502_183151_386 [MSG]           MCastArbitrator::SwitchStatus_LongTimeNoUpdate:   211,  9140,IN
20200502_183151_386 [MSG]           MCastArbitrator::              PrintStatus:   174,  9140,Status updated Raise hand
20200502_183151_386 [MSG]           MCastArbitrator::SwitchStatus_LongTimeNoUpdate:   211,  9137,IN
20200502_183151_386 [MSG]           MCastArbitrator::              PrintStatus:   174,  9137,Status updated Raise hand
20200502_183151_634 [MSG]           MCastArbitrator::              PrintStatus:   174,  9137,Status updated Secondary
20200502_183151_634 [MSG]           MCastArbitrator::              PrintStatus:   174,  9140,Status updated Primary
20200502_183152_132 [MSG]                    Sender::                     Main:    41,  9138,Send success! count: 5 instance: 2
20200502_183153_132 [MSG]                    Sender::                     Main:    41,  9138,Send success! count: 6 instance: 2

From the above output example, you can see that the first primary server is instance 1. After a while, we turn off instance 1 and then instance 2 comes up.

沒有留言:

發佈留言