apeescape2.com
  • メイン
  • Uiデザイン
  • 仕事の未来
  • トレンド
  • モバイル
データサイエンスとデータベース

Apache Sparkストリーミングチュートリアル:Twitterのトレンドハッシュタグの特定

今日、データは以前よりも急速に成長し、蓄積されています。現在、私たちの世界で生成されたデータの約90%は、過去2年間に生成されたものです。このレートの増加により、プラットフォーム ビッグデータ このような大量のデータを維持できるようにするには、根本的なソリューションを採用する必要がありました。

処理(プログラミング言語)

今日の最も重要なデータソースの1つは、ソーシャルメディアです。実際の例を示しましょう。ソーシャルメディアデータから情報をリアルタイムで管理、分析、抽出し、次のエコソリューションの1つを使用します。 ビッグデータ そこにある最も重要なもの— ApacheSparkとPython。

Apache Spark Streamingを使用して、トレンドのTwitterハッシュタグなどのソーシャルメディアから情報を抽出できます。



この記事では、Pythonを使用してTwitterオンラインフィードを読み取り、を使用してツイートを処理する簡単なアプリを作成する方法を紹介します。 ApacheSparkストリーミング ハッシュタグを識別し、最後に最も重要なトレンドハッシュタグを返し、このデータをダッシュ​​ボードにリアルタイムで表示します。

TwitterAPI用の独自の資格情報の作成

Twitterからツイートを取得するには、で登録する必要があります TwitterApps 「新規アプリケーションの作成」をクリックし、以下のフォームに記入した後、「Twitterアプリケーションの作成」をクリックします。

スクリーンショット:Twitterアプリケーションの作成方法

次に、新しく作成したアプリケーションに移動し、[識別子とキーへのアクセス]ウィンドウを開きます。次に、「アクセス識別子を生成する」をクリックします。

スクリーンショット:Twitterアプリの資格情報、パスワード、アクセスIDのインストール

新しいログインIDは次のように表示されます。

スクリーンショット:TwiiterアプリケーションのアクセスIDのインストール

これで、次のステップの準備が整いました。

HTTPTwitterクライアントを構築する

このステップでは、Pythonを使用してTwitter APIからツイートをフェッチし、それらをインスタンスに渡す単純なクライアントを構築する方法を示します。 Sparkストリーミング 。誰でも簡単にフォローできるはずです Python開発者 プロフェッショナル。

まず、twitter_app.pyというファイルを作成します。次に、以下に示すようにコードを一緒に追加します。

モンテカルロシミュレーションの確率分布

以下に示すように、使用するライブラリをインポートします。

import socket import sys import requests import requests_oauthlib import json

そして、以下に示すように、Twitterに接続するためにOAuthで使用される変数を追加します。

# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

それでは、get_tweetsという新しい関数を作成しましょう。 Twitter API URLを呼び出し、一連のツイートに対する応答を返します。

def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

次に、上のビューから応答を受け取り、完全なツイートのJSONオブジェクトからツイートのテキストを抽出する関数を作成します。この後、各ツイートをインスタンスに送信します Sparkストリーミング (後で説明します)TCP接続を介して。

def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + ' ') except: e = sys.exc_info()[0] print('Error: %s' % e)

次に、主要部分を実行します。これにより、アプリケーションが接続をホストするようになります ソケット 、後で接続します スパーク 。ここでIPをlocalhostに設定しましょう。すべてが同じマシンとポート9009で実行されるためです。次に、上記で行ったget_tweetsメソッドを呼び出して、Twitterからツイートを取得し、接続を使用して応答を渡します。 ソケット a send_tweets_to_sparkツイートをSparkに送信します。

TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)

ApacheSparkストリーミングアプリケーションのインストール

アプリケーションを構築しましょう Sparkストリーミング 、着信ツイートをリアルタイムで処理し、それらからハッシュタグを抽出して、言及されたハッシュタグの数を計算します。

イラスト:* Sparkストリーミング*は、着信ツイートのリアルタイム処理とハッシュタグ抽出を可能にします

まず、インスタンスを作成する必要があります Sparkコンテキスト sc次に、作成します ストリーミングコンテキスト sscのsc 2秒ごとに受信したすべての送信で変換を実行する2秒間隔で。ログレベルをERRORに設定していることに注意してください。書き込むログのほとんどを無効にできるようにする スパーク 。

ここでは、定期的なRDDチェックを許可できるようにチェックポイントを定義します。これは、ステートフルな消防変換を使用するため、アプリケーションで使用する必要があります(同じセクションで後述します)。

次に、サーバーに接続するメインのDStreamdataStreamを定義します ソケット 以前にポートで作成したもの9009そのポートからツイートを読み取ります。 DStreamのすべてのレコードはツイートになります。

from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)

次に、変換ロジックを定義します。まず、すべてのツイートを単語に分割し、RDD単語に変換します。次に、すべての単語のハッシュタグのみをフィルタリングし、(hashtag, 1)の横にプロットします。そしてそれらをRDDハッシュタグに入れます。

次に、ハッシュタグが言及された回数を計算する必要があります。 reduceByKey関数を使用してこれを行うことができます。この関数は、各グループがハッシュタグに言及した回数を計算します。つまり、各グループのアカウントをリセットします。

この場合、すべてのグループのカウントを計算する必要があるため、updateStateByKeyという別の関数を使用します。この機能を使用すると、RDDステータスを保持しながら、新しいデータで更新することができます。この形式はStateful Transformationと呼ばれます。

updateStateByKeyを使用するには、チェックポイントと前の手順で行ったことを構成する必要があることに注意してください。

# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()

updateStateByKey update関数と呼ばれるパラメーターとして関数を取ります。これはRDDの各項目で実行され、必要なロジックを実行します。

この例では、aggregate_tags_countという更新関数を作成しました。これにより、各ハッシュタグのすべてのnew_values(新しい値)が合計され、total_sumに追加されます。 (合計)、これはすべてのグループの合計であり、RDD tags_totalsにデータを保存します。

def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

次に、RDD処理を実行しますtags_totals各グループで、を使用して一時テーブルに変換できるようにします SparkSQLコンテキスト この後、アカウントで上位10個のハッシュタグを取得してデータフレーム内に配置できるようにステートメントを作成しますhashtag_counts_df。

def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)

Sparkアプリケーションの最後のステップは、データフレームを送信することですhashtag_counts_dfダッシュボードアプリに。したがって、データフレームを2つの行列に変換します。1つはハッシュタグ用で、もう1つはアカウント用です。次に、RESTAPIを介してダッシュボードアプリにプッシュします。

def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

最後に、これはの出力のサンプルです Sparkストリーミング hashtag_counts_dfの実行および印刷中。各グループ間隔で、出力が2秒ごとに正確に出力されることがわかります。

Windowsフォームアプリケーションプロジェクトのフォルダ構造

グループ間隔設定ごとに印刷されたTwitter * Sparkストリーミング*出力の例

データを表すシンプルなリアルタイムダッシュボードを作成する

次に、Sparkによってリアルタイムで更新される単純なダッシュボードアプリケーションを作成します。 Python、Flask、およびを使用してビルドします Charts.js 。

まず、以下のような構造のPythonプロジェクトを作成し、ファイルをダウンロードして追加します。 Chart.js 静的ディレクトリ内。

イラスト:Twitterハッシュタグ分析で使用するPythonプロジェクトを作成する

次に、ファイルapp.pyに、update_dataという関数を作成します。この関数は、URL http://localhost:5001/updateDataを介してSparkによって呼び出されます。グローバルラベルと値の配列を更新できるようにします。

同様に、関数refresh_graph_dataこれは、AJAXリクエストによって呼び出され、新しく更新されたラベルと値の配列をJSONとして返すように作成されます。関数get_chart_pageページを離れますchart.html呼び出されたとき。

from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)

次に、ファイルに簡単なグラフを作成しますchart.htmlハッシュタグデータを表示し、リアルタイムで更新できるようにします。以下に定義するように、JavaScriptライブラリをインポートする必要がありますChart.jsおよびjquery.min.js。

バイヤーの定義の交渉力

タグの本体で、キャンバスを作成し、次のステップでJavaScriptを使用するときにグラフを表示するときに参照できるようにIDを指定する必要があります。

Top Trending Twitter Hashtags

Top Trending Twitter Hashtags

次に、以下のJavaScriptコードを使用してグラフを作成します。まず、canvas要素を取得してから、新しいグラフオブジェクトを作成し、それにcanvas要素を渡して、以下に示すようにデータオブジェクトを定義します。

データラベルは、get_chart_pageを呼び出すときに、ページを離れるときに返されるラベルおよび値変数と結合されることに注意してください。ファイル内app.py。

最後の部分は、毎秒Ajaxリクエストを行い、URL /refreshDataを呼び出すように構成された関数です。これはrefresh_graph_dataを実行します。でapp.pyそして、新しく更新されたデータを返し、新しいデータが残すグラフを更新します。

var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);

アプリケーションを一緒に実行する

以下の順序で3つのアプリケーションを実行します。1。Twitterアプリクライアント。 2.SparkApp。3。ダッシュボードWebアプリ。

次に、URLを探すことにより、リアルタイムでコントロールパネルにアクセスできます。

これで、グラフが以下で更新されていることがわかります。

アニメーション:Twitterでトレンドのハッシュタグをリアルタイムでグラフ化

Apacheストリーミングの実際の使用

Spark Streamingを使用してリアルタイムデータで簡単なデータ分析を行い、RESTfulWebサービスを使用して簡単なコントロールパネルと直接統合する方法を学びました。この例から、Sparkが大量のデータストリームをキャプチャして変換し、短時間で意思決定を行うために簡単に使用できる貴重な情報を抽出するため、Sparkがいかに強力であるかがわかります。実装でき、ニュースやマーケティングなどのさまざまな業界に役立つ、多くの便利なユースケースがあります。

イラスト:ハッシュタグは、情報や価値観を抽出するために使用でき、複数の業界に適用できます。

ニュース業界の例

最も頻繁に言及されるハッシュタグを追跡して、ソーシャルメディアで人々が話しているトピックを見つけることができます。また、特定のハッシュタグとそのツイートを追跡して、世界中の特定のトピックやイベントについて人々が何を言っているかを調べることもできます。

マーケティング例

ツイートの送信を収集し、意見分析を行うことで、それらを分類し、人々の興味を判断して、彼らの興味に関連するオファーを提供することができます。

また、特に分析に適用できる多くのユースケースがあります。 ビッグデータ そしてそれらは多くの産業に役立つことができます。一般的なApacheSparkのユースケースの詳細については、次のいずれかを確認することをお勧めします。 以前の投稿 。

私はあなたがについてもっと読むことをお勧めします Sparkストリーミング ここに その機能についてさらに学び、それを使用するときにリアルタイムでより多くの情報を得るために、より高度なデータ変換を行うため。

人間の行動に対する色の影響

Salesforceフロントエンドエンジニア

その他

Salesforceフロントエンドエンジニア
強化されたGitフローの説明

強化されたGitフローの説明

プロジェクト管理

人気の投稿
BEM方法論の紹介
BEM方法論の紹介
Ember.js開発者が犯す8つの最も一般的な間違い
Ember.js開発者が犯す8つの最も一般的な間違い
ソフトウェア設計ドキュメントを書くことが重要な理由
ソフトウェア設計ドキュメントを書くことが重要な理由
初期市場参入の課題
初期市場参入の課題
AngularMaterialを使用して最新のWebアプリを構築する
AngularMaterialを使用して最新のWebアプリを構築する
 
モバイルエクスペリエンスのためのeコマースUX
モバイルエクスペリエンスのためのeコマースUX
コンサルタントツールボックス:あらゆるものを解決するためのフレームワーク
コンサルタントツールボックス:あらゆるものを解決するためのフレームワーク
モバイルWebアプリケーションの開発:いつ、なぜ、そしてどのように
モバイルWebアプリケーションの開発:いつ、なぜ、そしてどのように
トップピッチデッキの間違い
トップピッチデッキの間違い
GraphQLとREST-GraphQLチュートリアル
GraphQLとREST-GraphQLチュートリアル
人気の投稿
  • CFOは毎日何をしていますか
  • CFOの義務は何ですか
  • C ++を使用するプログラム
  • python-telegram-bot
  • androidは開発者にクラッシュレポートを送信します
  • 閉鎖の原則は次のように述べています。
カテゴリー
Webフロントエンド 計画と予測 エンジニアリング管理 バックエンド 設計プロセス トレンド 投資家と資金調達 プロセスとツール 技術 仕事の未来

© 2021 | 全著作権所有

apeescape2.com