Couplage nxlog et stack ElasticSearch

Je suis actuellement en train de tester le couple Logstash/ES/Kibana. Dans ce cadre, j'ai voulu regarder différents mécanismes pour remonter des informations venant de mes différentes machines sans avoir pour autant à installer un agent Logstash partout.

J'ai donc commencé à regarder le fonctionnement de nxlog qui permet de faire ce genre de chose.

Mise en place de nxlog

Dans ce qui va suivre, on va remonter les events venant de notre nœud Windows (ici un 2003) dans le serveur Logstash. Le côté intéressant de ce produit est qu'il fonctionne aussi bien sous Windows que Linux et que ce dernier fait de la rétention de message le temps que le serveur Logstash soit de nouveau disponible.

Pour en revenir à nos moutons, ci-dessous un exemple de configuration :

define ROOT C:\\Program Files\\nxlog

Moduledir %ROOT%\\modules
CacheDir %ROOT%\\data
Pidfile %ROOT%\\data\\nxlog.pid
SpoolDir %ROOT%\\data
LogFile %ROOT%\\data\\nxlog.log

<Input in>
    # Module      im_msvistalog
# For windows 2003 and earlier use the following:
    Module      im_mseventlog
</Input>

<Extension json>
    Module      xm_json
</Extension>

<Output logstash>
    Module      om_tcp
    Port        5140
    Host        IP_SERVEUR_LOGSTASH
    Exec        to_json();
</Output>

# Let's tie all pieces together with a NXlog route
<Route local_to_logstash>
    Path        in => logstash
</Route>
En substance, on charge le module im_mseventlog, on indique un point d'envoi (Output logstash) et enfin, on précise un chemin (in => logstash).

Configuration de Logstash

Nous allons maintenant configurer Logstash afin de récupérer les messages de notre ami nxlog.

Pour se faire, nous allons créer un fichier logstash.conf avec une entrée (précisant l'encodage) :

input {
  tcp {
    port => 5140
    type => "nxlog"
    codec => line {
      charset => "CP1252"
    }
  }
}

NB : il est possible d'utiliser le codec json directement dans l'input tcp. Le problème est que l'agent nxlog peut renvoyer plusieurs messages JSON d'un seul coup (notamment dans le cas d'un arrêt/relance du serveur Logstash). Dans ce cas, Logstash n'arrive pas à bien l'interpréter et ne récupère les différents champs.

On va ensuite récupérer le contenu et le traiter comme étant du json :

filter {
  if [type] == "nxlog" {
    json {
      source => "message"
    }
    mutate {
      rename => [ "Message", "message" ]
      # remove_field => [ "champ1", "champ2" ]
    }
  }
}

A noter que nxlog renvoie pas mal de champ. Si vous voulez en supprimer quelques uns, décommenter le champ remove_field ci-dessus.

Il ne nous reste plus qu'à renvoyer le contenu sur la sortie standard pour débogage :

output {
  stdout { debug => true }
}

Reste maintenant à lancer notre agent logstash :

java -jar logstash.jar --conf logstash.conf agent

Exemple de message

Une fois lancé, nous pouvons recevoir nos premiers messages :

{
              "message" => "xxx",
             "@version" => "1",
           "@timestamp" => "2014-02-13T14:53:57.156Z",
                 "type" => "nxlog",
                 "host" => "xxxx:xxx",
            "EventTime" => "2014-02-13 15:53:57",
     "EventTimeWritten" => "2014-02-13 15:53:57",
             "Hostname" => "XXX",
            "EventType" => "AUDIT_SUCCESS",
        "SeverityValue" => 2,
             "Severity" => "INFO",
           "SourceName" => "Security",
             "FileName" => "Security",
              "EventID" => 538,
       "CategoryNumber" => 2,
             "Category" => "Ouverture/Fermeture de session ",
         "RecordNumber" => 410222,
               "Domain" => "XXX",
          "AccountName" => "Administrateur",
          "AccountType" => "User",
    "EventReceivedTime" => "2014-02-13 15:53:57",
     "SourceModuleName" => "in",
     "SourceModuleType" => "im_mseventlog"
}

Il ne reste plus qu'à envoyer tout ça dans le moteur ElasticSearch et le tour est joué !
Vus : 2271
Publié par Yannig : 63