mirror of
https://github.com/eclipse/paho.mqtt-sn.embedded-c.git
synced 2025-12-13 15:36:51 +01:00
Update: set Max EventQue size to avoid Buffer over flow
Signed-off-by: tomoaki <tomoaki@tomy-tech.com>
This commit is contained in:
@@ -117,7 +117,10 @@ void BrokerRecvTask::run(void)
|
||||
/* post a BrokerRecvEvent */
|
||||
ev = new Event();
|
||||
ev->setBrokerRecvEvent(client, packet);
|
||||
_gateway->getPacketEventQue()->post(ev);
|
||||
if ( _gateway->getPacketEventQue()->post(ev) == 1 )
|
||||
{
|
||||
delete ev;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@@ -76,7 +76,10 @@ void ClientRecvTask::run()
|
||||
log(0, packet);
|
||||
ev = new Event();
|
||||
ev->setBrodcastEvent(packet);
|
||||
_gateway->getPacketEventQue()->post(ev);
|
||||
if ( _gateway->getPacketEventQue()->post(ev) == 1 )
|
||||
{
|
||||
delete ev;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -89,7 +92,10 @@ void ClientRecvTask::run()
|
||||
log(client, packet);
|
||||
ev = new Event();
|
||||
ev->setClientRecvEvent(client,packet);
|
||||
_gateway->getPacketEventQue()->post(ev);
|
||||
if ( _gateway->getPacketEventQue()->post(ev) == 1 )
|
||||
{
|
||||
delete ev;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -116,7 +122,10 @@ void ClientRecvTask::run()
|
||||
client->setClientAddress(_sensorNetwork->getSenderAddress());
|
||||
ev = new Event();
|
||||
ev->setClientRecvEvent(client, packet);
|
||||
_gateway->getPacketEventQue()->post(ev);
|
||||
if ( _gateway->getPacketEventQue()->post(ev) == 1 )
|
||||
{
|
||||
delete ev;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@@ -15,8 +15,8 @@
|
||||
**************************************************************************************/
|
||||
|
||||
#include "MQTTSNGateway.h"
|
||||
#include "MQTTSNGWProcess.h"
|
||||
#include "SensorNetwork.h"
|
||||
#include "MQTTSNGWProcess.h"
|
||||
|
||||
using namespace MQTTSNGW;
|
||||
|
||||
@@ -31,6 +31,7 @@ Gateway::Gateway()
|
||||
theProcess = this;
|
||||
_params.loginId = 0;
|
||||
_params.password = 0;
|
||||
_packetEventQue.setMaxSize(DEFAULT_INFLIGHTMESSAGE * DEFAULT_MAX_CLIENTS);
|
||||
}
|
||||
|
||||
Gateway::~Gateway()
|
||||
@@ -197,7 +198,7 @@ GatewayParams* Gateway::getGWParams(void)
|
||||
=====================================*/
|
||||
EventQue::EventQue()
|
||||
{
|
||||
|
||||
_maxSize = 0;
|
||||
}
|
||||
|
||||
EventQue::~EventQue()
|
||||
@@ -205,6 +206,11 @@ EventQue::~EventQue()
|
||||
|
||||
}
|
||||
|
||||
void EventQue::setMaxSize(uint16_t maxSize)
|
||||
{
|
||||
_maxSize = maxSize;
|
||||
}
|
||||
|
||||
Event* EventQue::wait(void)
|
||||
{
|
||||
Event* ev;
|
||||
@@ -249,14 +255,15 @@ Event* EventQue::timedwait(uint16_t millsec)
|
||||
|
||||
int EventQue::post(Event* ev)
|
||||
{
|
||||
if ( ev )
|
||||
if ( ev && ( _maxSize == 0 || size() < _maxSize ) )
|
||||
{
|
||||
_mutex.lock();
|
||||
_que.post(ev);
|
||||
_sem.post();
|
||||
_mutex.unlock();
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
int EventQue::size()
|
||||
|
||||
@@ -16,10 +16,9 @@
|
||||
#ifndef MQTTSNGATEWAY_H_
|
||||
#define MQTTSNGATEWAY_H_
|
||||
|
||||
#include "MQTTSNGWProcess.h"
|
||||
#include "MQTTSNGWClient.h"
|
||||
#include "MQTTSNGWProcess.h"
|
||||
#include "MQTTSNPacket.h"
|
||||
#include "MQTTGWPacket.h"
|
||||
|
||||
namespace MQTTSNGW
|
||||
{
|
||||
@@ -133,6 +132,7 @@ public:
|
||||
~EventQue();
|
||||
Event* wait(void);
|
||||
Event* timedwait(uint16_t millsec);
|
||||
void setMaxSize(uint16_t maxSize);
|
||||
int post(Event*);
|
||||
int size();
|
||||
|
||||
@@ -140,6 +140,7 @@ private:
|
||||
Que<Event> _que;
|
||||
Mutex _mutex;
|
||||
Semaphore _sem;
|
||||
uint16_t _maxSize;
|
||||
};
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user