FW3.xシリーズでは、2.xシリーズの各機能に加え、以下のようなアップデートが行われています。
・メッセージの完全な双方向化
・Dockerに対応。エッジ処理モジュールをDockerコンテナで提供可能
・エッジ処理のみをNode-REDでも記述可能
・Luaによるデバイスハンドラ記述が可能
・Node-REDの最新環境を提供し、カスタマイズを容易に
今回は、FW3.xで実装された、メッセージの双方向化における、下流方向のデータを用いたデバイスハンドラについて、まずは、C言語による記述と、設定方法について簡単に解説します。
メッセージの完全な双方向化に関しての詳細な情報は以下のドキュメントをご覧ください。
OpenBlocks IoT Family データハンドリングガイド
OpenBlocks IoT Family データハンドリング設定リファレンスガイド
先にご理解いただきたいのですが、メッセージの双方向化に関しては、現在は、
MQTT系の接続サービス、TCP接続、当社PD Exchangeとの接続、当社独自仕様のWebサーバーのみでご利用いただけます。対応の詳細に関しましては、上記ドキュメントを参照してください。
1.Userデバイスの作成と送受信設定
・Userデバイスの作成
サービスメニューより、基本機能、Userデバイスタブ、の順に選択します。
ユーザーメモを入力し、保存すると、一覧に作成したUserデバイスが表示されます。
サービスメニューより、IoTデータ、送受信設定の順に選択します。
今回は、例としてAzure IoT Hubへの送受信設定を行っています。
上記、送受信の設定を行った後、Userデバイスに対してのデバイス設定を行います。
デバイス設定メニュー内の送受信設定にて、iothubにチェックを入れ、接続に必要なデバイスIDおよびデバイスキーを入力します。
なお、Azure IoT Hubへの接続方法に関しては、以下のOBDNマガジンの記事にて解説を行っています。
下流方向への制御を行うには、「受信設定」を「有効」にします。
制御に使用する、Unixドメインソケットは、作成したデバイス番号より生成されます。
2.送受信に使用するUnixドメインソケット
制御メッセージの送受信に使用するUnixドメインソケットは、データハンドリングガイドにて解説しています。
PD Repeaterからの下流方向
¥0/pd_handler/<デバイス番号>.sock
PD Repeaterへの上流方向
¥0/pd_repeater/<デバイス番号>.sock
今回の設定例では、デバイス番号はuserdev_0000001となります。
3.Cで記述した下流方向制御アプリケーションのサンプル
本サンプルは、Cで記述した下流方向制御アプリケーションのサンプルです。
PD Repeater側がクライアントとなり、下流方向の制御アプリケーションはUnixドメインソケットを使うサーバ側の実装となります。
本プログラムは下流制御機能の評価用に作成されたもので、Userデバイスとして定義されるUnixドメインソケットのuserdev_0000000からuserdev_0000031までをポーリングして受信したデータを表示します。
# apt-get update
# apt-get install libssl-dev
インストールされていないならば、libssl-devパッケージをインストールし、
# make rd_header
シンプルにmakeしてください。
/*
* Copyright (c) 2018
* Plat'Home CO., LTD. <support@plathome.co.jp>. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the Plat'Home CO., LTD. nor the names of
* its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <sys/param.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/select.h>
#include <openssl/md5.h>
#ifdef __linux__
#include <sys/epoll.h>
#endif
#define N_DEVICE 32
#ifdef __linux__
#define ROOT_PATH "/pd_handler"
#else
#define ROOT_PATH "/tmp/pd_handler"
#endif
#define LOCALNAME "userdev_"
#define SOCKET_BUF 4096
#ifdef __linux__
#define MAX_SOCKET_CONN 512
#else
#define MAX_SOCKET_CONN 64
#endif
#define SELECT_TIMEOUT 0
#define SELECT_UTIMEOUT 1
#define EPOLL_WAIT 1
struct s_socket_t {
int fd;
struct sockaddr_un addr;
};
struct c_socket_t {
unsigned char id;
int fd;
struct sockaddr_un addr;
};
int main(int argc,char *argv[])
{
int i,j,k,m,n,rc;
char *buf, *socket_buf, *msg_buf;
#ifdef __linux__
int num_fd, epfd;
struct epoll_event ev;
struct epoll_event events[MAX_SOCKET_CONN];
int fcntl_flag;
#else
struct timeval timeout;
fd_set rd_set, rd_set_orig;
#endif
struct timeval tv;
socklen_t len;
struct s_socket_t *ss[N_DEVICE];
struct c_socket_t *cs[MAX_SOCKET_CONN - N_DEVICE];
u_int16_t topic_size;
char *p, *topic, *topic_p, *msg_buf_p;
unsigned char pl;
unsigned char ph;
unsigned char cloud_id, sub_id;
unsigned char md5[MD5_DIGEST_LENGTH];
char hash[MD5_DIGEST_LENGTH * 2 + 1];
buf = (char *)malloc(sizeof(char)*BUFSIZ);
if(!buf) {
perror("buf = malloc()");
exit(1);
}
socket_buf = (unsigned char *)malloc(sizeof(unsigned char)*SOCKET_BUF);
if(!socket_buf) {
perror("socket_buf = malloc()");
exit(1);
}
msg_buf = (unsigned char *)malloc(sizeof(unsigned char)*SOCKET_BUF);
if(!msg_buf) {
perror("msg_buf = malloc()");
exit(1);
}
topic = (char *)malloc(sizeof(char)*BUFSIZ);
if(!topic) {
perror("socket_buf = malloc()");
exit(1);
}
#ifdef __linux__
if ((epfd = epoll_create(MAX_SOCKET_CONN)) < 0) {
perror("epfd = epoll_create()");
exit(1);
}
#else
FD_ZERO(&rd_set_orig);
#endif
for(i=0;i<N_DEVICE;i++) {
ss[i] = (struct s_socket_t *)malloc(sizeof(struct s_socket_t));
if(!ss[i]) {
perror("ss[] = malloc()");
exit(1);
}
memset(ss[i], 0, sizeof(struct s_socket_t));
ss[i]->fd = socket(AF_UNIX, SOCK_STREAM, 0);
ss[i]->addr.sun_family = AF_UNIX;
#ifdef __linux__
snprintf(ss[i]->addr.sun_path + 1,sizeof(ss[i]->addr.sun_path) - 1,"%s/%s%07d.sock",
ROOT_PATH, LOCALNAME, i);
unlink(ss[i]->addr.sun_path);
rc = bind(ss[i]->fd, (struct sockaddr *)&ss[i]->addr, sizeof(sa_family_t) + strlen(ss[i]->
addr.sun_path + 1) + 1);
#else
snprintf(ss[i]->addr.sun_path,sizeof(ss[i]->addr.sun_path),"%s/%s%07d.sock",
ROOT_PATH, LOCALNAME, i);
unlink(ss[i]->addr.sun_path);
rc = bind(ss[i]->fd, (struct sockaddr *)&ss[i]->addr, sizeof(ss[i]->addr));
#endif
if (rc) {
snprintf(buf,sizeof(char)*BUFSIZ,"failed to bind(%s)",ss[i]->addr.sun_path);
perror(buf);
exit(1);
}
rc = listen(ss[i]->fd, N_DEVICE);
if (rc) {
snprintf(buf,sizeof(char)*BUFSIZ,"failed to listen(%s)",ss[i]->addr.sun_path);
perror(buf);
exit(1);
}
#ifdef __linux__
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = ss[i]->fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, ss[i]->fd, &ev);
#else
FD_SET(ss[i]->fd,&rd_set_orig);
#endif
printf("listen %s/%s%07d.sock\n", ROOT_PATH, LOCALNAME, i);
}
for(i=0;i<(MAX_SOCKET_CONN - N_DEVICE);i++) {
cs[i] = (struct c_socket_t *)malloc(sizeof(struct c_socket_t));
if(!cs[i]) {
perror("cs[] = malloc()");
exit(1);
}
memset(cs[i], 0, sizeof(struct c_socket_t));
cs[i]->fd = -1;
}
while(1) {
#ifdef __linux__
num_fd = epoll_wait(epfd, events, MAX_SOCKET_CONN, EPOLL_WAIT);
for(i=0;i<num_fd;i++) {
n=1;
for (j = 0; j < N_DEVICE; j++) {
if (events[i].data.fd == ss[j]->fd) {
for(k=0;k<(MAX_SOCKET_CONN - N_DEVICE);k++) {
if(cs[k]->fd == -1) {
break;
}
}
if(k == (MAX_SOCKET_CONN - N_DEVICE)) {
printf("exceeded maximum fd(%d) for accept()", MAX_SOCKET_CONN - N_DEVICE);
}
else {
len = sizeof(cs[k]->addr);
cs[k]->fd = accept(ss[j]->fd, (struct sockaddr *)&cs[k]->addr, &len);
if (cs[k]->fd != -1 ) {
cs[k]->id = j;
fcntl_flag = fcntl(cs[k]->fd, F_GETFL, 0);
fcntl(cs[k]->fd, F_SETFL, fcntl_flag | O_NONBLOCK);
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = cs[k]->fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cs[k]->fd, &ev);
}
}
n = 0;
}
}
if(n) {
for(j=0;j<(MAX_SOCKET_CONN - N_DEVICE);j++) {
if ( events[i].data.fd == cs[j]->fd ) {
memset(socket_buf, 0, sizeof(char)*SOCKET_BUF);
rc = read(cs[j]->fd,(char *)socket_buf,SOCKET_BUF);
if(rc > 0) {
p = socket_buf;
cloud_id = *p++;
sub_id = *p++;
pl = (unsigned char)*p++;
ph = (unsigned char)*p++;
for(m=0;m<MD5_DIGEST_LENGTH;m++)
md5[m] = (unsigned char)*p++;
for(m=0; m<MD5_DIGEST_LENGTH;m++)
snprintf(hash + m + m, sizeof(char)*3, "%.2x", md5[m]);
topic_size = (u_int16_t)((ph & 0x00ff) * 256 + (pl & 0x00ff));
topic_p = topic;
for(m=0;m<topic_size;m++)
*topic_p++ = *p++;
*topic_p = 0x00;
msg_buf_p = msg_buf;
while(*p != 0x00)
*msg_buf_p++ = *p++;
*msg_buf_p = 0x0;
printf("subscrib: topic_size=%d topic=%s payload=%s md5=%s\n",
topic_size, topic, msg_buf, hash);
}
epoll_ctl(epfd, EPOLL_CTL_DEL, cs[j]->fd, &ev);
shutdown(cs[j]->fd,SHUT_RDWR);
close(cs[j]->fd);
cs[j]->fd = -1;
}
}
}
}
#else
timeout.tv_sec = SELECT_TIMEOUT;
timeout.tv_usec = SELECT_UTIMEOUT;
memcpy(&rd_set,&rd_set_orig,sizeof(rd_set_orig));
if(select(FD_SETSIZE,&rd_set,(fd_set *)NULL,(fd_set *)NULL,&timeout) == -1) {
perror("select()");
exit(1);
}
for(i=0;i<FD_SETSIZE;i++) {
if(FD_ISSET(i, &rd_set)) {
n=1;
for(j=0;j<N_DEVICE;j++) {
if ( i == ss[j]->fd ) {
for(k=0;k<(MAX_SOCKET_CONN - N_DEVICE);k++) {
if(cs[k]->fd == -1) {
break;
}
}
if(k == (MAX_SOCKET_CONN - N_DEVICE)) {
printf("exceeded maximum fd(%d) for accept()", MAX_SOCKET_CONN);
}
else {
len = sizeof(cs[k]->addr);
cs[k]->fd = accept(i, (struct sockaddr *)&cs[k]->addr, &len);
if (cs[k]->fd != -1 ) {
setsockopt(cs[k]->fd, SOL_SOCKET, SO_RCVTIMEO,(char*)&timeout, sizeof(struct timeval));
FD_SET(cs[k]->fd, &rd_set_orig);
cs[k]->id = j;
}
}
n = 0;
}
}
if(n) {
for(j=0;j<(MAX_SOCKET_CONN - N_DEVICE);j++) {
if (i == cs[j]->fd ) {
memset(socket_buf, 0, sizeof(char)*SOCKET_BUF);
rc = read(i,(char *)socket_buf,SOCKET_BUF);
if(rc > 0) {
p = socket_buf;
cloud_id = *p++;
sub_id = *p++;
pl = (unsigned char)*p++;
ph = (unsigned char)*p++;
for(m=0;m<MD5_DIGEST_LENGTH;m++)
md5[m] = (unsigned char)*p++;
for(m=0; m<MD5_DIGEST_LENGTH;m++)
snprintf(hash + m + m, sizeof(char)*3, "%.2x", md5[m]);
topic_size = (u_int16_t)((ph & 0x00ff) * 256 + (pl & 0x00ff));
topic_p = topic;
for(m=0;m<topic_size;m++)
*topic_p++ = *p++;
*topic_p = 0x00;
msg_buf_p = msg_buf;
while(*p != 0x00)
*msg_buf_p++ = *p++;
*msg_buf_p = 0x0;
printf("subscrib: topic_size=%d topic=%s payload=%s md5=%s\n",
topic_size, topic, msg_buf, hash);
}
shutdown(i,SHUT_RDWR);
close(i);
FD_CLR(i,&rd_set_orig);
cs[j]->fd = -1;
}
}
}
}
}
#endif
}
}
* Copyright (c) 2018
* Plat'Home CO., LTD. <support@plathome.co.jp>. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the Plat'Home CO., LTD. nor the names of
* its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <sys/param.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/select.h>
#include <openssl/md5.h>
#ifdef __linux__
#include <sys/epoll.h>
#endif
#define N_DEVICE 32
#ifdef __linux__
#define ROOT_PATH "/pd_handler"
#else
#define ROOT_PATH "/tmp/pd_handler"
#endif
#define LOCALNAME "userdev_"
#define SOCKET_BUF 4096
#ifdef __linux__
#define MAX_SOCKET_CONN 512
#else
#define MAX_SOCKET_CONN 64
#endif
#define SELECT_TIMEOUT 0
#define SELECT_UTIMEOUT 1
#define EPOLL_WAIT 1
struct s_socket_t {
int fd;
struct sockaddr_un addr;
};
struct c_socket_t {
unsigned char id;
int fd;
struct sockaddr_un addr;
};
int main(int argc,char *argv[])
{
int i,j,k,m,n,rc;
char *buf, *socket_buf, *msg_buf;
#ifdef __linux__
int num_fd, epfd;
struct epoll_event ev;
struct epoll_event events[MAX_SOCKET_CONN];
int fcntl_flag;
#else
struct timeval timeout;
fd_set rd_set, rd_set_orig;
#endif
struct timeval tv;
socklen_t len;
struct s_socket_t *ss[N_DEVICE];
struct c_socket_t *cs[MAX_SOCKET_CONN - N_DEVICE];
u_int16_t topic_size;
char *p, *topic, *topic_p, *msg_buf_p;
unsigned char pl;
unsigned char ph;
unsigned char cloud_id, sub_id;
unsigned char md5[MD5_DIGEST_LENGTH];
char hash[MD5_DIGEST_LENGTH * 2 + 1];
buf = (char *)malloc(sizeof(char)*BUFSIZ);
if(!buf) {
perror("buf = malloc()");
exit(1);
}
socket_buf = (unsigned char *)malloc(sizeof(unsigned char)*SOCKET_BUF);
if(!socket_buf) {
perror("socket_buf = malloc()");
exit(1);
}
msg_buf = (unsigned char *)malloc(sizeof(unsigned char)*SOCKET_BUF);
if(!msg_buf) {
perror("msg_buf = malloc()");
exit(1);
}
topic = (char *)malloc(sizeof(char)*BUFSIZ);
if(!topic) {
perror("socket_buf = malloc()");
exit(1);
}
#ifdef __linux__
if ((epfd = epoll_create(MAX_SOCKET_CONN)) < 0) {
perror("epfd = epoll_create()");
exit(1);
}
#else
FD_ZERO(&rd_set_orig);
#endif
for(i=0;i<N_DEVICE;i++) {
ss[i] = (struct s_socket_t *)malloc(sizeof(struct s_socket_t));
if(!ss[i]) {
perror("ss[] = malloc()");
exit(1);
}
memset(ss[i], 0, sizeof(struct s_socket_t));
ss[i]->fd = socket(AF_UNIX, SOCK_STREAM, 0);
ss[i]->addr.sun_family = AF_UNIX;
#ifdef __linux__
snprintf(ss[i]->addr.sun_path + 1,sizeof(ss[i]->addr.sun_path) - 1,"%s/%s%07d.sock",
ROOT_PATH, LOCALNAME, i);
unlink(ss[i]->addr.sun_path);
rc = bind(ss[i]->fd, (struct sockaddr *)&ss[i]->addr, sizeof(sa_family_t) + strlen(ss[i]->
addr.sun_path + 1) + 1);
#else
snprintf(ss[i]->addr.sun_path,sizeof(ss[i]->addr.sun_path),"%s/%s%07d.sock",
ROOT_PATH, LOCALNAME, i);
unlink(ss[i]->addr.sun_path);
rc = bind(ss[i]->fd, (struct sockaddr *)&ss[i]->addr, sizeof(ss[i]->addr));
#endif
if (rc) {
snprintf(buf,sizeof(char)*BUFSIZ,"failed to bind(%s)",ss[i]->addr.sun_path);
perror(buf);
exit(1);
}
rc = listen(ss[i]->fd, N_DEVICE);
if (rc) {
snprintf(buf,sizeof(char)*BUFSIZ,"failed to listen(%s)",ss[i]->addr.sun_path);
perror(buf);
exit(1);
}
#ifdef __linux__
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = ss[i]->fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, ss[i]->fd, &ev);
#else
FD_SET(ss[i]->fd,&rd_set_orig);
#endif
printf("listen %s/%s%07d.sock\n", ROOT_PATH, LOCALNAME, i);
}
for(i=0;i<(MAX_SOCKET_CONN - N_DEVICE);i++) {
cs[i] = (struct c_socket_t *)malloc(sizeof(struct c_socket_t));
if(!cs[i]) {
perror("cs[] = malloc()");
exit(1);
}
memset(cs[i], 0, sizeof(struct c_socket_t));
cs[i]->fd = -1;
}
while(1) {
#ifdef __linux__
num_fd = epoll_wait(epfd, events, MAX_SOCKET_CONN, EPOLL_WAIT);
for(i=0;i<num_fd;i++) {
n=1;
for (j = 0; j < N_DEVICE; j++) {
if (events[i].data.fd == ss[j]->fd) {
for(k=0;k<(MAX_SOCKET_CONN - N_DEVICE);k++) {
if(cs[k]->fd == -1) {
break;
}
}
if(k == (MAX_SOCKET_CONN - N_DEVICE)) {
printf("exceeded maximum fd(%d) for accept()", MAX_SOCKET_CONN - N_DEVICE);
}
else {
len = sizeof(cs[k]->addr);
cs[k]->fd = accept(ss[j]->fd, (struct sockaddr *)&cs[k]->addr, &len);
if (cs[k]->fd != -1 ) {
cs[k]->id = j;
fcntl_flag = fcntl(cs[k]->fd, F_GETFL, 0);
fcntl(cs[k]->fd, F_SETFL, fcntl_flag | O_NONBLOCK);
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = cs[k]->fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cs[k]->fd, &ev);
}
}
n = 0;
}
}
if(n) {
for(j=0;j<(MAX_SOCKET_CONN - N_DEVICE);j++) {
if ( events[i].data.fd == cs[j]->fd ) {
memset(socket_buf, 0, sizeof(char)*SOCKET_BUF);
rc = read(cs[j]->fd,(char *)socket_buf,SOCKET_BUF);
if(rc > 0) {
p = socket_buf;
cloud_id = *p++;
sub_id = *p++;
pl = (unsigned char)*p++;
ph = (unsigned char)*p++;
for(m=0;m<MD5_DIGEST_LENGTH;m++)
md5[m] = (unsigned char)*p++;
for(m=0; m<MD5_DIGEST_LENGTH;m++)
snprintf(hash + m + m, sizeof(char)*3, "%.2x", md5[m]);
topic_size = (u_int16_t)((ph & 0x00ff) * 256 + (pl & 0x00ff));
topic_p = topic;
for(m=0;m<topic_size;m++)
*topic_p++ = *p++;
*topic_p = 0x00;
msg_buf_p = msg_buf;
while(*p != 0x00)
*msg_buf_p++ = *p++;
*msg_buf_p = 0x0;
printf("subscrib: topic_size=%d topic=%s payload=%s md5=%s\n",
topic_size, topic, msg_buf, hash);
}
epoll_ctl(epfd, EPOLL_CTL_DEL, cs[j]->fd, &ev);
shutdown(cs[j]->fd,SHUT_RDWR);
close(cs[j]->fd);
cs[j]->fd = -1;
}
}
}
}
#else
timeout.tv_sec = SELECT_TIMEOUT;
timeout.tv_usec = SELECT_UTIMEOUT;
memcpy(&rd_set,&rd_set_orig,sizeof(rd_set_orig));
if(select(FD_SETSIZE,&rd_set,(fd_set *)NULL,(fd_set *)NULL,&timeout) == -1) {
perror("select()");
exit(1);
}
for(i=0;i<FD_SETSIZE;i++) {
if(FD_ISSET(i, &rd_set)) {
n=1;
for(j=0;j<N_DEVICE;j++) {
if ( i == ss[j]->fd ) {
for(k=0;k<(MAX_SOCKET_CONN - N_DEVICE);k++) {
if(cs[k]->fd == -1) {
break;
}
}
if(k == (MAX_SOCKET_CONN - N_DEVICE)) {
printf("exceeded maximum fd(%d) for accept()", MAX_SOCKET_CONN);
}
else {
len = sizeof(cs[k]->addr);
cs[k]->fd = accept(i, (struct sockaddr *)&cs[k]->addr, &len);
if (cs[k]->fd != -1 ) {
setsockopt(cs[k]->fd, SOL_SOCKET, SO_RCVTIMEO,(char*)&timeout, sizeof(struct timeval));
FD_SET(cs[k]->fd, &rd_set_orig);
cs[k]->id = j;
}
}
n = 0;
}
}
if(n) {
for(j=0;j<(MAX_SOCKET_CONN - N_DEVICE);j++) {
if (i == cs[j]->fd ) {
memset(socket_buf, 0, sizeof(char)*SOCKET_BUF);
rc = read(i,(char *)socket_buf,SOCKET_BUF);
if(rc > 0) {
p = socket_buf;
cloud_id = *p++;
sub_id = *p++;
pl = (unsigned char)*p++;
ph = (unsigned char)*p++;
for(m=0;m<MD5_DIGEST_LENGTH;m++)
md5[m] = (unsigned char)*p++;
for(m=0; m<MD5_DIGEST_LENGTH;m++)
snprintf(hash + m + m, sizeof(char)*3, "%.2x", md5[m]);
topic_size = (u_int16_t)((ph & 0x00ff) * 256 + (pl & 0x00ff));
topic_p = topic;
for(m=0;m<topic_size;m++)
*topic_p++ = *p++;
*topic_p = 0x00;
msg_buf_p = msg_buf;
while(*p != 0x00)
*msg_buf_p++ = *p++;
*msg_buf_p = 0x0;
printf("subscrib: topic_size=%d topic=%s payload=%s md5=%s\n",
topic_size, topic, msg_buf, hash);
}
shutdown(i,SHUT_RDWR);
close(i);
FD_CLR(i,&rd_set_orig);
cs[j]->fd = -1;
}
}
}
}
}
#endif
}
}
4.[補足] Cで記述したPD Repeaterへの上流方向へのデータ送信
他の記事で上流方向へデータを送信するカスタムハンドラについて記述していますが、FW3.xでの書き込み方法としては、以下のコードを参考にしてください。
|
|||||||||||||
/* write data to unix domain socket */ |
write_uds( uint8_t *jbuf
) {
int uds, ret;
socklen_t socklen;
struct sockaddr_un addr;
uds = socket(AF_UNIX, SOCK_STREAM, 0);
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path+1,
"/pd_repeater/userdev_0000001.sock");
socklen =
sizeof(sa_family_t) + strlen(addr.sun_path+1)+1;
ret = connect(uds, (struct sockaddr
*)&addr, socklen);
if (ret < 0) {
int err = errno;
printf("connect NG (errno:
%d)\n", err);
close(uds);
return 1;
}
else {
printf("connect OK\n");
}
write(uds, jbuf, strlen(jbuf));
close( uds );
return 0;
}
|
5.さいごに
下流方向の制御に関して、PD Repeaterがクライアントであり、制御アプリケーションはサーバとしての実装が必要になります。
また、WebUIに連動して、起動、停止を行うためには、プロセスを外部から制御する実装を行ってください。