DETECCIÓN DE NEUMONÍA EN RADIOGRAFÍAS: 3 – DIAGNÓSTICO CON ALGORITMOS TRADICIONALES

Publicado por @JoseMa_deCuenca el March 8, 2020, 9:23 p.m.
A pesar de que la versatilidad y potencia de los modelos basados en redes neuronales profundas les han puesto de moda, no debemos desechar a priori la validez de los algoritmos tradicionales en muchos problemas de diagnóstico de imágenes. Vamos a repasar el uso de algunos de ellos. Comenzaremos con un modelo de máquina de vectores de soporte o SVM. En primer lugar, su aplicación a las radiografías requiere adaptar el formato de las imágenes cargadas en los arrays a la entrada del modelo SVM, aplanando su formato de resolución y canales. Una vez realizada esa transformación específica para este modelo, realizamos un análisis PCA de 150 componentes, que se considera suficiente para las dimensiones de entrada de los datos. Se usa para proyectar los datos de entrada en la base ortonormal de los autovalores de la matriz de componentes principales, con la que finalmente se alimentará el algoritmo. Después se define el modelo SVM, con autotunning para múltiples parámetros de penalización C y coeficiente gamma del kernel. Este modelo tuneado permite elegir su combinación óptima, lo que se traduce en un buen rendimiento. La función de tuneado es GridSearchCV, que también se volverá a usar más adelante para fijar las épocas de entrenamiento en las redes neuronales finalmente seleccionadas. ### CLASIFICADOR SVM ## PREPARACION FORMATO DEL CONJUNTO DE DATOS PARA MODELOS #Formato de imagenes en los conjuntos de carga print("Formato imagenes en conjuntos de carga") print(X_train.shape) print(X_test.shape) print(X_val.shape) # Aseguro las dimensiones del array como imagenes de entrada para el modelo SVM X_train_svm=X_train.reshape(5216,67500) X_test_svm=X_test.reshape(624,67500) X_val_svm=X_val.reshape(16,67500) # Compruebo print("Formato imagenes para entrada del modelo") print(X_train_svm.shape) print(X_test_svm.shape) print(X_val_svm.shape) ## ANALISIS DE COMPONENTES PRINCIPALES PCA from sklearn.decomposition import PCA n_components = 150 print("\nExtrae los %d eigenfaces principales de %d datos" % (n_components, X_train_svm.shape[0])) # Inicia cronometro start_time = time.time() # Define modelo PCA de n_componentes y lo entrena pca = PCA(n_components=n_components, svd_solver='randomized', whiten=True).fit(X_train_svm) print("Dimensiones matriz pca generada:", pca.components_.shape) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("La reduccion PCA tardo: ", dt, "segundos (", round(dt/60,1) , "minutos)") print("\nProyectando los datos de entrada sobre la base ortonormal de los eigenfaces") # Inicia cronometro start_time = time.time() X_train_pca = pca.transform(X_train_svm) X_test_pca = pca.transform(X_test_svm) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("La proyeccion ortonormal de los datos tardo: ", dt, "segundos (", round(dt/60,1) , "minutos)") print(X_train_pca.shape) print(X_test_pca.shape) # ############################################################################# # Entreno un clasificador SVM tuneandolo para los mejores parametros from sklearn.model_selection import GridSearchCV from sklearn.svm import SVC print("\nProbando el clasificador SVM con el conjunto de entrenamiento") # Inicia cronometro start_time = time.time() # Defino un diccionario de parametros para probar (SVM tuneado) param_grid = {'C': [1e3, 5e3, 1e4, 5e4, 1e5], 'gamma': [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.1], } # Defino un modelo SVM con kernel rbf clf = GridSearchCV(SVC(kernel='rbf', class_weight='balanced'), param_grid, cv=5) # Modelo de entrenamiento clf = clf.fit(X_train_pca, y_train) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("El entrenamiento del SVM tuneado ha tardado: ", dt, "segundos (", round(dt/60,1) , "minutos)") print("\nParametros del mejor estimador encontrado en la busqueda") print(clf.best_estimator_) # Pruebo el modelo con el conjunto de datos de prueba from sklearn.metrics import classification_report from sklearn.metrics import confusion_matrix print("\nPrediciendo los nombres de las personas en el conjunto de prueba") # Inicia cronometro start_time = time.time() # Uso el modelo para predecir el diagnostico en el conjunto de prueba tras PCA y_pred = clf.predict(X_test_pca) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("La prediccion con el modelo SVM tarda: ", dt, "segundos (", round(dt/60,1) , "minutos)") # Informe de rendimiento sobre el conjunto de prueba print("\nInforme de rendimiento para el conjunto de prueba:\n") print(classification_report(y_test, y_pred, target_names=['NORMAL', 'PNEUMONIA'])) print("\nMatriz de confusion para el conjunto de prueba:\n") print(confusion_matrix(y_test, y_pred)) # Libreria mlxtend para dibujo con diseño de la matriz de confusion CM = confusion_matrix(y_test, y_pred) from mlxtend.plotting import plot_confusion_matrix fig, ax = plot_confusion_matrix(conf_mat=CM , figsize=(5, 5)) plt.show() En segundo lugar vamos a probar un algoritmo de árbol de decisión tradicional. Utilizaremos el mismo dataframe de ingeniería de características y los datos preprocesados con análisis PCA. Una vez generado el árbol, representaremos gráficamente el resultado, y lo guardaremos en un archivo pdf en la ruta del usuario. Además, haremos un recuento de los nodos utilizados y cuantos de ellos son finales, almacenándose estos datos en variables para su uso posterior en el mapeo del perceptrón multicapa. También hallaremos las métricas de rendimiento del algoritmo. # Entreno un clasificador ARBOL DE DECISION from sklearn.tree import DecisionTreeClassifier print("\nProbando el clasificador ARBOL DE DECISION con el conjunto de entrenamiento") # Inicia cronometro start_time = time.time() # Defino un modelo ARBOL DE DECISION con criterio gini (es un clasificador) clf_AD = DecisionTreeClassifier(criterion = 'gini', random_state = 0, max_depth=10, min_samples_split=3) # Modelo de entrenamiento clf_AD = clf_AD.fit(X_train_pca, y_train) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("El entrenamiento del ARBOL ha tardado: ", dt, "segundos (", round(dt/60,1) , "minutos)") # Creo grafico con el diagrama del arbol de decision import graphviz from sklearn import tree # Configuro el grafico dot_data = tree.export_graphviz(clf_AD, out_file=None, class_names=['NORMAL','PNEUMONIA'], filled=True, rounded=True, special_characters=True) # Genero el grafico graph = graphviz.Source(dot_data) # Salida del grafico graph.render("Arbol Decision") # Grabo el arbol en un archivo pdf con el nombre especificado graph # Lo represento en pantalla # RECUENTO DE NODOS, Y NODOS FINALES print("Numero de nodos del arbol: ", clf_AD.tree_.node_count) # Utilizo matrices para almacenar la estructura de conexiones del arbol de decision n_nodes = clf_AD.tree_.node_count children_left = clf_AD.tree_.children_left children_right = clf_AD.tree_.children_right # Matrices para almacenar la profundidad de cada nodo y si es final node_depth = np.zeros(shape=n_nodes, dtype=np.int64) is_leaves = np.zeros(shape=n_nodes, dtype=bool) # Semilla para el nodo inicial stack = [(0, -1)] # Variable para contar el numero de nodos finales final_node = 0 # Bucle de comparacion que recorre cada nivel del arbol while len(stack) > 0: node_id, parent_depth = stack.pop() node_depth[node_id] = parent_depth + 1 # Si el nodo es de test se incrementa el nivel if (children_left[node_id] != children_right[node_id]): stack.append((children_left[node_id], parent_depth + 1)) stack.append((children_right[node_id], parent_depth + 1)) # Si no el nodo es final y se incrementa el contador else: is_leaves[node_id] = True final_node = final_node + 1 print("Nodos finales: ", final_node) # Pruebo el modelo con el conjunto de datos de prueba from sklearn.metrics import classification_report from sklearn.metrics import confusion_matrix print("\nPrediciendo con Arbol de Decisión en el conjunto de prueba") # Inicia cronometro start_time = time.time() # Uso el modelo para predecir el diagnostico en el conjunto de prueba tras PCA y_pred = clf_AD.predict(X_test_pca) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("La prediccion con el modelo AD tarda: ", dt, "segundos (", round(dt/60,1) , "minutos)") # Informe de rendimiento sobre el conjunto de prueba print("\nInforme de rendimiento para el conjunto de prueba:\n") print(classification_report(y_test, y_pred, target_names=['NORMAL', 'PNEUMONIA'])) print("\nMatriz de confusion para el conjunto de prueba:\n") print(confusion_matrix(y_test, y_pred)) print('\nPrecisión modelo conjunto train: ', round(clf_AD.score(X_train_pca, y_train), 3)) print('Precisión modelo conjunto test: ', round(clf_AD.score(X_test_pca, y_test),3)) # Libreria mlxtend para dibujo con diseño de la matriz de confusion CM = confusion_matrix(y_test, y_pred) from mlxtend.plotting import plot_confusion_matrix fig, ax = plot_confusion_matrix(conf_mat=CM , figsize=(5, 5)) plt.show() # Indice de JACARD from sklearn.metrics import jaccard_similarity_score print("jaccard_similarity_score: ", round(jaccard_similarity_score(y_test, y_pred),3)) Para tratar de mejorar los resultados del árbol de clasificación, y dado su escaso tiempo de proceso, vamos a realizar un ensemble secuencial (boosting) de árboles de decisión de tipo adaptativo (AdaBoost). Veremos que al partir de un Árbol de Decisión ya muy ajustado, los resultados prácticamente no registran mejoría. # Clasificador ADA BOOST from sklearn.ensemble import AdaBoostClassifier print("\nProbando el clasificador ARBOL DE DECISION con el conjunto de entrenamiento") # Inicia cronometro start_time = time.time() # Defino un modelo ADA BOOST con base en el arbol de decision clf_ADA = AdaBoostClassifier(base_estimator=clf_AD, n_estimators=95, learning_rate=1.71, random_state=1) # Modelo de entrenamiento clf_ADA = clf_ADA.fit(X_train_pca, y_train) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("El entrenamiento del ADA BOOST ha tardado: ", dt, "segundos (", round(dt/60,1) , "minutos)") # Utilizando AdaBoost para tratar de aumentar la precisión # Pruebo el modelo con el conjunto de datos de prueba print("\nPrediciendo con AdaBoost") # Inicia cronometro start_time = time.time() # Uso el modelo para predecir el diagnostico en el conjunto de prueba tras PCA y_pred = clf_ADA.predict(X_test_pca) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("La prediccion con el modelo AD tarda: ", dt, "segundos (", round(dt/60,1) , "minutos)") # Informe de rendimiento sobre el conjunto de prueba print("\nInforme de rendimiento para el conjunto de prueba:\n") print(classification_report(y_test, y_pred, target_names=['NORMAL', 'PNEUMONIA'])) print("\nMatriz de confusion para el conjunto de prueba:\n") print(confusion_matrix(y_test, y_pred)) print('\nPrecisión modelo conjunto train: ', round(clf_ADA.score(X_train_pca, y_train), 3)) print('Precisión modelo conjunto test: ', round(clf_ADA.score(X_test_pca, y_test),3)) # Libreria mlxtend para dibujo con diseño de la matriz de confusion CM = confusion_matrix(y_test, y_pred) from mlxtend.plotting import plot_confusion_matrix fig, ax = plot_confusion_matrix(conf_mat=CM , figsize=(5, 5)) plt.show() # Indice de JACARD from sklearn.metrics import jaccard_similarity_score print("jaccard_similarity_score: ", round(jaccard_similarity_score(y_test, y_pred),3)) Para finalizar la prueba de algoritmos tradicionales con este problema de clasificación de imágenes, vamos a simular un perceptrón multicapa clásico, el predecesor de los modelos de aprendizaje profundo que veremos en una próxima entrega. La librería Keras no permite la construcción de un perceptrón multicapa propiamente dicho, pero podemos emularlo mediante una red que mapea el diseño del árbol de decisión, con un optimizador sgd y solamente una época de entrenamiento. El fin de este código es por una parte académico para comprobar la bondad del mapeo, y por otra para aproximar el diseño de una primera red neuronal, sin esperar a priori grandes rendimientos. # Carga de librerias y funciones de Keras para modelos RN import keras from keras.models import Sequential from keras.layers import Dense from keras.layers import Flatten from keras import optimizers # Habilitamos funciones de llamada interna de keras para monitorizar el entrenamiento # Y para reducir la tasa de aprendizaje ReduceLROnPlateau cuando cerca de minimo from keras.callbacks import ReduceLROnPlateau , ModelCheckpoint lr_reduce = ReduceLROnPlateau(monitor='val_acc', factor=0.1, epsilon=0.0001, patience=1, verbose=1) # Definicion de perceptron multicapa # Aplico las reglas de conversion del arbol de decision en un perceptron multicapa perceptron_multi = Sequential() # Introduzco datos con Flatten perceptron_multi.add(keras.layers.Flatten(input_shape=(150,150,3))) # Nº neuronas primera capa oculta = nº nodos arbol decision perceptron_multi.add(keras.layers.Dense (n_nodes,activation='sigmoid')) # Nº neuronas segunda capa oculta = nº nodos finales arbol de decision perceptron_multi.add(keras.layers.Dense (final_node,activation='sigmoid')) # Tras el aprendizaje de los detalles, incluyo el clasificador con dense perceptron_multi.add(keras.layers.Dense (1,activation='softmax')) # Definición de optimizador sgd = optimizers.SGD(lr=0.01, clipnorm=1.) perceptron_multi.compile(optimizer = 'sgd', loss='binary_crossentropy', metrics=['accuracy']) # Muestra arquitectura de la red antes de entrenar el modelo print(perceptron_multi.summary()) ## ENTRENAMIENTO MODELO PERCEPTRON MULTICAPA # Parametros de entrenamiento batch_size = 1024 epochs = 1 # Habilito una ruta y un archivo para guardar los pesos de entrenamiento de la red filepath="./checkpoints/chest_xray_weights_perceptron.hdf5" checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max') # Entrenamiento del modelo # Inicia cronometro start_time = time.time() history_perceptr = perceptron_multi.fit(X_train, y_train, validation_data = (X_test , y_test), callbacks=[lr_reduce,checkpoint], epochs=epochs) # Detiene el cronometro end_time = time.time() # Informa cronometro dt = round(end_time - start_time, 2) print("El entrenamiento de este modelo tardó: ", dt, "segundos (", round(dt/60,1) , "minutos)") # Definimos variables para guardar los resultados del modelo de referencia entrenado print("Precision acc: ", history_perceptr.history['acc']) print("Precision en prueba val_acc: ", history_perceptr.history['val_acc']) print("Perdida loss: ", history_perceptr.history['loss']) print("Perdida en prueba val_loss: ", history_perceptr.history['val_loss']) Los códigos completos pueden descargarse el repositorio https://github.com/watershed-lab/pneumo-detector