/* * On-screen Display * * (c) 2021 Jiri Kalvoda <jirikalvoda@kam.mff.cuni.cz> */ #include "osdd-mqtt.h" #include "util.h" #include "osdd-to-string.h" #include <stdio.h> #include <string.h> #include <getopt.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <MQTTClient.h> // ************************* STATE STRUCT ********** struct mqtt_state { char * mqtt_addres; char * mqtt_user; char * mqtt_passwd; char * mqtt_topic; struct osd_to_string_state to_string; int pipe_fd; bool mqtt_retained; int mqtt_qos; }; struct mqtt_fork_state { int fd; bool connected; MQTTClient client; struct mqtt_state * state; }; // ************************* FORK ***************** static void mqtt_fork_connect(struct mqtt_fork_state * fork_state) { //fprintf(stderr,"MQTT CONNECT\n"); int rc; for(int i=0;i<2;i++) { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmissing-field-initializers" MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; conn_opts.httpProxy = 0; #pragma GCC diagnostic pop static char mqtt_con_name[300] = ""; if(!mqtt_con_name[0]) sprintf(mqtt_con_name,"led-include-%d-%d",(1),(2)); MQTTClient_create(&fork_state->client, fork_state->state->mqtt_addres, mqtt_con_name, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 600; conn_opts.cleansession = 0; conn_opts.username = fork_state->state->mqtt_user; conn_opts.password = fork_state->state->mqtt_passwd; MQTTClient_setCallbacks(fork_state->client, NULL, NULL, NULL, NULL); if ((rc = MQTTClient_connect(fork_state->client , &conn_opts)) == MQTTCLIENT_SUCCESS) { //fprintf(stderr,"MQTT CONNECT OK\n"); fork_state->connected = 1; return; } MQTTClient_destroy(&fork_state->client); fprintf(stderr,"MQTT CONNECT FAILD\n"); } fork_state->connected = 0; //printf("Failed to connect MQTT, return code %d\n", rc); //throw std::runtime_error("MQTT Connect error"); } int TIMEOUT = 10000; static void mqtt_fork_sent(struct mqtt_fork_state * fork_state, char * data) { int rc; for(int i=0;;i++) { if(!fork_state->connected) mqtt_fork_connect(fork_state); if(fork_state->connected) for(int i=0;i<2;i++) { MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; pubmsg.payload = (void *)data; pubmsg.payloadlen = strlen(data); pubmsg.qos = fork_state->state->mqtt_qos; pubmsg.retained = fork_state->state->mqtt_retained; //fprintf(stderr,"RETAINED %d QOS %d\n", pubmsg.retained, pubmsg.qos); MQTTClient_publishMessage(fork_state->client, fork_state->state->mqtt_topic, &pubmsg, &token); rc = MQTTClient_waitForCompletion(fork_state->client, token, TIMEOUT); if(rc == MQTTCLIENT_SUCCESS) return; } if(i==1) return; MQTTClient_destroy(&fork_state->client); mqtt_fork_connect(fork_state); } } static void NONRET mqtt_fork_main(struct mqtt_state * state, int fd) { struct mqtt_fork_state fork_state; fork_state.fd = fd; fork_state.connected = 0; fork_state.state = state; while(1) { int size; read(fd,&size,sizeof(int)); char * in = xmalloc(size); read(fd,in,size); //fprintf(stderr,"MQTT |%s|\n",in); mqtt_fork_sent(&fork_state,in); } } // ************************* MAIN FUNCTIONS ******* static void mqtt_write(struct mqtt_state *state, VECTOR(char) out) { if(!out) return; int size = VSIZE(out); char * buf = xmalloc(size+sizeof(int)); memcpy(buf + sizeof(int), out, size); *(int *) buf = size; write(state->pipe_fd,buf,size+sizeof(int)); free(buf); VFREE(out); } CREATE_FUNC_USING_TO_STRING(mqtt) // ************************* NEW ****************** static void mqtt_new_help(FILE * f) { fprintf(f,"\ Module MQTT help:\n\ -R, --retained \t\tSending messages will be retained\n\ -q, --qos=<{}> \t\tSet QOS of sending messages. {0,1,2}\n\ " OSDD_TO_STRING_HELP "\ \n\ "); } static struct osd_abstract mqtt_new(int argc, char ** argv, Display * nope) { (void)nope; argc++;argv--; struct osd_abstract r; static const char short_opts[] = "+Rq:" OSDD_TO_STRING_SHORTOP; static const struct option long_opts[] = { { "retained", no_argument, NULL, 'R' }, { "qos", required_argument, NULL, 'q' }, OSDD_TO_STRING_LONGOP { NULL, 0, NULL, 0 }, }; memset(&r,0,sizeof(r)); struct mqtt_state *state = xmalloc(sizeof(*state)); memset(state,0,sizeof(state)[0]); state->mqtt_qos = 1; state->mqtt_retained = 0; osd_to_string_state_init(&state->to_string); int opt; optind = 0; while ((opt = getopt_long(argc, argv, short_opts, long_opts, NULL)) >= 0) { if(!osd_to_string_parse_arg(&state->to_string,opt)) switch (opt) { case 'R': state->mqtt_retained = 1; break; case 'q': state->mqtt_qos = atoi(optarg); break; default: fprintf(stderr,"Option %c not exist\n\n",opt); mqtt_new_help(stderr); exit(0); } } int ind = optind; if(ind + 4 > argc) { fprintf(stderr,"Missing positional argument\n\n"); mqtt_new_help(stderr); exit(0); } state->mqtt_addres = argv[ind++]; state->mqtt_user = argv[ind++]; state->mqtt_passwd = argv[ind++]; state->mqtt_topic = argv[ind++]; if(ind != argc) { fprintf(stderr,"Too many arguments\n\n"); mqtt_new_help(stderr); exit(0); } ADD_FUNC_TO_ABSTRACT(r,mqtt); int fd[2]; pipe(fd); if(fork() == 0) { close (fd[1]); //write (fd[WRITE], phrase, strlen ( phrase) +1); mqtt_fork_main(state , fd[0]); } close(fd[0]); fcntl(fd[1], F_SETFL, O_NONBLOCK); state->pipe_fd = fd[1]; return r; } // ************************* CREATOR **************************** CREATE_STANDARD_CREATOR(mqtt,"MQTT")