Support for MacOS

Split semaphore classes into Semaphore and NamedSemaphore
Semaphore is implemented with Grand Central Dispatch
NamedSemaphore uses the spin lock approach like in boost for sem_timedwait
sem_getvalue is not supported on MacOS and therefore was removed
Fixed bug (*gatewayaddress_len > 0) in MQTTSNSearchClient.c

Signed-off-by: Jochen Wilhelmy <jochen.wilhelmy@gmail.com>
This commit is contained in:
Jochen Wilhelmy
2020-01-17 17:40:56 +01:00
parent e64e817f80
commit d6effc8074
8 changed files with 135 additions and 88 deletions

View File

@@ -77,6 +77,7 @@ $(SUBDIR)/MQTTSNUnsubscribeServer.c
CXX := g++ CXX := g++
CPPFLAGS += CPPFLAGS +=
# include directories, for MacOS/homebrew add -I/usr/local/opt/openssl/include/
INCLUDE := INCLUDE :=
INCLUDES += $(INCLUDE) -I$(SRCDIR) \ INCLUDES += $(INCLUDE) -I$(SRCDIR) \
-I$(SRCDIR)/$(OS) \ -I$(SRCDIR)/$(OS) \
@@ -84,12 +85,16 @@ INCLUDES += $(INCLUDE) -I$(SRCDIR) \
-I$(SUBDIR) \ -I$(SUBDIR) \
-I$(SRCDIR)/$(TEST) -I$(SRCDIR)/$(TEST)
# preprocessor defines
DEFS := DEFS :=
# library search paths, for MacOS/homebrew add -L/usr/local/opt/openssl/lib/
LIB := LIB :=
LIBS += $(LIB) -L/usr/local/lib LIBS += $(LIB) -L/usr/local/lib
LDFLAGS :=
LDFLAGS :=
CXXFLAGS := -Wall -O3 -std=c++11 CXXFLAGS := -Wall -O3 -std=c++11
LDADD := -lpthread -lssl -lcrypto -lrt LDADD := -lpthread -lssl -lcrypto
OUTDIR := Build OUTDIR := Build
PROG := $(OUTDIR)/$(PROGNAME) PROG := $(OUTDIR)/$(PROGNAME)

View File

@@ -103,7 +103,7 @@ void Process::initialize(int argc, char** argv)
} }
} }
} }
_rbsem = new Semaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0); _rbsem = new NamedSemaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0);
_rb = new RingBuffer(_configDir.c_str()); _rb = new RingBuffer(_configDir.c_str());
if (getParam("ShearedMemory", param) == 0) if (getParam("ShearedMemory", param) == 0)

View File

@@ -66,7 +66,7 @@ private:
string _configDir; string _configDir;
string _configFile; string _configFile;
RingBuffer* _rb; RingBuffer* _rb;
Semaphore* _rbsem; NamedSemaphore* _rbsem;
Mutex _mt; Mutex _mt;
int _log; int _log;
char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1]; char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1];

View File

@@ -145,7 +145,11 @@ bool TCPStack::accept(TCPStack& new_socket)
int TCPStack::send(const uint8_t* buf, int length) int TCPStack::send(const uint8_t* buf, int length)
{ {
#ifdef __APPLE__
return ::send(_sockfd, buf, length, SO_NOSIGPIPE);
#else
return ::send(_sockfd, buf, length, MSG_NOSIGNAL); return ::send(_sockfd, buf, length, MSG_NOSIGNAL);
#endif
} }
int TCPStack::recv(uint8_t* buf, int len) int TCPStack::recv(uint8_t* buf, int len)

View File

@@ -21,27 +21,40 @@
#include <sys/ipc.h> #include <sys/ipc.h>
#include <sys/shm.h> #include <sys/shm.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <semaphore.h>
#include <fcntl.h> #include <fcntl.h>
#include <string.h> #include <string.h>
#include <pthread.h> #include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
using namespace std; using namespace std;
using namespace MQTTSNGW; using namespace MQTTSNGW;
#if defined(OSX) #ifdef __APPLE__
int sem_timedwait(sem_type sem, const struct timespec *timeout)
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout)
{ {
int rc = -1; while (true)
int64_t tout = timeout->tv_sec * 1000L + tv_nsec * 1000000L
rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, tout));
if (rc != 0)
{ {
rc = ETIMEDOUT; // try to lock the semaphore
int result = sem_trywait(sem);
if (result != -1 || errno != EAGAIN)
return result;
// spin lock
sched_yield();
// check if timeout reached
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
if (ts.tv_sec > abs_timeout->tv_sec
|| (ts.tv_sec == abs_timeout->tv_sec && abs_timeout->tv_nsec >= ts.tv_nsec))
{
return ETIMEDOUT;
}
} }
return rc;
} }
#endif #endif
/*===================================== /*=====================================
@@ -69,7 +82,7 @@ Mutex::Mutex(const char* fileName)
throw Exception( -1, "Mutex can't create a shared memory."); throw Exception( -1, "Mutex can't create a shared memory.");
} }
_pmutex = (pthread_mutex_t*) shmat(_shmid, NULL, 0); _pmutex = (pthread_mutex_t*) shmat(_shmid, NULL, 0);
if (_pmutex < 0) if (_pmutex == (void*) -1)
{ {
throw Exception( -1, "Mutex can't attach shared memory."); throw Exception( -1, "Mutex can't attach shared memory.");
} }
@@ -153,21 +166,61 @@ void Mutex::unlock(void)
Class Semaphore Class Semaphore
=====================================*/ =====================================*/
Semaphore::Semaphore()
{
sem_init(&_sem, 0, 0);
_name = 0;
_psem = 0;
}
Semaphore::Semaphore(unsigned int val) Semaphore::Semaphore(unsigned int val)
{ {
#ifdef __APPLE__
_sem = dispatch_semaphore_create(val);
#else
sem_init(&_sem, 0, val); sem_init(&_sem, 0, val);
_name = 0; #endif
_psem = 0;
} }
Semaphore::Semaphore(const char* name, unsigned int val) Semaphore::~Semaphore()
{
#ifdef __APPLE__
dispatch_release(_sem);
#else
sem_destroy(&_sem);
#endif
}
void Semaphore::post(void)
{
#ifdef __APPLE__
dispatch_semaphore_signal(_sem);
#else
sem_post(&_sem);
#endif
}
void Semaphore::wait(void)
{
#ifdef __APPLE__
dispatch_semaphore_wait(_sem, DISPATCH_TIME_FOREVER);
#else
sem_wait(&_sem);
#endif
}
void Semaphore::timedwait(uint16_t millsec)
{
#ifdef __APPLE__
dispatch_semaphore_wait(_sem, dispatch_time(DISPATCH_TIME_NOW, int64_t(millsec) * 1000000));
#else
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
int nsec = ts.tv_nsec + (millsec % 1000) * 1000000;
ts.tv_nsec = nsec % 1000000000;
ts.tv_sec += millsec / 1000 + nsec / 1000000000;
sem_timedwait(&_sem, &ts);
#endif
}
/*=====================================
Class NamedSemaphore
=====================================*/
NamedSemaphore::NamedSemaphore(const char* name, unsigned int val)
{ {
_psem = sem_open(name, O_CREAT, 0666, val); _psem = sem_open(name, O_CREAT, 0666, val);
if (_psem == SEM_FAILED) if (_psem == SEM_FAILED)
@@ -181,67 +234,30 @@ Semaphore::Semaphore(const char* name, unsigned int val)
} }
} }
Semaphore::~Semaphore() NamedSemaphore::~NamedSemaphore()
{ {
if (_name) sem_close(_psem);
{ sem_unlink(_name);
sem_close(_psem); free(_name);
sem_unlink(_name);
free(_name);
}
else
{
sem_destroy(&_sem);
}
} }
void Semaphore::post(void) void NamedSemaphore::post(void)
{ {
int val = 0; sem_post(_psem);
if (_psem)
{
sem_getvalue(_psem, &val);
if (val <= 0)
{
sem_post(_psem);
}
}
else
{
sem_getvalue(&_sem, &val);
if (val <= 0)
{
sem_post(&_sem);
}
}
} }
void Semaphore::wait(void) void NamedSemaphore::wait(void)
{ {
if (_psem) sem_wait(_psem);
{
sem_wait(_psem);
}
else
{
sem_wait(&_sem);
}
} }
void Semaphore::timedwait(uint16_t millsec) void NamedSemaphore::timedwait(uint16_t millsec)
{ {
struct timespec ts; struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts); clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += millsec / 1000; ts.tv_sec += millsec / 1000;
ts.tv_nsec = (millsec % 1000) * 1000000; ts.tv_nsec = (millsec % 1000) * 1000000;
if (_psem) sem_timedwait(_psem, &ts);
{
sem_timedwait(_psem, &ts);
}
else
{
sem_timedwait(&_sem, &ts);
}
} }
/*========================================= /*=========================================
@@ -274,7 +290,7 @@ RingBuffer::RingBuffer(const char* keyDirectory)
if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE,
IPC_CREAT | IPC_EXCL | 0666)) >= 0) IPC_CREAT | IPC_EXCL | 0666)) >= 0)
{ {
if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0) if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) != (void*) -1)
{ {
_length = (uint16_t*) _shmaddr; _length = (uint16_t*) _shmaddr;
_start = (uint16_t*) _length + sizeof(uint16_t*); _start = (uint16_t*) _length + sizeof(uint16_t*);
@@ -290,9 +306,9 @@ RingBuffer::RingBuffer(const char* keyDirectory)
throw Exception(-1, "RingBuffer can't attach shared memory."); throw Exception(-1, "RingBuffer can't attach shared memory.");
} }
} }
else if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | 0666)) >= 0) else if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | 0666)) != -1)
{ {
if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0) if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) != (void*) -1)
{ {
_length = (uint16_t*) _shmaddr; _length = (uint16_t*) _shmaddr;
_start = (uint16_t*) _length + sizeof(uint16_t*); _start = (uint16_t*) _length + sizeof(uint16_t*);
@@ -330,7 +346,7 @@ RingBuffer::~RingBuffer()
} }
} }
if (_pmx > 0) if (_pmx != NULL)
{ {
delete _pmx; delete _pmx;
} }

View File

@@ -19,6 +19,9 @@
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
#ifdef __APPLE__
#include <dispatch/dispatch.h>
#endif
#include "MQTTSNGWDefines.h" #include "MQTTSNGWDefines.h"
namespace MQTTSNGW namespace MQTTSNGW
@@ -52,17 +55,34 @@ private:
class Semaphore class Semaphore
{ {
public: public:
Semaphore(); Semaphore(unsigned int val = 0);
Semaphore(unsigned int val);
Semaphore(const char* name, unsigned int val);
~Semaphore(); ~Semaphore();
void post(void); void post(void);
void wait(void); void wait(void);
void timedwait(uint16_t millsec); void timedwait(uint16_t millsec);
private:
#ifdef __APPLE__
dispatch_semaphore_t _sem;
#else
sem_t _sem;
#endif
};
/*=====================================
Class NamedSemaphore
====================================*/
class NamedSemaphore
{
public:
NamedSemaphore(const char* name, unsigned int val);
~NamedSemaphore();
void post(void);
void wait(void);
void timedwait(uint16_t millsec);
private: private:
sem_t* _psem; sem_t* _psem;
sem_t _sem;
char* _name; char* _name;
}; };

View File

@@ -121,11 +121,8 @@ char* SensorNetAddress::getAddress(void)
bool SensorNetAddress::isMatch(SensorNetAddress* addr) bool SensorNetAddress::isMatch(SensorNetAddress* addr)
{ {
return ((this->_portNo == addr->_portNo) && \ return (this->_portNo == addr->_portNo) && \
(this->_IpAddr.sin6_addr.__in6_u.__u6_addr32[0] == addr->_IpAddr.sin6_addr.__in6_u.__u6_addr32[0]) && \ (memcmp(this->_IpAddr.sin6_addr.s6_addr, addr->_IpAddr.sin6_addr.s6_addr, sizeof(this->_IpAddr.sin6_addr.s6_addr)) == 0);
(this->_IpAddr.sin6_addr.__in6_u.__u6_addr32[1] == addr->_IpAddr.sin6_addr.__in6_u.__u6_addr32[1]) && \
(this->_IpAddr.sin6_addr.__in6_u.__u6_addr32[2] == addr->_IpAddr.sin6_addr.__in6_u.__u6_addr32[2]) && \
(this->_IpAddr.sin6_addr.__in6_u.__u6_addr32[3] == addr->_IpAddr.sin6_addr.__in6_u.__u6_addr32[3]));
} }
SensorNetAddress& SensorNetAddress::operator =(SensorNetAddress& addr) SensorNetAddress& SensorNetAddress::operator =(SensorNetAddress& addr)
@@ -319,8 +316,13 @@ int UDPPort6::open(const char* ipAddress, uint16_t uniPortNo, const char* broadc
//if given, set a given device name to bind to //if given, set a given device name to bind to
if(strlen(interfaceName) > 0) if(strlen(interfaceName) > 0)
{ {
#ifdef __APPLE__
int idx = if_nametoindex(interfaceName);
setsockopt(_sockfdUnicast, IPPROTO_IP, IP_BOUND_IF, &idx, sizeof(idx));
#else
//socket option: bind to a given interface name //socket option: bind to a given interface name
setsockopt(_sockfdUnicast, SOL_SOCKET, SO_BINDTODEVICE, interfaceName, strlen(interfaceName)); setsockopt(_sockfdUnicast, SOL_SOCKET, SO_BINDTODEVICE, interfaceName, strlen(interfaceName));
#endif
} }
//socket option: reuse address //socket option: reuse address
@@ -370,7 +372,7 @@ int UDPPort6::unicast(const uint8_t* buf, uint32_t length, SensorNetAddress* add
strcpy(destStr, addr->getAddress()); strcpy(destStr, addr->getAddress());
strcat(destStr,"%"); strcat(destStr,"%");
strcat(destStr,_interfaceName); strcat(destStr,_interfaceName);
if(IN6_IS_ADDR_LINKLOCAL(addr->getAddress())) if(IN6_IS_ADDR_LINKLOCAL(&addr->getIpAddress()->sin6_addr))
{ {
getaddrinfo(destStr, portStr.c_str(), &hints, &res); getaddrinfo(destStr, portStr.c_str(), &hints, &res);
} }
@@ -411,8 +413,8 @@ int UDPPort6::broadcast(const uint8_t* buf, uint32_t length)
strcpy(destStr, _grpAddr.getAddress()); strcpy(destStr, _grpAddr.getAddress());
strcat(destStr,"%"); strcat(destStr,"%");
strcat(destStr,_interfaceName); strcat(destStr,_interfaceName);
if(IN6_IS_ADDR_MC_NODELOCAL(_grpAddr.getAddress()) || if(IN6_IS_ADDR_MC_NODELOCAL(&_grpAddr.getIpAddress()->sin6_addr) ||
IN6_IS_ADDR_MC_LINKLOCAL(_grpAddr.getAddress())) IN6_IS_ADDR_MC_LINKLOCAL(&_grpAddr.getIpAddress()->sin6_addr))
{ {
err = getaddrinfo(destStr, std::to_string(_uniPortNo).c_str(), &hint, &info ); err = getaddrinfo(destStr, std::to_string(_uniPortNo).c_str(), &hint, &info );
} }

View File

@@ -116,7 +116,7 @@ int MQTTSNDeserialize_gwinfo(unsigned char* gatewayid, unsigned short* gatewayad
*gatewayid = readChar(&curdata); *gatewayid = readChar(&curdata);
*gatewayaddress_len = enddata - curdata; *gatewayaddress_len = enddata - curdata;
*gatewayaddress = (gatewayaddress_len > 0) ? curdata : NULL; *gatewayaddress = (*gatewayaddress_len > 0) ? curdata : NULL;
rc = 1; rc = 1;
exit: exit: