PRODUCTION_MODE) { define('AMQP_DEBUG', true); } /** Utilise pour le test de retour de requete curl */ define('HTTP_RESULT_OK', '200'); /** * Fait le parse d'une chaine de characters recu comme * reponse a une requette curl * @param string $return La chaine de caracteres contenant un tableau * en format JSON * @return mixed Tableau associative contenant ce qui se trouve en * format JSON dans l'entree */ function parse($return) { // remplace les '{' et '}' par chaine vide $return = str_replace('{', '', $return); $return = str_replace('}', '', $return); // trimme les espaces et tabs $return = trim($return, " \t"); // remplace les retours de ligne $return = preg_replace('/\s\s+/', '', $return); // remplace les '"' par chaine vide $return = str_replace("\"", "", $return); // remplace ": " par ":" $return = str_replace(": ", ":", $return); $result = array(); // obtiens les (clef => valeur) paires $resparray = explode(',', $return); // si on a trouve des (clef => valeur) paires if ($resparray) { foreach ($resparray as $resp) { // obtiens le clef et la valeur $keyvalue = explode(':', $resp); // stocke le clef et la valeur dans le $result $result[trim(str_replace('"', '',$keyvalue[0]))] = trim(str_replace('"', '', $keyvalue[1])); } } // retourne le tableau associative return $result; } /** * Utilise curl pour faire POST a la service/messages d'ERP, et * log le resultat dans le fichier de log * @param string $msg Objet de la classe AMQPMessage */ function postViaCurl($msg) { $data = $msg->body; global $ERP_URL_MESSAGES; // si l'url de resource services/messages d'ERP est vide, ne traite pas // le message recu if (empty($ERP_URL_MESSAGES)) { logger::instance()->cleanLog(); logger::instance()->log("Le URL de destination manque", EXTRA_VERBOSE_MODE); logger::instance()->writeLogToFile(); return; } // cree une resource curl et set les options pour l'envoi d'un POST $curl_resource = curl_init($ERP_URL_MESSAGES); // set la methode d'envoi, POST curl_setopt($curl_resource, CURLOPT_CUSTOMREQUEST, "POST"); // set le message a envoyer curl_setopt($curl_resource, CURLOPT_POSTFIELDS, $msg->body); // une reponse est attendue curl_setopt($curl_resource, CURLOPT_RETURNTRANSFER, true); // set le header curl_setopt($curl_resource, CURLOPT_HTTPHEADER, array( 'Content-Type: application/json', 'Content-Length: ' . strlen($msg->body)) ); // execute la requete curl et obtiens le resultat de la requete $result = curl_exec($curl_resource); // ferme la resource curl curl_close($curl_resource); // le resultat recu par curl est une chaine des caracteres qui contient // un tableau JSON, donc appelle parse pour obtenir un tableau associative $result = parse($result); // log le retour de curl dans le fichier de log logger::instance()->cleanLog(); logger::instance()->log(date('d/m/Y H:i'). ': '. $result['http_code_message'] . ' ' . $result['message'].' Message: '. $msg->body, EXTRA_VERBOSE_MODE); logger::instance()->writeLogToFile(); // si le resultat du curl est OK, fais une accuse d'acception pour le message if ($result['http_code'] == HTTP_RESULT_OK) { // accuse de reception du message $msg->delivery_info['channel']-> basic_ack($msg->delivery_info['delivery_tag']); } else { // le resultat de l'execution de curl indique un erreur // dors un peu avant de refuser le message sleep(2); // refuse le message $msg->delivery_info['channel']->basic_reject( $msg->delivery_info['delivery_tag'], true); } // envoie un message avec la chaine "quit" pour ce consumeur if ($msg->body === 'quit') { $msg->delivery_info['channel']-> basic_cancel($msg->delivery_info['consumer_tag']); } } // pour des besoins de log il faut set $_SERVER["REQUEST_URI"] if (!isset($_SERVER["REQUEST_URI"])) { $_SERVER["REQUEST_URI"] = __FILE__; } // si le $argv[1] manque on ne peut pas cree un tag unique pour le // consumeur et dans ce cas-la on termine l'execution $suffix = ''; if (!empty($argv) && count($argv) > 1) { $suffix = $argv[1]; } else { logger::instance()->log('Missing the command line argument used for tag suffix', EXTRA_VERBOSE_MODE); logger::instance()->writeLogToFile(); exit; } // un exchange fait le routage des messages vers les piles $exchange = 'router'; // on va utiliser la pile $ERP_QUEUE $queue = $ERP_QUEUE; // l'identifiant unique de consumeur ; il est utilise par RabbitMQ $consumer_tag = 'consumer'.$suffix; // etablis une connexion avec RabbitMQ instance $conn = new AMQPConnection($ERP_CONNECTION_HOST, $ERP_CONNECTIOn_PORT, $ERP_CONNECTION_USER, $ERP_CONNECTION_PASSWORD, $ERP_CONNECTION_VHOST); // obtiens un canal de la connexion $ch = $conn->channel(); // si le canal n'est pas disponible arretes l'execution if (is_null($ch)) { // probleme avec la connexion, log le fait et arrete l'execution logger::instance()->log('Probleme dans l\'etablisement de la connexion', EXTRA_VERBOSE_MODE); logger::instance()->writeLogToFile(); exit; } // declare une pile (la creer s'il faut) $ch->queue_declare($queue, false, true, false, false); // declare un exchange qui va diriger les messages $ch->exchange_declare($exchange, 'direct', false, true, false); // relier la pile d'ERP au exchange $ch->queue_bind($queue, $exchange); // registre le callback pour ce consumer aupres de RabbitMQ $ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'postViaCurl'); // loop as long as the channel has callbacks registered while (count($ch->callbacks)) { $ch->wait(); } ?>