diff --git a/MQTTSNGateway/Makefile b/MQTTSNGateway/Makefile index 1fd2233..731d88e 100644 --- a/MQTTSNGateway/Makefile +++ b/MQTTSNGateway/Makefile @@ -77,6 +77,7 @@ $(SUBDIR)/MQTTSNUnsubscribeServer.c CXX := g++ CPPFLAGS += +# include directories, for MacOS/homebrew add -I/usr/local/opt/openssl/include/ INCLUDE := INCLUDES += $(INCLUDE) -I$(SRCDIR) \ -I$(SRCDIR)/$(OS) \ @@ -84,12 +85,16 @@ INCLUDES += $(INCLUDE) -I$(SRCDIR) \ -I$(SUBDIR) \ -I$(SRCDIR)/$(TEST) +# preprocessor defines DEFS := + +# library search paths, for MacOS/homebrew add -L/usr/local/opt/openssl/lib/ LIB := LIBS += $(LIB) -L/usr/local/lib -LDFLAGS := + +LDFLAGS := CXXFLAGS := -Wall -O3 -std=c++11 -LDADD := -lpthread -lssl -lcrypto -lrt +LDADD := -lpthread -lssl -lcrypto OUTDIR := Build PROG := $(OUTDIR)/$(PROGNAME) diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.cpp b/MQTTSNGateway/src/MQTTSNGWProcess.cpp index 88ba292..ac9c915 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.cpp +++ b/MQTTSNGateway/src/MQTTSNGWProcess.cpp @@ -103,7 +103,7 @@ void Process::initialize(int argc, char** argv) } } } - _rbsem = new Semaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0); + _rbsem = new NamedSemaphore(MQTTSNGW_RB_SEMAPHOR_NAME, 0); _rb = new RingBuffer(_configDir.c_str()); if (getParam("ShearedMemory", param) == 0) diff --git a/MQTTSNGateway/src/MQTTSNGWProcess.h b/MQTTSNGateway/src/MQTTSNGWProcess.h index 1fb02a9..3d94f2e 100644 --- a/MQTTSNGateway/src/MQTTSNGWProcess.h +++ b/MQTTSNGateway/src/MQTTSNGWProcess.h @@ -66,7 +66,7 @@ private: string _configDir; string _configFile; RingBuffer* _rb; - Semaphore* _rbsem; + NamedSemaphore* _rbsem; Mutex _mt; int _log; char _rbdata[PROCESS_LOG_BUFFER_SIZE + 1]; diff --git a/MQTTSNGateway/src/linux/Network.cpp b/MQTTSNGateway/src/linux/Network.cpp index 4f6f5a9..8880b90 100644 --- a/MQTTSNGateway/src/linux/Network.cpp +++ b/MQTTSNGateway/src/linux/Network.cpp @@ -145,7 +145,11 @@ bool TCPStack::accept(TCPStack& new_socket) int TCPStack::send(const uint8_t* buf, int length) { +#ifdef __APPLE__ + return ::send(_sockfd, buf, length, SO_NOSIGPIPE); +#else return ::send(_sockfd, buf, length, MSG_NOSIGNAL); +#endif } int TCPStack::recv(uint8_t* buf, int len) diff --git a/MQTTSNGateway/src/linux/Threading.cpp b/MQTTSNGateway/src/linux/Threading.cpp index b0b3ac4..da8cf34 100644 --- a/MQTTSNGateway/src/linux/Threading.cpp +++ b/MQTTSNGateway/src/linux/Threading.cpp @@ -21,27 +21,40 @@ #include #include #include -#include #include #include #include #include +#include using namespace std; using namespace MQTTSNGW; -#if defined(OSX) -int sem_timedwait(sem_type sem, const struct timespec *timeout) +#ifdef __APPLE__ + +int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout) { - int rc = -1; - int64_t tout = timeout->tv_sec * 1000L + tv_nsec * 1000000L - rc = (int)dispatch_semaphore_wait(sem, dispatch_time(DISPATCH_TIME_NOW, tout)); - if (rc != 0) + while (true) { - rc = ETIMEDOUT; + // try to lock the semaphore + int result = sem_trywait(sem); + if (result != -1 || errno != EAGAIN) + return result; + + // spin lock + sched_yield(); + + // check if timeout reached + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + if (ts.tv_sec > abs_timeout->tv_sec + || (ts.tv_sec == abs_timeout->tv_sec && abs_timeout->tv_nsec >= ts.tv_nsec)) + { + return ETIMEDOUT; + } } - return rc; } + #endif /*===================================== @@ -69,7 +82,7 @@ Mutex::Mutex(const char* fileName) throw Exception( -1, "Mutex can't create a shared memory."); } _pmutex = (pthread_mutex_t*) shmat(_shmid, NULL, 0); - if (_pmutex < 0) + if (_pmutex == (void*) -1) { throw Exception( -1, "Mutex can't attach shared memory."); } @@ -153,21 +166,61 @@ void Mutex::unlock(void) Class Semaphore =====================================*/ -Semaphore::Semaphore() -{ - sem_init(&_sem, 0, 0); - _name = 0; - _psem = 0; -} - Semaphore::Semaphore(unsigned int val) { +#ifdef __APPLE__ + _sem = dispatch_semaphore_create(val); +#else sem_init(&_sem, 0, val); - _name = 0; - _psem = 0; +#endif } -Semaphore::Semaphore(const char* name, unsigned int val) +Semaphore::~Semaphore() +{ +#ifdef __APPLE__ + dispatch_release(_sem); +#else + sem_destroy(&_sem); +#endif +} + +void Semaphore::post(void) +{ +#ifdef __APPLE__ + dispatch_semaphore_signal(_sem); +#else + sem_post(&_sem); +#endif +} + +void Semaphore::wait(void) +{ +#ifdef __APPLE__ + dispatch_semaphore_wait(_sem, DISPATCH_TIME_FOREVER); +#else + sem_wait(&_sem); +#endif +} + +void Semaphore::timedwait(uint16_t millsec) +{ +#ifdef __APPLE__ + dispatch_semaphore_wait(_sem, dispatch_time(DISPATCH_TIME_NOW, int64_t(millsec) * 1000000)); +#else + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + int nsec = ts.tv_nsec + (millsec % 1000) * 1000000; + ts.tv_nsec = nsec % 1000000000; + ts.tv_sec += millsec / 1000 + nsec / 1000000000; + sem_timedwait(&_sem, &ts); +#endif +} + +/*===================================== + Class NamedSemaphore + =====================================*/ + +NamedSemaphore::NamedSemaphore(const char* name, unsigned int val) { _psem = sem_open(name, O_CREAT, 0666, val); if (_psem == SEM_FAILED) @@ -181,67 +234,30 @@ Semaphore::Semaphore(const char* name, unsigned int val) } } -Semaphore::~Semaphore() +NamedSemaphore::~NamedSemaphore() { - if (_name) - { - sem_close(_psem); - sem_unlink(_name); - free(_name); - } - else - { - sem_destroy(&_sem); - } + sem_close(_psem); + sem_unlink(_name); + free(_name); } -void Semaphore::post(void) +void NamedSemaphore::post(void) { - int val = 0; - if (_psem) - { - sem_getvalue(_psem, &val); - if (val <= 0) - { - sem_post(_psem); - } - } - else - { - sem_getvalue(&_sem, &val); - if (val <= 0) - { - sem_post(&_sem); - } - } + sem_post(_psem); } -void Semaphore::wait(void) +void NamedSemaphore::wait(void) { - if (_psem) - { - sem_wait(_psem); - } - else - { - sem_wait(&_sem); - } + sem_wait(_psem); } -void Semaphore::timedwait(uint16_t millsec) +void NamedSemaphore::timedwait(uint16_t millsec) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += millsec / 1000; ts.tv_nsec = (millsec % 1000) * 1000000; - if (_psem) - { - sem_timedwait(_psem, &ts); - } - else - { - sem_timedwait(&_sem, &ts); - } + sem_timedwait(_psem, &ts); } /*========================================= @@ -274,7 +290,7 @@ RingBuffer::RingBuffer(const char* keyDirectory) if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | IPC_EXCL | 0666)) >= 0) { - if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0) + if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) != (void*) -1) { _length = (uint16_t*) _shmaddr; _start = (uint16_t*) _length + sizeof(uint16_t*); @@ -290,9 +306,9 @@ RingBuffer::RingBuffer(const char* keyDirectory) throw Exception(-1, "RingBuffer can't attach shared memory."); } } - else if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | 0666)) >= 0) + else if ((_shmid = shmget(key, PROCESS_LOG_BUFFER_SIZE, IPC_CREAT | 0666)) != -1) { - if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) > 0) + if ((_shmaddr = (uint16_t*) shmat(_shmid, NULL, 0)) != (void*) -1) { _length = (uint16_t*) _shmaddr; _start = (uint16_t*) _length + sizeof(uint16_t*); @@ -330,7 +346,7 @@ RingBuffer::~RingBuffer() } } - if (_pmx > 0) + if (_pmx != NULL) { delete _pmx; } diff --git a/MQTTSNGateway/src/linux/Threading.h b/MQTTSNGateway/src/linux/Threading.h index df58f1c..795ee5a 100644 --- a/MQTTSNGateway/src/linux/Threading.h +++ b/MQTTSNGateway/src/linux/Threading.h @@ -19,6 +19,9 @@ #include #include +#ifdef __APPLE__ +#include +#endif #include "MQTTSNGWDefines.h" namespace MQTTSNGW @@ -52,17 +55,34 @@ private: class Semaphore { public: - Semaphore(); - Semaphore(unsigned int val); - Semaphore(const char* name, unsigned int val); + Semaphore(unsigned int val = 0); ~Semaphore(); void post(void); void wait(void); void timedwait(uint16_t millsec); +private: +#ifdef __APPLE__ + dispatch_semaphore_t _sem; +#else + sem_t _sem; +#endif +}; + +/*===================================== + Class NamedSemaphore + ====================================*/ +class NamedSemaphore +{ +public: + NamedSemaphore(const char* name, unsigned int val); + ~NamedSemaphore(); + void post(void); + void wait(void); + void timedwait(uint16_t millsec); + private: sem_t* _psem; - sem_t _sem; char* _name; }; diff --git a/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp b/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp index 5d6b7b8..11affaa 100644 --- a/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp +++ b/MQTTSNGateway/src/linux/udp6/SensorNetwork.cpp @@ -121,11 +121,8 @@ char* SensorNetAddress::getAddress(void) bool SensorNetAddress::isMatch(SensorNetAddress* addr) { - return ((this->_portNo == addr->_portNo) && \ - (this->_IpAddr.sin6_addr.__in6_u.__u6_addr32[0] == addr->_IpAddr.sin6_addr.__in6_u.__u6_addr32[0]) && \ - (this->_IpAddr.sin6_addr.__in6_u.__u6_addr32[1] == addr->_IpAddr.sin6_addr.__in6_u.__u6_addr32[1]) && \ - (this->_IpAddr.sin6_addr.__in6_u.__u6_addr32[2] == addr->_IpAddr.sin6_addr.__in6_u.__u6_addr32[2]) && \ - (this->_IpAddr.sin6_addr.__in6_u.__u6_addr32[3] == addr->_IpAddr.sin6_addr.__in6_u.__u6_addr32[3])); + return (this->_portNo == addr->_portNo) && \ + (memcmp(this->_IpAddr.sin6_addr.s6_addr, addr->_IpAddr.sin6_addr.s6_addr, sizeof(this->_IpAddr.sin6_addr.s6_addr)) == 0); } SensorNetAddress& SensorNetAddress::operator =(SensorNetAddress& addr) @@ -319,8 +316,13 @@ int UDPPort6::open(const char* ipAddress, uint16_t uniPortNo, const char* broadc //if given, set a given device name to bind to if(strlen(interfaceName) > 0) { +#ifdef __APPLE__ + int idx = if_nametoindex(interfaceName); + setsockopt(_sockfdUnicast, IPPROTO_IP, IP_BOUND_IF, &idx, sizeof(idx)); +#else //socket option: bind to a given interface name setsockopt(_sockfdUnicast, SOL_SOCKET, SO_BINDTODEVICE, interfaceName, strlen(interfaceName)); +#endif } //socket option: reuse address @@ -370,7 +372,7 @@ int UDPPort6::unicast(const uint8_t* buf, uint32_t length, SensorNetAddress* add strcpy(destStr, addr->getAddress()); strcat(destStr,"%"); strcat(destStr,_interfaceName); - if(IN6_IS_ADDR_LINKLOCAL(addr->getAddress())) + if(IN6_IS_ADDR_LINKLOCAL(&addr->getIpAddress()->sin6_addr)) { getaddrinfo(destStr, portStr.c_str(), &hints, &res); } @@ -411,8 +413,8 @@ int UDPPort6::broadcast(const uint8_t* buf, uint32_t length) strcpy(destStr, _grpAddr.getAddress()); strcat(destStr,"%"); strcat(destStr,_interfaceName); - if(IN6_IS_ADDR_MC_NODELOCAL(_grpAddr.getAddress()) || - IN6_IS_ADDR_MC_LINKLOCAL(_grpAddr.getAddress())) + if(IN6_IS_ADDR_MC_NODELOCAL(&_grpAddr.getIpAddress()->sin6_addr) || + IN6_IS_ADDR_MC_LINKLOCAL(&_grpAddr.getIpAddress()->sin6_addr)) { err = getaddrinfo(destStr, std::to_string(_uniPortNo).c_str(), &hint, &info ); } diff --git a/MQTTSNPacket/src/MQTTSNSearchClient.c b/MQTTSNPacket/src/MQTTSNSearchClient.c index c55e134..af93c19 100644 --- a/MQTTSNPacket/src/MQTTSNSearchClient.c +++ b/MQTTSNPacket/src/MQTTSNSearchClient.c @@ -116,7 +116,7 @@ int MQTTSNDeserialize_gwinfo(unsigned char* gatewayid, unsigned short* gatewayad *gatewayid = readChar(&curdata); *gatewayaddress_len = enddata - curdata; - *gatewayaddress = (gatewayaddress_len > 0) ? curdata : NULL; + *gatewayaddress = (*gatewayaddress_len > 0) ? curdata : NULL; rc = 1; exit: