Couplage nxlog et stack ElasticSearch
Je suis actuellement en train de tester le couple Logstash/ES/Kibana. Dans ce cadre, j'ai voulu regarder différents mécanismes pour remonter des informations venant de mes différentes machines sans avoir pour autant à installer un agent Logstash partout.
J'ai donc commencé à regarder le fonctionnement de nxlog qui permet de faire ce genre de chose.
J'ai donc commencé à regarder le fonctionnement de nxlog qui permet de faire ce genre de chose.
Mise en place de nxlog
Dans ce qui va suivre, on va remonter les events venant de notre nœud Windows (ici un 2003) dans le serveur Logstash. Le côté intéressant de ce produit est qu'il fonctionne aussi bien sous Windows que Linux et que ce dernier fait de la rétention de message le temps que le serveur Logstash soit de nouveau disponible.
Pour en revenir à nos moutons, ci-dessous un exemple de configuration :
define ROOT C:\\Program Files\\nxlog Moduledir %ROOT%\\modules CacheDir %ROOT%\\data Pidfile %ROOT%\\data\\nxlog.pid SpoolDir %ROOT%\\data LogFile %ROOT%\\data\\nxlog.log <Input in> # Module im_msvistalog # For windows 2003 and earlier use the following: Module im_mseventlog </Input> <Extension json> Module xm_json </Extension> <Output logstash> Module om_tcp Port 5140 Host IP_SERVEUR_LOGSTASH Exec to_json(); </Output> # Let's tie all pieces together with a NXlog route <Route local_to_logstash> Path in => logstash </Route>
En substance, on charge le module im_mseventlog, on indique un point d'envoi (Output logstash) et enfin, on précise un chemin (in => logstash).
Configuration de Logstash
Nous allons maintenant configurer Logstash afin de récupérer les messages de notre ami nxlog.
Pour se faire, nous allons créer un fichier logstash.conf avec une entrée (précisant l'encodage) :
input {
tcp {
port => 5140
type => "nxlog"
codec => line {
charset => "CP1252"
}
}
}
NB : il est possible d'utiliser le codec json directement dans l'input tcp. Le problème est que l'agent nxlog peut renvoyer plusieurs messages JSON d'un seul coup (notamment dans le cas d'un arrêt/relance du serveur Logstash). Dans ce cas, Logstash n'arrive pas à bien l'interpréter et ne récupère les différents champs.
On va ensuite récupérer le contenu et le traiter comme étant du json :
filter {
if [type] == "nxlog" {
json {
source => "message"
}
mutate {
rename => [ "Message", "message" ]
# remove_field => [ "champ1", "champ2" ]
}
}
}
A noter que nxlog renvoie pas mal de champ. Si vous voulez en supprimer quelques uns, décommenter le champ remove_field ci-dessus.
Il ne nous reste plus qu'à renvoyer le contenu sur la sortie standard pour débogage :
output {
stdout { debug => true }
}
Reste maintenant à lancer notre agent logstash :
java -jar logstash.jar --conf logstash.conf agent
Exemple de message
Une fois lancé, nous pouvons recevoir nos premiers messages :
{
"message" => "xxx",
"@version" => "1",
"@timestamp" => "2014-02-13T14:53:57.156Z",
"type" => "nxlog",
"host" => "xxxx:xxx",
"EventTime" => "2014-02-13 15:53:57",
"EventTimeWritten" => "2014-02-13 15:53:57",
"Hostname" => "XXX",
"EventType" => "AUDIT_SUCCESS",
"SeverityValue" => 2,
"Severity" => "INFO",
"SourceName" => "Security",
"FileName" => "Security",
"EventID" => 538,
"CategoryNumber" => 2,
"Category" => "Ouverture/Fermeture de session ",
"RecordNumber" => 410222,
"Domain" => "XXX",
"AccountName" => "Administrateur",
"AccountType" => "User",
"EventReceivedTime" => "2014-02-13 15:53:57",
"SourceModuleName" => "in",
"SourceModuleType" => "im_mseventlog"
}
Il ne reste plus qu'à envoyer tout ça dans le moteur ElasticSearch et le tour est joué !