public class Biz { public static Single<String> method() { RespTransformer<String> transformer = RespTransform.newInstance(); return ApiGenerator.createApi(Api.class) .method() .compose(transformer) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } } public class presenter { public void method() { Biz.method() .subscribe(new Consumer<List<Resp>>() { @Override public void accept(List<Resp> resps) { ... } }, new BaseRespThrowableObserver() { @Override public void onV2Error(String code, String errorMsg) { ... } })); } } public class ApiGenerator { public static <S> createApi(Class<S> apiClass) { // serviceCreator()返回Retrofit实例; return NetManager.getInstance().serviceCreator().create(apiClass); } } 复制代码
上面就是Retrofit的调用流程, 下面根据这个调用流程来进行分析, Retrofit是如何完成一次网络请求的,
@SuppressWarnings("unchecked") // Single-interface proxy creation guarded by parameter safety. public <T> T create(final Class<T> service) { validateServiceInterface(service); return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service }, new InvocationHandler() { private final Platform platform = Platform.get(); private final Object[] emptyArgs = new Object[0]; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // If the method is a method from Object then defer to normal invocation. if (method.getDeclaringClass() == Object.class) { return method.invoke(this, args); } if (platform.isDefaultMethod(method)) { return platform.invokeDefaultMethod(method, service, proxy, args); } return loadServiceMethod(method).invoke(args != null ? args : emptyArgs); } }); } 复制代码
retrofit使用到 动态代理
的技术, 使得不同的接口调用, 最终都走到这里, 进行网络请求.
Retrofit.loadServiceMethod --->ServiceMethod.parseAnnotations --->HttpServiceMethod.parseAnnotations 复制代码
static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations( Retrofit retrofit, Method method, RequestFactory requestFactory) { boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction; boolean continuationWantsResponse = false; boolean continuationBodyNullable = false; Annotation[] annotations = method.getAnnotations(); Type adapterType; adapterType = method.getGenericReturnType(); // 1.获取adapter; CallAdapter<ResponseT, ReturnT> callAdapter = createCallAdapter(retrofit, method, adapterType, annotations); Type responseType = callAdapter.responseType(); // 2.获取convert; Converter<ResponseBody, ResponseT> responseConverter = createResponseConverter(retrofit, method, responseType); okhttp3.Call.Factory callFactory = retrofit.callFactory; // 3.构建CallAdapter; return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter); } 复制代码
retrofit的好处, 支持自定义convert与adapter, 默认okhttp只是单纯的网络请求, 与rxjava并没有关系, 但是通过Retrofit可以实现okhttp与rxjava的结合. 贴出来一段代码retrofit如何将okhttp与rxjava结合
public void initRetrofit() { Retrofit retrofit = new Retrofit.Builder() .addConverterFactory(GsonConverterFactory.create(/*自定义Gson*/)) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build(); } public final class RxJava2CallAdapterFactory extends CallAdapter.Factory { public static RxJava2CallAdapterFactory create() { return new RxJava2CallAdapterFactory(null, false); } } 复制代码
class HttpServiceMethod::createCallAdapter private static <ResponseT, ReturnT> CallAdapter<ResponseT, ReturnT> createCallAdapter( Retrofit retrofit, Method method, Type returnType, Annotation[] annotations) { return (CallAdapter<ResponseT, ReturnT>) retrofit.callAdapter(returnType, annotations); } class Retrofit::callAdapter public CallAdapter<?, ?> nextCallAdapter(CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) { Objects.requireNonNull(returnType, "returnType == null"); Objects.requireNonNull(annotations, "annotations == null"); int start = callAdapterFactories.indexOf(skipPast) + 1; for (int i = start, count = callAdapterFactories.size(); i < count; i++) { CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this); if (adapter != null) { return adapter; } } } class RxJava2CallAdapterFactory::get @Override public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) { Class<?> rawType = getRawType(returnType); if (rawType == Completable.class) { // Completable is not parameterized (which is what the rest of this method deals with) so it // can only be created with a single configuration. return new RxJava2CallAdapter(Void.class, scheduler, isAsync, false, true, false, false, false, true); } boolean isFlowable = rawType == Flowable.class; boolean isSingle = rawType == Single.class; boolean isMaybe = rawType == Maybe.class; // ... // 这里为okhttp与rxjava结合做了铺垫 return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false); } 复制代码
class CallAdapted::invoke @Override final ReturnT invoke(Object[] args) { Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter); return adapt(call, args); } @Override protected ReturnT adapt(Call<ResponseT> call, Object[] args) { // callAdapter指向的是初始化Retrofit中传入的RxJava2CallAdapter return callAdapter.adapt(call); } 复制代码
2.1模块loadServiceMethod
返回CallAdapted对象, 当有网络请求时, 触发CallAdapted.invoke方法的执行 @Override public Object adapt(Call<R> call) { Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); Observable<?> observable; if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return RxJavaPlugins.onAssembly(observable); } 复制代码
retrofit端作为网络请求的入口, 将请求权和结果的处理权都交给了rxjava, 实现了rxjava与okhttp的结合.
结合rxjava的源码可知, subsrcibe()会触发CallExecuteObservable.subscribeActual的执行
@Override protected void subscribeActual(Observer<? super Response<T>> observer) { // Since Call is a one-shot type, clone it for each new observer. Call<T> call = originalCall.clone(); //... Response<T> response = call.execute(); //... } 复制代码
@Override public Response<T> execute() throws IOException { okhttp3.Call call; synchronized (this) { executed = true; // call = RealCall; call = getRawCall(); } return parseResponse(call.execute()); } 复制代码
override fun execute(): Response { timeout.enter() callStart() // 流程省略, client指向OkHttpClient client.dispatcher.executed(this) // 网络请求的处理使用了责任链模式 return getResponseWithInterceptorChain() } 复制代码
@Throws(IOException::class) internal fun getResponseWithInterceptorChain(): Response { // Build a full stack of interceptors. val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis ) var calledNoMoreExchanges = false val response = chain.proceed(originalRequest) return response } 复制代码