Bugfix of Exception Handling

Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
tomoaki
2021-05-03 19:20:52 +09:00
parent 8a2d28a5e4
commit 4478eafc8d
11 changed files with 43 additions and 30 deletions

View File

@@ -1,4 +1,4 @@
# MQTT-SN Transparent / Aggregating Gateway MQTT-SN Transparent / Aggregating Gateway
**MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP). **MQTT-SN** requires a MQTT-SN Gateway which acts as a protocol converter to convert **MQTT-SN messages to MQTT messages**. MQTT-SN client over SensorNetwork can not communicate directly with MQTT broker(TCP/IP).
This Gateway can run as a transparent or aggregating Gateway by specifying the gateway.conf. This Gateway can run as a transparent or aggregating Gateway by specifying the gateway.conf.
@@ -7,11 +7,10 @@ This Gateway can run as a transparent or aggregating Gateway by specifying the g
```` ````
$ git clone -b develop https://github.com/eclipse/paho.mqtt-sn.embedded-c $ git clone -b develop https://github.com/eclipse/paho.mqtt-sn.embedded-c
$ cd paho.mqtt-sn.embedded-c/MQTTSNGateway $ cd paho.mqtt-sn.embedded-c/MQTTSNGateway
$ ./build.sh {udp|udp6|xbee|loralink}] $ ./build.sh {udp|udp6|xbee|loralink}
```` ````
By default, a gateway for UDP is built. In order to build a gateway, an argument is required.
In order to create a gateway for UDP6, XBee or LoRaLink, -DSENSORNET argument is required.
MQTT-SNGateway and MQTT-SNLogmonitor (executable programs) are built in the Build directory. MQTT-SNGateway and MQTT-SNLogmonitor (executable programs) are built in the Build directory.
@@ -19,16 +18,16 @@ MQTT-SNGateway and MQTT-SNLogmonitor (executable programs) are built in the Buil
```` ````
$ ./bin/MQTT-SNGateway -f gateway.conf $ ./bin/MQTT-SNGateway
```` ````
If you get the error message as follows: If you get the error message as follows:
```` ````
what(): RingBuffer can't create a shared memory. RingBuffer can't create a shared memory.
Aborted (core dumped) ABORT Gateway!!!
```` ````
You have to start using sudo command only once for the first time. You have to start using sudo command only once for the first time.
```` ````
$ sudo ./bin/MQTT-SNGateway -f gateway.conf $ sudo ./bin/MQTT-SNGateway
```` ````
### **How to Change the configuration of the gateway** ### **How to Change the configuration of the gateway**
@@ -132,7 +131,4 @@ Uncomment the line 62, 63 in MQTTSNDefines.h then you can get more precise logs.
==================================*/ ==================================*/
//#define DEBUG // print out log for debug //#define DEBUG // print out log for debug
//#define DEBUG_NWSTACK // print out SensorNetwork log //#define DEBUG_NWSTACK // print out SensorNetwork log
``` ```

View File

@@ -11,7 +11,7 @@ else
mkdir build.gateway mkdir build.gateway
cd build.gateway cd build.gateway
cmake .. -DSENSORNET=$1 cmake .. -DSENSORNET=$1
make MQTTSNPacke make MQTTSNPacket
make MQTT-SNGateway make MQTT-SNGateway
make MQTT-SNLogmonitor make MQTT-SNLogmonitor
fi fi

View File

@@ -30,7 +30,7 @@ char* currentDateTime(void);
BrokerRecvTask::BrokerRecvTask(Gateway* gateway) BrokerRecvTask::BrokerRecvTask(Gateway* gateway)
{ {
_gateway = gateway; _gateway = gateway;
_gateway->attach((Thread*) this); Runnable::threadNo =_gateway->attach((Thread*) this);
_light = nullptr; _light = nullptr;
setTaskName("BrokerRecvTask"); setTaskName("BrokerRecvTask");
} }

View File

@@ -34,7 +34,7 @@ char* currentDateTime();
BrokerSendTask::BrokerSendTask(Gateway* gateway) BrokerSendTask::BrokerSendTask(Gateway* gateway)
{ {
_gateway = gateway; _gateway = gateway;
_gateway->attach((Thread*) this); Runnable::threadNo =_gateway->attach((Thread*) this);
_gwparams = nullptr; _gwparams = nullptr;
_light = nullptr; _light = nullptr;
setTaskName("BrokerSendTask"); setTaskName("BrokerSendTask");

View File

@@ -30,7 +30,7 @@ char* currentDateTime(void);
ClientRecvTask::ClientRecvTask(Gateway* gateway) ClientRecvTask::ClientRecvTask(Gateway* gateway)
{ {
_gateway = gateway; _gateway = gateway;
_gateway->attach((Thread*) this); Runnable::threadNo =_gateway->attach((Thread*) this);
_sensorNetwork = _gateway->getSensorNetwork(); _sensorNetwork = _gateway->getSensorNetwork();
setTaskName("ClientRecvTask"); setTaskName("ClientRecvTask");
} }

View File

@@ -29,7 +29,7 @@ char* currentDateTime(void);
ClientSendTask::ClientSendTask(Gateway* gateway) ClientSendTask::ClientSendTask(Gateway* gateway)
{ {
_gateway = gateway; _gateway = gateway;
_gateway->attach((Thread*) this); Runnable::threadNo =_gateway->attach((Thread*) this);
_sensorNetwork = _gateway->getSensorNetwork(); _sensorNetwork = _gateway->getSensorNetwork();
setTaskName("ClientSendTask"); setTaskName("ClientSendTask");
} }

View File

@@ -45,7 +45,7 @@ char* currentDateTime(void);
PacketHandleTask::PacketHandleTask(Gateway* gateway) PacketHandleTask::PacketHandleTask(Gateway* gateway)
{ {
_gateway = gateway; _gateway = gateway;
_gateway->attach((Thread*) this); Runnable::threadNo =_gateway->attach((Thread*) this);
_mqttConnection = new MQTTGWConnectionHandler(_gateway); _mqttConnection = new MQTTGWConnectionHandler(_gateway);
_mqttPublish = new MQTTGWPublishHandler(_gateway); _mqttPublish = new MQTTGWPublishHandler(_gateway);
_mqttSubscribe = new MQTTGWSubscribeHandler(_gateway); _mqttSubscribe = new MQTTGWSubscribeHandler(_gateway);

View File

@@ -13,12 +13,13 @@
* Contributors: * Contributors:
* Tomoaki Yamaguchi - initial API and implementation and/or initial documentation * Tomoaki Yamaguchi - initial API and implementation and/or initial documentation
**************************************************************************************/ **************************************************************************************/
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <stdarg.h> #include <stdarg.h>
#include <signal.h> #include <signal.h>
#include <sys/types.h>
#include <unistd.h>
#include <Timer.h> #include <Timer.h>
#include <exception> #include <exception>
#include <getopt.h> #include <getopt.h>
@@ -252,13 +253,17 @@ MultiTaskProcess::MultiTaskProcess()
theMultiTaskProcess = this; theMultiTaskProcess = this;
_threadCount = 0; _threadCount = 0;
_stopCount = 0; _stopCount = 0;
_abortThreadNo = -1;
} }
MultiTaskProcess::~MultiTaskProcess() MultiTaskProcess::~MultiTaskProcess()
{ {
for (int i = 0; i < _threadCount; i++) for (int i = 0; i < _threadCount; i++)
{ {
_threadList[i]->stop(); if ( i != _abortThreadNo)
{
_threadList[i]->stop();
}
} }
} }
@@ -281,20 +286,20 @@ void MultiTaskProcess::run(void)
while (true) while (true)
{ {
if (theProcess->checkSignal() == SIGINT ) if (theProcess->checkSignal() == SIGINT || _abortThreadNo > -1)
{ {
return; return;
} }
else if (_stopCount > 0)
{
throw Exception("Task stopped !!\n\n");
}
sleep(1); sleep(1);
} }
} }
void MultiTaskProcess::waitStop(void) void MultiTaskProcess::waitStop(void)
{ {
/* Let threads exit from Select() */
pid_t pid = getpid();
kill(pid, SIGINT);
while (_stopCount < _threadCount) while (_stopCount < _threadCount)
{ {
sleep(1); sleep(1);
@@ -309,11 +314,19 @@ void MultiTaskProcess::threadStopped(void)
} }
void MultiTaskProcess::attach(Thread* thread) void MultiTaskProcess::abort(int threadNo)
{ {
_abortThreadNo = threadNo;
threadStopped();
}
int MultiTaskProcess::attach(Thread* thread)
{
int indexNo = 0;
_mutex.lock(); _mutex.lock();
if (_threadCount < MQTTSNGW_MAX_TASK) if (_threadCount < MQTTSNGW_MAX_TASK)
{ {
indexNo = _threadCount;
_threadList[_threadCount] = thread; _threadList[_threadCount] = thread;
_threadCount++; _threadCount++;
} }
@@ -323,6 +336,7 @@ void MultiTaskProcess::attach(Thread* thread)
throw Exception("Full of Threads"); throw Exception("Full of Threads");
} }
_mutex.unlock(); _mutex.unlock();
return indexNo;
} }
int MultiTaskProcess::getParam(const char* parameter, char* value) int MultiTaskProcess::getParam(const char* parameter, char* value)

View File

@@ -85,13 +85,15 @@ public:
void run(void); void run(void);
void waitStop(void); void waitStop(void);
void threadStopped(void); void threadStopped(void);
void attach(Thread* thread); int attach(Thread* thread);
void abort(int threadNo);
private: private:
Thread* _threadList[MQTTSNGW_MAX_TASK]; Thread* _threadList[MQTTSNGW_MAX_TASK];
Mutex _mutex; Mutex _mutex;
int _threadCount; int _threadCount;
int _stopCount; int _stopCount;
int _abortThreadNo;
}; };
/*===================================== /*=====================================

View File

@@ -118,6 +118,7 @@ public:
Runnable(){} Runnable(){}
virtual ~Runnable(){} virtual ~Runnable(){}
virtual void EXECRUN(){} virtual void EXECRUN(){}
int threadNo {0};
}; };
#define MAGIC_WORD_FOR_THREAD \ #define MAGIC_WORD_FOR_THREAD \
@@ -130,9 +131,9 @@ public: void EXECRUN() \
} \ } \
catch ( Exception &ex ) \ catch ( Exception &ex ) \
{ \ { \
theMultiTaskProcess->threadStopped(); \
WRITELOG("%s catch exception\n", getTaskName()); \ WRITELOG("%s catch exception\n", getTaskName()); \
ex.writeMessage(); \ ex.writeMessage(); \
theMultiTaskProcess->abort(threadNo); \
} \ } \
} }
@@ -151,6 +152,7 @@ public:
void stop(void); void stop(void);
const char* getTaskName(void); const char* getTaskName(void);
void setTaskName(const char* name); void setTaskName(const char* name);
void abort(int threadNo);
private: private:
static void* _run(void*); static void* _run(void*);
pthread_t _threadID; pthread_t _threadID;

View File

@@ -43,6 +43,5 @@ int main(int argc, char** argv)
{ {
ex.writeMessage(); ex.writeMessage();
WRITELOG("ABORT Gateway!!!\n\n\n"); WRITELOG("ABORT Gateway!!!\n\n\n");
abort();
} }
} }